#python #queue
#python #очередь
Вопрос:
У меня есть очередь Python, в которую я добавляю элементы в поток следующим образом
import queue
import threading
_myQueue = queue.Queue()
threading.Thread(target=worker, daemon=True).start()
def worker():
while True:
item = _myQueue.get()
# Do something with this item
_myQueue.task_done()
def add_item(item):
_myQueue.put(item)
Мой вопрос в том, могу ли я редактировать или удалять элементы из этой очереди, пока мой рабочий поток выполняет что-то между _myQueue.get() и _myQueue.task_done():
item = _myQueue.get()
# Do something with this item
_myQueue.task_done()
Например, если элемент «отменен» и больше не нуждается в обработке.
Комментарии:
1. Я думаю, что для каждого вопроса нужен ответ, но я думаю, что это интересно. Вы, конечно, можете безопасно выполнить a
_myQueue.get()
в любом фрагменте кода, что отменит это действие. Но вы получите только следующий элемент в списке. Вы хотите выбрать, какой элемент отменить? Это более сложный вопрос.2. Я не хочу отменять текущий обрабатываемый элемент. Я хочу проверить очередь на наличие определенного элемента и отменить его (удалить его из очереди) Возможно ли это?
Ответ №1:
Если у вас нет полного контроля над определением класса элементов, поставленных в очередь, вы можете использовать collections.OrderedDict
вместо queue.Queue
для эффективной реализации очереди с произвольным доступом:
from collections import OrderedDict
_myQueue = OrderedDict()
def worker():
while True:
item, _ = _myQueue.popitem(last=False)
# Do something with this item
def add_item(item):
_myQueue[item] = 1
def delete_item(item):
del _myQueue[item]
Если объекты item не являются хешируемыми, вам просто нужно будет идентифицировать их по определенным ключам, таким как инкрементное число.
Комментарии:
1. Интересно, но в документах не упоминается
last
параметр, и он не работает на моем 3.8.5.popitem
это LIFO, а не то, что вы хотите в очереди задач.2. @tdelaney Вы уверены, что читаете документацию
collections.OrderedDict
, а неdict
? Пожалуйста, обратитесь к: docs.python.org/3.8/library /…3. @tdelaney Ой, я виноват. Я случайно создал экземпляр
_myQueue
как dict в приведенном выше примере кода. Исправлено, чтобы теперь он сталOrderedDict
объектом, который поддерживаетlast
параметр.
Ответ №2:
Если у вас есть полный контроль над определением класса элементов, поставленных в очередь, вы можете добавить флаг в качестве атрибута экземпляра, чтобы указать, отменен ли элемент, чтобы рабочий мог пропустить отмененный элемент, когда он будет удален из очереди:
class Item:
def __init__(self):
self.cancelled = False
def cancel():
self.cancelled = True
def worker():
while True:
item = _myQueue.get()
if item.cancelled:
continue
# Do something with this item
_myQueue.task_done()
# call item.cancel() to mark an item as cancelled
Комментарии:
1. Как мне на самом деле получить доступ к элементу из очереди, чтобы пометить его как отмененный?
2. Ну, как вы определяете, какой элемент отменить в первую очередь? У вас должен быть отдельный объект контейнера (например, list, dict или set), который отслеживает все элементы, верно? Поэтому, как только вы решите отменить элемент, который вы получаете из указанного контейнера, вы вызываете
cancel
метод элемента, чтобы отменить его.3. Потенциальная проблема заключается в том, что, поскольку операции set-cancel и test-cancel не являются атомарными, установщик не может быть уверен, что операция была отменена. Добавьте к этому, что нет способа узнать, что задача выполняется. Это нормально, если дизайн является наилучшим, но может потребоваться добавить дополнительные элементы управления.
4. Существует еще одна функция, которая активируется при обратном вызове базы данных, поэтому для этого мне нужно будет получить доступ к _myQueue, найти конкретный элемент и установить cancelled=True
5. Итак, как я могу получить доступ и «отредактировать» элемент в очереди? из другой функции?