Очереди Redis — Как возобновить

#redis #queue

Вопрос:

Давайте предположим, что у нас есть список задач, которые должны быть выполнены, и некоторые работники, которые загружают элементы из этого списка. Если работник неожиданно выходит из строя до завершения выполнения задачи, то эта задача теряется. Какой механизм мог бы предотвратить это, чтобы мы могли повторно обрабатывать заброшенные задачи?

Комментарии:

1. Вы используете потоки Redis?

2. Нет, это не так. Использование списков.

Ответ №1:

Вам нужно использовать ZSET для решения этой проблемы

Операция Pop

  • Добавить в ZSET со временем истечения срока действия
  • Удалить из списка

Операция Ack

  • Удалить из ZSET

Работник

Вам нужно запустить запланированного работника, который будет перемещать элементы из ZSET в список, если они истекли

Прочитайте это подробно, как я это сделал в Rqueue https://medium.com/@sonus21/introducing-rqueue-redis-queue-d344f5c36e1b

Код на Github: https://github.com/sonus21/rqueue

Ответ №2:

Нет EXPIRE элементов для набора или zset, и нет атомной операции, чтобы выскочить из zset и нажать на список. Поэтому я закончил писать этот lua-скрипт, который работает атомарно.

Сначала я добавляю задачу в executing-tasks zset с отметкой времени ( (new Date()).valueOf() в javascript).:

ZADD 1619028226766 executing-tasks

Затем я запускаю сценарий:

EVAL [THE SCRIPT] 2 executing-tasks tasks 1619028196766

Если задача старше 30 секунд, она будет отправлена в tasks список. Если нет, он будет отправлен обратно в executing-tasks zset.

Вот сценарий

   local source = KEYS[1]
  local destination = KEYS[2]
  local min_score = ARGV[1]
  local popped = redis.call('zpopmin', source)
  local id = popped[1]
  local score = popped[2]
  if table.getn(popped) > 0 then
    if score < min_score then
      redis.call('rpush', destination, id)
      return { "RESTORED", id }
    else
      redis.call('zadd', source, score, id)
      return { "SENT_BACK", id }
    end
  end
  return { "NOTHING_DONE" }