Приложение Flask: обновляет индикатор выполнения во время выполнения функции

#python #html #flask

#python #flask #индикатор выполнения

Вопрос:

Я создаю довольно простое веб-приложение в Flask, которое выполняет функции через API веб-сайта. Мои пользователи заполняют форму с URL-адресом своей учетной записи и токеном API; когда они отправляют форму, у меня есть скрипт python, который экспортирует PDF-файлы из их учетной записи через API. Эта функция может занять много времени, поэтому я хочу отобразить индикатор выполнения начальной загрузки на странице формы, показывающий, как далеко продвинулся процесс выполнения скрипта. Мой вопрос в том, как обновить индикатор выполнения во время выполнения функции? Вот упрощенная версия того, о чем я говорю.

views.py:

 @app.route ('/export_pdf', methods = ['GET', 'POST'])
def export_pdf():
    form = ExportPDF()
    if form.validate_on_submit():
      try:
        export_pdfs.main_program(form.account_url.data,
          form.api_token.data)
        flash ('PDFs exported')
        return redirect(url_for('export_pdf'))
      except TransportException as e:
        s = e.content
        result = re.search('<error>(.*)</error>', s)
        flash('There was an authentication error: '   result.group(1))
      except FailedRequest as e:
        flash('There was an error: '   e.error)
    return render_template('export_pdf.html', title = 'Export PDFs', form = form)
 

export_pdf.html:

 {% extends "base.html" %}

{% block content %}
{% include 'flash.html' %}
<div class="well well-sm">
  <h3>Export PDFs</h3>
  <form class="navbar-form navbar-left" action="" method ="post" name="receipt">
    {{form.hidden_tag()}}
    <br>
    <div class="control-group{% if form.errors.account_url %} error{% endif %}">
      <label class"control-label" for="account_url">Enter Account URL:</label>
      <div class="controls">
        {{ form.account_url(size = 50, class = "span4")}}
        {% for error in form.errors.account_url %}
          <span class="help-inline">[{{error}}]</span><br>
        {% endfor %}
      </div>
    </div>
    <br>
    <div class="control-group{% if form.errors.api_token %} error{% endif %}">
      <label class"control-label" for="api_token">Enter API Token:</label>
      <div class="controls">
        {{ form.api_token(size = 50, class = "span4")}}
        {% for error in form.errors.api_token %}
          <span class="help-inline">[{{error}}]</span><br>
        {% endfor %}
      </div>
    </div>
    <br>
    <button type="submit" class="btn btn-primary btn-lg">Submit</button>
  <br>
  <br>
  <div class="progress progress-striped active">
  <div class="progress-bar"  role="progressbar" aria-valuenow="0" aria-valuemin="0" aria-valuemax="100" style="width: 0%">
    <span class="sr-only"></span>
  </div>
</form>
</div>
</div>
{% endblock %}
 

и export_pdfs.py:

 def main_program(url, token):
    api_caller = api.TokenClient(url, token)
    path = os.path.expanduser('~/Desktop/' url '_pdfs/')
    pdfs = list_all(api_caller.pdf.list, 'pdf')
    total = 0
    count = 1
    for pdf in pdfs:
        total = total   1
    for pdf in pdfs:
        header, body = api_caller.getPDF(pdf_id=int(pdf.pdf_id))
        with open('%s.pdf' % (pdf.number), 'wb') as f:
          f.write(body)
        count = count   1
        if count % 50 == 0:
          time.sleep(1)
 

В этой последней функции у меня есть общее количество PDF-файлов, которые я буду экспортировать, и есть постоянный подсчет во время его обработки. Как я могу отправить текущий прогресс в свой HTML-файл, чтобы он соответствовал тегу ‘style =’ индикатора выполнения? Желательно таким образом, чтобы я мог повторно использовать тот же инструмент для индикаторов выполнения на других страницах. Дайте мне знать, если я не предоставил достаточно информации.

Комментарии:

1. Я не хочу давать закодированный ответ, но позвольте мне указать вам на решение. Одна из обычных идей — запустить поток для экспорта pdf. Поток сообщает о ходе выполнения в таблицу базы данных. Интерфейс вашего браузера выполняет ajax-опрос, чтобы получить значение прогресса из базы данных. В качестве альтернативы ajax-опросу вы можете посмотреть на flask-socketio, чтобы перенести значение прогресса в ваш браузер. Эта альтернатива может потребовать дополнительных инженерных усилий.

2. Идея @chfw заключается в том, как вы должны подходить к нему. Но вместо потока это должен быть дополнительный процесс, ожидающий заданий. И вместо базы данных я бы использовал что-то вроде Redis и общался через очереди сообщений. И, наконец, вместо использования AJAX или WebSockets я бы рекомендовал SSE , который проще в настройке.

3. @FreshCrichard — Как вы в конечном итоге запустили его?

Ответ №1:

Как некоторые другие предлагали в комментариях, самое простое решение — запустить вашу функцию экспорта в другом потоке и позволить вашему клиенту получать информацию о ходе выполнения с другим запросом. Существует несколько подходов для решения этой конкретной задачи. В зависимости от ваших потребностей вы можете выбрать более или менее сложный вариант.

Вот очень (очень) минимальный пример того, как это сделать с потоками:

 import random
import threading
import time

from flask import Flask


class ExportingThread(threading.Thread):
    def __init__(self):
        self.progress = 0
        super().__init__()

    def run(self):
        # Your exporting stuff goes here ...
        for _ in range(10):
            time.sleep(1)
            self.progress  = 10


