php进程通信-消息队列

举报
仙士可 发表于 2023/06/14 11:40:37 2023/06/14
【摘要】 php多进程通信,有各种各样的方法(进程信号,消息队列,管道,共享内存,socket等等)本文主要讲php利用linux 消息队列的通信方法注意:多进程系列文章,都建立在linux环境,php-cli运行模式下一:消息队列通信介绍消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法。  每个数据块都被认为含有一个类型,接收进程可以独立地接收含有不同类型的数据结构。我们可以通过发送消息...

php多进程通信,有各种各样的方法(进程信号,消息队列,管道,共享内存,socket等等)

本文主要讲php利用linux 消息队列的通信方法

注意:多进程系列文章,都建立在linux环境,php-cli运行模式下

一:消息队列通信介绍

消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法。  每个数据块都被认为含有一个类型,接收进程可以独立地接收含有不同类型的数据结构。我们可以通过发送消息来避免命名管道的同步和阻塞问题。但是消息队列与命名管道一样,每个数据块都有一个最大长度的限制。

消息队列的最佳定义是:内核地址空间中的内部链表。消息可以顺序地发送到队列中,并以几种不同的方式从队列中获取。当然,每个消息队列都是由 IPC标识符所唯一标识的。

二:php消息队列扩展

php如果要使用linux的消息队列,需要安装sysvmsg扩展,官方文档地址:http://php.net/manual/zh/book.sem.php

三:php使用消息队列

1:获取一个IPC标识符ftok();

ftok,可将项目路径与文件标识转换成一个IPC标识符,该标识符可用于创建消息队列

2:获取/创建一个消息队列msg_get_queue()

使用linux命令ipcs -q 可查看系统当前的消息队列数

3:插入数据到队列msg_send()

msg_send (

 resource $queue(消息队列资源句柄) , 

int $msgtype(插入数据的类型,用来标识该队列自己的消息类型,自己自定义,必须大于0) , 

mixed $message(插入的数据,可以为数组,下一个参数可以序列化数据)

 [, bool $serialize = TRUE(是否序列化数据,默认是)

 [, bool $blocking = TRUE (如果消息太大而无法放入队列(linux消息队列限制),则脚本将等待另一个进程从队列中读取消息,并释放足够的空间以发送消息。这被称为阻塞; 您可以通过设置可选blocking参数来防止阻塞FALSE,在这种情况下,如果消息对于队列来说太大,msg_send()将立即返回,并将可选参数FALSE设置 errorcode为MSG_EAGAIN,表示您应稍后尝试再次发送消息。)

[, int &$errorcode ]]] (错误标识));

插入成功之后,ipcs可看到message多了一条:

4:取出一条数据msg_receive

msg_receive ( 

resource $queue , (消息队列资源句柄)

int $desiredmsgtype (要取出的消息队列类型,如果为0,则不筛选类型,直接返回最先插入的那条,大于0,则筛选类型,返回最先插入的类型数据,小于0,则返回小于等于绝对值的数据,如果消息队列暂无满足要求的数据,则阻塞或者返回false,由flag参数配置), 

int &$msgtype (当取出数据时,该变量会赋值为该数据的类型),

 int $maxsize (消息的最大大小被指定的被接受 maxsize; 如果队列中的消息大于此大小,则该功能将失败(除非flags按照以下说明设置 )该参数较迷,没有理解), 

mixed &$message (当取出数据时,该变量会赋值为该数据)

[, bool $unserialize = TRUE(是否反序列化数据)

 [, int $flags = 0 

该选项flags允许您将标志传递给低级msgrcv系统调用。它默认为0,但您可以指定一个或多个以下值(通过将它们相加或相加)。

MSG_IPC_NOWAIT

如果没有消息 desiredmsgtype,立即返回,不要等待。该函数将失败并返回对应的整数值MSG_ENOMSG。

MSG_EXCEPT

将此标志与desiredmsgtype大于0 结合使用 会导致函数接收到不等于的第一条消息 desiredmsgtype。

MSG_NOERROR

如果消息长于maxsize,则设置此标志将截断消息, maxsize并且不会发出错误信号。

[, int &$errorcode ]]] )如果该函数失败,errorcode 则可选项将被设置为系统errno变量的值。

5:删除队列msg_remove_queue ( resource $queue )

顾名思义,该函数可删除一个消息队列

四:linux相关操作

在linux中,主要用ipcs(查看) ipcrm(删除)

1:ipcs  

ipcs -h,可查看帮助

主要需要记住的是:

ipcs -q (查看消息队列)

ipcs -l  (查看系统配置)

2:ipcrm

ipcrm -h:

ipcrm,只要能删除就行啦~~

ipcrm -q id  (删除指定消息队列)

3:注意!

在使用消息队列时,请注意消息队列的默认限制(限制消息队列数,和消息队列大小),

当到达上限时,会使得写入消息队列操作阻塞(默认阻塞)

五:封装类

创建队列方法,好像有点问题(创建后无法正确使用队列,估计是__FILE__常量问题),暂时没查

使用封装类方法:

$message_queue_key= ftok(__FILE__, 'a');
if(msg_queue_exists($message_queue_key)){//如果有该消息队列,则删除,用于清空之前队列的无用数据
    msg_remove_queue(msg_get_queue($message_queue_key, 0666));
}
$message_queue= msg_get_queue($message_queue_key, 0666);
$msg_queue = new MsgQueue($message_queue);
复制
<?php

/**
 * Created by PhpStorm.
 * User: tioncico
 * Date: 18-5-29
 * Time: 下午11:00
 */
class MsgQueue
{

    public $queue;

    public function __construct($queue)
    {
        $this->queue = $queue;
    }

    public function push($data, $type = 1)
    {
        $result = msg_send($this->queue, $type, $data);
        return $result;
    }

    public function pop($type = 0,$flags = MSG_IPC_NOWAIT)
    {
        msg_receive($this->queue, $type, $message_type, 1024, $message,true,$flags);
//        var_dump($message_type);
//        msg_receive($this->queue,$type,$message_type,1024,$message);
        return $message;
    }

    public function close()
    {
        return msg_remove_queue($this->queue);
    }

    /**
     * 创建一个队列(TODO:疑问待解决)
     * @param string $path_name
     * @param string $prop
     * @param string $perms
     * @return array
     */
    public static function getQueue($path_name = __FILE__, $prop = '1', $perms = '0666')
    {
        $data              = array();
        $data['queue_key'] = ftok($path_name, $prop);
        $data['queue']     = msg_get_queue($data['queue_key'], $perms);
        return $data;
    }
}
复制

七:使用例子

<?php
include_once 'new/MsgQueue.php';
$message_queue_key= ftok(__FILE__, 'a');
$message_queue= msg_get_queue($message_queue_key, 0666);
$queue_obj = new MsgQueue($message_queue);
$pid = pcntl_fork();
if($pid>0){//主进程入列
    while(1){
        $msg = $queue_obj->push((array('a'=>321312,'v'=>'casd')),12456);
        sleep(2);
    }
}else{//子进程出列
    while(1){
        $message = $queue_obj->pop();
        var_dump($message);
        sleep(1);
    }
}
复制

输出:

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。