Quote: Two is company, three is an orgy.
(Courtesy of fortune cookies)

Welcome to the Cool Spot of the Sun

Background: How to use Ruby to write fast a Distributed Application. 

Environment: Multiple Clients Available, Test program or the Program to execute is present on each machine and called 't' with parameters 'start' and 'end'. Such a Test Program could be able to take some other parameters in your case also.

Difficulty Level: Intermediate

Solution:Ruby - Drb is a way to implement Distributed Programming using Ruby Language.

Ruby - Drb is a way to implement Distributed Programming using Ruby Language

Drb runs always a service. Thus for both client and the server the code that is most similar would be declaring itself as a Service. This means that the idea of a client and a server is somewhat blurried.

[Do note: code is italicized and bolded]

Also required is the inclusion of the library api implementing Drb.

require 'drb'

A service is started using the following command. 

 DRb.start_service()

This differs for both the server and the client. 

The reason being? Client 'need not' announce its IP and Port to everyone, But Server needs to announce its IP and port. Let's say its NOT a Requirement. Both can have just DRb.start_service().

In our example the Server is known to all and the Clients need not be known to all except the Server.

Client.rb:

require 'drb'
serveraddress = 'druby://192.168.0.12:9000'

class Client
  def do_work(assignwork)
  end
  def exit
  end
end

DRb.start_service()
#now register with the server
obj = DRbObject.new(nil,serveraddress)
# Now use obj
obj.register
DRb.thread.join 

 

As Seen above We have declared a Client obj which is rather a stub and doing nothing. It can be omitted if needed. What is interesting is the usage of DRb.start_service() which will start the Client as a service. 

Than it will call an Object residing on the Server using DRbObject.new. The first argument for DRbObject.new() is the Local object we want to create the stub for, which is not needed in our case. While the second argument is the URI of the remote object. The URI or Uniform Resource Identifier in our case is a special ruby string with the IP address of the remote server program's machine and the port address. It is done by using the following format 'druby://IP-address:Port'.

Once done we can directly call the remote object's method. In my case it is object method register().

The last line DRb.theread.join will make the client running till it doesn't quit. If this line is omitted than the client will exit after the call to Obj.register returns.

Do Note: The line in RED above is the server address. You will need to change that in your case. Do make it the IP address or the host name that is accessible from the Client.

 Server.rb

#!/usr/bin/ruby
serveraddress = 'druby://192.168.0.12:9000'
require 'drb'

