Почему потребители redis stream должны указывать отдельное имя в одной и той же группе потребителей?

#python #redis #python-asyncio #aioredis

Вопрос:

Приведенные ниже коды показывают, как я тестирую функции потока Redis.

И я обнаружил, что разные процессы с одним и тем же именем потребителя конкурируют за использование сообщений в одном потоке. В моем понимании, если эта производительность нормальная, Redis не должен разрабатывать функцию для указания имени потребителя.

Есть ли какие-то проблемы с моим пониманием? Или я использую неправильный метод?

 import asyncio
import aioredis

# consumer with name "a", subscribing two streams
async def consume_a():
    try:
        while True:
            redis = aioredis.from_url("redis://localhost:36379")
            res = await redis.xreadgroup(
                "test_consumer_group",
                "consumer_a",
                streams={"test_stream": ">", "test_stream_1": ">"},
                count=1,
                block=10000,
                noack=True,
            )
            print(res)
    finally:
        await redis.close()
 

потребитель с именем «b», подписавшийся на два потока

 async def consume_b():
    try:
        while True:
            redis = aioredis.from_url("redis://localhost:36379")
            res = await redis.xreadgroup(
                "test_consumer_group",
                "consumer_b",
                streams={"test_stream": ">", "test_stream_1": ">"},
                count=1,
                block=10000,
                noack=True,
            )
            print(res)
    finally:
        await redis.close()
 

создайте группу перед запуском скрипта

 async def config_group_0():
    try:
        redis = aioredis.from_url("redis://localhost:36379")
        res = await redis.xgroup_create("test_stream", "test_consumer_group")
        print(res)
    finally:
        await redis.close()

async def config_group_1():
    try:
        redis = aioredis.from_url("redis://localhost:36379")
        res = await redis.xgroup_create("test_stream_1", "test_consumer_group")
        print(res)
    finally:
        await redis.close()
 

производители

 async def produce_0():
    try:
        redis = aioredis.from_url("redis://localhost:36379")
        i = 0
        while i < 100:
            res = await redis.xadd(
                "test_stream",
                {"domain_name": "test_domain_name_0", "sid": 0},
                maxlen=5,
            )
            print(res)
            i  = 1
    finally:
        await redis.close()

async def produce_1():
    try:
        redis = aioredis.from_url("redis://localhost:36379")
        i = 0
        while i < 100:
            res = await redis.xadd(
                "test_stream_1",
                {"domain_name": "test_domain_name_1", "sid": 1},
                maxlen=2,
            )
            print(res)
            i  = 1
    finally:
        await redis.close()
 

тестовый код

 if __name__ == "__main__":
    # two coroutines consume messages from two streams with the same consumer name
    asyncio.run(asyncio.gather(consume_a(), consume_a(), produce_0(), produce_1()))
 

Ответ №1:

На основе документа Redis:

Одна из гарантий групп потребителей заключается в том, что данный потребитель может видеть только историю сообщений, которые были ему доставлены, поэтому у сообщения есть только один владелец.

Прочитайте этот документ для получения дополнительной информации:

Комментарии:

1. Итак, нормально ли, чтобы несколько процессов использовали одно и то же имя потребителя? Или лучше, чтобы каждый процесс имел уникальное имя?