#ruby #redis #eventmachine #smpp
#ruby #redis #eventmachine #smpp
Вопрос:
Я использую ruby-smpp и redis для создания фонового рабочего на основе очереди для отправки SMPP-сообщений.
И мне интересно, правильно ли я использую eventmachine. Это работает, но кажется неправильным.
#!/usr/bin/env ruby
# Sample SMS gateway that can receive MOs (mobile originated messages) and
# DRs (delivery reports), and send MTs (mobile terminated messages).
# MTs are, in the name of simplicity, entered on the command line in the format
# <sender> <receiver> <message body>
# MOs and DRs will be dumped to standard out.
require 'smpp'
require 'redis/connection/hiredis'
require 'redis'
require 'yajl'
require 'time'
LOGFILE = File.dirname(__FILE__) "/sms_gateway.log"
PIDFILE = File.dirname(__FILE__) '/worker_test.pid'
Smpp::Base.logger = Logger.new(LOGFILE)
#Smpp::Base.logger.level = Logger::WARN
REDIS = Redis.new
class MbloxGateway
# MT id counter.
@@mt_id = 0
# expose SMPP transceiver's send_mt method
def self.send_mt(sender, receiver, body)
if sender =~ /[a-z] /i
source_addr_ton = 5
else
source_addr_ton = 2
end
@@mt_id = 1
@@tx.send_mt(('smpp' @@mt_id.to_s), sender, receiver, body, {
:source_addr_ton => source_addr_ton
# :service_type => 1,
# :source_addr_ton => 5,
# :source_addr_npi => 0 ,
# :dest_addr_ton => 2,
# :dest_addr_npi => 1,
# :esm_class => 3 ,
# :protocol_id => 0,
# :priority_flag => 0,
# :schedule_delivery_time => nil,
# :validity_period => nil,
# :registered_delivery=> 1,
# :replace_if_present_flag => 0,
# :data_coding => 0,
# :sm_default_msg_id => 0
#
})
end
def logger
Smpp::Base.logger
end
def start(config)
# Write this workers pid to a file
File.open(PIDFILE, 'w') { |f| f << Process.pid }
# The transceiver sends MT messages to the SMSC. It needs a storage with Hash-like
# semantics to map SMSC message IDs to your own message IDs.
pdr_storage = {}
# Run EventMachine in loop so we can reconnect when the SMSC drops our connection.
loop do
EventMachine::run do
@@tx = EventMachine::connect(
config[:host],
config[:port],
Smpp::Transceiver,
config,
self # delegate that will receive callbacks on MOs and DRs and other events
)
# Let the connection start before we check for messages
EM.add_timer(3) do
# Maybe there is some better way to do this. IDK, But it works!
EM.defer do
loop do
# Pop a message
message = REDIS.lpop 'messages:send:queue'
if message # If there is a message. Process it and check the queue again
message = Yajl::Parser.parse(message, :check_utf8 => false) # Parse the message from Json to Ruby hash
if !message['send_after'] or (message['send_after'] and Time.parse(message['send_after']) < Time.now)
self.class.send_mt(message['sender'], message['receiver'], message['body']) # Send the message
REDIS.publish 'log:messages', "#{message['sender']} -> #{message['receiver']}: #{message['body']}" # Push the message to the redis queue so we can listen to the channel
else
REDIS.lpush 'messages:queue', Yajl::Encoder.encode(message)
end
else # If there is no message. Sleep for a second
sleep 1
end
end
end
end
end
sleep 2
end
end
# ruby-smpp delegate methods
def mo_received(transceiver, pdu)
logger.info "Delegate: mo_received: from #{pdu.source_addr} to #{pdu.destination_addr}: #{pdu.short_message}"
end
def delivery_report_received(transceiver, pdu)
logger.info "Delegate: delivery_report_received: ref #{pdu.msg_reference} stat #{pdu.stat}"
end
def message_accepted(transceiver, mt_message_id, pdu)
logger.info "Delegate: message_accepted: id #{mt_message_id} smsc ref id: #{pdu.message_id}"
end
def message_rejected(transceiver, mt_message_id, pdu)
logger.info "Delegate: message_rejected: id #{mt_message_id} smsc ref id: #{pdu.message_id}"
end
def bound(transceiver)
logger.info "Delegate: transceiver bound"
end
def unbound(transceiver)
logger.info "Delegate: transceiver unbound"
EventMachine::stop_event_loop
end
end
# Start the Gateway
begin
puts "Starting SMS Gateway. Please check the log at #{LOGFILE}"
# SMPP properties. These parameters work well with the Logica SMPP simulator.
# Consult the SMPP spec or your mobile operator for the correct settings of
# the other properties.
config = {
:host => 'server.com',
:port => 3217,
:system_id => 'user',
:password => 'password',
:system_type => 'type', # default given according to SMPP 3.4 Spec
:interface_version => 52,
:source_ton => 0,
:source_npi => 1,
:destination_ton => 1,
:destination_npi => 1,
:source_address_range => '',
:destination_address_range => '',
:enquire_link_delay_secs => 10
}
gw = MbloxGateway.new
gw.start(config)
rescue Exception => ex
puts "Exception in SMS Gateway: #{ex} at #{ex.backtrace.join("n")}"
end
Ответ №1:
Несколько простых шагов, чтобы сделать этот код более похожим на EventMachine:
- Избавьтесь от блокирующего драйвера Redis, используйте em-hiredis
- Прекратите использовать defer. Передача работы потокам с помощью драйвера Redis сделает ситуацию еще хуже, поскольку она зависит от блокировок вокруг используемого сокета.
- Избавьтесь от add_timer(3)
- Избавьтесь от внутреннего цикла, замените его, перепланировав блок для следующего цикла событий, используя EM.next_tick . Внешний интерфейс несколько не нужен. Также не следует зацикливаться на EM.run, проще правильно обработать отключение, выполнив переподключение в вашем несвязанном методе вместо остановки и перезапуска цикла событий, вызвав @@tx.reconnect .
- Не спи, просто подожди. EventMachine сообщит вам, когда в сетевой сокет поступят новые данные.
Вот как выглядел бы основной код EventMachine с некоторыми улучшениями:
def start(config)
File.open(PIDFILE, 'w') { |f| f << Process.pid }
pdr_storage = {}
EventMachine::run do
@@tx = EventMachine::connect(
config[:host],
config[:port],
Smpp::Transceiver,
config,
self
)
REDIS = EM::Hiredis.connect
pop_message = lambda do
REDIS.lpop 'messages:send:queue' do |message|
if message # If there is a message. Process it and check the queue again
message = Yajl::Parser.parse(message, :check_utf8 => false) # Parse the message from Json to Ruby hash
if !message['send_after'] or (message['send_after'] and Time.parse(message['send_after']) < Time.now)
self.class.send_mt(message['sender'], message['receiver'], message['body'])
REDIS.publish 'log:messages', "#{message['sender']} -> #{message['receiver']}: #{message['body']}"
else
REDIS.lpush 'messages:queue', Yajl::Encoder.encode(message)
end
end
EM.next_tick amp;pop_message
end
end
end
end
Не идеально, и его тоже следовало бы немного почистить, но это больше похоже на то, как это должно быть в EventMachine. Никаких спящих режимов, избегайте использования defer, если это возможно, и не используйте сетевые драйверы, которые потенциально блокируют, реализуйте традиционный цикл, перенеся все в следующий цикл reactor. С точки зрения Redis, разница не такая уж большая, но, имхо, так больше похоже на EventMachine.
Надеюсь, это поможет. С удовольствием объясню подробнее, если у вас все еще есть вопросы.
Комментарии:
1. Я думал, что синтаксис Module::method (двоеточия) в наши дни вроде как отказались в пользу точек. Есть ли причина, по которой вы используете двоеточия здесь?
2. Думаю, это остаток от исходного кода, который я скопировал. Я тоже предпочитаю синтаксис dot, но я все еще вижу, что в коде используются оба варианта.
3. Большое вам спасибо! Это здорово! но не могли бы вы ознакомиться с моим окончательным кодом, который отлично работает здесь: gist.github.com/1009106 . Но у меня есть несколько мыслей: 1. Я думаю, что нет необходимости запрашивать redis при каждом тике. Или, может быть, это не так сильно беспокоит redis? 2. Правильно ли выполнен мой новый код. Достаточно ли сейчас evenmachiny? 😉 3. Лично мне не нравятся лямбды. Могу ли я вместо этого переместить его в метод. 4. Я думал о создании 2 дочерних рабочих элементов в дочерних процессах с использованием ядра. разветвляйте и позвольте материнскому монитору выполнять дочерние функции. Как вы думаете, это сработает и есть ли от этого польза?
4. Извините за беспорядочное форматирование. Я отформатировал его, но ТАК сжал его вместе.
5. Это немного необычно. Вместо этого вы можете использовать blpop. Это не вернется, пока что-то не будет добавлено в список. Однако коду потребуется небольшая корректировка в том смысле, что next_tick вызывается только в обратных вызовах как для REDIS.publish, так и для REDIS.lpush, а не в конце блока для REDIS.lpop. Что касается вашего кода, я бы все же посоветовал избавиться от проверки @unbound и вместо этого просто переподключиться при отключении.
Ответ №2:
Вы блокируете вызовы Redis в цикле reactor EM. Это работает, но это не правильный путь. Вы могли бы взглянуть на em-hiredis, чтобы правильно интегрировать вызовы Redis с EM.
Комментарии:
1. Да, спасибо. Я новичок в eventmachine. Как я могу реализовать redis puller без использования уродливого цикла? Можете ли вы привести мне пример того, как это исправить?