Программирование сокетов на Python — Имитация радиопотока с несколькими клиентами с использованием потоков

#python #multithreading #sockets

Вопрос:

Я пытался написать программу на python, которая имитирует веб-поток радио, но я не совсем уверен, как это сделать правильно. Для этого я хотел бы, чтобы программа непрерывно «воспроизводила» музыку, даже если нет подключенных клиентов, чтобы она имитировала «живое» радио, где вы подключаетесь и слушаете все, что играет.

То, что у меня сейчас есть,-это связь между сервером и клиентом с базовым программированием сокетов TCP, на стороне сервера есть поток-производитель, который должен был продолжать чтение музыки, и потоки-потребители по требованию, которые должны отправлять аудиокадр клиенту, который воспроизводит его с помощью PyAudio. Проблема, вероятно, в том, как данные распределяются между потоками.

Сначала я попытался сделать это с одной очередью, но когда клиент читает данные из очереди, эти данные удаляются, и если у меня подключено несколько клиентов, это заставит музыку пропустить некоторые кадры.

Затем я попытался создать фиксированное число (10) объектов очереди, которые будут использоваться для каждого клиента, при этом поток-производитель будет подавать каждую очередь, но каждый клиент создаст собственный поток-потребитель и будет читать только из очереди, «назначенной» ему с помощью управляющей переменной. Проблема здесь в следующем: если какие-либо очереди не используются (например, если у меня подключен только один клиент), метод Queue.put() заблокируется, потому что эти очереди заполнены. Как мне сохранить все очереди «запущенными» и синхронизированными, даже если они не используются?

Вот где я сейчас нахожусь, и я ценю любой совет. Я еще не опытный программист, поэтому, пожалуйста, будьте терпеливы со мной. Я считаю, что очередь не является рекомендуемым методом IPC в данном случае, но если есть способ его использовать, дайте мне знать.

Ниже приведен код, который у меня есть на данный момент:

server.py

 #TCP config omitted

#Producer Thread
def readTheMusics(queue):
    #Control variable to keep looping through 2 music files
    i=1
   
    while i < 3:
       
        fname = "music"   str(i)   ".wav"
        wf = wave.open(fname, 'rb')
        data = wf.readframes(CHUNK)
        
        while data:
              
            for k in range (10):
                
                queue[k].put(data)
              
            data = wf.readframes(CHUNK)

        wf.close()
        i  = 1
        if i==3:
            i=1

#Consumer Thread
def connection(connectionSocket, addr, queue, index):
    while True:
        data = queue[index-1].get(True)
        connectionSocket.send(data)
    connectionSocket.close()        

def main():
    i = 1
    #Queue(1) was used to prevent an infinite queue and therefore a memory leak
    queueList = [Queue(1) for j in range(10)]
    th2 = threading.Thread(target=musicReading, args=(queueList, ))
    th2.start()
    while True:
        
        connectionSocket, addr = serverSocket.accept()
        
        print("connected - id {}".format(i))
        
        th = threading.Thread(target=connection, args=(connectionSocket, addr, queueList, i))
        
        th.start()
      
        i = i   1
                    
if __name__ == '__main__':
    main()
 

Комментарии:

1. Очередь — это неправильный ответ. Ваш поток подключения должен просто хранить текущий набор подключенных сокетов в списке. Затем, каждый раз, когда вы читаете фрагмент из файла wave, вы просто делаете for sock in socketlist: / sock.send(data) .

2. Привет, Тим! Как я мог бы получить данные из потока чтения таким образом?

3. Выполните отправку В потоке чтения. Вам не нужна нить подключения. Когда accept вернется, добавьте сокет в глобальный список. Вам нужно будет проверить, нет ли ошибки «отправить», чтобы вы могли удалить закрытые сокеты из списка.

4. Спасибо, Тим. У меня все еще есть некоторые проблемы, я не знаю, правильно ли я это делаю. Я протестировал его с 2 одновременными клиентами, и музыка все еще не синхронизирована. Возможно send(data) , метод не является мгновенным или чем-то другим, и цикл for застревает там… Есть какие-нибудь мысли?

5. Что именно вы подразумеваете под «синхронизированным»? Всегда будут переменные задержки в сети.

Ответ №1:

Комментариев Тима Робертса было достаточно, чтобы это сработало.