Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Am i using eventmachine in the right way?

I am using ruby-smpp and redis to achive a queue based background worker to send SMPP messages.

And i am wondering if I am using eventmachine in the right way. It works but it doesnt feel right.

#!/usr/bin/env ruby

# Sample SMS gateway that can receive MOs (mobile originated messages) and
# DRs (delivery reports), and send MTs (mobile terminated messages).
# MTs are, in the name of simplicity, entered on the command line in the format
# <sender> <receiver> <message body>
# MOs and DRs will be dumped to standard out.

require 'smpp'
require 'redis/connection/hiredis'
require 'redis'
require 'yajl'
require 'time'

LOGFILE = File.dirname(__FILE__) + "/sms_gateway.log"
PIDFILE = File.dirname(__FILE__) + '/worker_test.pid'
Smpp::Base.logger = Logger.new(LOGFILE)
#Smpp::Base.logger.level = Logger::WARN

REDIS = Redis.new

class MbloxGateway

  # MT id counter. 
  @@mt_id = 0

  # expose SMPP transceiver's send_mt method
  def self.send_mt(sender, receiver, body)

    if sender =~ /[a-z]+/i
      source_addr_ton = 5
    else
      source_addr_ton = 2
    end

    @@mt_id += 1
    @@tx.send_mt(('smpp' + @@mt_id.to_s), sender, receiver, body, {
      :source_addr_ton => source_addr_ton
    #   :service_type => 1,
    #   :source_addr_ton => 5,
    #   :source_addr_npi => 0 ,
    #   :dest_addr_ton => 2, 
    #   :dest_addr_npi => 1, 
    #   :esm_class => 3 ,
    #   :protocol_id => 0, 
    #   :priority_flag => 0,
    #   :schedule_delivery_time => nil,
    #   :validity_period => nil,
    #   :registered_delivery=> 1,
    #   :replace_if_present_flag => 0,
    #   :data_coding => 0,
    #   :sm_default_msg_id => 0 
    #     
    })
  end

  def logger
    Smpp::Base.logger
  end

  def start(config)
    # Write this workers pid to a file
    File.open(PIDFILE, 'w') { |f| f << Process.pid }
    # The transceiver sends MT messages to the SMSC. It needs a storage with Hash-like
    # semantics to map SMSC message IDs to your own message IDs.
    pdr_storage = {} 

    # Run EventMachine in loop so we can reconnect when the SMSC drops our connection.
    loop do
      EventMachine::run do             
        @@tx = EventMachine::connect(
          config[:host], 
          config[:port], 
          Smpp::Transceiver, 
          config, 
          self    # delegate that will receive callbacks on MOs and DRs and other events
        )

      # Let the connection start before we check for messages
      EM.add_timer(3) do
        # Maybe there is some better way to do this. IDK, But it works!
        EM.defer do
          loop do
            # Pop a message
            message = REDIS.lpop 'messages:send:queue'
            if message # If there is a message. Process it and check the queue again
              message = Yajl::Parser.parse(message, :check_utf8 => false) # Parse the message from Json to Ruby hash
              if !message['send_after'] or (message['send_after'] and Time.parse(message['send_after']) < Time.now)

                self.class.send_mt(message['sender'], message['receiver'], message['body']) # Send the message
                REDIS.publish 'log:messages', "#{message['sender']} -> #{message['receiver']}: #{message['body']}" # Push the message to the redis queue so we can listen to the channel
              else
                REDIS.lpush 'messages:queue', Yajl::Encoder.encode(message)
              end
            else # If there is no message. Sleep for a second
              sleep 1
            end
          end
        end
      end
    end
      sleep 2
    end
  end

  # ruby-smpp delegate methods 

  def mo_received(transceiver, pdu)
    logger.info "Delegate: mo_received: from #{pdu.source_addr} to #{pdu.destination_addr}: #{pdu.short_message}"
  end

  def delivery_report_received(transceiver, pdu)
    logger.info "Delegate: delivery_report_received: ref #{pdu.msg_reference} stat #{pdu.stat}"
  end

  def message_accepted(transceiver, mt_message_id, pdu)
    logger.info "Delegate: message_accepted: id #{mt_message_id} smsc ref id: #{pdu.message_id}"
  end

  def message_rejected(transceiver, mt_message_id, pdu)
    logger.info "Delegate: message_rejected: id #{mt_message_id} smsc ref id: #{pdu.message_id}"
  end

  def bound(transceiver)
    logger.info "Delegate: transceiver bound"
  end

  def unbound(transceiver)  
    logger.info "Delegate: transceiver unbound"
    EventMachine::stop_event_loop
  end

