Как использовать обратный вызов then() для нескольких пакетов?

#laravel #laravel-queue

Вопрос:

Я собираю и обрабатываю данные из нескольких сторонних API. Сначала я обрабатываю все ранее собранные данные api на своем конце, затем я собираю, обновляю все данные api для следующего цикла. И мне нужно делать это как можно чаще. Поскольку (общее) затраченное время меняется, я не могу использовать планирование и нуждаюсь в цикле. Когда весь процесс будет завершен, мне нужно перезапустить процесс.

WholeProcessStarterJob.php

 public function handle()
{
    self::calculateParents();
}

static function calculateParents()
{
    $batch = Bus::batch([])
        ->name('CalculateParentChunkJob')
        ->onQueue('main')
        ->then(function () {
            self::calculateChildren();
        })->dispatch();

    Parent::with('modifiers')
        ->chunkById(300, function ($chunk) use ($batch) {
            $batch->add(new AppJobsCalculateParentChunkJob($chunk));
        });
}

static function calculateChildren()
{
    $batch = Bus::batch([])
        ->name('CalculateChildChunkJob')
        ->onQueue('main')
        ->then(function () {
            self::collectApiData();
        })->dispatch();

    Child::with('parent.modifiers')
        ->chunkById(300, function ($chunk) use ($batch) {
            $batch->add(new AppJobsMainCalculateChildChunkJob($chunk));
        });
}

static function collectAllApiData()
{
    $batch = Bus::batch([])
        ->finally(function () {
            self::calculateParents();
        })
        ->name("collectAllApiData")
        ->onQueue("main")
        ->dispatch();

    if (!$synchronization_lock) {
        $apis = Api::where('is_sync', 1)->get();

        foreach ($apis as $api) {
            $class = "\App\Jobs\" . $api->class . "\SyncApiJob";
            $update_batch->add(new $class());
        }
    }

}
 

ApiA/SyncApiJob.php (каждый api имеет отдельный дескриптор)

 public function handle()
{
    $child = Child::with('parent')
        // some ->where conditions... this parts changes for each api
        ->chunkById(100, function ($chunk){
            $this->batch()->add(new AppJobsApiAUpdateChunkJob($chunk));
        });

}
 

Здесь я использую один пакет для всех API, и я могу использовать then() обратный вызов и снова перезапустить процесс whool, но таким образом я должен обрабатывать все API в одной очереди (все пакетные задания должны выполняться в одном соединении и очереди). Но мне нужно управлять каждой очередью api отдельно, потому что каждый api потребляет разные ресурсы. В то время как один api может работать только с 4 работниками, другому требуется 40 работников.

Мне нужно как-то использовать несколько пакетов с взаимным then() обратным вызовом. Или способ определить, что все API обработаны, и перезапустить весь процесс.