`

关于消息队列——ZeroMQ的router/dealer

 
阅读更多

为了不将服务端直接暴露给“客户端”,增加可扩展性,我们可以制作个中间层broker(参照官方示例代码)

比如request-reply这种最简单的模型,我们可以通过一个中间组件将后面的服务透明化,增强系统的可扩展性,后台只需要增加service数目就可以增强服务能力

 

架构模型如下:


1,服务端worker.php

(不用bind端口,只需要"监听"中间件broker的套接口即可)

<?php
/*
* Hello World server
* Connects REP socket to tcp://*:5560
* Expects "Hello" from client, replies with "World"
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/

$context = new ZMQContext();

// Socket to talk to clients
$responder = new ZMQSocket($context, ZMQ::SOCKET_REP);
$responder->connect("tcp://localhost:5560");

while (true) {
    // Wait for next request from client
    $string = $responder->recv();
    printf ("Received request: [%s]%s", $string, PHP_EOL);

    // Do some 'work'
    sleep(1);

    // Send reply back to client
    $responder->send("World");
}
?>

2,客户端client.php

<?php
/*
* Hello World client
* Connects REQ socket to tcp://localhost:5559
* Sends "Hello" to server, expects "World" back
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/

$context = new ZMQContext();

// Socket to talk to server
$requester = new ZMQSocket($context, ZMQ::SOCKET_REQ);
$requester->connect("tcp://localhost:5559");

for ($request_nbr = 0; $request_nbr < 10; $request_nbr++) {
    $requester->send("Hello");
    $string = $requester->recv();
    printf ("Received reply %d [%s]%s", $request_nbr, $string, PHP_EOL);
}
?>

3,中间件broker.php
不处理任务,只是转发消息,通过poll实现异步IO(连接客户端client的不能再是REP而应该是ROUTER,同理连接服务端worker的不能是REQ而应该是DEALER)

<?php
/*
* Simple request-reply broker
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/

// Prepare our context and sockets
$context = new ZMQContext();
$frontend = new ZMQSocket($context, ZMQ::SOCKET_ROUTER);
$backend = new ZMQSocket($context, ZMQ::SOCKET_DEALER);
$frontend->bind("tcp://*:5559");
$backend->bind("tcp://*:5560");

// Initialize poll set
$poll = new ZMQPoll();
$poll->add($frontend, ZMQ::POLL_IN);
$poll->add($backend, ZMQ::POLL_IN);
$readable = $writeable = array();

// Switch messages between sockets
while (true) {
    $events = $poll->poll($readable, $writeable);

    foreach ($readable as $socket) {
        if ($socket === $frontend) {
            // Process all parts of the message
            while (true) {
                $message = $socket->recv();
                // Multipart detection
                $more = $socket->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
                $backend->send($message, $more ? ZMQ::MODE_SNDMORE : null);
                if (!$more) {
                    break; // Last message part
                }
            }
        } elseif ($socket === $backend) {
            $message = $socket->recv();
            // Multipart detection
            $more = $socket->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
            $frontend->send($message, $more ? ZMQ::MODE_SNDMORE : null);
            if (!$more) {
                break; // Last message part
            }
        }
    }
}
?>

运行broker.php以及多个worker.php,然后运行client.php(也可以是多个),可以看到10个任务被分担给多个worker.php运行了。
这样就很简单的实现一个n:m的通信了

根据这个模型我们还可以定制一个支持负载均衡的broker,详细代码可以参照

https://github.com/imatix/zguide/blob/master/examples/PHP/lbbroker.php

  • 大小: 10.8 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics