#ruby-on-rails #websocket #connection
#ruby-on-rails #websocket #подключение
Вопрос:
Вот мой код для использования веб-сокетов HTML5:
Серверный код:
require 'em-websocket'
class WebSocketsServer
def self.instance
@inst ||= self.new
end
def initialize
@messages = Queue.new
Thread.new do
puts 'Initializing WebSocketsServer'
EventMachine.run {
puts 'EventMachine.run'
@channel = EM::Channel.new
puts 'EM::Channel.new'
EventMachine::WebSocket.start(:host => '0.0.0.0', :port => 9090) do |ws|
puts 'EventMachine::WebSocket.start'
ws.onopen {
puts 'ws.onopen'
sid = @channel.subscribe { |msg| ws.send msg }
ws.onmessage { |msg|
puts 'ws.onmessage'
@channel.push msg
}
ws.onclose {
puts 'ws.onclose'
@channel.unsubscribe(sid)
}
}
end
}
end
puts 'Initialized WebSocketsServer'
end
def send_message(msg)
@messages << msg
send_messages_from_queue
end
def send_messages_from_queue
#if some_condition
puts "Channel: #{@channel.inspect}n"
begin
while data = @messages.pop(true)
@channel.push(data)
end
rescue ThreadError
#raised if queue is empty
end
#end
end
end
Инициализатор:
require 'web_sockets_server'
unless ( File.basename($0) == 'rake' || defined?(Rails::Console))
ws = WebSocketsServer.instance
puts 'Sleeping'
sleep 10
puts 'Calling send_message'
ws.send_message 'Test'
puts 'Called send_message'
end
JavaScript на стороне клиента:
<html>
<head>
<script type="text/javascript">
var socket = new WebSocket("ws://192.168.0.12:9090");
socket.onopen = function(){
console.log('onopen');
};
socket.onmessage = function(event){
console.log(event.data);
socket.close();
};
</script>
</head>
<body>
</body>
</html>
Вопрос в том, как отправить сообщение со стороны сервера конкретному JavaScript-клиенту WebSocket? Предположим, что у меня есть current_user.id на стороне клиента, которая соответствует current_user.id на стороне сервера.
Комментарии:
1. К вашему сведению, я кратко ответил ниже, но я думаю, вам следует удалить тег javascript. Это полностью вопрос на стороне сервера ruby.
Ответ №1:
Я не знаю, как кодировать ruby, поэтому отнеситесь к этому с недоверием, но, просмотрев код, я бы сделал что-то следующее:
Поскольку у вас есть
sid = @channel.subscribe { |msg| ws.send msg }
и удалите его при отмене подписки, я предполагаю, что sid (идентификатор сокета) сохраняется в канале.
Поэтому вам нужно что-то вроде
ws = @channel.find sid
ws.send msg
итак, вам нужно реализовать @channel.найдите (если он не существует), чтобы получить сокет, соответствующий идентификатору.
Вы также можете самостоятельно сохранять очередь при подписке на канал, но это излишне, поскольку похоже, что это именно то, что должен делать канал.
Комментарии:
1. Извините, я хотел бы увидеть ваш ответ в answers, но это общее предположение. В любом случае, ваш ответ потенциально полезен, так что здесь у вас есть повышение репутации.
2. Спасибо. Вы когда-нибудь решали это? Если это так, пожалуйста, опубликуйте свое решение для других.
Ответ №2:
Итак, EventMachine::Channel
и EventMachine::WebSocket
разные и не пересекающиеся вещи живут своей жизнью отдельно, за исключением того, что канал может быть подписан на сообщения сокета.
Вы даже можете использовать решение только для WebSocket, не включающее каналы.
Ключ к одноадресной рассылке — создать новый канал в ws.onopen
обработчике и сохранить свой собственный список каналов в виде массива.
Вы можете создать один канал для каждого соединения с сокетом или подписаться на канал для нескольких сокетов (в этом случае все сокеты канала будут получать данные одновременно). Я выбрал смешанное решение: все окна браузера с одинаковым идентификатором пользователя подписаны на один и тот же канал, поэтому один канал представляет одного пользователя.
Обновить
Вот готовый код, используемый мной на стороне RoR. Я публикую его здесь так, как оно есть:
require 'em-websocket'
require 'socket'
class WebSocketsServer
def self.instance
@inst ||= self.new
end
def initialize
@channels = []
@messages = Queue.new
Thread.new do
EventMachine.run {
begin
EventMachine::WebSocket.start(:host => '0.0.0.0', :port => 9090) do |ws|
ws.class.module_eval { attr_accessor :channel, :custom_sid, :cookie_name, :cookie_value, :page_token }
ws.onopen { |handshake|
begin
log 'ws.onopen'
cookie = handshake.headers['Cookie']
ws.cookie_name, ws.cookie_value = cookie.split('=',2) if cookie
par = Rack::Utils.parse_nested_query(handshake.query_string)
user_id = par['user_id']
ptoken = par['page_token']
if user_id.nil? || ptoken.nil?
log 'user_id or page_token not defined. Closing websocket.'
ws.close
else
log 'init'
ws.page_token = ptoken
channel = @channels.detect {|ch| ch.custom_user_id = user_id}
unless channel
log 'Channel not found. Creating.'
channel = EventMachine::Channel.new
channel.class.module_eval { attr_accessor :custom_user_id, :sessions }
channel.custom_user_id = user_id
channel.sessions = []
@channels << channel
end
if channel.sessions.detect {|sessid| sessid == ws.page_token}
log 'Socket already subscribed'
else
log 'Subscribing channel to socket.'
ws.channel = channel
channel.sessions << ws.page_token
ws.custom_sid = channel.subscribe { |msg| ws.send msg }
end
end
rescue Exception => ex
ws.close
log "ws.onopen exception: #{ex.message}"
end
}
ws.onmessage { |msg|
begin
data = JSON.parse(msg)
message_type = data['message_type']
if message_type == 'ping'
ws.send({ message_type: 'pong' }.to_json)
end
if message_type == 'phone_call'
order_id = data['order_id']
user_id = data['user_id']
log "phone call: order_id=#{order_id.inspect}, user_id=#{user_id.inspect}"
self.send_phone_call(order_id, user_id) if order_id amp;amp; user_id
end
rescue Exception => ex
ws.close
log "ws.onmessage exception: #{ex.message}"
end
}
ws.onclose {
begin
log 'ws.onclose'
channel = ws.channel
if channel
log 'Unsubscribing channel.'
channel.sessions.delete(ws.page_token)
channel.unsubscribe(ws.custom_sid)
if channel.sessions.length==0
log 'No subscribers left. Deleting channel.'
#@channels.delete(channel)
end
end
rescue Exception => ex
log "ws.onclose exception: #{ex.message}"
end
}
end
rescue Exception => ex
log "EM.run exception: #{ex.message}"
end
}
end
init_ipc
end
def init_ipc
ipc = IPC.instance
ipc.on_send_message do |msg, user_id|
log 'on_send_message'
send_message_raw(msg, user_id)
end
end
def send_message_raw(msg, user_id=nil)
log "send_message_raw msg=#{msg.inspect}"
@messages << {data: msg, user_id: user_id}
send_messages_from_queue
end
def send_message(msg, user_id=nil)
IPC.instance.send_message(msg, user_id)
end
def send_messages_from_queue
while msg = @messages.pop(true)
if msg[:user_id]
#Сообщение определённому пользователю
channel = @channels.detect {|ch| ch.custom_user_id = msg[:user_id]}
channel.push(msg[:data]) if channel
else
#Широковещательное сообщение
@channels.each do |channel|
channel.push(msg[:data])
end
end
end
rescue ThreadError
#raised if queue is empty
end
def send_coordinates(work_shift, driver, coord, user_id = nil)
send_message({
message_type: 'coordinates',
workShift: {
id: work_shift.id,
#currentState: work_shift.current_state,
#openedAt: work_shift.opened_at,
#closedAt: work_shift.closed_at,
#createdAt: work_shift.created_at,
#updatedAt: work_shift.updated_at,
#scheduledOpenedAt: work_shift.scheduled_opened_at,
#scheduledClosedAt: work_shift.scheduled_closed_at,
#position: work_shift.position,
#driver: {
#id: driver.id,
#callsign: driver.callsign,
#name: driver.name,
#type: driver.condition.title
#},
#car: {
# id: work_shift.car.id,
# brand: work_shift.car.brand
#},
#client: {
# phoneNumber: (work_shift.trips.first) ? work_shift.trips.first.order.client.phone.number : ''
#},
coord: {
lat: coord.latitude,
lon: coord.longitude
}
}
}.to_json, user_id)
end
def send_order_acceptance(order, trip, is_accepted, order_id, user_id = nil)
send_message({
message_type: 'order_acceptance',
order: {
accepted: is_accepted,
id: order_id,
startPointName: (order amp;amp; order.points amp;amp; order.points.first) ? order.points.first.address : '',
endPointName: (order amp;amp; order.points amp;amp; order.points.last) ? order.points.last.address : ''
},
trip: {
id: trip.nil? ? nil : trip.id,
currentStateTbl: trip.nil? ? nil : trip.current_state_tbl
}
}.to_json, user_id)
end
def send_call_request(driver, client, link_type, user_id = nil)
par = {
message_type: 'call_request',
target: case link_type
when NavServer::LinkType::DRIVER_TO_CLIENT
'driver_to_client'
when NavServer::LinkType::DISPATCHER_TO_CLIENT
'dispatcher_to_client'
else
'dispatcher_to_driver'
end
}
if driver
par[:driver] = {
id: driver.id,
name: driver.name,
phone: driver.phone_number
}
end
if client amp;amp; client.phone
par[:client] = {
phone: client.phone.number
}
end
send_message par.to_json, user_id
end
def send_alarm(driver, user_id = nil)
send_message({
message_type: 'alarm',
driver: {
id: driver.id,
name: driver.name,
phone: driver.phone_number
}
}.to_json, user_id)
end
def send_log_item(text, type, user_id = nil)
send_message({
message_type: 'log',
event_type: type,
text: text
}.to_json, user_id)
end
def send_status(pars, user_id = nil)
send_message({
message_type: 'state',
driver: {
id: pars[:driver].nil? ? nil : pars[:driver].id
},
workshift: {
id: pars[:workshift_id].nil? ? nil : pars[:workshift_id],
state: pars[:workshift_state].nil? ? nil : pars[:workshift_state],
state_str: pars[:workshift_state_str].nil? ? nil : pars[:workshift_state_str],
tbl_class: pars[:workshift_tbl_class].nil? ? nil : pars[:workshift_tbl_class],
},
trip: {
state: pars[:trip_state].nil? ? nil : pars[:trip_state],
id: pars[:trip_id].nil? ? nil : pars[:trip_id],
tbl_class: pars[:trip_tbl_class].nil? ? nil : pars[:trip_tbl_class],
state_str: pars[:trip_state_str].nil? ? nil : pars[:trip_state_str],
},
order: {
id: pars[:order_id].nil? ? nil : pars[:order_id]
}
}.to_json, user_id)
end
def send_phone_call(order_id, user_id = nil)
send_message({
message_type: 'phone_call',
order_id: order_id
}.to_json, user_id)
end
private
def log(msg)
puts "WS: #{msg}n"
end
end
Комментарии:
1. Как вы управляете очисткой каналов для вышедших из системы пользователей? нет ли утечки памяти со временем?
2. В окончательной редакции я использую один канал на пользователя и решил не избавляться от каналов, на которых не осталось подписчиков. Я просто повторно использую их, когда пользователь решает повторно подключиться. В моем программном обеспечении не более 250 пользователей, поэтому я могу позволить себе такой подход.
3. Я вижу. Спасибо. В моем случае у нас есть динамический список пользователей, и нам нужно публиковать сообщения конкретным пользователям. Для этого требуется сопоставление пользователя с websocket и приводит к утечке памяти, которую я, похоже, не могу устранить — gist.github.com/AvnerCohen/72540e2dc13a56b4be87 возможна какая-то внутренняя циклическая ссылка.
4. Краткий обзор вашей сути показал мне, что вы вообще не используете EM-каналы и пытаетесь закрыть сокет, который, я думаю, должен закрываться сам по себе, а ресурсы должны быть освобождены сборщиком мусора. Разве это не так?
5. Вместо того, чтобы использовать массив для хранения записей всех сокетов, я использовал EM-каналы и «взломал» созданный канал во время выполнения, внедрив метод и пару средств доступа для хранения необходимых данных (например, UID) в канале. Внутри EventMachine уже есть множество каналов.