#java #redis #concurrency #quarkus
Вопрос:
У меня есть приложение, которое сохраняет данные в redis, эти данные поступают через события в очереди, которые генерируют среду с высоким уровнем параллелизма. Для поддержания согласованности данных я использовал инструкцию Redis MULTI / EXEC, и в случае сбоя данные не подтверждаются. Перед открытием MULTI мне нужно запросить данные в Redis.
Я привожу пример.
Тестовая конечная точка, с помощью которой я моделирую события:
@GET
@Path("/foo")
public Response foo() {
String queue = "foo";
for (int i = 0; i < 10000; i ) {
activeMqClient.addToQueue(queue, new Foo(i, "Message " 1));
}
return OkResponse.toResponse("Foo send OK");
}
Запись событий (запуск нескольких экземпляров в потоках):
public class FooRedisCmd {
public void process(Foo foo) {
// Example
RedisClient redisClient = null;
Response multiResponse = null;
try {
redisClient = RedisClient.createClient();
//get info
redisClient.hget("foos", "foo_field1:" foo.getId());
redisClient.hget("foos", "foo_field2:" foo.getId());
redisClient.hget("foos", "foo_field3:" foo.getId());
redisClient.hget("foos", "foo_field4:" foo.getId());
redisClient.hget("foos", "foo_field5:" foo.getId());
redisClient.hget("foos", "foo_field6:" foo.getId());
redisClient.hget("foos", "foo_field7:" foo.getId());
//write info
multiResponse = redisClient.multi();
log.info("Get multi: {} for foo: {}", multiResponse, foo);
redisClient.hincrby("foos", "foo_field1:" foo.getId(), Integer.toString(1));
redisClient.hincrby("foos", "foo_field2:" foo.getId(), Integer.toString(1));
redisClient.hincrby("foos", "foo_field3:" foo.getId(), Integer.toString(1));
redisClient.hincrby("foos", "foo_field4:" foo.getId(), Integer.toString(1));
redisClient.hincrby("foos", "foo_field5:" foo.getId(), Integer.toString(1));
redisClient.hincrby("foos", "foo_field6:" foo.getId(), Integer.toString(1));
redisClient.hincrby("foos", "foo_field7:" foo.getId(), Integer.toString(1));
redisClient.hincrby("foos", "foo_field8:" foo.getId(), Integer.toString(1));
redisClient.hincrby("foos", "foo_field9:" foo.getId(), Integer.toString(1));
redisClient.hincrby("foos", "foo_field10:" foo.getId(), Integer.toString(1));
redisClient.hincrby("foos", "foo_field11:" foo.getId(), Integer.toString(1));
redisClient.hincrby("foos", "foo_field12:" foo.getId(), Integer.toString(1));
Response execResponse = redisClient.exec();
log.info("Multi result: {} for foo: {}", execResponse, foo);
} catch (Exception e) {
if (multiResponse != null) {
try {
redisClient.discard();
} catch (Exception e2) {
log.error("Error on discard for foo: {}", foo, e2);
}
}
log.error("Error on exec for foo: {}", foo, e);
} finally {
if (redisClient != null) {
redisClient.close();
}
}
}
}
Ошибки:
INFO [com.foo.FooRedisCmd] (pool-17-thread-3) Get multi: OK for foo: Foo(id=9965, message=Message 1)
INFO [com.foo.FooRedisCmd] (pool-17-thread-3) Multi result: [3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3] for foo: Foo(id=9965, message=Message 1)
INFO [com.foo.FooRedisCmd] (pool-17-thread-1) Get multi: OK for foo: Foo(id=9966, message=Message 1)
ERROR [com.foo.FooRedisCmd] (pool-17-thread-1) Error on discard for foo: Foo(id=9966, message=Message 1): java.util.concurrent.CompletionException: ERR DISCARD without MULTI
at io.smallrye.mutiny.operators.uni.UniBlockingAwait.await(UniBlockingAwait.java:73)
at io.smallrye.mutiny.groups.UniAwait.atMost(UniAwait.java:61)
at io.quarkus.redis.client.runtime.RedisClientImpl.await(RedisClientImpl.java:1026)
at io.quarkus.redis.client.runtime.RedisClientImpl.discard(RedisClientImpl.java:142)
at com.foo.FooRedisCmd.process(FooRedisCmd.java:48)
at com.foo.FooRedisCmd_Subclass.process$superforward1(FooRedisCmd_Subclass.zig:94)
at com.foo.FooRedisCmd_Subclass$function$1.apply(FooRedisCmd_Subclass$function$1.zig:33)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:54)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.proceed(InvocationInterceptor.java:62)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.monitor(InvocationInterceptor.java:51)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor_Bean.intercept(InvocationInterceptor_Bean.zig:521)
at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:41)
at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
at com.foo.FooRedisCmd_Subclass.process(FooRedisCmd_Subclass.zig:158)
at com.foo.FooRedisCmd_ClientProxy.process(FooRedisCmd_ClientProxy.zig:128)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: ERR DISCARD without MULTI
INFO [com.foo.FooRedisCmd] (pool-17-thread-2) Get multi: OK for foo: Foo(id=9967, message=Message 1)
ERROR [com.foo.FooRedisCmd] (pool-17-thread-1) Error on exec for foo: Foo(id=9966, message=Message 1): java.util.concurrent.CompletionException: ERR EXEC without MULTI
at io.smallrye.mutiny.operators.uni.UniBlockingAwait.await(UniBlockingAwait.java:73)
at io.smallrye.mutiny.groups.UniAwait.atMost(UniAwait.java:61)
at io.quarkus.redis.client.runtime.RedisClientImpl.await(RedisClientImpl.java:1026)
at io.quarkus.redis.client.runtime.RedisClientImpl.exec(RedisClientImpl.java:167)
at com.foo.FooRedisCmd.process(FooRedisCmd.java:43)
at com.foo.FooRedisCmd_Subclass.process$superforward1(FooRedisCmd_Subclass.zig:94)
at com.foo.FooRedisCmd_Subclass$function$1.apply(FooRedisCmd_Subclass$function$1.zig:33)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:54)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.proceed(InvocationInterceptor.java:62)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.monitor(InvocationInterceptor.java:51)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor_Bean.intercept(InvocationInterceptor_Bean.zig:521)
at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:41)
at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
at com.foo.FooRedisCmd_Subclass.process(FooRedisCmd_Subclass.zig:158)
at com.foo.FooRedisCmd_ClientProxy.process(FooRedisCmd_ClientProxy.zig:128)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: ERR EXEC without MULTI
INFO [com.foo.FooRedisCmd] (pool-17-thread-2) Multi result: [3, 3, 3, 3, 3, 3, 3, 3, 3, 3] for foo: Foo(id=9967, message=Message 1)
Ошибки генерируются при выполнении как exec, так и discard при сбое первого. Почему это происходит? Я использую quarkus 2 и его собственный клиент для redis quarkus-redis-client.
Ответ №1:
Поскольку вы запускали несколько экземпляров в потоках, команда multi
and exec
выполнялась поочередно.
вот так.
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set dst jjj
QUEUED
127.0.0.1:6379> multi
(error) ERR MULTI calls can not be nested
127.0.0.1:6379> exec
1) OK
127.0.0.1:6379> exec
(error) ERR EXEC without MULTI
127.0.0.1:6379> discard
(error) ERR DISCARD without MULTI
Решение заключается pipeline
в том, чтобы упаковать ваши multi
hincrby
exec
команды в одну команду. Redis выполнит их в правильном порядке.
вы можете обратиться https://redis.io/topics/pipelining для получения дополнительной информации.
Ответ №2:
Вы можете использовать конвейер, если вы используете клиент jedis для Redis, вы можете проверить это
https://www.tabnine.com/code/java/methods/redis.clients.jedis .Jedis / конвейерный