exporting_threads = {}
app = Flask(__name__)
app.debug = True


@app.route('/')
def index():
    global exporting_threads

    thread_id = random.randint(0, 10000)
    exporting_threads[thread_id] = ExportingThread()
    exporting_threads[thread_id].start()

    return 'task id: #%s' % thread_id


@app.route('/progress/<int:thread_id>')
def progress(thread_id):
    global exporting_threads

    return str(exporting_threads[thread_id].progress)


if __name__ == '__main__':
    app.run()
 

В индексном маршруте (/) мы создаем поток для каждой задачи экспорта и возвращаем идентификатор этой задаче, чтобы клиент мог получить его позже с помощью маршрута выполнения (/progress/[exporting_thread] ).
Экспортирующий поток обновляет свое значение прогресса каждый раз, когда считает это целесообразным.

На стороне клиента вы получите что-то вроде этого (в этом примере используется jQuery):

 function check_progress(task_id, progress_bar) {
    function worker() {
        $.get('progress/'   task_id, function(data) {
            if (progress < 100) {
                progress_bar.set_progress(progress)
                setTimeout(worker, 1000)
            }
        })
    }
}
 

Как уже было сказано, этот пример очень минималистичен, и вам, вероятно, следует использовать немного более сложный подход.
Обычно мы сохраняем ход выполнения определенного потока в базе данных или в каком-либо кэше, чтобы не полагаться на общую структуру, что позволяет избежать большинства проблем с памятью и параллелизмом, которые возникают в моем примере.

Redis (https://redis.io ) — это хранилище баз данных в памяти, которое, как правило, хорошо подходит для такого рода задач. Он хорошо интегрируется с Python (https://pypi.python.org/pypi/redis ).

Комментарии:

1. Удивительные. Спасибо.

2. Этот ответ хорошо сработал для меня, за исключением того, что я изо всех сил пытаюсь понять, как остановить цикл setTimeout в случае ошибки на стороне сервера.

3. Привет @Alvae, этот метод отличный и очень простой, и в настоящее время я его внедряю. У меня есть вопрос, будет ли когда-либо освобождена память, связанная с потоком? Я имею в виду, безопасно ли это? как я могу освободить его через некоторое время

4. @FrancescoPegoraro Если ваш поток завершится, его память будет восстановлена с помощью обычного механизма сбора мусора. Вам не нужно явное освобождение памяти в Python (или Javascript). Является ли это безопасным или нет, гораздо более сложный вопрос, который зависит от вашего определения «безопасный».

Ответ №2:

Я запускаю эту простую, но обучающую реализацию Flask SSE на локальном хосте. Для обработки сторонней (загруженной пользователем) библиотеки в GAE:

  1. Создайте каталог с именем lib в вашем корневом пути.
  2. скопируйте gevent каталог библиотеки lib в каталог.
  3. Добавьте эти строки в свой main.py :
     import sys
    sys.path.insert(0,'lib')
     
  4. Вот и все. Если вы используете lib каталог из дочерней папки, используйте относительную ссылку: sys.path.insert(0, ../../blablabla/lib')

Из http://flask.pocoo.org/snippets/116 /

 # author: oskar.blom@gmail.com
#
# Make sure your gevent version is >= 1.0
import gevent
from gevent.wsgi import WSGIServer
from gevent.queue import Queue

from flask import Flask, Response

import time


# SSE "protocol" is described here: http://mzl.la/UPFyxY
class ServerSentEvent(object):

    def __init__(self, data):
        self.data = data
        self.event = None
        self.id = None
        self.desc_map = {
            self.data : "data",
            self.event : "event",
            self.id : "id"
        }

    def encode(self):
        if not self.data:
            return ""
        lines = ["%s: %s" % (v, k) 
                 for k, v in self.desc_map.iteritems() if k]

        return "%snn" % "n".join(lines)

app = Flask(__name__)
subscriptions = []

# Client code consumes like this.
@app.route("/")
def index():
    debug_template = """
     <html>
       <head>
       </head>
       <body>
         <h1>Server sent events</h1>
         <div id="event"></div>
         <script type="text/javascript">

         var eventOutputContainer = document.getElementById("event");
         var evtSrc = new EventSource("/subscribe");

         evtSrc.onmessage = function(e) {
             console.log(e.data);
             eventOutputContainer.innerHTML = e.data;
         };

         </script>
       </body>
     </html>
    """
    return(debug_template)

@app.route("/debug")
def debug():
    return "Currently %d subscriptions" % len(subscriptions)

@app.route("/publish")
def publish():
    #Dummy data - pick up from request for real data
    def notify():
        msg = str(time.time())
        for sub in subscriptions[:]:
            sub.put(msg)

    gevent.spawn(notify)

    return "OK"

@app.route("/subscribe")
def subscribe():
    def gen():
        q = Queue()
        subscriptions.append(q)
        try:
            while True:
                result = q.get()
                ev = ServerSentEvent(str(result))
                yield ev.encode()
        except GeneratorExit: # Or maybe use flask signals
            subscriptions.remove(q)

    return Response(gen(), mimetype="text/event-stream")

if __name__ == "__main__":
    app.debug = True
    server = WSGIServer(("", 5000), app)
    server.serve_forever()
    # Then visit http://localhost:5000 to subscribe 
    # and send messages by visiting http://localhost:5000/publish