#python #python-3.x #multithreading
#python #python-3.x #многопоточность
Вопрос:
В настоящее время я пишу приложение на python, которое одновременно загружает несколько файлов с использованием многопоточности. Я не совсем разбираюсь в многопоточности в Python, поэтому я не уверен, сколько или какую информацию я должен предоставить.
Когда загрузчик почти заканчивает загрузку файла, он выходит из строя, и я получаю это сообщение об ошибке.
fatal Python error: _enter_buffered_busy: could not aquire lock for <_io.BufferedWriter name='<stdout>'> at interpreter shutdown, possibly due to daemon threads
Python runtime state: finalizing (tstate=000001EF6024D440)
Current thread 0x00002cf8 (most recent call first):
<no Python frame>
Я тестировал это довольно много раз и заметил следующее:
- Эта ошибка появляется не всегда. В большинстве случаев у меня нет проблем с приложением
- Я попробовал это и получил ошибки в
- Linux Ubuntu 18.04 (ноутбук) под управлением Python3.6
- Linux Ubuntu 20.04 (рабочий стол) под управлением Python3.6
- Windows 10 (рабочий стол)
- Контейнер Docker под управлением Ubuntu / python3.6-базовый образ slim-buster
Мне жаль, что мне приходится сбрасывать весь этот код на SO; Я действительно понятия не имею, что означает эта ошибка, и не знаю, с чего начать устранение неполадок. Наш репозиторий находится здесь
Вот код, который обрабатывает многопоточность:
import datetime
import json
import multiprocessing
import os
from pathlib import Path
from queue import Queue
from threading import Thread
from typing import List, Dict
try:
from dozent.downloader_tools import DownloaderTools
from dozent.progress_tracker import ProgressTracker
except ModuleNotFoundError:
from downloader_tools import DownloaderTools
from progress_tracker import ProgressTracker
CURRENT_FILE_PATH = Path(__file__)
DEFAULT_DATA_DIRECTORY = CURRENT_FILE_PATH.parent.parent / 'data'
TWITTER_ARCHIVE_STREAM_LINKS_PATH = CURRENT_FILE_PATH.parent / 'twitter-archive-stream-links.json'
FIRST_DAY_OF_SUPPORT = datetime.date(2017, 6, 1)
LAST_DAY_OF_SUPPORT = datetime.date(2020, 6, 30)
class _DownloadWorker(Thread): # skip_tests
def __init__(self, queue: Queue, download_dir: Path, tracker: ProgressTracker = None):
Thread.__init__(self)
self.queue = queue
self.download_dir = download_dir
self.tracker = tracker
def run(self):
while True:
# Get the work from the queue and expand the tuple
link = self.queue.get()
try:
DownloaderTools.download_with_pysmartdl(link, str(self.download_dir), tracker=self.tracker)
finally:
self.queue.task_done()
#
# Skipping non-multithreading code
#
def download_timeframe(self, start_date: datetime.date, end_date: datetime.date, verbose: bool = True,
download_dir: Path = DEFAULT_DATA_DIRECTORY):
"""
Download all tweet archives from self.start_date to self.end_date
:return: None
"""
# Create a queue to communicate with the worker threads
queue = Queue()
if verbose:
tracker = ProgressTracker()
tracker.daemon = True
tracker.start()
else:
tracker = None
os.makedirs(download_dir, exist_ok=True)
for x in range(multiprocessing.cpu_count()):
worker = _DownloadWorker(queue, download_dir, tracker=tracker)
# worker.set_verbosity(verbose=verbosity)
# Setting daemon to True will let the main thread exit even though the workers are blocking
worker.daemon = True
worker.start()
for sample_date in self.get_links_for_days(start_date=start_date, end_date=end_date):
print(f"Queueing tweets download for {sample_date['month']}-{sample_date['year']}")
queue.put(sample_date['link'])
queue.join()
tracker.join()
import threading
import sys
import traceback
import os
import signal
"""
usage: install()
Once installed, all exceptions caught from within threads will cause the program to end.
"""
def sendKillSignal(etype, value, tb):
lines = traceback.format_exception(etype, value, tb)
for line in lines:
print(line, flush=True)
os.kill(os.getpid(), signal.SIGTERM)
original_init = threading.Thread.__init__
def patched_init(self, *args, **kwargs):
original_init(self, *args, **kwargs)
original_run = self.run
def patched_run(*args, **kw):
try:
original_run(*args, **kw)
except Exception:
sys.excepthook(*sys.exc_info())
self.run = patched_run
def install():
sys.excepthook = sendKillSignal
threading.Thread.__init__ = patched_init
import sys
import os
import numpy as np
import threading
import shutil
import queue
from typing import Dict, Callable, Tuple
from collections import namedtuple
Task = namedtuple('Task', 'id progress_callback progress prefix suffix line_length')
class ProgressTracker(threading.Thread):
"""It is assumed that no other printing to stdout will occur while running this ProgressTracker thread.
"""
def __init__(self):
threading.Thread.__init__(self)
self.tasks: Dict[int, Task] = {}
self.message_queue = queue.Queue()
self.sum_progress = 0
self.next_task = 0
self.terminal_size = None
resized = self.refresh_terminal_size()
assert(resized) # This won't work if we couldn't read the terminal size
self.lock = threading.Lock()
self.tasks_pending = False
self.running = True
def refresh_terminal_size(self) -> bool:
"""
Records any change in the terminal size.
:returns: True if a change is detected, else False
"""
old_size = self.terminal_size
try:
self.terminal_size = os.get_terminal_size()[0]
except OSError:
self.terminal_size = shutil.get_terminal_size()[0]
return old_size != self.terminal_size
def join(self):
self.message_queue.join()
self.stop()
def stop(self):
self.running = False
self.update(0)
def update(self, task_id: int):
self.message_queue.put(task_id)
def register_task(self, name: str, progress_callback: Callable[[], Tuple[float, str, str]]) -> int:
"""Register a new task with the given progress callback.
:param name: A unique name for this task.
:param progress_callback: A function, which will report a tuple of:
a progress percentage in [0,100],
a string prefix to be output before the progress bar,
and a string suffix to be output after the progress bar.
:return: The ID of the registered task.
"""
task_id = self.next_task
self.next_task = 1
# Perform operation in helper function on own thread, so that this function does not block
thread = threading.Thread(target=self._register_task_helper, args=(name, task_id, progress_callback))
thread.daemon = True
thread.start()
return task_id
def _register_task_helper(self, name: str, task_id: int, progress_callback: Callable[[], Tuple[float, str]]):
with self.lock:
# Create the task, with initial progress 0
self.tasks[task_id] = (Task(id=task_id,
progress_callback=progress_callback,
progress=0,
prefix=None,
suffix=None,
line_length=0))
self.tasks_pending = True
self._delete_progress_bars()
print(f"Queueing tweet download {task_id}: {name}", flush=True)
def _delete_progress_bars(self, starting_id: int = 0):
if self.tasks:
sys.stdout.write('b' * (np.sum([self.tasks[i].line_length for i in range(starting_id, len(self.tasks))])))
def _update_task_progress(self, task_id: int, draw: bool = True):
"""Update the progress bar for the given task
:param task_id: Identifier of the task to update.
:param draw: Whether to draw the updated progress bar, defaults to True
"""
# Update the progress value
task = self.tasks[task_id]
assert(task.id == task_id)
progress, prefix, suffix = task.progress_callback()
assert(0 <= progress <= 100)
self.sum_progress = progress - task.progress
task = Task(id=task_id,
progress_callback=task.progress_callback,
progress=progress,
prefix=prefix,
suffix=suffix,
line_length=task.line_length)
self.tasks[task_id] = task
if draw:
# Redraw all bars if the terminal size changes
if self.refresh_terminal_size():
task_id = 0
# Return to start of the progress bar for task_id
self._delete_progress_bars(starting_id=task_id)
self._draw_task_progress(task_id)
def _draw_task_progress(self, task_id: int):
"""Updates the progress bar for task_id and after.
Assumes the console begins at the end of any previously drawn progress bars.
"""
# We re-write the progress for all tasks, in order to finish at the end of all progress bars (our assumption)
for task in [self.tasks[i] for i in range(task_id, len(self.tasks))]:
if task.prefix is None or task.suffix is None:
continue
prefix = f"{task.id:3.0f}: "
bar = ' '*len(prefix)
bar_width = self.terminal_size - len(prefix) - 2
assert(bar_width > 0)
num_dashes = int(task.progress * bar_width / 100.0)
bar = f"[{'-' * num_dashes}{' ' * (bar_width - num_dashes)}]"
prefix = task.prefix[:int(self.terminal_size/2)]
suffix = task.suffix[:int(self.terminal_size/3)]
spaces = ' ' * (self.terminal_size - len(suffix) - len(prefix))
line = f"{prefix}{spaces}{suffix}{bar}"
assert(self.tasks[task.id] == task)
self.tasks[task.id] = Task(id=task.id,
progress_callback=task.progress_callback,
progress=task.progress,
prefix=task.prefix,
suffix=task.suffix,
line_length=len(line))
sys.stdout.write(line)
sys.stdout.flush()
def _run_task(self):
task_id = self.message_queue.get(True)
try:
with self.lock:
if self.tasks_pending:
self._update_task_progress(0, draw=False)
self.tasks_pending = False
else:
self._update_task_progress(task_id, draw=True)
finally:
self.message_queue.task_done()
def run(self):
while self.running:
self._run_task()
self._delete_progress_bars()
print('')
Опять же, я не специалист по многопоточности в Python и не уверен, с чего начать устранение неполадок. Я был бы очень признателен за любые подсказки!
Редактировать:
Приведенные мной примеры не воспроизводимы
Как воспроизвести
Извлеките изображение Docker:
docker pull socialmediapublicanalysis/dozent:latest
Загрузите все твиты за день (12 мая 2020 г.):
docker run -it socialmediapublicanalysis/dozent:latest python -m dozent -s 2020-05-12 -e 2020-05-12
Ответ №1:
В вашем коде многое происходит, и это определенно не минимальный, воспроизводимый пример…
Тем не менее, и в соответствии с сообщением об ошибке, похоже, что вы используете потоки демонов, и они, как известно, вызывают проблемы, если они пытаются печатать (to stdout
) при завершении работы (вкратце: вполне вероятно, что вы пытаетесь выполнить запись в более не существующий stdout
поток демона).
Вы должны убедиться, что .join()
все ваши потоки (и, если ожидается, что они будут длительными, снабдите их методом завершения работы, который переключает флаг, который по очереди проверяется любым циклом, в который вы входите; или отправьте ядовитую таблетку через рабочую очередь).
Лучшим подходом для многопоточных загрузок было бы использовать ThreadPoolExecutor
, как в этом примере, из документов.
Если вам нужны отчеты о ходе выполнения, вас может заинтересовать tqdm
. В конце концов, ваш код будет намного проще и проще в обслуживании.
Вот полный пример, который загружает ряд (818, на момент написания статьи) URL-адресов из Википедии, используя ThreadPoolExecutor и показывая прогресс по мере его выполнения:
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
tld = 'https://en.wikipedia.org'
url = '/wiki/Index_of_computing_articles'
soup = BeautifulSoup(requests.get(urljoin(tld, url)).content)
urllist = [urljoin(tld, a.get('href')) for a in soup.find_all(href=True)]
def load_url(url):
return requests.get(url).content
def load_all(urllist):
with ThreadPoolExecutor() as executor:
results = list(tqdm(
executor.map(load_url, urllist),
total=len(urllist), unit=' pages'))
return results
results = load_all(urllist)