|
Background: How to use Ruby to write fast a Distributed Application.
Environment: Multiple Clients Available, Test program or the Program to execute is present on each machine and called 't' with parameters 'start' and 'end'. Such a Test Program could be able to take some other parameters in your case also.
Difficulty Level: Intermediate
Solution:Ruby - Drb is a way to implement Distributed Programming using Ruby Language.
Ruby - Drb is a way to implement Distributed Programming using Ruby Language
Drb runs always a service. Thus for both client and the server the code that is most similar would be declaring itself as a Service. This means that the idea of a client and a server is somewhat blurried.
[Do note: code is italicized and bolded]
Also required is the inclusion of the library api implementing Drb.
require 'drb'
A service is started using the following command.
DRb.start_service()
This differs for both the server and the client.
The reason being? Client 'need not' announce its IP and Port to everyone, But Server needs to announce its IP and port. Let's say its NOT a Requirement. Both can have just DRb.start_service().
In our example the Server is known to all and the Clients need not be known to all except the Server.
Client.rb:
require 'drb' serveraddress = 'druby://192.168.0.12:9000' class Client def do_work(assignwork) end def exit end end DRb.start_service() #now register with the server obj = DRbObject.new(nil,serveraddress) # Now use obj obj.register DRb.thread.join
As Seen above We have declared a Client obj which is rather a stub and doing nothing. It can be omitted if needed. What is interesting is the usage of DRb.start_service() which will start the Client as a service.
Than it will call an Object residing on the Server using DRbObject.new. The first argument for DRbObject.new() is the Local object we want to create the stub for, which is not needed in our case. While the second argument is the URI of the remote object. The URI or Uniform Resource Identifier in our case is a special ruby string with the IP address of the remote server program's machine and the port address. It is done by using the following format 'druby://IP-address:Port'.
Once done we can directly call the remote object's method. In my case it is object method register().
The last line DRb.theread.join will make the client running till it doesn't quit. If this line is omitted than the client will exit after the call to Obj.register returns.
Do Note: The line in RED above is the server address. You will need to change that in your case. Do make it the IP address or the host name that is accessible from the Client.
Server.rb
#!/usr/bin/ruby serveraddress = 'druby://192.168.0.12:9000' require 'drb' class Server include DRbUndumped def initialize @client_list = Hash.new @@minimum_number =3 @@work_sent =false @mutex_registering = Mutex.new @work_list = Hash.new @work = ["./t 2 1000000","./t 1000000 2000000","./t 2000000 3000000"] @client_Execution=Array.new @@finalwork = String.new end def register(port,efficiency) @mutex_registering.synchronize do #register this client in a array if @client_list.length <@@minimum_number ip = getIP addToList(ip,port,efficiency) if @@work_sent ==false puts "Waiting for #{@@minimum_number-@client_list.length} more clients" else puts "Work already assigned" end else client = Thread.current['DRb']['client'] a= Array.new a= client.peeraddr if client.respond_to?(:peeraddr) #a[1] is the remote port a[3] is the remote server ip puts "Incoming connection from #{a[3]} rejected due to excessive clients" obj = DRbObject.new(nil,"druby://#{a[3]}:#{port}") puts "Invoking druby://#{a[3]}:#{port} to kill the Client" obj.exit_client("Server called Exit too many clients") end if @@work_sent ==false && @client_list.length >=@@minimum_number @@work_sent = true sendwork end end end def unregister(port) @mutex_registering.synchronize do client = Thread.current['DRb']['client'] a= Array.new a= client.peeraddr if client.respond_to?(:peeraddr) client_uri = "#{a[3]}:#{port}" @client_list.each_key { |clientaddress| if clientaddress == client_uri @client_list.delete(clientaddress) puts @client_list end } puts "#{client_uri} went away :-(" #now check whether the client had a work assigned or not if @work_list.has_key?(client_uri) end if @client_list.length >=1 puts "now we have #{@client_list} left" else puts "No-one's left ;-)" end end end def getIP client = Thread.current['DRb']['client'] a= Array.new a= client.peeraddr if client.respond_to?(:peeraddr) puts "Incoming connection from #{a[3]}" end def addToList(ip,port,efficiency) #add only if the client is not already counted in if @client_list.has_key?("#{ip}:#{port}") return end @client_list["#{ip}:#{port}"] = efficiency.to_i end
def printToList @client_list end
def exit_clients @client_list.each_key { |clientaddress| obj = DRbObject.new(nil,"druby://#{clientaddress}") puts "druby://#{clientaddress}" obj.exit_client("Client can be exited now") } end
def sendwork puts "Assigning work" threads = [] @client_list1= @client_list.sort{|a,b| a[1]<=>b[1]} @client_list = {} #from http://blade.nagaokaut.ac.jp/cgi-bin/scat.rb/ruby/ruby-talk/76424 @client_list1.each do |elt| raise TypeError unless elt.is_a? Array key, value = elt[0..1] @client_list[key] = value @client_Execution.push(key) end while !@work.empty? #for each client pop and choose a work work = @work.pop #then it will check if there is something left in the client_execution list #if none that means that we had spawned enough threads. its now time to join #if there is something left pop the client and spawn a thread if !@client_Execution.empty? client1 = @client_Execution.pop else threads.each { |aThread| aThread.join } #i guess as we are joining all the threads they can now be emptied threads = [] end threads << Thread.new(client1) { |client| obj = DRbObject.new(nil,"druby://#{client}") if !obj.doWork(work) @work.push(work) else puts obj.postwork @@finalwork << obj.postwork @client_Execution.push(client) end } end threads.each { |aThread| aThread.join } puts "Done" end def print_clientlist @client_list.each_key { |clientaddress| puts "#{clientaddress} - #{@client_list[clientaddress]}" } end end
aServerObject = Server.new DRb.start_service(serveraddress, aServerObject) puts DRb.uri at_exit { aServerObject.exit_clients; puts "Goodbye"; aServerObject.print_clientlist;} DRb.thread.join # Don't exit just yet!
As seen above we are declaring a Server Class. The methods of interest for the Client are mainly Register and Unregister. Register() will register the Client to the Server, Whereas Unregister() will unregister the client. When a client is registered it is added to a Client_list. The server object will wait till 'n' (@@minimum_number) of clients connect. Then it will assign the work to all the n clients on basis of @work. @work over here is a command that has to be called on the command line and executed. It can be anything to your liking. Once all the work is done more work is sent until all work is finished. Also note that there is no direct way to sort a Hash!!!
TODO's:Work to be done should arbitary Ruby code. Use a better scheduler for work execution. Ability to kill Clients on Exiting.
References:
1.http://blade.nagaokaut.ac.jp/cgi-bin/scat.rb/ruby/ruby-talk/76424
2.http://www.chadfowler.com/ruby/drb.html
3.http://www.rubycentral.com/articles/drb.html
4.http://segment7.net/projects/ruby/drb/introduction.html
|