#java #ignite
Вопрос:
У меня есть тайник с зажиганием:
IgniteCache<String, Record> cache;
Приведена коллекция ключей этого кэша. Мне нужно сделать следующее:
- Получение записей с указанными ключами
- … но дополнительно фильтруйте их по некоторой логике, определенной динамически (например, «где поле
name
имеет значениеJohn
«). - … сделайте это как можно быстрее
- … в рамках сделки
Одним из способов, который я попробовал, было использование getAll()
метода и применение фильтрации на моей стороне:
cache.getAll(keys).values().stream()
.filter(... filter logic...)
.collect(toList());
Это работает, но если дополнительный фильтр обладает высокой избирательностью (т. е. он отбрасывает много данных), мы потратим много времени на отправку ненужных данных по сети.
Другой вариант-использовать сканирование:
cache.query(new ScanQuery<>(new IsKeyIn(keys).and(new CustomFilter())))
Это заставляет всю фильтрацию работать на стороне серверных узлов, но это полное сканирование, и если в кэше много записей, в то время как ключи ввода составляют лишь небольшую его часть, снова тратится много времени, на этот раз на ненужное сканирование.
И есть invokeAll()
то, что позволяет фильтровать на стороне серверных узлов:
cache.invokeAll(new TreeSet<>(keys), new AdditionalFilter())
.values().stream()
.map(EntryProcessorResult::get)
.collect(toList());
где
private static class AdditionalFilter implements CacheEntryProcessor<String, Record, Record> {
@Override
public Record process(MutableEntry<String, Record> entry,
Object... arguments) throws EntryProcessorException {
if (... record matches the filter ...) {
return entry.getValue();
}
return null;
}
}
Он находит записи по их ключам, выполняет логику фильтрации на стороне серверных узлов, но для моих данных это даже медленнее, чем решение для сканирования. Я полагаю (но не уверен), что это связано с invokeAll()
возможной операцией обновления, поэтому (в соответствии с его Javadoc) он блокирует соответствующие ключи.
Я хотел бы иметь возможность находить записи по заданным ключам, применять дополнительную фильтрацию на стороне серверных узлов и не платить за дополнительные блокировки (как в моем случае это операция только для чтения).
Возможно ли это?
Мой кэш распределен между 3 узлами сервера, и его атомарность такова TRANSACTIONAL_SNAPSHOT
. Считывание выполняется в рамках транзакции.
Ответ №1:
- SQL — это самое простое решение и, возможно, самое быстрое при наличии надлежащих индексов.
IgniteCompute#broadcast
IgniteCache#localPeek
:
Collection<Key> keys = ...;
Collection<Collection<Value>> results = compute.broadcast(new LocalGetter(), keys);
...
class LocalGetter implements IgniteClosure<Collection<Key>, Collection<Value>>
{
@Override public Collection<Value> apply(Collection<Key> keys) {
IgniteCache<Key, Value> cache = ...;
Collection<Value> res = new ArrayList<>(keys.size());
for (Key key : keys) {
Value val = cache.localPeek(key, CachePeekMode.PRIMARY);
if (val != null amp;amp; filterMatches(val)) {
res.add(val);
}
}
return res;
}
}
Таким образом, мы эффективно извлекаем записи кэша по ключу, затем применяем фильтр локально и отправляем только соответствующие записи обратно по сети. Существует только N сетевых вызовов, где N — количество узлов сервера.
Комментарии:
1. Спасибо тебе, Павел! Будет ли этот сценарий широковещания localPeek учитывать транзакции? Я имею в виду, что мне нужно запустить транзакцию, затем выполнить несколько раундов этой операции «идентификаторы->получить->>фильтр» внутри той же транзакции и убедиться, что она изолирована от внешних изменений (или, по крайней мере, она завершится неудачей, если кто-то внесет изменения извне).
2. @RomanPuchkovskiy извините, что я пропустил часть о требованиях к транзакциям. К сожалению, ни
Compute
те, ниlocalPeek
другие не участвуют в сделках.