Как исправить «фатальную ошибку Python: _enter_buffered_busy: не удалось получить блокировку для <_io.BufferedWriter name='’> при завершении работы интерпретатора» ошибка?

#python #python-3.x #multithreading

#python #python-3.x #многопоточность

Вопрос:

В настоящее время я пишу приложение на python, которое одновременно загружает несколько файлов с использованием многопоточности. Я не совсем разбираюсь в многопоточности в Python, поэтому я не уверен, сколько или какую информацию я должен предоставить.

Когда загрузчик почти заканчивает загрузку файла, он выходит из строя, и я получаю это сообщение об ошибке.

 fatal Python error: _enter_buffered_busy: could not aquire lock for <_io.BufferedWriter name='<stdout>'> at interpreter shutdown, possibly due to daemon threads
Python runtime state: finalizing (tstate=000001EF6024D440)

Current thread 0x00002cf8 (most recent call first):
<no Python frame>
 

Я тестировал это довольно много раз и заметил следующее:

  • Эта ошибка появляется не всегда. В большинстве случаев у меня нет проблем с приложением
  • Я попробовал это и получил ошибки в
    • Linux Ubuntu 18.04 (ноутбук) под управлением Python3.6
    • Linux Ubuntu 20.04 (рабочий стол) под управлением Python3.6
    • Windows 10 (рабочий стол)
    • Контейнер Docker под управлением Ubuntu / python3.6-базовый образ slim-buster

Мне жаль, что мне приходится сбрасывать весь этот код на SO; Я действительно понятия не имею, что означает эта ошибка, и не знаю, с чего начать устранение неполадок. Наш репозиторий находится здесь

Вот код, который обрабатывает многопоточность:

dozent.py

 import datetime
import json
import multiprocessing
import os
from pathlib import Path
from queue import Queue
from threading import Thread
from typing import List, Dict

try:
    from dozent.downloader_tools import DownloaderTools
    from dozent.progress_tracker import ProgressTracker
except ModuleNotFoundError:
    from downloader_tools import DownloaderTools
    from progress_tracker import ProgressTracker

CURRENT_FILE_PATH = Path(__file__)
DEFAULT_DATA_DIRECTORY = CURRENT_FILE_PATH.parent.parent / 'data'
TWITTER_ARCHIVE_STREAM_LINKS_PATH = CURRENT_FILE_PATH.parent / 'twitter-archive-stream-links.json'

FIRST_DAY_OF_SUPPORT = datetime.date(2017, 6, 1)
LAST_DAY_OF_SUPPORT = datetime.date(2020, 6, 30)

class _DownloadWorker(Thread):  # skip_tests

    def __init__(self, queue: Queue, download_dir: Path, tracker: ProgressTracker = None):
        Thread.__init__(self)
        self.queue = queue
        self.download_dir = download_dir
        self.tracker = tracker

    def run(self):
        while True:
            # Get the work from the queue and expand the tuple
            link = self.queue.get()
            try:
                DownloaderTools.download_with_pysmartdl(link, str(self.download_dir), tracker=self.tracker)
            finally:
                self.queue.task_done()

#
# Skipping non-multithreading code
#

    def download_timeframe(self, start_date: datetime.date, end_date: datetime.date, verbose: bool = True,
                           download_dir: Path = DEFAULT_DATA_DIRECTORY):
        """
        Download all tweet archives from self.start_date to self.end_date
        :return: None
        """

        # Create a queue to communicate with the worker threads
        queue = Queue()
        if verbose:
            tracker = ProgressTracker()
            tracker.daemon = True
            tracker.start()
        else:
            tracker = None

        os.makedirs(download_dir, exist_ok=True)

        for x in range(multiprocessing.cpu_count()):
            worker = _DownloadWorker(queue, download_dir, tracker=tracker)
            # worker.set_verbosity(verbose=verbosity)
            # Setting daemon to True will let the main thread exit even though the workers are blocking
            worker.daemon = True
            worker.start()

        for sample_date in self.get_links_for_days(start_date=start_date, end_date=end_date):
            print(f"Queueing tweets download for {sample_date['month']}-{sample_date['year']}")
            queue.put(sample_date['link'])

        queue.join()
        tracker.join()
 

catch_thread_execptions.py

 import threading