class Server
  include DRbUndumped
  def initialize
    @client_list = Hash.new
    @@minimum_number =3
    @@work_sent =false
    @mutex_registering = Mutex.new
    @work_list = Hash.new
    @work = ["./t 2 1000000","./t 1000000 2000000","./t 2000000 3000000"]
    @client_Execution=Array.new    
    @@finalwork = String.new
  end
 
  def register(port,efficiency)
    @mutex_registering.synchronize do
     #register this client in a array
     if @client_list.length <@@minimum_number
       ip = getIP
       addToList(ip,port,efficiency)  
       if @@work_sent ==false
         puts "Waiting for #{@@minimum_number-@client_list.length} more clients"
       else
         puts "Work already assigned"
       end
     else
       client = Thread.current['DRb']['client']
       a= Array.new
       a= client.peeraddr if client.respond_to?(:peeraddr)
       #a[1] is the remote port a[3] is the remote server ip
       puts "Incoming connection from #{a[3]} rejected due to excessive clients"
       obj = DRbObject.new(nil,"druby://#{a[3]}:#{port}")
       puts "Invoking druby://#{a[3]}:#{port} to kill the Client"
       obj.exit_client("Server called Exit too many clients")
     end     
     if @@work_sent ==false && @client_list.length >=@@minimum_number
       @@work_sent = true
       sendwork
     end
    end
  end

  def unregister(port)
    @mutex_registering.synchronize do
       client = Thread.current['DRb']['client']
       a= Array.new
       a= client.peeraddr if client.respond_to?(:peeraddr)
       client_uri = "#{a[3]}:#{port}"
       @client_list.each_key { |clientaddress|
          if clientaddress == client_uri
              @client_list.delete(clientaddress)
              puts @client_list
          end
       }
       puts "#{client_uri} went away :-("
       #now check whether the client had a work assigned or not
       if @work_list.has_key?(client_uri)
       end
       if @client_list.length >=1
          puts "now we have #{@client_list} left"
       else
          puts "No-one's left ;-)"
       end
    end
  end

  def getIP
     client = Thread.current['DRb']['client']
     a= Array.new
     a= client.peeraddr if client.respond_to?(:peeraddr)
     puts "Incoming connection from #{a[3]}"
  end
 
  def addToList(ip,port,efficiency)
    #add only if the client is not already counted in
    if @client_list.has_key?("#{ip}:#{port}")
      return
    end
    @client_list["#{ip}:#{port}"] = efficiency.to_i   
  end
 

  def printToList
    @client_list
  end   

  def exit_clients
    @client_list.each_key { |clientaddress|
      obj = DRbObject.new(nil,"druby://#{clientaddress}")
      puts "druby://#{clientaddress}"
      obj.exit_client("Client can be exited now")
    }
  end
 

  def sendwork
    puts "Assigning work"
    threads = []
    @client_list1= @client_list.sort{|a,b| a[1]<=>b[1]}
    @client_list = {}
    #from http://blade.nagaokaut.ac.jp/cgi-bin/scat.rb/ruby/ruby-talk/76424
    @client_list1.each do |elt|
    raise TypeError unless elt.is_a? Array
        key, value = elt[0..1]
        @client_list[key] = value
        @client_Execution.push(key)
    end
    
    while !@work.empty?
       #for each client pop and choose a work
       work = @work.pop
       #then it will check if there is something left in the client_execution list
       #if none that means that we had spawned enough threads. its now time to join
       #if there is something left pop the client and spawn a thread
       if !@client_Execution.empty?
         client1 = @client_Execution.pop
       else
         threads.each { |aThread| aThread.join }
         #i guess as we are joining all the threads they can now be emptied
         threads = []
       end
       threads << Thread.new(client1) { |client|
          obj = DRbObject.new(nil,"druby://#{client}")
          if !obj.doWork(work)
            @work.push(work)
          else
            puts obj.postwork
            @@finalwork << obj.postwork
            @client_Execution.push(client)
          end
       }
    end
    threads.each { |aThread| aThread.join }
    puts "Done"
  end
    
  def print_clientlist
    @client_list.each_key { |clientaddress|
       puts "#{clientaddress} - #{@client_list[clientaddress]}"
    }
  end

end
 

aServerObject = Server.new
DRb.start_service(serveraddress, aServerObject)

puts DRb.uri
at_exit { aServerObject.exit_clients; puts "Goodbye"; aServerObject.print_clientlist;}

DRb.thread.join # Don't exit just yet!


As seen above we are declaring a Server Class. The methods of interest for the Client are mainly Register and Unregister. Register() will register the Client to the Server, Whereas Unregister() will unregister the client. When a client is registered it is added to a Client_list. The server object will wait till 'n' (@@minimum_number) of clients connect. Then it will assign the work to all the n clients on basis of @work. @work over here is a command that has to be called on the command line and executed. It can be anything to your liking. Once all the work is done more work is sent until all work is finished. Also note that there is no direct way to sort a Hash!!!


TODO's:Work to be done should arbitary Ruby code. Use a better scheduler for work execution. Ability to kill Clients on Exiting.

References:

1.http://blade.nagaokaut.ac.jp/cgi-bin/scat.rb/ruby/ruby-talk/76424

2.http://www.chadfowler.com/ruby/drb.html

3.http://www.rubycentral.com/articles/drb.html

4.http://segment7.net/projects/ruby/drb/introduction.html 

Get Firefox!  Use OpenOffice.org lastRSS.php - RSS parser for PHP Valid HTML 4.01! Valid CSS! IP Address Lookup


The visitor number 13195
Disclaimer