Возможно ли вложить конвейеры Hazelcast Jet таким образом, чтобы внутренний конвейер мог вычислять результаты для внешнего конвейера?

#java #deadlock #distributed-computing #hazelcast #hazelcast-jet

#java #взаимоблокировка #распределенные вычисления #hazelcast #hazelcast-jet

Вопрос:

Рассмотрим следующий сценарий:

Мы хотим взять большую распределенную коллекцию объектов, и для каждого объекта в коллекции мы хотим запустить другое вычисление, которое использует текущий объект и другую большую распределенную коллекцию для вычисления результата, который преобразует текущий объект.

Например.

коллекция A: 1,2,3,4,5,6,7,8……

коллекция B: 1,2,3,4,5,6,7,8……

Для каждого значения в A мы повторяем все значения в B, умножая каждое на 2 и суммируя эти значения, мы сопоставляем каждое значение в A с этой суммой, умноженной на текущее значение A.

Ниже приведена моя попытка, которая приводит к взаимоблокировке при использовании следующего:

 c2.newJob(p2).join()
  

при использовании следующего взаимоблокировки не возникает:

 c2.newJob(p2)
  

однако мы хотим, чтобы p2 был завершен, чтобы гарантировать, что мы получим правильную сумму.

Это может показаться неидиоматическим способом использования Jet для данного конкретного варианта использования, однако я хочу использовать этот шаблон для решения других проблем, и поэтому я был бы очень признателен за вашу помощь в этом.

 JetInstance jet =  Jet.newJetInstance();
JetInstance c1 =  Jet.newJetClient();

Pipeline p1 = Pipeline.create();

List<Integer> aIn =  jet.getList("a-in");
aIn.add(1);
aIn.add(2);
aIn.add(3);

p1.drawFrom(Sources.list("a-in"))
        .map(e -> {
          Pipeline p2 = Pipeline.create();
          JetInstance c2 =  Jet.newJetClient();

          List<Integer> bIn = c2.getList("b-in");
          bIn.add(1);
          bIn.add(2);
          bIn.add(3);

          p2.drawFrom(Sources.list("b-in"))
                  .map(i->((Integer)i)*2)
                  .drainTo(Sinks.list("b-out"));

          List<Integer> bOut = c2.getList("b-out");

          // I would have thought it should just wait for the computation to complete,
          // instead the join here causes jet to block itself,
          c2.newJob(p2).join();

          int sum = 0;
          for (Integer i : bOut){
            sum =i;
          }

          return ((Integer)e)*sum;
        }).drainTo(Sinks.list("a-out"));
c1.newJob(p1).join();
  

Ответ №1:

В вашем коде есть несколько проблем:

  1. map функция не должна блокироваться. В следующей версии мы добавляем, mapUsingContextAsync где вы можете использовать клиентское соединение в качестве контекста, отправлять задание и возвращаться job.getFuture() .

  2. map операции будут выполняться параллельно. Вам нужно убедиться, что они не совместно используют временный список. В ваших примерах все подчиненные задания используют b-out и они перезаписывают данные друг друга.

  3. Причиной взаимоблокировки было следующее: join() in map() заблокировал совместного рабочего и ожидал завершения подзадачи, но подзадача не может завершиться из-за заблокированного потока совместного рабочего.

Кроме того, Jet не оптимизирован для очень маленьких пакетных заданий, но я предполагаю, что ваша фактическая работа больше. Для развертывания задания требуются довольно большие накладные расходы; если само задание выполняется всего несколько мс, накладные расходы значительны. В этом конкретном случае вам было бы лучше просто использовать list.stream().map(i->i*2).sum() вместо подзадачи.

 JetInstance jet = Jet.newJetInstance();
JetInstance c1 = Jet.newJetClient();

Pipeline p1 = Pipeline.create();

List<Integer> aIn = jet.getList("a-in");
aIn.add(1);
aIn.add(2);
aIn.add(3);

List<Integer> bIn = jet.getList("b-in");
bIn.add(1);
bIn.add(2);
bIn.add(3);

p1.drawFrom(Sources.list("a-in"))
  .mapUsingContextAsync(
          ContextFactory
                  .withCreateFn(inst -> tuple2(inst, inst.<UUID, Long>getMap("tmpResults")))
                  // mark as non-cooperative, job submission does some blocking
                  .toNonCooperative()
                  .withLocalSharing()
                  .withMaxPendingCallsPerProcessor(2)
                  .withDestroyFn(ctx -> ctx.f1().destroy()),
          (ctx, item) -> {
              Pipeline p2 = Pipeline.create();
              JetInstance instance = ctx.f0();
              UUID key = UUID.randomUUID();
              IMapJet<UUID, Long> tmpResultsMap = ctx.f1();

              p2.drawFrom(Sources.list("b-in"))
                .map(i -> ((Integer) i) * 2L)
                .aggregate(summingLong(Long::longValue))
                .map(sum -> entry(key, sum))
                .drainTo(Sinks.map(tmpResultsMap));

              return instance.newJob(p2).getFuture()
                             .thenApply(r -> entry(item, tmpResultsMap.remove(key)));
          })
  .drainTo(Sinks.list("a-out"));

c1.newJob(p1).join();
jet.getList("a-out").forEach(System.out::println);
  

При этом выводится следующий результат:

 1=12
2=12
3=12
  

Приведенный выше код работает в текущем снимке и должен работать в Jet 3.0, который должен появиться через несколько недель.

Ответ №2:

@newlogic, попробуйте этот подход:

  1. Создайте задание, которое считывает данные из b-in и записывает в b-out map, а не list. Вы можете использовать известный ключ или просто использовать временную метку и т.д. В качестве ключа и определить TTL в этой таблице, чтобы удалить старые результаты.
  2. Создайте прослушиватель в b-out таблице (локальный прослушиватель, чтобы уведомлялся только узел, который содержит обновленный ключ) для прослушивания событий entryAdded / Updated, зависит от того, что вы выбрали на первом шаге, и отправьте новое задание из этого метода прослушивания для обработки a-in .

Таким образом, вам не нужно ждать, как только первое задание будет завершено, оно автоматически запустит второе задание.