
More than a thousand humans drowned in the sea but apparently no one is to blame for this accident.
Apparently no one cares.
Are lives that cheap? Looks like some people believe so.
When my heart was hardened and my courses constrained I made my hope a stairway to Your forgiveness. My sin burdened me heavily, but when I measured it by Your forgiveness Lord, Your forgiveness was the greater.
Or 40% faster DB Access for your Ruby applications!
In a previous post I talked a bit about event based programming for Ruby. I mentioned the EventMachine/Asymy combo as a means of doing Asynchronous database operations hence freeing up the Ruby runtime to do other things while it is waiting on database I/O operations. Even more, the devs need not worry about using a different programming model, with the help of Ruby Fibers we will continue to program in the same old ways while Fibers will be doing all the twisted work underneath. Very promising indeed, but one big elephant in the room was the immaturity of the current solution. Asymy is still very infant and it is based on the super slow pure Ruby MySQL driver, not to mention that it is fairly incomplete as well.
require 'pg'This way our connection is ready for async operations. Now we need to start sending some sql commands to our connection. To do that we normally use the PGconn#exec method. But this method will block, waiting on postgres. So instead we will use the PGconn#send_query method. This method will return immediately, not waiting for Postgres to actually process the sql command. Here's how are going to use it.
# I have configured postgres to run in *trusted* mode
# so I don't need to supply a password
conn = PGconn.new({:host=>'localhost',:user=>'postgres',:dbname=>'evented'})
conn.setnonblocking(true)
conn.send_query("select * from users where name like '%am%'")
# the method will return immediately (or raise an exception in case of an error)But wait, where are the results? Normally we expect the call to return with the data. Now where is my data? The results are being processed right now at the server side. We can continue to do other things till they come. But how do we know when they arrive? It turns out that this is easy as well. The PGconn instance provides a method that returns the connection's socket descriptor. PGconn#socket that is. We retrieve that socket descriptor and wrap it in a Ruby IO object by callingio = IO.new(conn.socket)Now have a nice IO object that we can get notified of its activity in a select call. For the uninitiated, event based programming is done by have a tight loop that runs forever. Within this loop we check if IO events happen and if so we respond to them. One efficient way of doing so is using the Ruby Kernel#select method (which is a wrapper to the UNIX select). The select method works that way: you provide it with three lists, one for sockets that you need to read from and one for sockets that you need to write to, the third is for errors that you are interested in. The call returns an array of the sockets that can be read/write or nil if none is ready.
# the method that will be called if input is readyThis way whenever there is info to read from the socket we will not get a nil (we will get an array actually) so we can call the process command. When the process command gets called it knows that there is data in the connection to be read so it calls the PGconn#consume_input method. After which it checks to see if the conn is busy or not. If it is still busy, it does nothing (it will do in a later event). On the other hand, if the connection is not busy then we start calling the PGconn#get_result method and append what we get to the result we got so far. We keep doing that till we get a nil result which indicates the end of the command and the readiness of the connection to accept further commands. Here is how the method will look like:
def process_command(conn)
# we will detail the implementation soon
end
loop do
# we supply a list of sockets we need to read from.
# Only our io object in this case. we nullify
# the other lists and we set a timeout
res = select([io],nil,nil, 0.001)
# of course this needs to be done in a cleaner way
process_command(conn) unless result.nil?
end
def process_command(conn)Several things to be noted. First, one cannot process several commands using the same connection at once. You need several connections to achieve parallel command processing. Second, the model described above works in the twisted way, to get things working the normal way you can use Ruby Fibers (or continuations but they apparently leak memory)
conn.consume_input
unless conn.is_busy
res, data = 0, []
while res != nil
res = get_result
res.each {|d| data.push d}unless res.nil?
end
#we are done, we need to put this data some where
end
end
require 'fiber_pool'This works as follows, once a fiber calls cpool.exec the query is sent to the pool for processing and the fiber is halted, giving way for another one to start processing. The other one will halt as well once it hits a cpool.exec. Later during the event loop you will get notifications of completion of queries (in any order) and resume the fiber associated with the finished query. Note that commands issued in the same fiber will run sequentially while those issued from different fibers will interleave. This is effectively what is achieved by threading but without its costs.
require 'fibered_connection_pool'
options = {:host=>'localhost',:user=>'postgres',:dbname=>'evented'}
cpool = FiberedC onnectionPool.new(options, 12)
# second param is the number of connections to spawn, defaults at 8
# note that one more connection than those will be spawned. This one
# will be used for processing blocking requests.
fpool = FiberPool.new(100)
# the number of fibers to spawn, defaults at 50
100.times do
fpool.spawn do
cpool.exec(some_sql_command, true) #true means async
cpool.exec(some_other_sql_command, true)
cpool.exec(yet_another_sql_command, true)
end
end
# our event loop
loop do
res = select(cpool.sockets,nil,nil,0) #check for something to read
# IO is monkey patched to be able to hold a reference to the connection
res.first.each{ |s|s.connection.process_command } if res
end

I tested for 10, 50 and 100 long queries with the following multipliers (1, 2, 5, 10, 50, 100). The graph shows the performance gain for each number of queries vs the multiplier. For example 50 long queries with a multiplier of 10 (i.e. 500 short queries) achieves a 39.6% reduction in query execution time. I have repeated many of the tests several time (not all of them, too lazy to do that). The repeated tests showed consistent results so I am pretty confident of the presented results.
The area I would like to focus on for performance tuning is the size of the fiber pool. The test is a bit sensitive to it so I believe I can gain a bit more performance with insane query counts if I optimize my fiber pool a bit. Setting the initial size too high certainly helps, but eats too much memory to make it usable.
Queries Mode
Ratio Long Short Blocking Non Blocking Advantage
:1/2 10 20 0.56 0.5 10.27%
50 100 2.55 2.26 11.19%
100 200 5.15 4.46 13.53%
:1/5 10 50 0.55 0.4 27.04%
50 250 2.72 1.83 32.82%
100 500 5.45 3.63 33.39%
:1/10 10 100 0.6 0.4 33.76%
50 500 3.01 1.82 39.67%
100 1000 5.9 3.65 38.13%
:1/20 10 200 0.72 0.45 38.12%
50 1000 3.43 2.1 38.73%
100 2000 6.83 4.33 36.53%
:1/50 10 500 0.98 0.62 36.57%
50 2500 4.78 3.23 32.36%
100 5000 9.74 8.68 10.93%
:1/100 10 1000 1.46 0.94 35.40%
50 5000 7.42 5.17 30.31%
100 10000 14.27 12.68 11.15%
def show
@user = User.find(params[id]) #db access
@events = Events.find(:all) #another db access
render :action => :show #rendering
end
connection.execute('SELECT * from events') do |headers,data|
# do something with headers and data
pp headers
pp data
endAsymy is still in a very early stage, the performance is horrible (as it is based on the darn slow pure Ruby MySQL driver) and it comes with many rough corners (I was not able to run INSERTs and UPDATEs without hacking it, and I am still not able to run the callbacks for those). Nevertheless, this is a formidable achievement on the road to a very fast single threaded implementation.#this is propably wrong but it can illustrateWe had to twist the function flow to be able to make use of the evented nature of the new driver. Instead of flow passing normally it is being scattered in the different callbacks. This is one of the areas where event based programming makes you change the way you think about program flow. A hurdle for many developers and a show stopper for some. No wonder the event library for Python is called Twisted
#the twisted nature of evented programming
def show
User.find(params[:id]) do |result_set|
@user = result_set
Events.find(:all) do |result_set|
@events = result_set
@events.each do |event|
event.owner = @user
if event != @events.last
event.save
else
event.save do |ev|
render :action => :show
end
end
end
end
end
end
def showHuh? this is the normal action code we are used to. Well, using fibers we can do this and still do things under the hood in an evented way.
@user = User.find(params[:id]
@events = Events.find(:all).each do |event|
event.owner = @user
event.save
end
end
require 'fiber'Let's see how can this be useful for dispatching controller actions (this code will preferrably be in the server itself)
fiber = Fiber.new do
#do something
Fiber.yield another_thing
#do yet another thing
end
yielded = fiber.resume # => runs the fiber till the yield,
# returns the yielded value
# and pauses the fiber where it is
fiber.resume #=> re-runs the fiber from the point it was paused.
fiber.resume #=> no more statements to run, raises an exception
Fiber.new doInside the action we call the find method repeatedly. This method could be implemented like this:
Dispatcher.dispatch(controller,action,req,res)
send_response res
end.resume
class DataStoreThis way whenever the code passes a find method it will pass the query to the db driver, return immediately and pause, giving room for other requests to be processed. Once the data comes from the db server the call back is run and it resumes the fiber (passing to it the result of the query). The result gets passed back to the caller of the function and the original action method continues till completion (or till it is paused again by another find method)
def find(*args)
query = construct_query(*args)
fiber = Fiber.current # grab the current fiber
conn.execute(query) do |headers, data|
fiber.resume convert_to_objects(data)
end
yield
end
end