Commit c35cf030 authored by hangjun83's avatar hangjun83

药明康德

parent 11d64f3c
......@@ -47,7 +47,7 @@ class KafkaConsumerCommand extends Command
$consumer_type = $this->option('type');
switch($consumer_type){
case 'zkh' : app(KafkaService::class)->startZkhConsumer();break;
case 'wuxilab' : app(KafkaService::class)->startWuxiLabConsumer();break;
case 'wuxilab' : app(KafkaService::class)->startConsumer('wuxilab');break;
default : app(KafkaService::class)->startConsumer();break;
}
}
......
......@@ -56,8 +56,11 @@ class KafkaService
/**
* 开启消费者
*/
public function startConsumer()
public function startConsumer($topic = null)
{
if(is_null($topic)){
$this->config['topic'] = $topic;
}
$this->kafkaConsumerConfig = $this->getConsumerConfig($this->config);
$consumer = new Consumer($this->kafkaConsumerConfig, function (ConsumeMessage $message) {
......@@ -99,28 +102,6 @@ class KafkaService
$consumer->close();
}
/**
* 药明康德消费
*/
public function startWuxiLabConsumer()
{
$this->config['topic'] = 'wuxilab';
$this->kafkaConsumerConfig = $this->getConsumerConfig($this->config);
$consumer = new Consumer($this->kafkaConsumerConfig, function (ConsumeMessage $message) {
try{
$consumer = $message->getConsumer();
$kafkaMessage = json_decode($message->getValue(),true);
$consumer->ack($message); // 手动提交
app($kafkaMessage['consumer'])->{$kafkaMessage['method']}($kafkaMessage['params']);
}catch(\Throwable $exception){
$this->errLog($exception,$this->kafkaConsumerConfig->getGroupInstanceId().'_consumer');
}
$consumer->stop();
});
$consumer->start();
$consumer->close();
}
protected function errLog($exception, $errorType)
{
SimpleLogs::writeLog([
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment