如何在 laravel 中使用 apache kafka 消费者
我正在尝试使用Apache Kafka消费者和生产商在Laravel Framework中
使用Console命令和执行PHP Artisan命令,并在生产中运行NOHUP,直到发生。是否有任何最佳方法使用实时消费和审核并防止消息丢失 如果我用杀死PID消息杀死命令,那将是损失吗?
class JsonKafka extends Command
{
private $topic;
private $producer;
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'kafka:up';
/**
* The console command description.
*
* @var string
*/
protected $description = 'kafka';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
$conf = new \RdKafka\Conf();
$conf->set('metadata.broker.list', '0.0.0.0:29092');
//$conf->set('enable.idempotence', 'true');
$this->producer = new \RdKafka\Producer($conf);
$this->topic = $this->producer->newTopic('test_php');
}
/**
* Execute the console command.
*
* @return int
*/
public function handle()
{
//consume
$conf = new \RdKafka\Conf();
// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
echo "Assign: ";
var_dump($partitions);
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
echo "Revoke: ";
var_dump($partitions);
$kafka->assign(NULL);
break;
default:
throw new \Exception($err);
}
});
// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'service');
// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '0.0.0.0:29092');
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'earliest': start from the beginning
$conf->set('auto.offset.reset', 'earliest');
$consumer = new \RdKafka\KafkaConsumer($conf);
// Subscribe to topic 'messages'
$consumer->subscribe(['messages']);
echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";
while (true) {
$message = $consumer->consume(120 * 1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
Service::getInstance()->main(json_decode($message->payload, true))
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
}
}
}
private function produce(string $key,array $message)
{
$this->topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message), $key);
$this->producer->poll(0);
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $this->producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
echo 'produced' . PHP_EOL;
break;
}
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
}
I am trying to use apache kafka consumer and producer with in laravel framework
i create console command and execute php artisan command with nohup in production to be run until exeption happen. is there any best way to use real time consume and pruduce and prevent message loss
and if i kill command with kill pid messages will be loss ?
class JsonKafka extends Command
{
private $topic;
private $producer;
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'kafka:up';
/**
* The console command description.
*
* @var string
*/
protected $description = 'kafka';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
$conf = new \RdKafka\Conf();
$conf->set('metadata.broker.list', '0.0.0.0:29092');
//$conf->set('enable.idempotence', 'true');
$this->producer = new \RdKafka\Producer($conf);
$this->topic = $this->producer->newTopic('test_php');
}
/**
* Execute the console command.
*
* @return int
*/
public function handle()
{
//consume
$conf = new \RdKafka\Conf();
// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
echo "Assign: ";
var_dump($partitions);
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
echo "Revoke: ";
var_dump($partitions);
$kafka->assign(NULL);
break;
default:
throw new \Exception($err);
}
});
// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'service');
// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '0.0.0.0:29092');
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'earliest': start from the beginning
$conf->set('auto.offset.reset', 'earliest');
$consumer = new \RdKafka\KafkaConsumer($conf);
// Subscribe to topic 'messages'
$consumer->subscribe(['messages']);
echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";
while (true) {
$message = $consumer->consume(120 * 1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
Service::getInstance()->main(json_decode($message->payload, true))
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
}
}
}
private function produce(string $key,array $message)
{
$this->topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message), $key);
$this->producer->poll(0);
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $this->producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
echo 'produced' . PHP_EOL;
break;
}
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论