4591566 发表于 2015-11-10 13:58:33

php脚本(功能ios/android推送;技术mongodb队列,MQTT,锁文件。linux运行)

  主文件:MongoPushSend.php
  包含文件:include.php
  配置文件:config.php
  自定义类库目录:library
  ios ck.pem文件:ck.pem (ios推送所需的一个钥匙,重要!!!手机端提供)
  
  目录结构:
  主文件:scripts/MongoPushSend.php
  包含文件:scripts/include.php
  配置文件:scripts/config.php
  ios ck.pem文件:scripts/ck.pem(ios推送所需的一个钥匙,重要!!!手机端提供)
  自定义类库目录:
  scripts/library
  scripts/library/Hrs
  scripts/library/Hrs/Mongo
  scripts/library/Hrs/SAM(新增的MQTT,android推送需要用到的类库,重要!!!)
  数据库
  mongodb数据库连接:scripts/library/Hrs/Mongo/Config.php
  mongodb数据库推送消息的操纵(要继承与Table.php文件的):scripts/library/Hrs/Mongo/QueuePush.php
  mongodb数据库操纵的类文件:scripts/library/Hrs/Mongo/Table.php
  推送消息
  scripts/library/Hrs/Push
  推送消息的类文件:scripts/library/Hrs/Push/Sender.php
  
  MongoPushSend.php

<?php
/*
* 从mongodb数据库读取需要推送的信息数据, 调用推送信息接口推送信息
* 具体实现文件
*/
//加载include.php文件,包括文件,这样就能加载所有的文件了。include.php文件还包含一些方法,如锁文件的创建与删除
include_once 'include.php';
//设置锁文件,根据自己情况来,也可以不用锁的。get_temp_dir()方法在include.php文件中
//创建锁文件
//程序跑完再解锁,没跑完如果运行时间超过1800s,则重新上锁。
//没解锁,这个文件不能重复运行。
$lock_file = get_temp_dir().'/mongopushsend.lock';
//文件上锁(一小时)
//lock_up($lock_file,3600);
//mongodb数据库配置信息
$server = array('host'=>$config['databases']['mongodb']['host'],
'port'=>$config['databases']['mongodb']['port'],
'database'=>$config['databases']['mongodb']['database']
);
//设置mongodb配置文件中的配置信息,要识别Hrs_Mongo_Config类文件,需要include.php写一个自动加载类方法
Hrs_Mongo_Config::set($server);
//new 推送信息队列的对象
$queue = new Hrs_Mongo_QueuePush();
//发送失败的数目,默认为0,失败时自增加1。
$send_status = 0;
//当前时间
$nowtime = time();
//获取需要推送的队列信息($result是MongoCursor游标)
$result = $queue->getQueuePush();//默认100条
/*
*判断队列信息数目是否为空
*队列存在的话,foreach循环一下。
*send_push()推送的函数 ,成功与否都返回一个状态值$status
*$status 0时推送成功
*$status不为0是,$send_status自增加1这个记录推送失败的数目
*/
if ($result->count()) {
foreach($result as $rowid=>$row){
if(in_array($row['status'], array(0,2)) && $row['fail_times']<4){
$data = $row['alert'];
$status = send_push($row['device_type'], $row['device_id'], $data);
if($status){
$queue->changePush(array('$set'=>array('status'=>'1', 'sent_time'=>$nowtime)), $rowid);
}else{
$send_status += 1;
$result = $queue->changePush(array('$set'=>array('status'=>'2','sent_time'=>$nowtime), '$inc'=>array('fail_times'=>1)), $rowid);
}
}
}
}
else {
echo 'no sms send';
}
//判断最后发送情况
if($send_status){
echo &quot;sms num=&quot;.$send_status.&quot; message send fail\n&quot;;
}elseif(!count($result)){
echo &quot;no sms message send,&quot;.date(&quot;Y-m-d H:i:s&quot;).&quot;\n&quot;;
}else{
echo &quot;sms send success\n&quot;;
}
//文件解锁
//un_lock($lock_file);

  

include.php

<?php
/*
* 包括文件:包括的如下
* 数据库配置文件——》config.php
* 自定义类库目录——》library目录
*/
//设置时区,亚洲上海
date_default_timezone_set('Asia/Shanghai');
/*
*加载config.php数据库配置文件
*__FILE__当前文件的路径
*dirname()返回路径中的目录部分
*dirname(__FILE__),返回当前文件的目录
*/
include_once(dirname(__FILE__).'/config.php');
/*
* 加载自定义类库(这里不同于Zend Framework)
* 动态设置环境变量,把自定义类库放置到环境变量中
* SCRIPT_PATH脚本路径
* set_include_path动态设置环境变量PATH_SEPARATOR路径分离器,我们的环境变量中的 ';'
* realpath()返回绝对路径注意,你必须有,否则返回空
* get_include_path获取环境变量
*/
defined('SCRIPT_PATH')||define('SCRIPT_PATH', dirname(__FILE__));
set_include_path(implode(PATH_SEPARATOR, array(
realpath(SCRIPT_PATH.'/library'),
get_include_path()
)));
//加载android推送的MQTT
require_once 'Hrs/SAM/php_sam.php';
/*
* 文件上锁,锁文件主要也就是时间锁的问题。这里半个小时1800s
* file_exists()判断文件是否存在
* file_get_contents()获取文件的内容
* 只要时间不超过1800s,文件就处于锁定状态
* 超过1800s,重新写锁文件的内容
* 也就是说1800s,运行时间,或者解锁。
*/
function lock_up($file, $time=1800)
{
if(file_exists($file)){
$content = file_get_contents($file);
if((intval(time())-intval($content)) > $time){
file_put_contents($file, time());
}else{
die('file is locked!');
}
}else{
file_put_contents($file, time());
}
}
/*
* 文件解锁,删除目录
* unlink();
*/
function un_lock($file)
{
@unlink($file);
}
/*
* 临时目录,存放脚本锁文件位置
* strncmp()比较两个字符串WIN下就存放在C:\WINDOWS\TEMP
* getenv()系统的环境变量
*/
function get_temp_dir()
{
if(strncmp(PHP_OS, 'WIN', 3)===0){
return getenv('TEMP');
}else{
return '/tmp';
}
}
/*
* 调用短信接口发送短信
* @param mobile $tel
* @param string $msg
* new Hrs_Sms_Sender对象
* addDestinationAddr() 要发送的地址,也就是手机号
* setMsgText() 要发送的内容
* send() 发送
* $status 返回的状态 0成功
*/
function send_sms($tel, $msg)
{
$sms = new Hrs_Sms_Sender();
$status = $sms->addDestinationAddr($tel)
->setMsgText($msg)
->send();
return $status;
}
/*
* 格式化邮件
* 处理附近问题
* @param string $att
*/
function attaformat($att){
/*
* 当存在附件时,无论一个附件或N个附件都要以数组的形式再进行序列化.直接对单一附件进行序列化将导致反序列化后发送邮件的错误
* example:
*   文件一=array('filename','mimeType','disposition','encoding');
* right: serialize(array('文件1'));
* error: serialize('文件1');
*/
$attArr = &quot;&quot;;
$attArr = unserialize($att);
return is_array($attArr) ? $attArr : '';
}
/*
* 发送邮件
* @param string $tomail      #收信人邮件
* @param string $toName      #收信人名称
* @param string $subject   #邮件标题
* @param string $bodyText    #邮件内容
* @param string $attachments #邮件附件
* @param string $mailfrom    #发送人邮件
* @param string $fromname    #发送人名称
* @param string $replyto   #回复邮件地址
* @param string $open_bcc    #是否打开密送
* @return boolean
*/
function send_mail($tomail,$toName,$subject,$bodyText,$attachments=NULL,$mailfrom=&quot;&quot;,$fromname=&quot;&quot;,$replyto=&quot;&quot;, $open_bcc=false){
global $config;
$bcc_mail = $config['mail']['bcc_mail'];
$attachment = attaformat($attachments);
$status = true;
try{
$email = new Hrs_Mail_Sender('UTF-8');
$email->setHeaderEncoding(Zend_Mime::ENCODING_BASE64);
if($mailfrom)
$email->setFrom($mailfrom, $fromname);
if($replyto)
$email->setReplyTo($replyto, $fromname);
//是否密抄(系统监控用)
if($open_bcc && $bcc_mail)
$email->addBcc($bcc_mail);
$email->addTo($tomail,$toName)
->setSubject(html_entity_decode($subject, 2, 'UTF-8'))
->setBodyHtml($bodyText);
if($attachment){
$attaNum = count($attachment);
for($j=0;$j<$attaNum;$j++){
if(!isset($attachment[$j]['body'])) {
echo &quot;可能有一个以上附件格式不正确被抛弃。\n&quot;;
continue;
}
$email->createAttachment(
$attachment[$j]['body'],
$attachment[$j]['mimeType'],
$attachment[$j]['disposition'],
$attachment[$j]['encoding'],
$attachment[$j]['filename']);   //默认二进制文件 base64编码传输
}
}
$email->send();
echo &quot;mail success\n\r&quot;;
}catch (Exception $e){
//邮件发送失败
echo &quot;mail fail:&quot;.$e->getMessage().&quot;\n&quot;;
$status = false;
}
unset($email);
return $status;
}
/*
* 调用推送接口推送信息
* @param $devicetype设备类型ios
* @param $deviceid iphone手机应用的deviceToken
* @param string $msg
* new Hrs_Push_Sender对象
* setDeviceID() 设置deviceToken
* setMsg() 要推送的内容
* setDeviceType()设备类型ios
* send() 发送
* $status 返回的状态 0成功
*/
function send_push($devicetype,$deviceid,$msg){
global $config;
$push = new Hrs_Push_Sender($config);
$status = $push->setDeviceID($deviceid)
->setMsg($msg)
->setDeviceType($devicetype)
->send();
return $status;
}
/*
* 自动加载类方法 使Hrs_Mongo_Config可以识别
* class_exists()类是否已定义
* interface_exists() 接口是否已定义
* str_replace() 字符串替换函数
* $filename文件名(不带后缀)
* 加载类文件 require_once();
*/
function __autoload($class)
{
if(class_exists($class, false) || interface_exists($class, false)){
return ;
}
try {
$filename = str_replace('_', '/', $class);
@require_once ($filename.'.php');
if(!class_exists($class, false) || !interface_exists($class, false)){
throw new Exception('Class ' . $class . ' not found');
}
} catch (Exception $e) {
return ;
}
}
  



config.php

<?php
/*
* 数据库配置文件
* host
* port
*/
$config['databases']['mongodb']['host'] = '172.16.26.240';
$config['databases']['mongodb']['port'] = '27088';
$config['databases']['mongodb']['database'] = 'local';
/*
* ios推送配置
* pem local_cert
* pass passphrase
* url 沙盒或者正式推送服务器
* 沙盒ssl://gateway.sandbox.push.apple.com:2195
* 正式ssl://gateway.push.apple.com:2195
* msg_url 信息地址
*/
$config['push']['ios']['pem']='ck.pem';
$config['push']['ios']['pass']='jetson';
$config['push']['ios']['url']='ssl://gateway.sandbox.push.apple.com:2195';
$config['push']['ios']['msg_url']='http://testing.portal.ataudc.com/message';
/*
* android推送设置
* broker host服务器的host地址
* broker port服务器的port端口(1883是默认的端口)
*/
$config['push']['android']['broker']['host'] = &quot;127.0.0.1&quot;;
$config['push']['android']['broker']['port'] = &quot;1883&quot;;
  


  library/Hrs/Mongo/Config.php

<?php
require_once 'Zend/Exception.php';
class Hrs_Mongo_Config
{
const VERSION = '1.7.0';
const DEFAULT_HOST = 'localhost';
const DEFAULT_PORT = 27017;
private static $host = self::DEFAULT_HOST ;
private static $port = self::DEFAULT_PORT ;
private static $options = array(
'connect' => true,
'timeout' => 30,
//'replicaSet' => '' //If this is given, the master will be determined by using the ismaster database command on the seeds
);
public static $conn = '';
public static $defaultDb = '';
public static $linkStatus = '';
public static function set($server = 'mongodb://localhost:27017', $options = array('connect' => true)) {
if(!$server){
$url = 'mongodb://'.self::$host.':'.self::$port;
}
if(is_array($server)){
if(isset($server['host'])){
self::$host = $server['host'];
}
if(isset($server['port'])){
self::$port = $server['port'];
}
if(isset($server['user']) && isset($server['pass'])){
$url = 'mongodb://'.$server['user'].':'.$server['pass'].'@'.self::$host.':'.self::$port;
}else{
$url = 'mongodb://'.self::$host.':'.self::$port;
}
}
if(is_array($options)){
foreach (self::$options as $o_k=>$o_v){
if(isset($options[$o_k]))
self::$options[$o_k] = $o_v;
}
}
try{                        
self::$conn = new Mongo($url, self::$options);
self::$linkStatus = 'success';
}catch (Exception $e){
self::$linkStatus = 'failed';
}
if(isset($server['database'])){
self::selectDB($server['database']);
}
}
public static function selectDB($database){
if($database){
try {
if(self::$linkStatus=='success')
self::$defaultDb = self::$conn->selectDB($database);
return self::$defaultDb;
}
catch(InvalidArgumentException $e) {
throw new Zend_Exception('Mongodb数据库名称不正确');
}
}else{
throw new Zend_Exception('Mongodb数据库名称不能为空');
}
}
}
  


  library/Hrs/Mongo/QueuePush.php

<?php
require_once 'Hrs/Mongo/Table.php';
class Hrs_Mongo_QueuePush extends Hrs_Mongo_Table
{
protected $_name = 'push';
protected $_row = array(
'device_type' => '',
'device_id' => '',
'user_id' => '',
'username' => '',
'message_id' => '',
'title' => '',
'alert' => '',
'status' => '0',
'fail_times' => 0,
'create_time' => 0,
'sent_time' => 0
);
public function addRow($data){
$prepareData = array();
foreach($this->_row as $key=>$val){
if(isset($data[$key])){
$prepareData[$key] = $data[$key];
}else{
$prepareData[$key] = $val;
}
}
$this->insert($prepareData);
}
public function getQueuePush($limit=100){
return $this->look(array('status'=>array('$in'=>array('0','2')),'fail_times'=>array('$lt'=>4)))->limit($limit);
}
public function changePush($data, $_id){
$result = $this->update($data, array('_id'=>new MongoId($_id)));
return $result;
}
public function deletePush($_id){
$result = $this->delete(array('_id'=>new MongoId($_id)));
}
}

  


  library/Hrs/Mongo/Table.php

<?php
require_once 'Hrs/Mongo/Config.php';
abstract class Hrs_Mongo_Table
{
protected $_db = '';
protected $_name = '';
protected $_data = array();
protected $c_options = array(
'fsync'=>true,
'safe'=>true
);
protected $u_options = array(
//'upsert'=>false,
'multiple'=>true,
'fsync'=>true,
'safe'=>true
);
/*
protected $r_options = array(
);*/
protected $d_options = array(
'fsync'=>true,
'justOne'=>false,
'safe'=>true
);
protected function _setAdapter($database=''){
if(!$database)
throw new Zend_Exception('Mongodb数据库名称不能为空');
Hrs_Mongo_Config::selectDB($database);
}
public function __construct() {
if(Hrs_Mongo_Config::$conn instanceof Mongo){
$name = $this->_name;
$defDb = Hrs_Mongo_Config::$defaultDb;
$this->_db = $defDb->$name;
}else{
throw new Zend_Exception('Mongodb服务器连接失败');
}
}
public function insert($data){
if(!$this->testLink()) return false;
$ret = $this->_db->insert($data, $this->c_options);
return $ret;
}
public function update($data, $where){
if(!$this->testLink()) return false;
return $this->_db->update($where, $data, $this->u_options);
}
public function find($where=array(),$limit=0){
if($this->testLink()) {
if($limit>0){
$this->_data = $where ? $this->_db->find($where)->limit($limit)->snapshot() : $this->_db->find()->limit($limit)->snapshot();
}else{
$this->_data = $where ? $this->_db->find($where)->limit($limit)->snapshot() : $this->_db->find()->limit($limit)->snapshot();
}
}
return $this;
}
//find cursor
/*
* 获取游标对象
*/
public function look($where=array(),$fields=array()){
if($this->testLink()) {
if($fields){
return $where ? $this->_db->find($where,$fields): $this->_db->find()->fields($fields);
}else{
return $where ? $this->_db->find($where) : $this->_db->find();
}
}
return false;
}
/*
* 获取游标对象
*/
public function getCursor(){
return $this->_data;
}
public function delete($where){
if(!$this->testLink()) return false;
return $this->_db->remove($where, $this->d_options);
}
public function dropMe(){
if(!$this->testLink()) return false;
return $this->_db->drop();
}
public function __toString(){
return $this->_data;
}
public function toArray(){
$tmpData = array();
foreach($this->_data as $id=>$row){
$one_row = array();
foreach($row as $key=>$col){
$one_row[$key] = $col;
}
$one_row['_id'] = $id;
$tmpData[] = $one_row;
}
return $tmpData;
}
protected function testLink(){
return Hrs_Mongo_Config::$linkStatus == 'success' ? true :false;
}
}
  


  
  library/Hrs/SAM/MQTT/sam_mqtt.php

<?php
/*
+----------------------------------------------------------------------+
| Copyright IBM Corporation 2006, 2007.                                    |
| All Rights Reserved.                                                 |
+----------------------------------------------------------------------+
|                                                                      |
| Licensed under the Apache License, Version 2.0 (the &quot;License&quot;); you|
| may not use this file except in compliance with the License. You may |
| obtain a copy of the License at                                    |
| http://www.apache.org/licenses/LICENSE-2.0                           |
|                                                                      |
| Unless required by applicable law or agreed to in writing, software|
| distributed under the License is distributed on an &quot;AS IS&quot; BASIS,    |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or      |
| implied. See the License for the specific language governing         |
| permissions and limitations under the License.                     |
+----------------------------------------------------------------------+
| Author: Dave Renshaw                                                 |
+----------------------------------------------------------------------+
$Id: sam_mqtt.php,v 1.1 2007/02/02 15:36:46 dsr Exp $
*/
define(&quot;SAM_MQTT_CLEANSTART&quot;, &quot;SAM_MQTT_CLEANSTART&quot;);
define(&quot;SAM_MQTT_QOS&quot;, &quot;SAM_MQTT_QOS&quot;);
define(&quot;SAM_MQTT_SUB_SEPARATOR&quot;, &quot;#-#&quot;);
/* ---------------------------------
SAMConnection
--------------------------------- */
class SAMConnection_MQTT {
var $debug = false;
var $errno = 0;
var $error = '';
/*
Info we need to keep between calls...
*/
var $sub_id = '';
var $port = '';
var $host = '';
var $cleanstart = false;
var $virtualConnected = false;
var $connected = false;
/*
Our current open socket...
*/
var $sock;
/*
Table of available operations using the MQTT protocol...
*/
var $operations = array(&quot;MQTT_CONNECT&quot;   => 1,
&quot;MQTT_CONNACK&quot;   => 2,
&quot;MQTT_PUBLISH&quot;   => 3,
&quot;MQTT_PUBACK&quot;      => 4,
&quot;MQTT_PUBREC&quot;      => 5,
&quot;MQTT_PUBREL&quot;      => 6,
&quot;MQTT_PUBCOMP&quot;   => 7,
&quot;MQTT_SUBSCRIBE&quot;   => 8,
&quot;MQTT_SUBACK&quot;      => 9,
&quot;MQTT_UNSUBSCRIBE&quot; => 10,
&quot;MQTT_UNSUBACK&quot;    => 11,
&quot;MQTT_PINGREC&quot;   => 12,
&quot;MQTT_PINGRESP&quot;    => 13,
&quot;MQTT_DISCONNECT&quot;=> 14);
/* ---------------------------------
Constructor
--------------------------------- */
function SAMConnection_MQTT() {
if ($this->debug) e('SAMConnection_MQTT()');
if ($this->debug) x('SAMConnection_MQTT()');
}
/* ---------------------------------
Commit
--------------------------------- */
function Commit() {
if ($this->debug) e('SAMConnection_MQTT.Commit()');
$errno = 100;
$error = 'Unsupported operation for MQTT protocol!';
$rc = false;
if ($this->debug) x(&quot;SAMConnection_MQTT.Commit() rc=$rc&quot;);
return $rc;
}
/* ---------------------------------
Connect
--------------------------------- */
function Connect($proto, $options=array()) {
if ($this->debug) e('SAMConnection_MQTT.Connect()');
/* Check our optional parameter array for the necessary bits...   */
if ($options == '') {
$this->port = 1883;
} else {
$this->port = $options;
}
if ($options == '') {
$this->host = 'localhost';
} else {
$this->host = $options;
}
$this->cleanstart = in_array(SAM_MQTT_CLEANSTART, $options);
if ($this->debug) t(&quot;SAMConnection_MQTT.Connect() host=$this->host, port=$this->port, cleanstart=$this->cleanstart&quot;);
if ($this->checkHost($this->host, $this->port)) {
$this->virtualConnected = true;
} else {
$this->virtualConnected = false;
}
if ($this->debug) x(&quot;SAMConnection_MQTT.Connect() rc=$this->virtualConnected&quot;);
return $this->virtualConnected;
}
/* ---------------------------------
Disconnect
--------------------------------- */
function Disconnect() {
if ($this->debug) e('SAMConnection_MQTT.Disconnect()');
$rc = false;
if ($this->virtualConnected) {
if ($this->connected) {
$msg = $this->fixed_header(&quot;MQTT_DISCONNECT&quot;).pack('C', 0);
fwrite($this->sock, $msg);
$response = fgets($this->sock, 128);
if ($this->debug) t('SAMConnection_MQTT.Disconnect() response is '.strlen($response).' bytes');
if (strlen($response) == 0) {
fclose($this->sock);
$this->sock = NULL;
}
}
$this->virtualConnected = false;
$this->connected = false;
$rc = true;
}
if ($this->debug) x(&quot;SAMConnection_MQTT.Disconnect() rc=$rc&quot;);
return $rc;
}
/* ---------------------------------
IsConnected
--------------------------------- */
function IsConnected() {
if ($this->debug) e('SAMConnection_MQTT.IsConnected()');
$rc = false;
if ($this->connected) {
$rc = true;
}
if ($this->debug) x(&quot;SAMConnection_MQTT.IsConnected() rc=$rc&quot;);
return $rc;
}
/* ---------------------------------
Peek
--------------------------------- */
function Peek() {
if ($this->debug) e('SAMConnection_MQTT.Peek()');
$errno = 100;
$error = 'Unsupported operation for MQTT protocol!';
$rc = false;
if ($this->debug) x(&quot;SAMConnection_MQTT.Peek() rc=$rc&quot;);
return $rc;
}
/* ---------------------------------
PeekAll
--------------------------------- */
function PeekAll() {
if ($this->debug) e('SAMConnection_MQTT.PeekAll()');
$errno = 100;
$error = 'Unsupported operation for MQTT protocol!';
$rc = false;
if ($this->debug) x(&quot;SAMConnection_MQTT.PeekAll() rc=$rc&quot;);
return $rc;
}
/* ---------------------------------
Receive
--------------------------------- */
function Receive($sub_id, $options=array()) {
if ($this->debug) e('SAMConnection_MQTT.Receive()');
$rc = false;
/* strip the topic from the rear of the subscription id...*/
$x = strpos($sub_id, SAM_MQTT_SUB_SEPARATOR);
if (!$x) {
$this->errno = 279;
$this->error = 'Specified subscription id ('.$sub_id.') is not valid!';
return false;
}
$topic = substr($sub_id, $x + strlen(SAM_MQTT_SUB_SEPARATOR));
$si = substr($sub_id, 0, $x);
/* Are we already connected?               */
if (!$this->connected) {
if ($this->debug) t('SAMConnection_MQTT.Receive() Not connected.');
/* No, so open up the connection...    */
$this->sub_id = $si;
$rc = $this->do_connect_now();
} else {
/* We are already connected. Are we using the right subscriber id?*/
if ($this->sub_id != $si) {
if ($this->debug) t('SAMConnection_MQTT.Receive() Connected with wrong sub_id.');
/* No, We better reconnect then...*/
$this->disconnect();
$this->sub_id = $si;
$rc = $this->do_connect_now();
} else {
if ($this->debug) t('SAMConnection_MQTT.Receive() Connected OK.');
$rc = true;
}
}
if ($rc) {
/* have we got a timeout specified?    */
if ($options > 1) {
$m = $options % 1000;
$s = ($options - $m) /1000;
if ($this->debug) t('SAMConnection_MQTT.Receive() timeout='.$options.&quot; ($s secs $m millisecs)&quot;);
stream_set_timeout($this->sock, $s, $m);
if ($this->debug) t('SAMConnection_MQTT.Receive() timeout set.');
} else {
if ($this->debug) t('SAMConnection_MQTT.Receive() no timeout value found!');
}
$hdr = $this->read_fixed_header($this->sock);
if (!$hdr) {
$this->errno = 500;
$this->error = 'Receive request failed, timed out with no data!';
$rc = false;
} else {
if ($hdr['mtype'] == $this->operations['MQTT_PUBLISH']) {
$len = $this->read_remaining_length($this->sock);
if ($len > 1) {
/* read the topic length...   */
$topic = $this->read_topic($this->sock);
if (!$topic) {
$this->errno = 303;
$this->error = 'Receive request failed, message format invalid!';
$rc = false;
} else {
if ($this->debug) t('SAMConnection_MQTT.Receive() topic='.$topic);
$len -= (strlen($topic) + 2);
/* If QoS 1 or 2 then read the message id...   */
if ($hdr['qos'] > 0) {
$idb = fread($this->sock, 2);
$len -= 2;
$fields = unpack('na', $idb);
$mid = $fields['a'];
if ($this->debug) t('SAMConnection_MQTT.Receive() mid='.$mid);
}
$payload = fread($this->sock, $len);
if ($this->debug) t('SAMConnection_MQTT.Receive() payload='.$payload);
$rc = new SAMMessage();
$rc->body = $payload;
$rc->header->SAM_MQTT_TOPIC = 'topic://'.$topic;
$rc->header->SAM_MQTT_QOS = $hdr['qos'];
$rc->header->SAM_TYPE = 'SAM_BYTES';
}
} else {
$this->errno = 303;
$this->error = 'Receive request failed, received message too short! No topic data';
$rc = false;
}
} else {
if ($this->debug) t('SAMConnection_MQTT.Receive() Receive failed response mtype = '.$mtype);
$rc = false;
}
}
}
if ($this->debug) x(&quot;SAMConnection_MQTT.Receive() rc=$rc&quot;);
return $rc;
}
/* ---------------------------------
Remove
--------------------------------- */
function Remove() {
if ($this->debug) e('SAMConnection_MQTT.Remove()');
$errno = 100;
$error = 'Unsupported operation for MQTT protocol!';
$rc = false;
if ($this->debug) x(&quot;SAMConnection_MQTT.Remove() rc=$rc&quot;);
return $rc;
}
/* ---------------------------------
Rollback
--------------------------------- */
function Rollback() {
if ($this->debug) e('SAMConnection_MQTT.Rollback()');
$errno = 100;
$error = 'Unsupported operation for MQTT protocol!';
$rc = false;
if ($this->debug) x(&quot;SAMConnection_MQTT.Rollback() rc=$rc&quot;);
return $rc;
}
/* ---------------------------------
Send
--------------------------------- */
function Send($topic, $message, $options=array()) {
if ($this->debug) e('SAMConnection_MQTT.Send()');
$rc = true;
/* check the format of the topic...   */
if (strncmp($topic, 'topic://', 8) == 0) {
$t = substr($topic, 8);
} else {
$this->errno = 279;
$this->error = 'Specified target ('.$topic.') is not a valid topic!';
return false;
}
if (in_array(SAM_MQTT_QOS, $options)) {
$qos = $options;
} else {
$qos = 0;
}
/* Are we already connected?               */
if (!$this->connected) {
/* No, so open up the connection...    */
$this->do_connect_now();
}
$mid = rand();
$variable = $this->utf($t);
if ($qos > 0) {
$variable .= pack('n', $mid);
}
$payload = $message->body;
// add in the remaining length field and fix it together
$msg = $this->fixed_header(&quot;MQTT_PUBLISH&quot;, 0, $qos) . $this->remaining_length(strlen($variable)+strlen($payload)) . $variable . $payload;
fwrite($this->sock, $msg);
if ($qos > 0) {
$hdr = $this->read_fixed_header($this->sock);
if ($hdr) {
/* is this a QoS level 1 message being sent?      */
if ($qos == 1) {
/* Yup, so we should get a PUBACK response message...    */
if ($hdr['mtype'] == $this->operations['MQTT_PUBACK']) {
$len = $this->read_remaining_length($this->sock);
if ($len > 0) {
$response = fread($this->sock, $len);
}
if ($len < 2) {
if ($this->debug) t(&quot;SAMConnection_MQTT.Send() send failed, incorrect length response ($len) received!&quot;);
$this->errno = 302;
$this->error = 'Send request failed!';
$rc = false;
} else {
$rc = true;
}
} else {
if ($this->debug) t('SAMConnection_MQTT.Send() Send failed response mtype = '.$mtype.' Expected PUBREC!');
$rc = false;
}
} else {
/* lets assume it's QoS level 2...               */
/* We should get a PUBREC response message...    */
if ($hdr['mtype'] == $this->operations['MQTT_PUBREC']) {
$len = $this->read_remaining_length($this->sock);
if ($len > 0) {
$response = fread($this->sock, $len);
}
if ($len < 2) {
if ($this->debug) t(&quot;SAMConnection_MQTT.Send() send failed, incorrect length response ($len) received!&quot;);
$this->errno = 302;
$this->error = 'Send request failed!';
$rc = false;
} else {
$rc = true;
/* Now we can send a PUBREL message...       */
$variable = pack('n', $mid);
$msg = $this->fixed_header(&quot;MQTT_PUBREL&quot;).$this->remaining_length(strlen($variable)).$variable;
fwrite($this->sock, $msg);
/* get a response...                         */
$hdr = $this->read_fixed_header($this->sock);
if ($hdr['mtype'] == $this->operations['MQTT_PUBCOMP']) {
$len = $this->read_remaining_length($this->sock);
if ($len > 0) {
$response = fread($this->sock, $len);
}
if ($len < 2) {
if ($this->debug) t(&quot;SAMConnection_MQTT.Send() send failed, incorrect length response ($len) received!&quot;);
$this->errno = 302;
$this->error = 'Send request failed!';
$rc = false;
} else {
$rc = true;
}
} else {
if ($this->debug) t('SAMConnection_MQTT.Send() Send failed response mtype = '.$mtype.' Expected PUBCOMP!');
$rc = false;
}
}
} else {
if ($this->debug) t('SAMConnection_MQTT.Send() Send failed response mtype = '.$mtype);
$rc = false;
}
}
}
}
if ($this->debug) x(&quot;SAMConnection_MQTT.Send() rc=$rc&quot;);
return $rc;
}
/* ---------------------------------
SetDebug
--------------------------------- */
function SetDebug($option=false) {
$this->debug = $option;
return;
}
/* ---------------------------------
Subscribe
--------------------------------- */
function Subscribe($topic, $options=array()) {
if ($this->debug) e(&quot;SAMConnection_MQTT.Subscribe($topic)&quot;);
$rc = true;
/* check the format of the topic...   */
if (strncmp($topic, 'topic://', 8) == 0) {
$t = substr($topic, 8);
} else {
$this->errno = 279;
$this->error = 'Specified target ('.$topic.') is not a valid topic!';
return false;
}
if (in_array(SAM_MQTT_QOS, $options)) {
$qos = $options;
} else {
$qos = 0;
}
/* Are we already connected?               */
if (!$this->connected) {
/* No, so open up the connection...    */
if (!$this->do_connect_now()) {
return false;
}
}
// variable header: message id (16 bits)
$x = rand(1, 16000);
$variable = pack('n', $x);
// payload: client ID
$payload = $this->utf($t).pack('C', $qos);
// add in the remaining length field and fix it together
$msg = $this->fixed_header(&quot;MQTT_SUBSCRIBE&quot;, 0, 1) . $this->remaining_length(strlen($variable)+strlen($payload)) . $variable . $payload;
fwrite($this->sock, $msg);
$hdr = $this->read_fixed_header($this->sock);
if (!$hdr) {
if ($this->debug) t(&quot;SAMConnection_MQTT.Subscribe() subscribe failed, no response from broker!&quot;);
$this->errno = 301;
$this->error = 'Subscribe request failed, no response from broker!';
$rc = false;
} else {
if ($hdr['mtype'] == $this->operations['MQTT_SUBACK']) {
$len = $this->read_remaining_length($this->sock);
if ($len > 0) {
$response = fread($this->sock, $len);
/* Return the subscription id with the topic appended to it so we can unsubscribe easily... */
$rc = $this->sub_id.SAM_MQTT_SUB_SEPARATOR.$t;
}
if ($len < 3) {
if ($this->debug) t(&quot;SAMConnection_MQTT.Subscribe() subscribe failed, incorrect length response ($len) received!&quot;);
$this->errno = 301;
$this->error = 'Subscribe request failed, incorrect length response ($len) received!';
$rc = false;
}
} else {
if ($this->debug) t('SAMConnection_MQTT.Subscribe() subscribe failed response mtype = '.$mtype);
$rc = false;
}
}
if ($this->debug) x(&quot;SAMConnection_MQTT.Subscribe() rc=$rc&quot;);
return $rc;
}
/* ---------------------------------
Unsubscribe
--------------------------------- */
function Unsubscribe($sub_id) {
if ($this->debug) e(&quot;SAMConnection_MQTT.Unsubscribe($sub_id)&quot;);
/* Detach the topic from the rear of the subscription id...   */
$x = strpos($sub_id, SAM_MQTT_SUB_SEPARATOR);
if (!$x) {
$this->errno = 279;
$this->error = 'Specified subscription id ('.$sub_id.') is not valid!';
return false;
}
$topic = substr($sub_id, $x + strlen(SAM_MQTT_SUB_SEPARATOR));
$si = substr($sub_id, 0, $x);

/* Are we already connected?               */
if (!$this->connected) {
if ($this->debug) t('SAMConnection_MQTT.Unsubscribe() Not connected.');
/* No, so open up the connection...    */
$this->sub_id = $si;
$rc = $this->do_connect_now();
} else {
/* We are already connected. Are we using the right subscriber id?*/
if ($this->sub_id != $si) {
if ($this->debug) t('SAMConnection_MQTT.Unsubscribe() Connected with wrong sub_id.');
/* No, We better reconnect then...*/
$this->disconnect();
$this->sub_id = $si;
$rc = $this->do_connect_now();
} else {
if ($this->debug) t('SAMConnection_MQTT.Unsubscribe() Connected OK.');
$rc = true;
}
}
/* variable header: message id (16 bits)*/
$x = rand(1, 16000);
$variable = pack('n', $x);
/* payload: client ID    */
$payload = $this->utf($topic);
/* add in the remaining length field and fix it together   */
$msg = $this->fixed_header(&quot;MQTT_UNSUBSCRIBE&quot;, 0, 1) . $this->remaining_length(strlen($variable)+strlen($payload)) . $variable . $payload;
fwrite($this->sock, $msg);
$hdr = $this->read_fixed_header($this->sock);
if (!$hdr) {
if ($this->debug) t(&quot;SAMConnection_MQTT.Unsubscribe() unsubscribe failed, no response from broker!&quot;);
$this->errno = 302;
$this->error = 'Unsubscribe request failed, no response from broker!';
$rc = false;
} else {
if ($hdr['mtype'] == $this->operations['MQTT_UNSUBACK']) {
$len = $this->read_remaining_length($this->sock);
if ($len > 0) {
$response = fread($this->sock, $len);
$rc = $this->sub_id;
}
if ($len != 2) {
if ($this->debug) t(&quot;SAMConnection_MQTT.Unsubscribe() unsubscribe failed, incorrect length response ($len) received!&quot;);
$this->errno = 301;
$this->error = &quot;Unsubscribe request failed, incorrect length response ($len) received!&quot;;
$rc = false;
}
} else {
if ($this->debug) t('SAMConnection_MQTT.Unsubscribe() unsubscribe failed response mtype = '.$hdr['mtype']);
$rc = false;
}
}
if ($this->debug) x(&quot;SAMConnection_MQTT.Unsubscribe() rc=$rc&quot;);
return $rc;
}

function remaining_length($l) {
/* return the remaining length field bytes for an integer input parameter   */
if ($this->debug) t(&quot;SAMConnection_MQTT.remaining_length() l=$l&quot;);
$rlf = '';
do {
$digit = $l % 128;
$l = ($l - $digit)/128;
if ($this->debug) t(&quot;SAMConnection_MQTT.remaining_length() digit=$digit l=$l&quot;);
# if there are more digits to encode, set the top bit of this digit
if ( $l > 0 ) {
$digit += 128;
}
$digit = pack('C', $digit);
$rlf .= $digit;
if ($this->debug) t(&quot;SAMConnection_MQTT.remaining_length() rlf=$rlf&quot;);
} while ($l > 0);
return $rlf;
}
function utf($s) {
/* return the UTF-8 encoded version of the parameter    */
$l = strlen($s);
$b1 = pack('C', $l/256);
$b2 = pack('C', $l%256);
$rc = $b1.$b2.$s;
return $rc;
}
function fixed_header($operation, $dup=0, $qos=0, $retain=0) {
/* fixed header: msg type (4) dup (1) qos (2) retain (1)   */
return pack('C', ($this->operations[$operation] * 16) + ($dup * 4) + ($qos * 2) + $retain);
}
function checkHost($hostname, $port) {
if ($this->debug) e(&quot;SAMConnection_MQTT.checkHost($hostname)&quot;);
$rc = false;
$fp = fsockopen($hostname, $port);
if (!$fp) {
$rc = false;
} else {
$this->sock = $fp;
$rc = true;
}
if ($this->debug) x(&quot;SAMConnection_MQTT.checkHost(rc=$rc)&quot;);
return $rc;
}
function do_connect_now() {
$rc = true;
/* Do we have a client/subscriber id yet?       */
if ($this->sub_id == '') {
/* No, so create a unique one...            */
$this->sub_id = uniqid('', true);
if ($this->debug) t(&quot;SAMConnection_MQTT.do_connect_now() sub_id=$this->sub_id&quot;);
} else {
if ($this->debug) t(&quot;SAMConnection_MQTT.do_connect_now() using existing sub_id=$this->sub_id&quot;);
}
if ($this->cleanstart) {
$x = &quot;\x03&quot;;
} else {
$x = &quot;\x00&quot;;
}
$variable = $this->utf('MQIsdp').&quot;\x03$x\x00\x00&quot;;
/* payload is subscriber id               */
$payload = $this->utf($this->sub_id);
/* add in the remaining length field and fix it together   */
$msg = $this->fixed_header(&quot;MQTT_CONNECT&quot;) . $this->remaining_length(strlen($variable)+strlen($payload)) . $variable . $payload;
$errno = 0;
$errstr = '';
if (!$this->virtualConnected) {
$fp = fsockopen($this->host, $this->port, $errno, $errstr);
if (!$fp) {
if ($this->debug) t(&quot;SAMConnection_MQTT.do_connect_now() fsockopen failed! ($errno) $errstr&quot;);
$this->errno = 208;
$this->error = 'Unable to open socket to broker!';
$this->sock = NULL;
return false;
} else {
$this->virtualConnected = true;
$this->sock = $fp;
}
}
stream_set_timeout($this->sock, 10);
fwrite($this->sock, $msg);
$hdr = $this->read_fixed_header($this->sock);
if ($hdr) {
if ($hdr['mtype'] == $this->operations['MQTT_CONNACK']) {
$len = $this->read_remaining_length($this->sock);
if ($len < 2) {
if ($this->debug) t(&quot;SAMConnection_MQTT.do_connect_now() connect failed, incorrect length response ($len) received!&quot;);
$this->errno = 218;
$this->error = 'Unable to open connection to broker!';
$rc = false;
} else {
$response = fread($this->sock, $len);
$fields = unpack('Ccomp/Cretcode', $response);
if ($fields['retcode'] == 0) {
$rc = $this->sock;
$this->connected = true;
$rc = true;
if ($this->debug) t('SAMConnection_MQTT.do_connect_now() connected OK');
} else {
if ($this->debug) t('SAMConnection_MQTT.do_connect_now() connect failed retcode = '.$fields['retcode']);
$rc = false;
if ($fields['retcode'] == 2) {
$this->sub_id = '';
$this->errno = 279;
$this->error = 'Invalid subscription id!';
}
}
}
} else {
if ($this->debug) t('SAMConnection_MQTT.do_connect_now() connect failed response mtype = '.$mtype);
$rc = false;
}
}
if (!$rc) {
fclose($this->sock);
$this->sock = NULL;
$this->virtualConnected = false;
}
return $rc;
}
function read_fixed_header($conn) {
$rc = false;
$response = fread($conn, 1);
if (strlen($response) > 0) {
$fields = unpack('Cbyte1', $response);
$x = $fields['byte1'];
$ret = $x % 2;
$x -= $ret;
$qos = ($x % 8) / 2;
$x -= ($qos * 2);
$dup = ($x % 16) / 8;
$x -= ($dup * 8);
$mtype = $x / 16;
if ($this->debug) t(&quot;SAMConnection_MQTT.read_fixed_header() mtype=$mtype, dup=$dup, qos=$qos, retain=$ret&quot;);
$rc = array('mtype' => $mtype, 'dup' => $dup, 'qos' => $qos, 'retain' => $ret);
}
return $rc;
}
function read_remaining_length($conn) {
$rc = 0;
$m = 1;
while (!feof($conn)) {
$byte = fgetc($conn);
$fields = unpack('Ca', $byte);
$x = $fields['a'];
if ($this->debug) t('SAMConnection_MQTT.read_remaining_length() byte ('.strlen($byte).') = '.$x);
if ($x < 128) {
$rc += $x * $m;
break;
} else {
$rc += (($x - 128) * $m);
}
$m *= 128;
}
if ($this->debug) t('SAMConnection_MQTT.read_remaining_length() remaining length = '.$rc);
return $rc;
}
function read_topic($conn) {
if ($this->debug) e('SAMConnection_MQTT.read_topic()');
$rc = false;
while (!feof($conn)) {
$tlen = fread($conn, 2);
$fields = unpack('na', $tlen);
if ($this->debug) t('SAMConnection_MQTT.read_topic() topic length='.$fields['a']);
$rc = fread($conn, $fields['a']);
break;
}
if ($this->debug) x(&quot;SAMConnection_MQTT.read_topic(rc=$rc)&quot;);
return $rc;
}
}
?>

  


  library/Hrs/SAM/php_sam.php

<?php
/*
+----------------------------------------------------------------------+
| Copyright IBM Corporation 2006, 2007.                                    |
| All Rights Reserved.                                                 |
+----------------------------------------------------------------------+
|                                                                      |
| Licensed under the Apache License, Version 2.0 (the &quot;License&quot;); you|
| may not use this file except in compliance with the License. You may |
| obtain a copy of the License at                                    |
| http://www.apache.org/licenses/LICENSE-2.0                           |
|                                                                      |
| Unless required by applicable law or agreed to in writing, software|
| distributed under the License is distributed on an &quot;AS IS&quot; BASIS,    |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or      |
| implied. See the License for the specific language governing         |
| permissions and limitations under the License.                     |
+----------------------------------------------------------------------+
| Author: Dave Renshaw                                                 |
+----------------------------------------------------------------------+
$Id: php_sam.php,v 1.1 2007/02/02 15:38:53 dsr Exp $
*/
/* Debugging flags and functions available to sub packages...   */
$eol = &quot;\n&quot;;
if (isset($_SERVER['REQUEST_URI'])) {
$eol = '<br>';
}
function e($s) {global $eol;echo '-->'.$s.&quot;$eol&quot;;}
function t($s) {global $eol;echo '   '.$s.&quot;$eol&quot;;}
function x($s) {global $eol;echo '<--'.$s.&quot;$eol&quot;;}
define('SAM_MQTT', 'mqtt');
/* ---------------------------------
SAMConnection
--------------------------------- */
class SAMConnection {
//var $debug = true;
var $debug = false;
var $errno = 0;
var $error = '';
var $connection;
/* ---------------------------------
Create
--------------------------------- */
function Create($proto) {
if ($this->debug) e(&quot;SAMConnection.Create(proto=$proto)&quot;);
$rc = false;
/* search the PHP config for a factory to use...    */
$x = get_cfg_var('sam.factory.'.$proto);
if ($this->debug) t('SAMConnection.Create() get_cfg_var() &quot;'.$x.'&quot;');
/* If there is no configuration (php.ini) entry for this protocol, default it.*/
if (strlen($x) == 0) {
/* for every protocol other than MQTT assume we will use XMS    */
if ($proto != 'mqtt') {
$x = 'xms';
} else {
$x = 'mqtt';
}
}
/* Invoke the chosen factory to create a real connection object...   */
$x = 'sam_factory_'.$x.'.php';
if ($this->debug) t(&quot;SAMConnection.Create() calling factory - $x&quot;);
$rc = include $x;
if ($this->debug && $rc) t('SAMConnection.Create() rc = '.get_class($rc));
if ($this->debug) x('SAMConnection.Create()');
return $rc;
}
/* ---------------------------------
Constructor
--------------------------------- */
function SAMConnection() {
if ($this->debug) e('SAMConnection()');
if ($this->debug) x('SAMConnection()');
}
/* ---------------------------------
Commit
--------------------------------- */
function Commit() {
if ($this->debug) e('SAMConnection.Commit()');
$rc = true;
if (!$this->connection) {
$errno = 106;
$error = 'No active connection!';
$rc = false;
} else {
/* Call the method on the underlying connection object...   */
$rc = $this->connection->commit($target, $options);
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t(&quot;SAMConnection.Commit() commit failed ($this->errno) $this->error&quot;);
$rc = false;
}
}
if ($this->debug) x(&quot;SAMConnection.Commit() rc=$rc&quot;);
return $rc;
}
/* ---------------------------------
Connect
--------------------------------- */
function Connect($proto='', $options=array()) {
if ($this->debug) e('SAMConnection.Connect()');
$rc = false;
if ($proto == '') {
$errno = 101;
$error = 'Incorrect number of parameters on connect call!';
$rc = false;
} else {
$this->connection = $this->create($proto);
if (!$this->connection) {
$errno = 102;
$error = 'Unsupported protocol!';
$rc = false;
} else {
if ($this->debug) t(&quot;SAMConnection.Connect() connection created for protocol $proto&quot;);
$this->connection->setdebug($this->debug);
/* Call the connect method on the newly created connection object...   */
$rc = $this->connection->connect($proto, $options);
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t(&quot;SAMConnection.Connect() connect failed ($this->errno) $this->error&quot;);
} else {
$rc = true;
}
}
}
if ($this->debug) x(&quot;SAMConnection.Connect() rc=$rc&quot;);
return $rc;
}
/* ---------------------------------
Disconnect
--------------------------------- */
function Disconnect() {
if ($this->debug) e('SAMConnection.Disconnect()');
$rc = true;
if (!$this->connection) {
$errno = 106;
$error = 'No active connection!';
$rc = false;
} else {
/* Call the method on the underlying connection object...   */
$rc = $this->connection->Disconnect();
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t(&quot;SAMConnection.Disconnect() Disconnect failed ($this->errno) $this->error&quot;);
} else {
$rc = true;
$this->connection = false;
}
}
if ($this->debug) x(&quot;SAMConnection.Disconnect() rc=$rc&quot;);
return $rc;
}
/* ---------------------------------
IsConnected
--------------------------------- */
function IsConnected() {
if ($this->debug) e('SAMConnection.IsConnected()');
$rc = true;
if (!$this->connection) {
$errno = 106;
$error = 'No active connection!';
$rc = false;
} else {
/* Call the method on the underlying connection object...   */
$rc = $this->connection->isconnected();
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t(&quot;SAMConnection.IsConnected() isconnected failed ($this->errno) $this->error&quot;);
$rc = false;
}
}
if ($this->debug) x(&quot;SAMConnection.IsConnected() rc=$rc&quot;);
return $rc;
}
/* ---------------------------------
Peek
--------------------------------- */
function Peek($target, $options=array()) {
if ($this->debug) e('SAMConnection.Peek()');
$rc = true;
if (!$this->connection) {
$errno = 106;
$error = 'No active connection!';
$rc = false;
} else {
/* Call the method on the underlying connection object...   */
$rc = $this->connection->peek($target, $options);
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t(&quot;SAMConnection.Peek() peek failed ($this->errno) $this->error&quot;);
$rc = false;
}
}
if ($this->debug) x(&quot;SAMConnection.Peek() rc=$rc&quot;);
return $rc;
}
/* ---------------------------------
PeekAll
--------------------------------- */
function PeekAll($target, $options=array()) {
if ($this->debug) e('SAMConnection.PeekAll()');
$rc = true;
if (!$this->connection) {
$errno = 106;
$error = 'No active connection!';
$rc = false;
} else {
/* Call the method on the underlying connection object...   */
$rc = $this->connection->peekall($target, $options);
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t(&quot;SAMConnection.PeekAll() peekall failed ($this->errno) $this->error&quot;);
$rc = false;
}
}
if ($this->debug) x(&quot;SAMConnection.PeekAll() rc=$rc&quot;);
return $rc;
}
/* ---------------------------------
Receive
--------------------------------- */
function Receive($target, $options=array()) {
if ($this->debug) e('SAMConnection.Receive()');
$rc = true;
if (!$this->connection) {
$errno = 106;
$error = 'No active connection!';
$rc = false;
} else {
/* Call the receive method on the underlying connection object...   */
$rc = $this->connection->receive($target, $options);
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t(&quot;SAMConnection.Receive() receive failed ($this->errno) $this->error&quot;);
}
}
if ($this->debug) x(&quot;SAMConnection.Receive() rc=$rc&quot;);
return $rc;
}
/* ---------------------------------
Remove
--------------------------------- */
function Remove($target, $options=array()) {
if ($this->debug) e('SAMConnection.Remove()');
$rc = true;
if (!$this->connection) {
$errno = 106;
$error = 'No active connection!';
$rc = false;
} else {
/* Call the method on the underlying connection object...   */
$rc = $this->connection->remove($target, $options);
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t(&quot;SAMConnection.Remove() remove failed ($this->errno) $this->error&quot;);
$rc = false;
}
}
if ($this->debug) x(&quot;SAMConnection.Remove() rc=$rc&quot;);
return $rc;
}
/* ---------------------------------
Rollback
--------------------------------- */
function Rollback() {
if ($this->debug) e('SAMConnection.Rollback()');
$rc = true;
if (!$this->connection) {
$errno = 106;
$error = 'No active connection!';
$rc = false;
} else {
/* Call the method on the underlying connection object...   */
$rc = $this->connection->rollback($target, $options);
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t(&quot;SAMConnection.Rollback() rollback failed ($this->errno) $this->error&quot;);
$rc = false;
}
}
if ($this->debug) x(&quot;SAMConnection.Rollback() rc=$rc&quot;);
return $rc;
}
/* ---------------------------------
Send
--------------------------------- */
function Send($target, $msg, $options=array()) {
if ($this->debug) e('SAMConnection.Send()');
$rc = true;
if (!$this->connection) {
$errno = 106;
$error = 'No active connection!';
$rc = false;
} else {
/* Call the send method on the underlying connection object...   */
$rc = $this->connection->send($target, $msg, $options);
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t(&quot;SAMConnection.Send() send failed ($this->errno) $this->error&quot;);
$rc = false;
}
}
if ($this->debug) x(&quot;SAMConnection.Send() rc=$rc&quot;);
return $rc;
}
/* ---------------------------------
SetDebug
--------------------------------- */
function SetDebug($option=false) {
if ($this->debug) e(&quot;SAMConnection.SetDebug($option)&quot;);
$this->debug = $option;
if ($this->connection) {
$this->connection->setdebug($option);
}
if ($this->debug) x('SAMConnection.SetDebug()');
return;
}
/* ---------------------------------
Subscribe
--------------------------------- */
function Subscribe($topic, $options=array()) {
if ($this->debug) e(&quot;SAMConnection.Subscribe($topic)&quot;);
$rc = true;
if (!$this->connection) {
$errno = 106;
$error = 'No active connection!';
$rc = false;
} else {
/* Call the subscribe method on the underlying connection object...   */
$rc = $this->connection->subscribe($topic, $options);
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t(&quot;SAMConnection.Subscribe() subscribe failed ($this->errno) $this->error&quot;);
$rc = false;
}
}
if ($this->debug) x(&quot;SAMConnection.Subscribe() rc=$rc&quot;);
return $rc;
}
/* ---------------------------------
Unsubscribe
--------------------------------- */
function Unsubscribe($sub_id) {
if ($this->debug) e(&quot;SAMConnection.Unsubscribe($sub_id)&quot;);
$rc = true;
if (!$this->connection) {
$errno = 106;
$error = 'No active connection!';
$rc = false;
} else {
/* Call the subscribe method on the underlying connection object...   */
$rc = $this->connection->unsubscribe($sub_id);
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t(&quot;SAMConnection.Unsubscribe() unsubscribe failed ($this->errno) $this->error&quot;);
$rc = false;
}
}
if ($this->debug) x(&quot;SAMConnection.Unsubscribe() rc=$rc&quot;);
return $rc;
}
}
/* ---------------------------------
SAMMessage
--------------------------------- */
class SAMMessage {
/* ---------------------------------
Constructor
--------------------------------- */
function SAMMessage($body='') {
if ($body != '') {
$this->body = $body;
}
}
}
?>

  


  library/Hrs/SAM/sam_factory_mqtt.php