import sys
import traceback
import os
import signal

"""
usage: install()
Once installed, all exceptions caught from within threads will cause the program to end.
"""


def sendKillSignal(etype, value, tb):
    lines = traceback.format_exception(etype, value, tb)
    for line in lines:
        print(line, flush=True)

    os.kill(os.getpid(), signal.SIGTERM)


original_init = threading.Thread.__init__


def patched_init(self, *args, **kwargs):
    original_init(self, *args, **kwargs)
    original_run = self.run

    def patched_run(*args, **kw):
        try:
            original_run(*args, **kw)
        except Exception:
            sys.excepthook(*sys.exc_info())
    self.run = patched_run


def install():
    sys.excepthook = sendKillSignal
    threading.Thread.__init__ = patched_init
 

progress_tracker.py

  
import sys
import os

import numpy as np
import threading
import shutil
import queue

from typing import Dict, Callable, Tuple
from collections import namedtuple

Task = namedtuple('Task', 'id progress_callback progress prefix suffix line_length')


class ProgressTracker(threading.Thread):
    """It is assumed that no other printing to stdout will occur while running this ProgressTracker thread.
    """

    def __init__(self):
        threading.Thread.__init__(self)

        self.tasks: Dict[int, Task] = {}
        self.message_queue = queue.Queue()
        self.sum_progress = 0
        self.next_task = 0

        self.terminal_size = None

        resized = self.refresh_terminal_size()
        assert(resized)  # This won't work if we couldn't read the terminal size

        self.lock = threading.Lock()
        self.tasks_pending = False
        self.running = True

    def refresh_terminal_size(self) -> bool:
        """
        Records any change in the terminal size.
        :returns: True if a change is detected, else False
        """

        old_size = self.terminal_size
        try:
            self.terminal_size = os.get_terminal_size()[0]
        except OSError:
            self.terminal_size = shutil.get_terminal_size()[0]

        return old_size != self.terminal_size

    def join(self):
        self.message_queue.join()
        self.stop()

    def stop(self):
        self.running = False
        self.update(0)

    def update(self, task_id: int):
        self.message_queue.put(task_id)

    def register_task(self, name: str, progress_callback: Callable[[], Tuple[float, str, str]]) -> int:
        """Register a new task with the given progress callback.
        :param name: A unique name for this task.
        :param progress_callback: A function, which will report a tuple of:
                                    a progress percentage in [0,100],
                                    a string prefix to be output before the progress bar,
                                    and a string suffix to be output after the progress bar.
        :return: The ID of the registered task.
        """

        task_id = self.next_task
        self.next_task  = 1

        # Perform operation in helper function on own thread, so that this function does not block
        thread = threading.Thread(target=self._register_task_helper, args=(name, task_id, progress_callback))
        thread.daemon = True
        thread.start()

        return task_id

    def _register_task_helper(self, name: str, task_id: int, progress_callback: Callable[[], Tuple[float, str]]):
        with self.lock:
            # Create the task, with initial progress 0
            self.tasks[task_id] = (Task(id=task_id,
                                        progress_callback=progress_callback,
                                        progress=0,
                                        prefix=None,
                                        suffix=None,
                                        line_length=0))
            self.tasks_pending = True
            self._delete_progress_bars()
            print(f"Queueing tweet download {task_id}: {name}", flush=True)

    def _delete_progress_bars(self, starting_id: int = 0):
        if self.tasks:
            sys.stdout.write('b' * (np.sum([self.tasks[i].line_length for i in range(starting_id, len(self.tasks))])))

    def _update_task_progress(self, task_id: int, draw: bool = True):
        """Update the progress bar for the given task
        :param task_id: Identifier of the task to update.
        :param draw: Whether to draw the updated progress bar, defaults to True
        """

        # Update the progress value
        task = self.tasks[task_id]
        assert(task.id == task_id)

        progress, prefix, suffix = task.progress_callback()
        assert(0 <= progress <= 100)

        self.sum_progress  = progress - task.progress
        task = Task(id=task_id,
                    progress_callback=task.progress_callback,
                    progress=progress,
                    prefix=prefix,
                    suffix=suffix,
                    line_length=task.line_length)
        self.tasks[task_id] = task

        if draw:
            # Redraw all bars if the terminal size changes
            if self.refresh_terminal_size():
                task_id = 0

            # Return to start of the progress bar for task_id
            self._delete_progress_bars(starting_id=task_id)
            self._draw_task_progress(task_id)

    def _draw_task_progress(self, task_id: int):
        """Updates the progress bar for task_id and after.
        Assumes the console begins at the end of any previously drawn progress bars.
        """

        # We re-write the progress for all tasks, in order to finish at the end of all progress bars (our assumption)
        for task in [self.tasks[i] for i in range(task_id, len(self.tasks))]:
            if task.prefix is None or task.suffix is None:
                continue

            prefix = f"{task.id:3.0f}: "

            bar = ' '*len(prefix)
            bar_width = self.terminal_size - len(prefix) - 2
            assert(bar_width > 0)

            num_dashes = int(task.progress * bar_width / 100.0)
            bar  = f"[{'-' * num_dashes}{' ' * (bar_width - num_dashes)}]"

            prefix  = task.prefix[:int(self.terminal_size/2)]
            suffix = task.suffix[:int(self.terminal_size/3)]
            spaces = ' ' * (self.terminal_size - len(suffix) - len(prefix))

            line = f"{prefix}{spaces}{suffix}{bar}"

            assert(self.tasks[task.id] == task)
            self.tasks[task.id] = Task(id=task.id,
                                       progress_callback=task.progress_callback,
                                       progress=task.progress,
                                       prefix=task.prefix,
                                       suffix=task.suffix,
                                       line_length=len(line))

            sys.stdout.write(line)

        sys.stdout.flush()

    def _run_task(self):
        task_id = self.message_queue.get(True)

        try:
            with self.lock:
                if self.tasks_pending:
                    self._update_task_progress(0, draw=False)
                    self.tasks_pending = False

                else:
                    self._update_task_progress(task_id, draw=True)

        finally:
            self.message_queue.task_done()

    def run(self):
        while self.running:
            self._run_task()

        self._delete_progress_bars()
        print('')
 

