Commit 5a1dd6eb authored by hangjun83's avatar hangjun83

openapi 震坤行

parent 2cef0817
......@@ -13,7 +13,8 @@ class KafkaConsumerCommand extends Command
*
* @var string
*/
protected $signature = 'kafka:consumer';
protected $signature = 'kafka:consumer
{--type= : 消费类型}';
/**
* 命令行的概述。
......@@ -43,8 +44,11 @@ class KafkaConsumerCommand extends Command
return 1;
}
//开启消费者模式
app(KafkaService::class)->startConsumer();
$consumer_type = $this->option('type');
switch($consumer_type){
case 'zkh' : app(KafkaService::class)->startZkhConsumer();break;
default : app(KafkaService::class)->startConsumer();break;
}
}
}
......@@ -76,6 +76,30 @@ class KafkaService
$consumer->close();
}
/**
* 震坤行消费
*/
public function startZkhConsumer()
{
$this->config['topic'] = 'zkh';
$this->kafkaConsumerConfig = $this->getConsumerConfig($this->config);
$consumer = new Consumer($this->kafkaConsumerConfig, function (ConsumeMessage $message) {
try{
$consumer = $message->getConsumer();
$kafkaMessage = json_decode($message->getValue(),true);
$consumer->ack($message); // 手动提交
app($kafkaMessage['consumer'])->{$kafkaMessage['method']}($kafkaMessage['params']);
}catch(\Throwable $exception){
$this->errLog($exception,$this->kafkaConsumerConfig->getGroupInstanceId().'_consumer');
}
$consumer->stop();
});
$consumer->start();
$consumer->close();
}
protected function errLog($exception, $errorType)
{
SimpleLogs::writeLog([
......
......@@ -171,7 +171,7 @@ class ZhenKhService
'params' => ['orders' => $order],
'consumer' => __CLASS__,
'method' => 'consumerBatchUpdateOrdersList'
]);
],'zkh');
}
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment