#python #data-structures #queue
#python #структуры данных #очередь
Вопрос:
Я хотел бы создать структуру данных, которая представляет собой набор очередей (в идеале хэш, карту или поиск, подобный dict), где сообщения в очередях активно удаляются после того, как они достигли определенного возраста. Значение ttl было бы глобальным; сообщения не нуждались бы в отдельных ttl и не имели бы их. Разрешение для ttl не обязательно должно быть ужасно точным — только в течение секунды или около того.
Я даже не уверен, что здесь искать. Я мог бы создать отдельную глобальную очередь, которую отслеживает фоновый поток, просматривая и извлекая указатели на сообщения из глобальной очереди, которые указывают ему удалять элементы из отдельных очередей, но поведение должно быть обоюдным. Если элемент удаляется из индивидуальной очереди, его необходимо удалить из глобальной очереди.
Я хотел бы, чтобы эта структура данных была реализована на Python, в идеале, и, как всегда, скорость имеет первостепенное значение (больше, чем использование памяти). Есть предложения, с чего начать?
Ответ №1:
Я бы начал с простого моделирования поведения, которое вы ищете, в одном классе, выраженном как можно проще. Производительность может быть достигнута позже за счет итеративной оптимизации, но только при необходимости (возможно, вам это не понадобится).
Приведенный ниже класс делает что-то примерно подобное тому, что вы описываете. Очереди — это просто списки, которые называются и хранятся в словаре. Каждое сообщение помечается временной меткой и вставляется в начало списка (FIFO). Получение сообщений осуществляется путем проверки временной метки сообщения в конце списка и выталкивания ее до тех пор, пока не будет найдено сообщение, возраст которого ниже порогового значения.
Если вы планируете получить доступ к этому из нескольких потоков, вам нужно будет добавить некоторую детализированную блокировку, чтобы выжать из этого максимальную производительность. Например, reap()
метод должен блокировать только 1 очередь за раз, а не блокировать все очереди (синхронизация на уровне метода), поэтому вам также необходимо сохранить блокировку для каждой именованной очереди.
Обновлено — Теперь используется глобальный набор сегментов (по временной метке, разрешение 1 секунда), чтобы отслеживать, в каких очередях хранятся сообщения с этого времени. Это уменьшает количество очередей, подлежащих проверке при каждом проходе.
import time
from collections import defaultdict
class QueueMap(object):
def __init__(self):
self._expire = defaultdict(lambda *n: defaultdict(int))
self._store = defaultdict(list)
self._oldest_key = int(time.time())
def get_queue(self, name):
return self._store.get(name, [])
def pop(self, name):
queue = self.get_queue(name)
if queue:
key, msg = queue.pop()
self._expire[key][name] -= 1
return msg
return None
def set(self, name, message):
key = int(time.time())
# increment count of messages in this bucket/queue
self._expire[key][name] = 1
self._store[name].insert(0, (key, message))
def reap(self, age):
now = time.time()
threshold = int(now - age)
oldest = self._oldest_key
# iterate over buckets we need to check
for key in range(oldest, threshold 1):
# for each queue with items, expire the oldest ones
for name, count in self._expire[key].iteritems():
if count <= 0:
continue
queue = self.get_queue(name)
while queue:
if queue[-1][0] > threshold:
break
queue.pop()
del self._expire[key]
# set oldest_key for next pass
self._oldest_key = threshold
Использование:
qm = QueueMap()
qm.set('one', 'message 1')
qm.set('one', 'message 2')
qm.set('two', 'message 3')
print qm.pop('one')
print qm.get_queue('one')
print qm.get_queue('two')
# call this on a background thread which sleeps
time.sleep(2)
# reap messages older than 1 second
qm.reap(1)
# queues should be empty now
print qm.get_queue('one')
print qm.get_queue('two')
Комментарии:
1. Единственная проблема, которую я вижу здесь, заключается в том, что для этого требуется перебирать все очереди (которые в моем случае могут составлять сотни тысяч (очень маленькие очереди)), даже если они пусты. Я бы предпочел найти способ сразу перейти к любым сообщениям, срок действия которых истекает.
2. @dave 100k очередей? Вау, ОК, я понимаю, к чему ты клонишь.. Я обновил его глобальным списком сегментов на основе временных меток, который отслеживает количество сообщений, вставленных в каждую очередь за данное 1-секундное окно. Это делает
reap()
операцию более эффективной.
Ответ №2:
Рассмотрите возможность проверки TTLS при каждом обращении к очередям вместо использования потока для постоянной проверки. Я не уверен, что вы имеете в виду, говоря о хэше / карте / dict (что такое ключ?), Но как насчет чего-то подобного:
import time
class EmptyException(Exception): pass
class TTLQueue(object):
TTL = 60 # seconds
def __init__(self):
self._queue = []
def push(self, msg):
self._queue.append((time.time() self.TTL, msg))
def pop(self):
self._queue = [(t, msg) for (t, msg) in self._queue if t > time.time()]
if len(self._queue) == 0:
raise EmptyException()
return self._queue.pop(0)[1]
queues = [TTLQueue(), TTLQueue(), TTLQueue()] # this could be a dict or set or
# whatever if I knew what keys
# you expected
Комментарии:
1. Единственная проблема с отложенным истечением срока действия заключается в том, что из-за природы приложения некоторые очереди могут никогда не проверяться снова. Они будут задерживаться, тратя пространство. Что касается хэша, это просто моя таблица поиска для очередей. Когда приходит клиент и запрашивает очередь X, я нахожу это в хэше и возвращаю все, что есть в очереди.