Опять же, я не специалист по многопоточности в Python и не уверен, с чего начать устранение неполадок. Я был бы очень признателен за любые подсказки!

Редактировать:

Приведенные мной примеры не воспроизводимы

Как воспроизвести

Извлеките изображение Docker:

 docker pull socialmediapublicanalysis/dozent:latest
 

Загрузите все твиты за день (12 мая 2020 г.):

 docker run -it socialmediapublicanalysis/dozent:latest python -m dozent -s 2020-05-12 -e 2020-05-12
 

Ответ №1:

В вашем коде многое происходит, и это определенно не минимальный, воспроизводимый пример…

Тем не менее, и в соответствии с сообщением об ошибке, похоже, что вы используете потоки демонов, и они, как известно, вызывают проблемы, если они пытаются печатать (to stdout ) при завершении работы (вкратце: вполне вероятно, что вы пытаетесь выполнить запись в более не существующий stdout поток демона).

Вы должны убедиться, что .join() все ваши потоки (и, если ожидается, что они будут длительными, снабдите их методом завершения работы, который переключает флаг, который по очереди проверяется любым циклом, в который вы входите; или отправьте ядовитую таблетку через рабочую очередь).

Лучшим подходом для многопоточных загрузок было бы использовать ThreadPoolExecutor , как в этом примере, из документов.

Если вам нужны отчеты о ходе выполнения, вас может заинтересовать tqdm . В конце концов, ваш код будет намного проще и проще в обслуживании.

Вот полный пример, который загружает ряд (818, на момент написания статьи) URL-адресов из Википедии, используя ThreadPoolExecutor и показывая прогресс по мере его выполнения:

 import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm


tld = 'https://en.wikipedia.org'
url = '/wiki/Index_of_computing_articles'

soup = BeautifulSoup(requests.get(urljoin(tld, url)).content)
urllist = [urljoin(tld, a.get('href')) for a in soup.find_all(href=True)]

def load_url(url):
    return requests.get(url).content

def load_all(urllist):
    with ThreadPoolExecutor() as executor:
        results = list(tqdm(
            executor.map(load_url, urllist),
            total=len(urllist), unit=' pages'))
    return results

results = load_all(urllist)
 

Во время выполнения отображается индикатор выполнения:
введите описание изображения здесь