#python #zeromq #distributed-computing #pyzmq
#python #zeromq #распределенные вычисления #pyzmq
Вопрос:
Я собрал около 12000 подписчиков на компьютер с потоковой передачей следующим образом
сторона подписчика:
def client(id):
context=zmq.Context()
subscriber=context.socket(zmq.SUB)
subscriber.connect('ip:port')
subscriber.setsockopt(zmq.SUBSCRIBE,(id 'e').encode())
while 1:
signal=subscriber.recv_multipart()
write logs...
for i in range(12000):
threading.Thread(target=client,args=(str(i j*12000),)).start()
#j is arbitrary unduplicated int
сторона издателя:
subscriber=zmq.Context().socket(zmq.PUB)
subscriber.bind('tcp://*:port')
while 1:
for id in client_id:
subscriber.send_multipart([(id 'e').encode()] [message])
Когда я использовал более одного компьютера (используя разные j) для создания подписчиков, иногда некоторые подписчики вообще не могли получить сообщение.
Если я перезапущу подписчиков, те, кто не смог получить сообщение, станут нормальными. Но те, кто был нормальным, не смогли получить сообщение.
Эта проблема не будет показывать никаких ошибок, их можно найти только в моих журналах.
Возникает ли проблема с чрезмерным подключением?
Комментарии:
1. Рассматривали ли вы проблему с протоколированием?
2. ДА. Проблема в том, что некоторые подписчики могут получать, а некоторые — нет.
Ответ №1:
По мере увеличения количества подключений / сообщений / размеров некоторые предположения по умолчанию обычно перестают быть достаточными. Попробуйте расширить некоторые другие рабочие настройки по умолчанию на PUB
стороне конфигурации, где проблема, похоже, начинает захлебываться (не забывайте, что начиная с версии v3.? обработка списка подписки была перенесена с SUB
стороны (ов) на центральную PUB
сторону. Это уменьшает объемы потока данных, но при некоторых (здесь растущих до значительных величин) дополнительных затратах на PUB
стороне ~ RAM-for-buffers CPU-for-TOPIC-list-filtering…
Итак, давайте начнем с этих шагов на PUB
стороне :
aSock2SUBs = zmq.Context( _tweak_nIOthreads ).socket( zmq.PUB ) # MORE CPU POWER
aSock2SUBs.setsockopt( zmq.SNDBUF, _tweak_SIZE_with_SO_SNDBUF ) # ROOM IN SNDBUF
И последнее, но не менее важное: PUB
-s автоматически отбрасывает любые сообщения, которые не «подходят» под его текущий уровень highWaterMark, поэтому давайте также изменим это :
aSock2SUBs.setsockopt( zmq.SNDHWM, _tweak_HWM_till_no_DROPs ) # TILL NO DROPS
Другие { TCP_* | TOS | RECONNECT_IVL* | BACKLOG | IMMEDIATE | HEARTBEAT_* | ... }
настройки параметров низкого уровня могут помочь в дальнейшем сделать ваше стадо из 12 тыс. SUB
-ов мирно сосуществующими бок о бок с другим (как дружественным, так и враждебным) трафиком и сделать ваше приложение более надежным, чем если бы вы полагались только на готовые API-настройки по умолчанию.
Обратитесь к документации ZeroMQ API в целом, а также к настройкам по умолчанию O / S, поскольку многие из этих низкоуровневых атрибутов ZeroMQ также зависят от фактических значений конфигурации O / S.
Вы также должны быть предупреждены, что создание потоков 12k в Python по-прежнему оставляет чисто [SERIAL]
выполнение кода, поскольку центральное владение Python GIL-lock (эксклюзивное) позволяет избежать (да, принципиально избегает) любой формы [CONCURRENT]
совместного выполнения, поскольку само владение GIL-lock является эксклюзивным и повторно [SERIAL]
помещает любое количество потоков в очередь ожидания и приводит к простой последовательности выполнения блоков (по умолчанию Python 2 будет переключать потоки каждые 100 инструкций. Начиная с Python 3.2 , по умолчанию GIL будет выпущен через 5 миллисекунд (5000 [us] ), чтобы у другого потока была возможность попробовать и также получить блокировку GIL. Вы можете изменить эти значения по умолчанию, если война потоков 12k при смене владельца GIL-lock фактически приводит к «почти блокировке» любого TCP / IP-инструментария для буферизации сообщений, стекирования, отправки, повторной передачи до своевременного подтверждения приема. Можно тестировать его до предела, но выбор более безопасного потолка может помочь, если другие параметры были хорошо отрегулированы для обеспечения надежности.
И последнее, но не менее важное: наслаждайтесь Zen-of-Zero, шедевром Мартина СУСТРИКА в области распределенных вычислений, так хорошо разработанным для создания масштабируемой, практически с нулевой задержкой, очень удобной, широко портируемой платформы сигнализации и обмена сообщениями.
Комментарии:
1. @Ian Рад это слышать! Не стесняйтесь устанавливать флажок Принять ответ — именно так StackOverflow отмечает лучший из всех предоставленных ответов. Это также помогает другим членам сообщества распознавать полезные идеи и рабочие решения. Поэтому не стесняйтесь придерживаться этих практик сообщества StackOverflow. В любом случае, удачи с Zen of Zero и наслаждайтесь тем, что вы новый участник сообщества, активно вносящий свой вклад.
Ответ №2:
В дополнение к ответу пользователя 3666197 вам, возможно, также придется учитывать время, затраченное на подключение всех этих клиентов. Издатель понятия не имеет, сколько подписчиков должно быть, и просто продолжит работу по отправке сообщений тем подписчикам, которые подключены в данный момент, с момента установления самого первого соединения. Сокет PUBlisher не зависает от отправленных сообщений на случай, если в будущем в неопределенное время подключится больше подписчиков. Как только сообщение было передано 1 или более подписчикам, оно удаляется из очереди издателя. Кроме того, соединения не выполняются мгновенно, и 12 000 — это довольно много для прохождения.
Не имеет значения, запускаете ли вы сначала свою программу издателя или подписчика; ваши 12 000 подключений будут выполняться в течение определенного периода времени после запуска обеих программ, это происходит асинхронно по отношению к вашим собственным потокам. Некоторые подписчики начнут получать сообщения, в то время как другие по-прежнему будут неизвестны издателю. Когда, наконец, будут выполнены все 12 000 подключений, это сгладится.