Отправить сообщение обратно в определенный веб-сокет

#ruby-on-rails #websocket #connection

#ruby-on-rails #websocket #подключение

Вопрос:

Вот мой код для использования веб-сокетов HTML5:

Серверный код:

 require 'em-websocket'

class WebSocketsServer

  def self.instance
    @inst ||= self.new
  end

  def initialize
    @messages = Queue.new
    Thread.new do
      puts 'Initializing WebSocketsServer'
      EventMachine.run {
        puts 'EventMachine.run'
        @channel = EM::Channel.new
        puts 'EM::Channel.new'
        EventMachine::WebSocket.start(:host => '0.0.0.0', :port => 9090) do |ws|
          puts 'EventMachine::WebSocket.start'
          ws.onopen {
            puts 'ws.onopen'
            sid = @channel.subscribe { |msg| ws.send msg }

            ws.onmessage { |msg|
              puts 'ws.onmessage'
              @channel.push msg
            }

            ws.onclose {
              puts 'ws.onclose'
              @channel.unsubscribe(sid)
            }
          }
        end
      }
    end
    puts 'Initialized WebSocketsServer'
  end

  def send_message(msg)
    @messages << msg
    send_messages_from_queue
  end

  def send_messages_from_queue
    #if some_condition
      puts "Channel: #{@channel.inspect}n"
      begin
        while data = @messages.pop(true)
          @channel.push(data)
        end
      rescue ThreadError
        #raised if queue is empty
      end
    #end
  end
end
  

Инициализатор:

 require 'web_sockets_server'

unless ( File.basename($0) == 'rake' || defined?(Rails::Console))
  ws = WebSocketsServer.instance
  puts 'Sleeping'
  sleep 10
  puts 'Calling send_message'
  ws.send_message 'Test'
  puts 'Called send_message'
end
  

JavaScript на стороне клиента:

 <html>
<head>
<script type="text/javascript">
var socket = new WebSocket("ws://192.168.0.12:9090");
socket.onopen = function(){
  console.log('onopen');
};
socket.onmessage = function(event){
  console.log(event.data);
  socket.close();
};
</script>
</head>
<body>
</body>
</html>
  

Вопрос в том, как отправить сообщение со стороны сервера конкретному JavaScript-клиенту WebSocket? Предположим, что у меня есть current_user.id на стороне клиента, которая соответствует current_user.id на стороне сервера.

Комментарии:

1. К вашему сведению, я кратко ответил ниже, но я думаю, вам следует удалить тег javascript. Это полностью вопрос на стороне сервера ruby.

Ответ №1:

Я не знаю, как кодировать ruby, поэтому отнеситесь к этому с недоверием, но, просмотрев код, я бы сделал что-то следующее:

Поскольку у вас есть
sid = @channel.subscribe { |msg| ws.send msg }

и удалите его при отмене подписки, я предполагаю, что sid (идентификатор сокета) сохраняется в канале.

Поэтому вам нужно что-то вроде


ws = @channel.find sid
ws.send msg

итак, вам нужно реализовать @channel.найдите (если он не существует), чтобы получить сокет, соответствующий идентификатору.

Вы также можете самостоятельно сохранять очередь при подписке на канал, но это излишне, поскольку похоже, что это именно то, что должен делать канал.

Комментарии:

1. Извините, я хотел бы увидеть ваш ответ в answers, но это общее предположение. В любом случае, ваш ответ потенциально полезен, так что здесь у вас есть повышение репутации.

2. Спасибо. Вы когда-нибудь решали это? Если это так, пожалуйста, опубликуйте свое решение для других.

Ответ №2:

Итак, EventMachine::Channel и EventMachine::WebSocket разные и не пересекающиеся вещи живут своей жизнью отдельно, за исключением того, что канал может быть подписан на сообщения сокета.

Вы даже можете использовать решение только для WebSocket, не включающее каналы.

Ключ к одноадресной рассылке — создать новый канал в ws.onopen обработчике и сохранить свой собственный список каналов в виде массива.

Вы можете создать один канал для каждого соединения с сокетом или подписаться на канал для нескольких сокетов (в этом случае все сокеты канала будут получать данные одновременно). Я выбрал смешанное решение: все окна браузера с одинаковым идентификатором пользователя подписаны на один и тот же канал, поэтому один канал представляет одного пользователя.

Обновить

Вот готовый код, используемый мной на стороне RoR. Я публикую его здесь так, как оно есть:

 require 'em-websocket'
require 'socket'

class WebSocketsServer

  def self.instance
    @inst ||= self.new
  end

  def initialize
    @channels = []
    @messages = Queue.new
    Thread.new do
      EventMachine.run {
        begin
        EventMachine::WebSocket.start(:host => '0.0.0.0', :port => 9090) do |ws|
          ws.class.module_eval { attr_accessor :channel, :custom_sid, :cookie_name, :cookie_value, :page_token }

          ws.onopen { |handshake|
            begin
              log 'ws.onopen'
              cookie = handshake.headers['Cookie']
              ws.cookie_name, ws.cookie_value = cookie.split('=',2) if cookie
              par = Rack::Utils.parse_nested_query(handshake.query_string)
              user_id = par['user_id']
              ptoken = par['page_token']
              if user_id.nil? || ptoken.nil?
                log 'user_id or page_token not defined. Closing websocket.'
                ws.close
              else
                log 'init'
                ws.page_token = ptoken
                channel = @channels.detect {|ch| ch.custom_user_id = user_id}
                unless channel
                  log 'Channel not found. Creating.'
                  channel = EventMachine::Channel.new
                  channel.class.module_eval { attr_accessor :custom_user_id, :sessions }
                  channel.custom_user_id = user_id
                  channel.sessions = []
                  @channels << channel
                end
                if channel.sessions.detect {|sessid| sessid == ws.page_token}
                  log 'Socket already subscribed'
                else
                  log 'Subscribing channel to socket.'
                  ws.channel = channel
                  channel.sessions << ws.page_token
                  ws.custom_sid = channel.subscribe { |msg| ws.send msg }
                end
              end
            rescue Exception => ex
              ws.close
              log "ws.onopen exception: #{ex.message}"
            end
          }

          ws.onmessage { |msg|
            begin
              data = JSON.parse(msg)
              message_type = data['message_type']
              if message_type == 'ping'
                ws.send({ message_type: 'pong' }.to_json)
              end
              if message_type == 'phone_call'
                order_id = data['order_id']
                user_id = data['user_id']
                log "phone call: order_id=#{order_id.inspect}, user_id=#{user_id.inspect}"
                self.send_phone_call(order_id, user_id) if order_id amp;amp; user_id
              end
            rescue Exception => ex
              ws.close
              log "ws.onmessage exception: #{ex.message}"
            end
          }

          ws.onclose {
            begin
              log 'ws.onclose'
              channel = ws.channel
              if channel
                log 'Unsubscribing channel.'
                channel.sessions.delete(ws.page_token)
                channel.unsubscribe(ws.custom_sid)
                if channel.sessions.length==0
                  log 'No subscribers left. Deleting channel.'
                  #@channels.delete(channel)
                end
              end
            rescue Exception => ex
              log "ws.onclose exception: #{ex.message}"
            end
          }
        end
        rescue Exception => ex
          log "EM.run exception: #{ex.message}"
        end
      }
    end
    init_ipc
  end

  def init_ipc
    ipc = IPC.instance
    ipc.on_send_message do |msg, user_id|
      log 'on_send_message'
      send_message_raw(msg, user_id)
    end
  end

  def send_message_raw(msg, user_id=nil)
    log "send_message_raw msg=#{msg.inspect}"
    @messages << {data: msg, user_id: user_id}
    send_messages_from_queue
  end

  def send_message(msg, user_id=nil)
    IPC.instance.send_message(msg, user_id)
  end

  def send_messages_from_queue
    while msg = @messages.pop(true)
      if msg[:user_id]
        #Сообщение определённому пользователю
        channel = @channels.detect {|ch| ch.custom_user_id = msg[:user_id]}
        channel.push(msg[:data]) if channel
      else
        #Широковещательное сообщение
        @channels.each do |channel|
          channel.push(msg[:data])
        end
      end
    end
  rescue ThreadError
    #raised if queue is empty
  end

  def send_coordinates(work_shift, driver, coord, user_id = nil)
    send_message({
        message_type: 'coordinates',
        workShift: {
            id: work_shift.id,
            #currentState: work_shift.current_state,
            #openedAt: work_shift.opened_at,
            #closedAt: work_shift.closed_at,
            #createdAt: work_shift.created_at,
            #updatedAt: work_shift.updated_at,
            #scheduledOpenedAt: work_shift.scheduled_opened_at,
            #scheduledClosedAt: work_shift.scheduled_closed_at,
            #position: work_shift.position,
            #driver: {
                #id: driver.id,
                #callsign: driver.callsign,
                #name: driver.name,
                #type: driver.condition.title
            #},
            #car: {
            #    id: work_shift.car.id,
            #    brand: work_shift.car.brand
            #},
            #client: {
            #    phoneNumber: (work_shift.trips.first) ? work_shift.trips.first.order.client.phone.number : ''
            #},
            coord: {
                lat: coord.latitude,
                lon: coord.longitude
            }
        }
    }.to_json, user_id)
  end

  def send_order_acceptance(order, trip, is_accepted, order_id, user_id = nil)
    send_message({
        message_type: 'order_acceptance',
        order: {
            accepted: is_accepted,
            id: order_id,
            startPointName: (order amp;amp; order.points amp;amp; order.points.first) ? order.points.first.address : '',
            endPointName: (order amp;amp; order.points amp;amp; order.points.last) ? order.points.last.address : ''
        },
        trip: {
            id: trip.nil? ? nil : trip.id,
            currentStateTbl: trip.nil? ? nil : trip.current_state_tbl
        }
    }.to_json, user_id)
  end

  def send_call_request(driver, client, link_type, user_id = nil)
    par = {
        message_type: 'call_request',
        target: case link_type
                  when NavServer::LinkType::DRIVER_TO_CLIENT
                    'driver_to_client'
                  when NavServer::LinkType::DISPATCHER_TO_CLIENT
                    'dispatcher_to_client'
                  else
                    'dispatcher_to_driver'
                end
    }
    if driver
      par[:driver] = {
          id: driver.id,
          name: driver.name,
          phone: driver.phone_number
      }
    end
    if client amp;amp; client.phone
      par[:client] = {
          phone: client.phone.number
      }
    end

    send_message par.to_json, user_id
  end

  def send_alarm(driver, user_id = nil)
    send_message({
        message_type: 'alarm',
        driver: {
            id: driver.id,
            name: driver.name,
            phone: driver.phone_number
        }
    }.to_json, user_id)
  end

  def send_log_item(text, type, user_id = nil)
    send_message({
        message_type: 'log',
        event_type: type,
        text: text
    }.to_json, user_id)
  end

  def send_status(pars, user_id = nil)
    send_message({
        message_type: 'state',
        driver: {
            id: pars[:driver].nil? ? nil : pars[:driver].id
        },
        workshift: {
            id: pars[:workshift_id].nil? ? nil : pars[:workshift_id],
            state: pars[:workshift_state].nil? ? nil : pars[:workshift_state],
            state_str: pars[:workshift_state_str].nil? ? nil : pars[:workshift_state_str],
            tbl_class: pars[:workshift_tbl_class].nil? ? nil : pars[:workshift_tbl_class],
        },
        trip: {
            state: pars[:trip_state].nil? ? nil : pars[:trip_state],
            id: pars[:trip_id].nil? ? nil : pars[:trip_id],
            tbl_class: pars[:trip_tbl_class].nil? ? nil : pars[:trip_tbl_class],
            state_str: pars[:trip_state_str].nil? ? nil : pars[:trip_state_str],
        },
        order: {
            id: pars[:order_id].nil? ? nil : pars[:order_id]
        }
    }.to_json, user_id)
  end

  def send_phone_call(order_id, user_id = nil)
    send_message({
        message_type: 'phone_call',
        order_id: order_id
    }.to_json, user_id)
  end

  private

  def log(msg)
    puts "WS: #{msg}n"
  end
end
  

Комментарии:

1. Как вы управляете очисткой каналов для вышедших из системы пользователей? нет ли утечки памяти со временем?

2. В окончательной редакции я использую один канал на пользователя и решил не избавляться от каналов, на которых не осталось подписчиков. Я просто повторно использую их, когда пользователь решает повторно подключиться. В моем программном обеспечении не более 250 пользователей, поэтому я могу позволить себе такой подход.

3. Я вижу. Спасибо. В моем случае у нас есть динамический список пользователей, и нам нужно публиковать сообщения конкретным пользователям. Для этого требуется сопоставление пользователя с websocket и приводит к утечке памяти, которую я, похоже, не могу устранить — gist.github.com/AvnerCohen/72540e2dc13a56b4be87 возможна какая-то внутренняя циклическая ссылка.

4. Краткий обзор вашей сути показал мне, что вы вообще не используете EM-каналы и пытаетесь закрыть сокет, который, я думаю, должен закрываться сам по себе, а ресурсы должны быть освобождены сборщиком мусора. Разве это не так?

5. Вместо того, чтобы использовать массив для хранения записей всех сокетов, я использовал EM-каналы и «взломал» созданный канал во время выполнения, внедрив метод и пару средств доступа для хранения необходимых данных (например, UID) в канале. Внутри EventMachine уже есть множество каналов.