Commit 11d64f3c authored by hangjun83's avatar hangjun83

药明康德

parent f477dea9
...@@ -47,6 +47,7 @@ class KafkaConsumerCommand extends Command ...@@ -47,6 +47,7 @@ class KafkaConsumerCommand extends Command
$consumer_type = $this->option('type'); $consumer_type = $this->option('type');
switch($consumer_type){ switch($consumer_type){
case 'zkh' : app(KafkaService::class)->startZkhConsumer();break; case 'zkh' : app(KafkaService::class)->startZkhConsumer();break;
case 'wuxilab' : app(KafkaService::class)->startWuxiLabConsumer();break;
default : app(KafkaService::class)->startConsumer();break; default : app(KafkaService::class)->startConsumer();break;
} }
} }
......
...@@ -99,6 +99,28 @@ class KafkaService ...@@ -99,6 +99,28 @@ class KafkaService
$consumer->close(); $consumer->close();
} }
/**
* 药明康德消费
*/
public function startWuxiLabConsumer()
{
$this->config['topic'] = 'wuxilab';
$this->kafkaConsumerConfig = $this->getConsumerConfig($this->config);
$consumer = new Consumer($this->kafkaConsumerConfig, function (ConsumeMessage $message) {
try{
$consumer = $message->getConsumer();
$kafkaMessage = json_decode($message->getValue(),true);
$consumer->ack($message); // 手动提交
app($kafkaMessage['consumer'])->{$kafkaMessage['method']}($kafkaMessage['params']);
}catch(\Throwable $exception){
$this->errLog($exception,$this->kafkaConsumerConfig->getGroupInstanceId().'_consumer');
}
$consumer->stop();
});
$consumer->start();
$consumer->close();
}
protected function errLog($exception, $errorType) protected function errLog($exception, $errorType)
{ {
SimpleLogs::writeLog([ SimpleLogs::writeLog([
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment