Commit 7d905db5 authored by hangjun83's avatar hangjun83

队列更新

parent a15f87be
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
namespace App\Console\Commands; namespace App\Console\Commands;
use App\Jobs\BideJob; use App\Jobs\BideJob;
use App\Services\BideService;
use Illuminate\Console\Command; use Illuminate\Console\Command;
use Illuminate\Console\ConfirmableTrait; use Illuminate\Console\ConfirmableTrait;
...@@ -56,8 +57,15 @@ class BideJobCommand extends Command ...@@ -56,8 +57,15 @@ class BideJobCommand extends Command
exit; exit;
} }
$productUpdateJob = (new BideJob($action_type,$params))->delay(100)->onQueue('slow'); $service = app(BideService::class);
app('Illuminate\Contracts\Bus\Dispatcher')->dispatch($productUpdateJob); switch($action_type){
case 'batchUpdateProduct' :
default:
$service->batchUpdateProducts($params);
break;
}
//$productUpdateJob = (new BideJob($action_type,$params))->delay(100)->onQueue('slow');
//app('Illuminate\Contracts\Bus\Dispatcher')->dispatch($productUpdateJob);
} }
} }
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
namespace App\Console\Commands; namespace App\Console\Commands;
use App\Jobs\BjsJob; use App\Jobs\BjsJob;
use App\Services\BjsService;
use Illuminate\Console\Command; use Illuminate\Console\Command;
use Illuminate\Console\ConfirmableTrait; use Illuminate\Console\ConfirmableTrait;
...@@ -56,8 +57,16 @@ class BjsJobCommand extends Command ...@@ -56,8 +57,16 @@ class BjsJobCommand extends Command
exit; exit;
} }
$productUpdateJob = (new BjsJob($action_type,$params))->delay(100)->onQueue('slow'); $service = app(BjsService::class);
app('Illuminate\Contracts\Bus\Dispatcher')->dispatch($productUpdateJob); switch($action_type){
case 'batchUpdateProduct' :
default:
$service->batchUpdateProducts($params);
break;
}
//$productUpdateJob = (new BjsJob($action_type,$params))->delay(100)->onQueue('slow');
//app('Illuminate\Contracts\Bus\Dispatcher')->dispatch($productUpdateJob);
} }
} }
......
...@@ -10,6 +10,7 @@ namespace App\Console\Commands; ...@@ -10,6 +10,7 @@ namespace App\Console\Commands;
use App\Jobs\InteglePlatformJob; use App\Jobs\InteglePlatformJob;
use App\Jobs\WuxiLabJob; use App\Jobs\WuxiLabJob;
use App\Services\InteglePlatformService;
use Illuminate\Console\Command; use Illuminate\Console\Command;
use Illuminate\Console\ConfirmableTrait; use Illuminate\Console\ConfirmableTrait;
...@@ -62,8 +63,17 @@ class IntegleJobCommand extends Command ...@@ -62,8 +63,17 @@ class IntegleJobCommand extends Command
$this->error('缺少命令参数,请输入具体的参数命令.如需帮助请输入 --help'); $this->error('缺少命令参数,请输入具体的参数命令.如需帮助请输入 --help');
exit; exit;
} }
$productUpdateJob = (new InteglePlatformJob($action_type,$params))->delay(100)->onQueue('slow');
app('Illuminate\Contracts\Bus\Dispatcher')->dispatch($productUpdateJob); $service = app(InteglePlatformService::class);
switch($action_type){
case 'batchUpdateProduct' :
$service->batchUpdateProducts();
break;
default:
}
//$productUpdateJob = (new InteglePlatformJob($action_type,$params))->delay(100)->onQueue('slow');
//app('Illuminate\Contracts\Bus\Dispatcher')->dispatch($productUpdateJob);
} }
} }
......
...@@ -3,7 +3,7 @@ namespace App\Jobs\queue; ...@@ -3,7 +3,7 @@ namespace App\Jobs\queue;
use App\Jobs\Job; use App\Jobs\Job;
class ZhenkhQueueJob extends Job class QueueJob extends Job
{ {
public $params; public $params;
/** /**
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
namespace App\Services; namespace App\Services;
use App\Jobs\queue\QueueJob;
use App\Repositories\Contracts\ThirdApiPlatformRepository; use App\Repositories\Contracts\ThirdApiPlatformRepository;
use App\Services\Kafka\KafkaService; use App\Services\Kafka\KafkaService;
use App\Services\ThirdPlatform\Api\BideApiService; use App\Services\ThirdPlatform\Api\BideApiService;
...@@ -33,14 +34,12 @@ class BideService ...@@ -33,14 +34,12 @@ class BideService
// yield 循环逐行读取csv内容 // yield 循环逐行读取csv内容
while ($generator->valid()) { while ($generator->valid()) {
$content = $generator->current(); $content = $generator->current();
app(KafkaService::class)->produerSend( $this->apiService->pushQueue([
[ 'params' => ['cas' => current($content)],
'params' => ['cas' => current($content)], 'consumer' => __CLASS__,
'consumer' => __CLASS__, 'method' => 'batchUpdateApi'
'method' => 'batchUpdateApi'
],'bide');
]
);
$generator->next(); $generator->next();
} }
//$generator->rewind(); //$generator->rewind();
......
...@@ -33,14 +33,12 @@ class BjsService ...@@ -33,14 +33,12 @@ class BjsService
// yield 循环逐行读取csv内容 // yield 循环逐行读取csv内容
while ($generator->valid()) { while ($generator->valid()) {
$content = $generator->current(); $content = $generator->current();
app(KafkaService::class)->produerSend( $this->apiService->pushQueue([
[ 'params' => ['cas' => current($content)],
'params' => ['cas' => current($content)], 'consumer' => __CLASS__,
'consumer' => __CLASS__, 'method' => 'batchUpdateApi'
'method' => 'batchUpdateApi'
],'bjs');
]
);
$generator->next(); $generator->next();
} }
//$generator->rewind(); //$generator->rewind();
......
...@@ -45,13 +45,11 @@ class InteglePlatformService ...@@ -45,13 +45,11 @@ class InteglePlatformService
} }
} }
if(!empty($rawList)){ if(!empty($rawList)){
SimpleKafka::produerSend( $this->apiService->pushQueue([
[ 'params' => ['rawList' => $rawList,'packageList' => $newPackages],
'params' => ['rawList' => $rawList,'packageList' => $newPackages], 'consumer' => 'App\Services\InteglePlatformService',
'consumer' => 'App\Services\InteglePlatformService', 'method' => 'batchUpdateApi'
'method' => 'batchUpdateApi' ],'integle');
]
);
$page ++; $page ++;
} }
} }
...@@ -63,13 +61,11 @@ class InteglePlatformService ...@@ -63,13 +61,11 @@ class InteglePlatformService
$result = $this->apiService->pushBatchUpdateProduct($updateData); $result = $this->apiService->pushBatchUpdateProduct($updateData);
if($result){ if($result){
if($result['status'] === false){ if($result['status'] === false){
SimpleKafka::produerSend( $this->apiService->pushQueue([
[ 'consumer' => 'App\Services\InteglePlatformService',
'consumer' => 'App\Services\InteglePlatformService', 'method' => 'batchUpdateApi',
'method' => 'batchUpdateApi', 'params' => $updateData
'params' => $updateData ],'integle');
]
);
} }
} }
unset($result['status']); unset($result['status']);
......
...@@ -11,9 +11,11 @@ ...@@ -11,9 +11,11 @@
namespace App\Services\ThirdPlatform; namespace App\Services\ThirdPlatform;
use App\Jobs\queue\QueueJob;
use App\Repositories\Contracts\ThirdApiPlatformRepository; use App\Repositories\Contracts\ThirdApiPlatformRepository;
use App\Services\Kafka\KafkaService; use App\Services\Kafka\KafkaService;
use App\Services\PlatformDataEntriesService; use App\Services\PlatformDataEntriesService;
use App\Support\Facades\SimpleKafka;
abstract class PlatformAbstractService abstract class PlatformAbstractService
{ {
...@@ -114,4 +116,15 @@ abstract class PlatformAbstractService ...@@ -114,4 +116,15 @@ abstract class PlatformAbstractService
{ {
return $this->dataEntriesService->removeEntriesValues($this->getPlatformInfo('id'),$dataKey); return $this->dataEntriesService->removeEntriesValues($this->getPlatformInfo('id'),$dataKey);
} }
public function pushQueue($pushContent, $queueName)
{
if(env('QUEUE_CONNECTION') == 'database'){
app('Illuminate\Contracts\Bus\Dispatcher')->dispatch(
(new QueueJob($pushContent))->onQueue($queueName)
);
}else{
SimpleKafka::produerSend($pushContent,$queueName);
}
}
} }
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
namespace App\Services; namespace App\Services;
use App\Jobs\queue\ZhenkhQueueJob; use App\Jobs\queue\QueueJob;
use App\Mailer\MailService; use App\Mailer\MailService;
use App\Mailer\MailTempl; use App\Mailer\MailTempl;
use App\Repositories\Contracts\ThirdApiPlatformRepository; use App\Repositories\Contracts\ThirdApiPlatformRepository;
...@@ -127,7 +127,7 @@ class ZhenKhService ...@@ -127,7 +127,7 @@ class ZhenKhService
'consumer' => __CLASS__, 'consumer' => __CLASS__,
'method' => 'consumerBatchUpdateGoodsList' 'method' => 'consumerBatchUpdateGoodsList'
]; ];
$this->pushQueue($sendContent); $this->apiService->pushQueue($sendContent,'zkh');
} }
} }
...@@ -176,11 +176,11 @@ class ZhenKhService ...@@ -176,11 +176,11 @@ class ZhenKhService
if($orderList && count($orderList['list']) > 0){ if($orderList && count($orderList['list']) > 0){
foreach($orderList['list']['data'] as $order){ foreach($orderList['list']['data'] as $order){
if($order['orderStatus'] == 0){ if($order['orderStatus'] == 0){
$this->pushQueue([ $this->apiService->pushQueue([
'params' => ['orders' => $order], 'params' => ['orders' => $order],
'consumer' => __CLASS__, 'consumer' => __CLASS__,
'method' => 'consumerBatchUpdateOrdersList' 'method' => 'consumerBatchUpdateOrdersList'
]); ],'zkh');
} }
} }
} }
...@@ -546,15 +546,4 @@ class ZhenKhService ...@@ -546,15 +546,4 @@ class ZhenKhService
} }
} }
private function pushQueue($pushContent)
{
if(env('QUEUE_CONNECTION') == 'database'){
app('Illuminate\Contracts\Bus\Dispatcher')->dispatch(
(new ZhenkhQueueJob($pushContent))->onQueue('zkh')
);
}else{
SimpleKafka::produerSend($pushContent,'zkh');
}
}
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment