当前位置: 首页>后端>正文

django channel 消息队列 消息队列 php

1. 什么是消息队列

消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式

 

2. 为什么使用消息队列

消息队列技术是分布式应用间交换信息的一种技术。消息队列可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读出。通过消息队列,应用程序可独立地执行,它们不需要知道彼此的位置、或在继续执行前不需要等待接收程序接收此消息。

 

3. 什么场合使用消息队列

你首先需要弄清楚,消息队列与远程过程调用的区别,在很多读者咨询我的时候,我发现他们需要的是RPC(远程过程调用),而不是消息队列。

消息队列有同步或异步实现方式,通常我们采用异步方式使用消息队列,远程过程调用多采用同步方式。

MQ与RPC有什么不同? MQ通常传递无规则协议,这个协议由用户定义并且实现存储转发;而RPC通常是专用协议,调用过程返回结果。

 

4. 什么时候使用消息队列

同步需求,远程过程调用(PRC)更适合你。

异步需求,消息队列更适合你。

目前很多消息队列软件同时支持RPC功能,很多RPC系统也能异步调用。

消息队列用来实现下列需求

  1. 存储转发
  2. 分布式事务
  3. 发布订阅
  4. 基于内容的路由
  5. 点对点连接

 

以下是一个消息队列的运用实例

<?php

/**
 * Created by PhpStorm.
 * User: lin
 * Date: 2017/6/9
 * Time: 11:19
 * 实现php共享内存消息队列
 */
class ShmQueue
{
    private $maxQSize = 0;//队列最大长度
    private $front = 0;//队头指针
    private $rear = 0; //队尾指针
    private $blockSize = 256;  // 块的大小(byte)
    private $memSize = 1280;  // 最大共享内存(byte)
    private $shmId = 0;//根据这个id可以操作该共享内存片段
    private $filePtr = APP_PATH.'public/shmq.ptr';
    private $semId = 0;
    public function __construct()
    {
        $shmkey = ftok(__FILE__, 't');//产生系统id
        $this->shmId = shmop_open($shmkey, "c", 0644, $this->memSize );//创建一个内存段
        $this->maxQSize = $this->memSize / $this->blockSize;
        // 申請一个信号量
        $this->semId = sem_get($shmkey, 1);
        sem_acquire($this->semId); // 申请进入临界
        $this->init();
    }
    private function init()
    {
        if ( file_exists($this->filePtr) ){
            $contents = file_get_contents($this->filePtr);
            $data = explode( '|', $contents );
            if ( isset($data[0]) && isset($data[1])){
                $this->front = (int)$data[0];
                $this->rear  = (int)$data[1];
            }
        }
    }
    public function getLength()
    {
        return (($this->rear - $this->front + $this->memSize) % ($this->memSize) )/$this->blockSize;
    }
    public function enQueue( $value )
    {
        if ( $this->ptrInc($this->rear) == $this->front ){ // 队满
            return false;
        }
        //echo $this->front;
        $data = $this->encode($value);
        shmop_write($this->shmId, $data, $this->rear );
        $this->rear = $this->ptrInc($this->rear);
        return  $this->decode($data);
    }
    public function deQueue()
    {
        if ( $this->front == $this->rear ){ // 队空
            throw new Exception(" block size is null!");
        }
        $value = shmop_read($this->shmId, $this->front, $this->blockSize-1);
        $this->front = $this->ptrInc($this->front);
        return $this->decode($value);
    }
    private function ptrInc( $ptr )
    {
        return ($ptr + $this->blockSize) % ($this->memSize);
    }
    private function encode( $value )
    {
        $data = serialize($value) . "__eof";
        //echo '';
        //echo strlen($data);
        //echo '';
       // echo $this->blockSize -1;
       // echo '';

        if ( strlen($data) > $this->blockSize -1 ){
            throw new Exception(strlen($data)." is overload block size!");
        }
        return $data;
    }
    public function exist($value){//判断队头的数据

        $data = shmop_read($this->shmId, $this->front, $this->blockSize-1);
        if($value == $this->decode($data)){
            return 1;
        }
        return 0;
    }
    private function decode( $value )
    {
        //return $value;
        $data = explode("__eof", $value);
        return unserialize($data[0]);
    }
    public function __destruct()
    {
        //保存队头和队尾指针
        $data = $this->front . '|' . $this->rear;
        file_put_contents($this->filePtr, $data);
        sem_release($this->semId); // 出临界区, 释放信号量
    }

}

 

如何调用

$shmq = new ShmQueue();
入队:
$data = 125;
$shmq->enQueue($data);

出队:
$shmq->deQueue();



https://www.xamrdz.com/backend/3sf1964171.html

相关文章: