#redis #queue
Вопрос:
Давайте предположим, что у нас есть список задач, которые должны быть выполнены, и некоторые работники, которые загружают элементы из этого списка. Если работник неожиданно выходит из строя до завершения выполнения задачи, то эта задача теряется. Какой механизм мог бы предотвратить это, чтобы мы могли повторно обрабатывать заброшенные задачи?
Комментарии:
1. Вы используете потоки Redis?
2. Нет, это не так. Использование списков.
Ответ №1:
Вам нужно использовать ZSET для решения этой проблемы
Операция Pop
- Добавить в ZSET со временем истечения срока действия
- Удалить из списка
Операция Ack
- Удалить из ZSET
Работник
Вам нужно запустить запланированного работника, который будет перемещать элементы из ZSET в список, если они истекли
Прочитайте это подробно, как я это сделал в Rqueue https://medium.com/@sonus21/introducing-rqueue-redis-queue-d344f5c36e1b
Код на Github: https://github.com/sonus21/rqueue
Ответ №2:
Нет EXPIRE
элементов для набора или zset, и нет атомной операции, чтобы выскочить из zset и нажать на список. Поэтому я закончил писать этот lua-скрипт, который работает атомарно.
Сначала я добавляю задачу в executing-tasks
zset с отметкой времени ( (new Date()).valueOf()
в javascript).:
ZADD 1619028226766 executing-tasks
Затем я запускаю сценарий:
EVAL [THE SCRIPT] 2 executing-tasks tasks 1619028196766
Если задача старше 30 секунд, она будет отправлена в tasks
список. Если нет, он будет отправлен обратно в executing-tasks
zset.
Вот сценарий
local source = KEYS[1]
local destination = KEYS[2]
local min_score = ARGV[1]
local popped = redis.call('zpopmin', source)
local id = popped[1]
local score = popped[2]
if table.getn(popped) > 0 then
if score < min_score then
redis.call('rpush', destination, id)
return { "RESTORED", id }
else
redis.call('zadd', source, score, id)
return { "SENT_BACK", id }
end
end
return { "NOTHING_DONE" }