#laravel #laravel-queue
Вопрос:
Я собираю и обрабатываю данные из нескольких сторонних API. Сначала я обрабатываю все ранее собранные данные api на своем конце, затем я собираю, обновляю все данные api для следующего цикла. И мне нужно делать это как можно чаще. Поскольку (общее) затраченное время меняется, я не могу использовать планирование и нуждаюсь в цикле. Когда весь процесс будет завершен, мне нужно перезапустить процесс.
WholeProcessStarterJob.php
public function handle()
{
self::calculateParents();
}
static function calculateParents()
{
$batch = Bus::batch([])
->name('CalculateParentChunkJob')
->onQueue('main')
->then(function () {
self::calculateChildren();
})->dispatch();
Parent::with('modifiers')
->chunkById(300, function ($chunk) use ($batch) {
$batch->add(new AppJobsCalculateParentChunkJob($chunk));
});
}
static function calculateChildren()
{
$batch = Bus::batch([])
->name('CalculateChildChunkJob')
->onQueue('main')
->then(function () {
self::collectApiData();
})->dispatch();
Child::with('parent.modifiers')
->chunkById(300, function ($chunk) use ($batch) {
$batch->add(new AppJobsMainCalculateChildChunkJob($chunk));
});
}
static function collectAllApiData()
{
$batch = Bus::batch([])
->finally(function () {
self::calculateParents();
})
->name("collectAllApiData")
->onQueue("main")
->dispatch();
if (!$synchronization_lock) {
$apis = Api::where('is_sync', 1)->get();
foreach ($apis as $api) {
$class = "\App\Jobs\" . $api->class . "\SyncApiJob";
$update_batch->add(new $class());
}
}
}
ApiA/SyncApiJob.php (каждый api имеет отдельный дескриптор)
public function handle()
{
$child = Child::with('parent')
// some ->where conditions... this parts changes for each api
->chunkById(100, function ($chunk){
$this->batch()->add(new AppJobsApiAUpdateChunkJob($chunk));
});
}
Здесь я использую один пакет для всех API, и я могу использовать then()
обратный вызов и снова перезапустить процесс whool, но таким образом я должен обрабатывать все API в одной очереди (все пакетные задания должны выполняться в одном соединении и очереди). Но мне нужно управлять каждой очередью api отдельно, потому что каждый api потребляет разные ресурсы. В то время как один api может работать только с 4 работниками, другому требуется 40 работников.
Мне нужно как-то использовать несколько пакетов с взаимным then()
обратным вызовом. Или способ определить, что все API обработаны, и перезапустить весь процесс.