#redis
#redis
Вопрос:
У меня есть интерфейс очереди, который я хочу реализовать в redis. Хитрость заключается в том, что каждый работник может претендовать на элемент в течение N секунд, после чего предполагается, что работник потерпел крах, и элемент должен быть востребован снова. Ответственность за удаление элемента по завершении лежит на работнике. Как бы вы это сделали в redis? Я использую phpredis, но это не имеет значения.
Ответ №1:
Чтобы реализовать простую очередь в redis, которую можно использовать для повторной отправки аварийных заданий, я бы попробовал что-то вроде этого:
- 1 список «up_for_grabs»
- 1 список «выполняется_работка_он»
- автоматические блокировки с истекающим сроком действия
работник, пытающийся получить задание, сделает что-то вроде этого:
timeout = 3600
#wrap this in a transaction so our cleanup wont kill the task
#Move the job away from the queue so nobody else tries to claim it
job = RPOPLPUSH(up_for_grabs, being_worked_on)
#Set a lock and expire it, the value tells us when that job will time out. This can be arbitrary though
SETEX('lock:' job, Time.now timeout, timeout)
#our application logic
do_work(job)
#Remove the finished item from the queue.
LREM being_worked_on -1 job
#Delete the item's lock. If it crashes here, the expire will take care of it
DEL('lock:' job)
И время от времени мы могли бы просто взять наш список и проверить, что все задания, которые там есть, на самом деле имеют блокировку.
Если мы найдем какие-либо задания, у которых НЕТ блокировки, это означает, что срок его действия истек, и наш рабочий, вероятно, потерпел крах.
В этом случае мы отправили бы повторно.
Это будет псевдокод для этого:
loop do
items = LRANGE(being_worked_on, 0, -1)
items.each do |job|
if !(EXISTS("lock:" job))
puts "We found a job that didn't have a lock, resubmitting"
LREM being_worked_on -1 job
LPUSH(up_for_grabs, job)
end
end
sleep 60
end
Комментарии:
1. Я бы хотел, чтобы вам не понадобился MULTI-EXEC в первом блоке.
2. Да, это был всего лишь комментарий, который остался. Но вы правы, нам, вероятно, нужен один, чтобы процесс очистки ничего не убивал
3. Я думаю, что в вашем решении есть проблема, согласно документам, LREM равен O (N), поэтому это может снизить производительность при работе с большими списками. Предпочтительнее использовать альтернативу O (1), если это возможно.
4. @MarcSeeger МУЛЬТИИСПОЛНИТЕЛЬ для переноса вызовов в RPOPLPUSH и SETEX не поможет, поскольку вам нужно значение, возвращаемое RPOPLPUSH для вызова SETEX. Начиная с Redis 2.6, это можно решить с помощью скрипта LUA.
Ответ №2:
Вы можете настроить стандартную схему синхронизированной блокировки в Redis с [SETNX][1]
помощью . По сути, вы используете SETNX
для создания блокировки, которую все пытаются получить. Чтобы снять блокировку, вы можете DEL
это сделать, и вы также можете настроить EXPIRE
, чтобы сделать блокировку снимаемой. Здесь есть и другие соображения, но ничего необычного в настройке блокировочных и критических разделов в распределенном приложении нет.
Комментарии:
1. Хм, я спросил об очереди. Речь идет о блокировке. Не уверен, как перейти от блокировки к очереди.
2. Вы должны использовать блокировку, чтобы указать, заявлен ли элемент. Если срок действия блокировки истекает, на нее снова можно претендовать. Прежде чем рабочий разблокирует ресурс, вы удаляете его, а затем снимаете блокировку. Именно так я бы сделал это в Redis.