<?php
/*
+----------------------------------------------------------------------+
| Copyright IBM Corporation 2006, 2007.                                    |
| All Rights Reserved.                                                 |
+----------------------------------------------------------------------+
|                                                                      |
| Licensed under the Apache License, Version 2.0 (the &quot;License&quot;); you|
| may not use this file except in compliance with the License. You may |
| obtain a copy of the License at                                    |
| http://www.apache.org/licenses/LICENSE-2.0                           |
|                                                                      |
| Unless required by applicable law or agreed to in writing, software|
| distributed under the License is distributed on an &quot;AS IS&quot; BASIS,    |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or      |
| implied. See the License for the specific language governing         |
| permissions and limitations under the License.                     |
+----------------------------------------------------------------------+
| Author: Dave Renshaw                                                 |
+----------------------------------------------------------------------+
$Id: sam_factory_mqtt.php,v 1.1 2007/02/02 15:40:41 dsr Exp $
*/
require_once('MQTT/sam_mqtt.php');
return new SAMConnection_MQTT();
?>
  


  library/Hrs/SAM/sam_factory_xms.php

<?php
/*
+----------------------------------------------------------------------+
| Copyright IBM Corporation 2006, 2007.                                    |
| All Rights Reserved.                                                 |
+----------------------------------------------------------------------+
|                                                                      |
| Licensed under the Apache License, Version 2.0 (the &quot;License&quot;); you|
| may not use this file except in compliance with the License. You may |
| obtain a copy of the License at                                    |
| http://www.apache.org/licenses/LICENSE-2.0                           |
|                                                                      |
| Unless required by applicable law or agreed to in writing, software|
| distributed under the License is distributed on an &quot;AS IS&quot; BASIS,    |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or      |
| implied. See the License for the specific language governing         |
| permissions and limitations under the License.                     |
+----------------------------------------------------------------------+
| Author: Dave Renshaw                                                 |
+----------------------------------------------------------------------+
$Id: sam_factory_xms.php,v 1.1 2007/02/02 15:40:00 dsr Exp $
*/
if (!class_exists('SAMXMSConnection')) {
global $eol;
$l = (strstr(PHP_OS, 'WIN') ? 'php_' : '').'sam_xms.'.(strstr(PHP_OS, 'WIN') ? 'dll' : 'so');
echo $eol.'<font color=&quot;red&quot;><b>Unable to access SAM XMS capabilities. Ensure the '.$l.' library is defined as an extension.</b></font>'.$eol;
return false;
} else {
return new SAMXMSConnection();
}
?>
  


  
  
  library/Hrs/Mail/Sender.php

