Ошибки Redis и параллелизма в Quarkus: ОШИБКА EXEC без переполнения нескольких стеков.

#java #redis #concurrency #quarkus

Вопрос:

У меня есть приложение, которое сохраняет данные в redis, эти данные поступают через события в очереди, которые генерируют среду с высоким уровнем параллелизма. Для поддержания согласованности данных я использовал инструкцию Redis MULTI / EXEC, и в случае сбоя данные не подтверждаются. Перед открытием MULTI мне нужно запросить данные в Redis.

Я привожу пример.

Тестовая конечная точка, с помощью которой я моделирую события:

 @GET
@Path("/foo")
public Response foo() {
    String queue = "foo";

    for (int i = 0; i < 10000; i  ) {
        activeMqClient.addToQueue(queue,  new Foo(i, "Message "   1));
    }

    return OkResponse.toResponse("Foo send OK");
}
 

Запись событий (запуск нескольких экземпляров в потоках):

 public class FooRedisCmd {

    public void process(Foo foo) {
        // Example
        RedisClient redisClient = null;
        Response multiResponse = null;
        try {
            redisClient = RedisClient.createClient();
            //get info
            redisClient.hget("foos", "foo_field1:"   foo.getId());
            redisClient.hget("foos", "foo_field2:"   foo.getId());
            redisClient.hget("foos", "foo_field3:"   foo.getId());
            redisClient.hget("foos", "foo_field4:"   foo.getId());
            redisClient.hget("foos", "foo_field5:"   foo.getId());
            redisClient.hget("foos", "foo_field6:"   foo.getId());
            redisClient.hget("foos", "foo_field7:"   foo.getId());

            //write info
            multiResponse = redisClient.multi();
            log.info("Get multi: {} for foo: {}", multiResponse, foo);
            redisClient.hincrby("foos", "foo_field1:"   foo.getId(), Integer.toString(1));
            redisClient.hincrby("foos", "foo_field2:"   foo.getId(), Integer.toString(1));
            redisClient.hincrby("foos", "foo_field3:"   foo.getId(), Integer.toString(1));
            redisClient.hincrby("foos", "foo_field4:"   foo.getId(), Integer.toString(1));
            redisClient.hincrby("foos", "foo_field5:"   foo.getId(), Integer.toString(1));
            redisClient.hincrby("foos", "foo_field6:"   foo.getId(), Integer.toString(1));
            redisClient.hincrby("foos", "foo_field7:"   foo.getId(), Integer.toString(1));
            redisClient.hincrby("foos", "foo_field8:"   foo.getId(), Integer.toString(1));
            redisClient.hincrby("foos", "foo_field9:"   foo.getId(), Integer.toString(1));
            redisClient.hincrby("foos", "foo_field10:"   foo.getId(), Integer.toString(1));
            redisClient.hincrby("foos", "foo_field11:"   foo.getId(), Integer.toString(1));
            redisClient.hincrby("foos", "foo_field12:"   foo.getId(), Integer.toString(1));
            Response execResponse = redisClient.exec();
            log.info("Multi result: {} for foo: {}", execResponse, foo);
        } catch (Exception e) {
            if (multiResponse != null) {
                try {
                    redisClient.discard();
                } catch (Exception e2) {
                    log.error("Error on discard for foo: {}", foo, e2);
                }
            }
            log.error("Error on exec for foo: {}", foo, e);
        } finally {
            if (redisClient != null) {
                redisClient.close();
            }
        }
    }
}
 

Ошибки:

 INFO  [com.foo.FooRedisCmd] (pool-17-thread-3) Get multi: OK for foo: Foo(id=9965, message=Message 1)
INFO  [com.foo.FooRedisCmd] (pool-17-thread-3) Multi result: [3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3] for foo: Foo(id=9965, message=Message 1)
INFO  [com.foo.FooRedisCmd] (pool-17-thread-1) Get multi: OK for foo: Foo(id=9966, message=Message 1)
ERROR [com.foo.FooRedisCmd] (pool-17-thread-1) Error on discard for foo: Foo(id=9966, message=Message 1): java.util.concurrent.CompletionException: ERR DISCARD without MULTI
        at io.smallrye.mutiny.operators.uni.UniBlockingAwait.await(UniBlockingAwait.java:73)
        at io.smallrye.mutiny.groups.UniAwait.atMost(UniAwait.java:61)
        at io.quarkus.redis.client.runtime.RedisClientImpl.await(RedisClientImpl.java:1026)
        at io.quarkus.redis.client.runtime.RedisClientImpl.discard(RedisClientImpl.java:142)
        at com.foo.FooRedisCmd.process(FooRedisCmd.java:48)
        at com.foo.FooRedisCmd_Subclass.process$superforward1(FooRedisCmd_Subclass.zig:94)
        at com.foo.FooRedisCmd_Subclass$function$1.apply(FooRedisCmd_Subclass$function$1.zig:33)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:54)
        at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.proceed(InvocationInterceptor.java:62)
        at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.monitor(InvocationInterceptor.java:51)
        at io.quarkus.arc.runtime.devconsole.InvocationInterceptor_Bean.intercept(InvocationInterceptor_Bean.zig:521)
        at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:41)
        at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
        at com.foo.FooRedisCmd_Subclass.process(FooRedisCmd_Subclass.zig:158)
        at com.foo.FooRedisCmd_ClientProxy.process(FooRedisCmd_ClientProxy.zig:128)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: ERR DISCARD without MULTI

INFO  [com.foo.FooRedisCmd] (pool-17-thread-2) Get multi: OK for foo: Foo(id=9967, message=Message 1)
ERROR [com.foo.FooRedisCmd] (pool-17-thread-1) Error on exec for foo: Foo(id=9966, message=Message 1): java.util.concurrent.CompletionException: ERR EXEC without MULTI
        at io.smallrye.mutiny.operators.uni.UniBlockingAwait.await(UniBlockingAwait.java:73)
        at io.smallrye.mutiny.groups.UniAwait.atMost(UniAwait.java:61)
        at io.quarkus.redis.client.runtime.RedisClientImpl.await(RedisClientImpl.java:1026)
        at io.quarkus.redis.client.runtime.RedisClientImpl.exec(RedisClientImpl.java:167)
        at com.foo.FooRedisCmd.process(FooRedisCmd.java:43)
        at com.foo.FooRedisCmd_Subclass.process$superforward1(FooRedisCmd_Subclass.zig:94)
        at com.foo.FooRedisCmd_Subclass$function$1.apply(FooRedisCmd_Subclass$function$1.zig:33)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:54)
        at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.proceed(InvocationInterceptor.java:62)
        at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.monitor(InvocationInterceptor.java:51)
        at io.quarkus.arc.runtime.devconsole.InvocationInterceptor_Bean.intercept(InvocationInterceptor_Bean.zig:521)
        at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:41)
        at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
        at com.foo.FooRedisCmd_Subclass.process(FooRedisCmd_Subclass.zig:158)
        at com.foo.FooRedisCmd_ClientProxy.process(FooRedisCmd_ClientProxy.zig:128)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: ERR EXEC without MULTI

INFO  [com.foo.FooRedisCmd] (pool-17-thread-2) Multi result: [3, 3, 3, 3, 3, 3, 3, 3, 3, 3] for foo: Foo(id=9967, message=Message 1)
 

Ошибки генерируются при выполнении как exec, так и discard при сбое первого. Почему это происходит? Я использую quarkus 2 и его собственный клиент для redis quarkus-redis-client.

Ответ №1:

Поскольку вы запускали несколько экземпляров в потоках, команда multi and exec выполнялась поочередно.

вот так.

 127.0.0.1:6379> multi
OK
127.0.0.1:6379> set dst jjj
QUEUED
127.0.0.1:6379> multi
(error) ERR MULTI calls can not be nested
127.0.0.1:6379> exec
1) OK
127.0.0.1:6379> exec
(error) ERR EXEC without MULTI
127.0.0.1:6379> discard
(error) ERR DISCARD without MULTI
 

Решение заключается pipeline в том, чтобы упаковать ваши multi hincrby exec команды в одну команду. Redis выполнит их в правильном порядке.
вы можете обратиться https://redis.io/topics/pipelining для получения дополнительной информации.

Ответ №2:

Вы можете использовать конвейер, если вы используете клиент jedis для Redis, вы можете проверить это

https://www.tabnine.com/code/java/methods/redis.clients.jedis .Jedis / конвейерный