end

# Start the Gateway
begin   
  puts "Starting SMS Gateway. Please check the log at #{LOGFILE}"  

  # SMPP properties. These parameters work well with the Logica SMPP simulator.
  # Consult the SMPP spec or your mobile operator for the correct settings of 
  # the other properties.
  config = {
    :host => 'server.com',
    :port => 3217,
    :system_id => 'user',
    :password => 'password',
    :system_type => 'type', # default given according to SMPP 3.4 Spec
    :interface_version => 52,
    :source_ton  => 0,
    :source_npi => 1,
    :destination_ton => 1,
    :destination_npi => 1,
    :source_address_range => '',
    :destination_address_range => '',
    :enquire_link_delay_secs => 10
  }  
  gw = MbloxGateway.new
  gw.start(config)

rescue Exception => ex
  puts "Exception in SMS Gateway: #{ex} at #{ex.backtrace.join("\n")}"
end
like image 384
Lisinge Avatar asked Dec 06 '22 21:12

Lisinge


2 Answers

Some easy steps to make this code more EventMachine-ish:

  • Get rid of the blocking Redis driver, use em-hiredis
  • Stop using defer. Pushing work out to threads with the Redis driver will make things even worse as it relies on locks around the socket it's using.
  • Get rid of the add_timer(3)
  • Get rid of the inner loop, replace it by rescheduling a block for the next event loop using EM.next_tick. The outer one is somewhat unnecessary. You shouldn't loop around EM.run as well, it's cleaner to properly handle a disconnect by doing a reconnect in your unbound method instead of stopping and restarting the event loop, by calling the @@tx.reconnect.
  • Don't sleep, just wait. EventMachine will tell you when new things come in on a network socket.

Here's how the core code around EventMachine would look like with some of the improvements:

def start(config)
  File.open(PIDFILE, 'w') { |f| f << Process.pid }
  pdr_storage = {} 

  EventMachine::run do
    @@tx = EventMachine::connect(
      config[:host], 
      config[:port], 
      Smpp::Transceiver, 
      config, 
      self
    )
    REDIS = EM::Hiredis.connect

    pop_message = lambda do
      REDIS.lpop 'messages:send:queue' do |message|
        if message # If there is a message. Process it and check the queue again
          message = Yajl::Parser.parse(message, :check_utf8 => false) # Parse the message from Json to Ruby hash
          if !message['send_after'] or (message['send_after'] and Time.parse(message['send_after']) < Time.now)
            self.class.send_mt(message['sender'], message['receiver'], message['body'])
            REDIS.publish 'log:messages', "#{message['sender']} -> #{message['receiver']}: #{message['body']}"
          else
            REDIS.lpush 'messages:queue', Yajl::Encoder.encode(message)
          end
        end
        EM.next_tick &pop_message
      end
    end
  end
end

Not perfect and could use some cleaning up too, but this is more what it should be like in an EventMachine manner. No sleeps, avoid using defer if possible, and don't use network drivers that potentially block, implement traditional loop by rescheduling things on the next reactor loop. In terms of Redis, the difference is not that big, but it's more EventMachine-y this way imho.

Hope this helps. Happy to explain further if you still have questions.

like image 92
roidrage Avatar answered Dec 27 '22 21:12

roidrage


You're doing blocking Redis calls in EM's reactor loop. It works, but isn't the way to go. You could take a look at em-hiredis to properly integrate Redis calls with EM.

like image 20
Pieter Avatar answered Dec 27 '22 21:12

Pieter