<?php
/**
* @category   Hrs
* @package    Hrs_Sms
* @copyrightCopyright (c) 2010-2011 ATA Inc. (http://www.ata.net.cn)
* @description push client class
*/
/*
* $_device_type设备类型ios
* $_device_id设备的deviceToken唯一
* $_message要推送的信息
* $_apns_url苹果推送服务器地址(分沙盒和正式)ssl://gateway.sandbox.push.apple.com:2195沙盒; ssl://gateway.push.apple.com:2195正式
* $_apns_pass苹果ck.pem通关密码
* $_apns_pem苹果ck.pem
* $_apns_badge
* $_apns_sound
* $_apns_msg_url
* $_android_broker android推送服务器设置
* $err错误码
* $errstr错误信息
*/
class Hrs_Push_Sender {
private $_device_type;
private $_device_id;
private $_message;
private $_apns_url = &quot;ssl://gateway.sandbox.push.apple.com:2195&quot;;
private $_apns_pem = &quot;ck.pem&quot;;
private $_apns_pass = &quot;jetson&quot;;   
private $_apns_badge=1;
private $_apns_sound='bingbong.aiff';
private $_apns_msg_url='';
private $_android_broker='';//android推送服务器设置
public $err;
public $errstr;
public function __construct($config) {
$this->_apns_pem=$config['push']['ios']['pem'];
$this->_apns_pass=$config['push']['ios']['pass'];
$this->_apns_url=$config['push']['ios']['url'];
$this->_apns_msg_url=$config['push']['ios']['msg_url'];
$this->_android_broker=$config['push']['android']['broker'];
}
public function setDeviceID($deviceid){
$this->_device_id=$deviceid;
return $this;
}

public function setDeviceType($devicetype){
$this->_device_type=$devicetype;
return $this;
}

public function setMsg($msg){
$this->_message=$msg;
return $this;
}
public function setBadge($badge){
$this->_apns_badge=$badge;
return $this;
}
public function setSound($sound){
$this->_apns_sound=$sound;
return $this;
}
public function send() {
$fnc='send_'.$this->_device_type;
if(!method_exists($this,$fnc)){ //确保方法存在
return false;
}
return $this->$fnc();
}
public function send_ios() {
$body = array();
$body['aps'] = array('alert' => $this->_message);
if ($this->_apns_badge)
$body['aps']['badge'] = $this->_apns_badge;
if ($this->_apns_sound)
$body['aps']['sound'] = $this->_apns_sound;
$payload = json_encode($body);
$ctx = stream_context_create();
stream_context_set_option($ctx, 'ssl', 'local_cert', SCRIPT_PATH.'/'.$this->_apns_pem);
stream_context_set_option($ctx, 'ssl', 'passphrase', $this->_apns_pass);
$fp = stream_socket_client($this->_apns_url, $this->err, $this->errstr, 60, STREAM_CLIENT_CONNECT, $ctx);
if (!$fp) {
return false;
}
$msg = chr(0) . pack(&quot;n&quot;, 32) . pack('H*', str_replace(' ', '', $this->_device_id)) . pack(&quot;n&quot;, strlen($payload)) . $payload;
$fwrite=fwrite($fp, $msg);
fclose($fp);
if(!$fwrite) {
return false;
}
return true;
}
public function send_android(){
$conn = new SAMConnection();
$conn->connect(SAM_MQTT, array(SAM_HOST => $this->_android_broker['host'], SAM_PORT => $this->_android_broker['port']));
$msgCpu = new SAMMessage(json_encode($this->_message));
$conn->send('topic://xxx/'.$this->_device_id, $msgCpu);
$conn->disconnect();   
return true;
}
}
  
页: [1]
查看完整版本: php脚本(功能ios/android推送;技术mongodb队列,MQTT,锁文件。linux运行)