#java #deadlock #distributed-computing #hazelcast #hazelcast-jet
#java #взаимоблокировка #распределенные вычисления #hazelcast #hazelcast-jet
Вопрос:
Рассмотрим следующий сценарий:
Мы хотим взять большую распределенную коллекцию объектов, и для каждого объекта в коллекции мы хотим запустить другое вычисление, которое использует текущий объект и другую большую распределенную коллекцию для вычисления результата, который преобразует текущий объект.
Например.
коллекция A: 1,2,3,4,5,6,7,8……
коллекция B: 1,2,3,4,5,6,7,8……
Для каждого значения в A мы повторяем все значения в B, умножая каждое на 2 и суммируя эти значения, мы сопоставляем каждое значение в A с этой суммой, умноженной на текущее значение A.
Ниже приведена моя попытка, которая приводит к взаимоблокировке при использовании следующего:
c2.newJob(p2).join()
при использовании следующего взаимоблокировки не возникает:
c2.newJob(p2)
однако мы хотим, чтобы p2 был завершен, чтобы гарантировать, что мы получим правильную сумму.
Это может показаться неидиоматическим способом использования Jet для данного конкретного варианта использования, однако я хочу использовать этот шаблон для решения других проблем, и поэтому я был бы очень признателен за вашу помощь в этом.
JetInstance jet = Jet.newJetInstance();
JetInstance c1 = Jet.newJetClient();
Pipeline p1 = Pipeline.create();
List<Integer> aIn = jet.getList("a-in");
aIn.add(1);
aIn.add(2);
aIn.add(3);
p1.drawFrom(Sources.list("a-in"))
.map(e -> {
Pipeline p2 = Pipeline.create();
JetInstance c2 = Jet.newJetClient();
List<Integer> bIn = c2.getList("b-in");
bIn.add(1);
bIn.add(2);
bIn.add(3);
p2.drawFrom(Sources.list("b-in"))
.map(i->((Integer)i)*2)
.drainTo(Sinks.list("b-out"));
List<Integer> bOut = c2.getList("b-out");
// I would have thought it should just wait for the computation to complete,
// instead the join here causes jet to block itself,
c2.newJob(p2).join();
int sum = 0;
for (Integer i : bOut){
sum =i;
}
return ((Integer)e)*sum;
}).drainTo(Sinks.list("a-out"));
c1.newJob(p1).join();
Ответ №1:
В вашем коде есть несколько проблем:
-
map
функция не должна блокироваться. В следующей версии мы добавляем,mapUsingContextAsync
где вы можете использовать клиентское соединение в качестве контекста, отправлять задание и возвращатьсяjob.getFuture()
. -
map
операции будут выполняться параллельно. Вам нужно убедиться, что они не совместно используют временный список. В ваших примерах все подчиненные задания используютb-out
и они перезаписывают данные друг друга. -
Причиной взаимоблокировки было следующее:
join()
inmap()
заблокировал совместного рабочего и ожидал завершения подзадачи, но подзадача не может завершиться из-за заблокированного потока совместного рабочего.
Кроме того, Jet не оптимизирован для очень маленьких пакетных заданий, но я предполагаю, что ваша фактическая работа больше. Для развертывания задания требуются довольно большие накладные расходы; если само задание выполняется всего несколько мс, накладные расходы значительны. В этом конкретном случае вам было бы лучше просто использовать list.stream().map(i->i*2).sum()
вместо подзадачи.
JetInstance jet = Jet.newJetInstance();
JetInstance c1 = Jet.newJetClient();
Pipeline p1 = Pipeline.create();
List<Integer> aIn = jet.getList("a-in");
aIn.add(1);
aIn.add(2);
aIn.add(3);
List<Integer> bIn = jet.getList("b-in");
bIn.add(1);
bIn.add(2);
bIn.add(3);
p1.drawFrom(Sources.list("a-in"))
.mapUsingContextAsync(
ContextFactory
.withCreateFn(inst -> tuple2(inst, inst.<UUID, Long>getMap("tmpResults")))
// mark as non-cooperative, job submission does some blocking
.toNonCooperative()
.withLocalSharing()
.withMaxPendingCallsPerProcessor(2)
.withDestroyFn(ctx -> ctx.f1().destroy()),
(ctx, item) -> {
Pipeline p2 = Pipeline.create();
JetInstance instance = ctx.f0();
UUID key = UUID.randomUUID();
IMapJet<UUID, Long> tmpResultsMap = ctx.f1();
p2.drawFrom(Sources.list("b-in"))
.map(i -> ((Integer) i) * 2L)
.aggregate(summingLong(Long::longValue))
.map(sum -> entry(key, sum))
.drainTo(Sinks.map(tmpResultsMap));
return instance.newJob(p2).getFuture()
.thenApply(r -> entry(item, tmpResultsMap.remove(key)));
})
.drainTo(Sinks.list("a-out"));
c1.newJob(p1).join();
jet.getList("a-out").forEach(System.out::println);
При этом выводится следующий результат:
1=12
2=12
3=12
Приведенный выше код работает в текущем снимке и должен работать в Jet 3.0, который должен появиться через несколько недель.
Ответ №2:
@newlogic, попробуйте этот подход:
- Создайте задание, которое считывает данные из
b-in
и записывает вb-out
map, а не list. Вы можете использовать известный ключ или просто использовать временную метку и т.д. В качестве ключа и определить TTL в этой таблице, чтобы удалить старые результаты. - Создайте прослушиватель в
b-out
таблице (локальный прослушиватель, чтобы уведомлялся только узел, который содержит обновленный ключ) для прослушивания событий entryAdded / Updated, зависит от того, что вы выбрали на первом шаге, и отправьте новое задание из этого метода прослушивания для обработкиa-in
.
Таким образом, вам не нужно ждать, как только первое задание будет завершено, оно автоматически запустит второе задание.