Как синхронизировать гринлеты из gevent в среде WSGIServer / flask

#python #flask #raspberry-pi #gevent #greenlets

#python #flask #raspberry-pi #gevent #гринлеты

Вопрос:

на моем HTTP-сервере на базе flask, предназначенном для удаленного управления некоторыми сервисами в RPI, я столкнулся с проблемой, которую не могу решить в одиночку, поэтому любезно прошу вас дать мне подсказку.

Концепция: С помощью flask и gevent я могу остановить и запустить некоторые (две) службы, работающие на RPI. Я использую gevent и событие на стороне сервера в отношении javascript, чтобы прослушивать обновления html.

html-страница показывает состояние (включено / выключено / обработка) сервисов и предоставляет кнопки для их включения / выключения. Дополнительно отобразите некоторые системные параметры (CPU, RAM, HDD, NET).

Пока открыт только один пользователь / страница, все работает так, как требуется. Как только появляется больше пользователей, обращающихся к серверу flask, возникает гонка между гринлетами, обслуживающими каждого пользователя / страницу, и не все страницы перезагружаются.

Проблема: Как я могу отправить сообщение всем запущенным гринлетам sse_worker() и обработать его поверх их обычной работы?

Ниже приведен код высокого уровня. Полный исходный код можно найти здесь:https://github.com/petervflocke/flasksse_rpi проверьте sse.py файл

 def sse_worker(): #neverending task
   while True:
       if there_is_a_change_in_process_status:
          reload_page=True
       else: 
          reload_page=False
       Do some other tasks:
          update some_single_parameters_to_be_passed_to_html_page
       yield 'data: '   json.dumps(all_parameters)
       gevent.sleep(1)

@app.route('/stream/', methods=['GET', 'POST'])
def stream():
    return Response(sse_worker(), mimetype="text/event-stream")

if __name__ == "__main__":
    gevent.signal(signal.SIGTERM, stop)
    http_server = WSGIServer(('', 5000), app)
    http_server.serve_forever()
  

…на html-странице потоковые данные json обрабатываются соответствующим образом. Если статус службы был изменен на основе переменной reload_page, javascript перезагрузит полную страницу — выдержка кода ниже:

 <script>
    function listen() {
        var source = new EventSource("/stream/");
        var target1 = document.getElementById("time");
        ....  
        source.onmessage = function(msg) {
          obj = JSON.parse(msg.data);
          target1.innerHTML = obj.time;
        ....
          if (obj.reload == "1") {
            location.reload();
          }
        }
    }
    listen();
</script>
  

Моим желаемым решением было бы расширить sse_worker() следующим образом:

 def sse_worker():
   while True:
       if there_is_a_change_in_process_status:
          reload_page=True
          # NEW: set up a semaphore/flag that there is a change on the page
          message_set(reload)
       elif message_get(block=false)==reload: # NEW: check the semaphore
          # issue: the message_get must retun "reload" for _all_ active sse_workers, that all of them can push the reload to "their" pages 
          reload_page=True
       else: 
          reload_page=False
       Do some other tasks:
          update some_single_parameters_to_be_passed_to_html_page
       yield 'data: '   json.dumps(all_parameters)
       gevent.sleep(1)
  

Я надеюсь, что смог передать свое сообщение. Есть идеи с вашей стороны, как я могу решить проблему синхронизации? Пожалуйста, обратите внимание, что здесь у нас есть производитель и потребитель в одной и той же функции sse_worker.

Любая идея приветствуется! с наилучшими пожеланиями, Питер

Комментарии:

1. Насколько интенсивна работа процессора, выполняемая sse_worker функцией? Мне интересно, получают ли ваши гринлеты недостаточно процессорного времени. Кроме того, кажется, что данные, которые отправляются всем клиентам, не зависят от конкретного клиента, верно? Если это правда, то нет смысла запускать отдельную sse_worker задачу для каждого клиента. Вместо этого у вас должен быть один фоновый гринлет, который генерирует информацию о состоянии, а затем у каждого клиента должен быть потоковый ответ, который отправляет данные, а затем переходит в режим ожидания на некоторое количество секунд или до тех пор, пока данные не изменятся.

2. > Насколько интенсивна работа процессора, выполняемая функцией sse_worker?

3. (… нажмите enter, чтобы быстро) @Miguel, спасибо за чтение и ваши мысли. Насколько интенсивна работа процессора, выполняемая функцией sse_worker? Вовсе нет, просто использую psutil и time для извлечения базовой информации. Но мне нравится ваша идея иметь один единственный фоновый гринлет. Вы правы, информация о состоянии не зависит от клиента. Как только кто-то меняет статус службы — это справедливо для всех остальных и должно быть представлено им соответствующим образом. Вопрос: как я могу избежать создания новых гринлетов и обслуживать все новые входящие запросы через один-единственный — пример кода был бы отличным.

4.@Miguel, через некоторое время я понял, что не могу сократить поток sse_worker до одного, поскольку запросы могут поступать с разных адресов. Обходной путь заключается в том, чтобы уменьшить количество подключений к серверу путем добавления в WSGIServer(('', 5000), app).serve_forever() spawn=pool и предварительно сократить пул до одного. Однако это обходной путь, который на самом деле не решает мою проблему. Я должен найти способ перебирать все созданные sse_workers.

5. Ваш поток sse_worker может быть независимым от любых запросов. вы можете запустить его при запуске сервера. Этот гринлет просто получает данные и сохраняет их в наборе переменных, которые могут получить ваши обработчики запросов (глобальные переменные, переменные в совместно используемом объекте и т.д.). Затем, когда клиент подключается, вы создаете гринлет для этого клиента, который просто опрашивает данные, полученные гринлетом sse_worker.