Error: Address already in use[98]
laravel+swoole+amqp实现异步的方式推送消息,之前都是自己手写redis队列,这次想用下amqp。
创建websocket_server,接收到消息放到队列。然后另外一个websocket_server取队列消息进行推送。问题就出在第二个websocket_server,启动时报错如下:
下面贴上代码,望大牛指正,谢谢!
websocket_server : app/Lib/websocket.php
<?php
namespace App\Lib;
class Websocket
{
//
private static $instance = null;
public static $websocket_server;
private function __construct()
{
}
private function __clone()
{
}
public static function get_instance(){
if(self::$instance == null){
self::$instance = new \swoole_websocket_server('0.0.0.0',9502);
}
return self::$instance;
}
}
接收消息,放入队列:
app/Console/Commands/MessagePublish.php
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use App\Lib\Websocket;
class MessagePublish extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'message:publish';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Command description';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
$ws = Websocket::get_instance();
$ws->on('open',function ($ws,$request){
DB::table('clients')->insert(['fd'=>$request->fd,'ip'=>$request->server['remote_addr']]);
//$ws->push($request->fd,"hello,welcome!\n");
});
$ws->on('message',function($ws,$frame){
echo "Message:{$frame->data}\n";
//把消息放入队列
$host = config('queue.connections.amqp.host');
$port = config('queue.connections.amqp.port');
$user = config('queue.connections.amqp.user');
$password = config('queue.connections.amqp.password');
$queue = config('queue.connections.amqp.queue');
$exchange = config('queue.connections.amqp.exchange');
$key = config('queue.connections.amqp.key');
try {
$connection = new AMQPStreamConnection($host,$port,$user,$password);
$channel = $connection->channel();
$channel->exchange_declare($exchange,'direct',true,true,false);
$channel->queue_declare($queue,true,true);
$channel->queue_bind($queue,$exchange,$key);
$data = new AMQPMessage($frame->data);
$channel->basic_publish($data,$exchange,$queue);
}catch (\Exception $e){
echo $e->getMessage() . "\n";
}
});
$ws->on('close',function ($ws,$fd){
DB::table('clients')->where('fd',$fd)->delete();
echo "Client:{$fd} is closed\n";
});
$ws->start();
}
}
处理队列消息,执行推送:
app/Console/Commands/MessageConsume.php
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\Lib\Websocket;
class MessageConsume extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'message:consume';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Command description';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
*
* @return mixed
* @throws \ErrorException
*/
public function handle()
{
$host = config('queue.connections.amqp.host');
$port = config('queue.connections.amqp.port');
$user = config('queue.connections.amqp.user');
$password = config('queue.connections.amqp.password');
$queue = config('queue.connections.amqp.queue');
$ws = Websocket::get_instance();
try {
$connection = new AMQPStreamConnection($host,$port,$user,$password);
$channel = $connection->channel();
$channel->queue_declare($queue,true,true);
echo "waiting for message\n";
$callback = function ($msg) use ($ws) {
echo "Received ", $msg->body,"\n";
//执行消息发送
$clients = DB::table('clients')->get();
foreach($clients as $client){
$ws->push($client->fd,$msg->body);
}
};
$channel->basic_consume($queue,'',false,true,false,false,$callback);
while (count($channel->callbacks)){
$channel->wait();
}
}catch (Exception $e){
echo $e->getMessage() . "\n";
}
}
}
前台就是常规的js代码发送websocket消息。服务端可以启动message:publish,amqp队列中也有数据。就是启动message:consume报错。看报错信息,就是先启动的脚本已经监听了9502,不能再在9502上创建一个新的websocket_server,我想到的就是用单例模式来加载websocket_server,但是依然报错。不知道是不是我的单例写的不对还是单例的方式根本解决不了这个问题。
我能想到的其他方式:
1、js端ajax请求到php,php把数据放入队列,这样就可以启动一个websocket_server来处理队列了。我没有尝试,估计是没问题的。
2、使用swoole的异步,swoole官方文档如下,看了下,我不知道怎么改造到我的这个例子来。
我想实现的就是消息放入队列,异步的方式来实现推送。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
自己回答。
还是用swoole的任务投递来实现,贴上代码
端口只能一个程序监听的,为什么要启动多个服务监听同一个端口?