Thread Safe Connection Pool in Ruby
26 May 2020To demonstrate a thread safe implementation of a connection pool, we will use a class instance variable, a Mutex, and the connection_pool gem.
Class Instance Variables
Class instance variables work like regular class variables except for two main differences.
- Class instance variables are available only to class methods and not to instance methods.
- Class instance variables are not shared with sub-classes. They belong exclusively to the class itself.
We will use class instance variables (mainly for reason 1) to share a connection pool which all threads will be able to access. Here is a quick example of what class instance variables can do for us.
class Example
@my_value = 1
class << self
# class methods
def my_value
@my_value
end
def my_value=(value)
@my_value = value
end
end
# instance methods
def my_value
@my_value
end
def my_value=(value)
@my_value = value
end
end
# class method
puts Example.my_value # my_value = 1
Example.my_value = 2
puts Example.my_value # my_value = 2
# instance method
a = Example.new
puts a.my_value # my_value is not initialized at the instance level
a.my_value = 3
puts a.my_value # my_value = 3
# class method
puts Example.my_value # my_value is still 2 at the class level
When my_value
was accessed by class methods, its value was only accessed and updated at the class level. When accessed as an instance method, it was accessed and updated in a separate memory space at the instance level.
If you want to learn more about class instance variables, take a look at this article and this post.
Now that we have a way to access the same connection pool across threads, we need to actually create the pool so threads can use it.
Connection Pool
The connection_pool gem will allow us to create a pool of connections which we can use to grab a connection and return it after we’re done using it. This allows us to specify a static number of connections and forces our threads to share those connections.
class ConnectionManager
@pool = nil
class << self
def connection_pool
@pool || create_pool
end
private
def create_pool
@pool = ConnectionPool.new(size: 10) do
Dalli::Client.new
end
end
end
end
With this, we can now access the shared connection pool across threads by calling ConnectionManager.connection_pool
. To use one of the connections in the pool, we use with
which will yield a connection.
ConnectionManager.connection_pool.with do |conn|
# do work with the connection
conn.get(...)
end
However, there is a slight problem. Multiple threads could access connection_pool
at the same time, see that @pool
hasn’t been initialized, and then each one of those threads would create another connection pool. This would cause us to create extra pools (and extra connections) which we don’t want. We only want the first thread that accesses connection_pool
to initialize @pool
and not the consequent ones.
To demonstrate the race condition, we will slightly modify the class above to show how that’s possible.
class ConnectionManager
@my_var = nil
class << self
def connection_pool
@my_var || create_pool
end
private
def create_pool
return @my_var if @my_var
# if this was thread safe, @my_var would only be updated once
puts "updating my_var"
@my_var = 1
end
end
end
threads = []
10.times {
threads << Thread.new {
puts "[ThreadId=#{Thread.current.object_id}] my_values=#{ConnectionManager.connection_pool}"
}
}
threads.each(&:join)
Output
updating my_var
[ThreadId=70347336477020] my_values=1
updating my_var
[ThreadId=70347336475740] my_values=1
[ThreadId=70347315666620] my_values=1
[ThreadId=70347315666480] my_values=1
[ThreadId=70347315666340] my_values=1
[ThreadId=70347315666200] my_values=1
[ThreadId=70347315666060] my_values=1
[ThreadId=70347336475320] my_values=1
updating my_var
[ThreadId=70347336476360] my_values=1
updating my_var
[ThreadId=70347315665900] my_values=1
As you can see, create_pool
ends up being accessed multiple times, even when we have return @my_var if @my_var
to check for initialization. There is a race condition between threads that ends up updating @my_var
multiple times. We only want @my_var
to be updated once no matter how many times create_pool
is called.
We can use a Mutex
to solve this.
Mutex
In the ruby documentation, the Mutex
class has the following description
Mutex implements a simple semaphore that can be used to coordinate access to shared data from multiple concurrent threads.¹
We can use a mutex to avoid creating multiple pools when threads initially access ConnectionManager.connection_pool
.
class ConnectionManager
@pool = nil
@mutex = Mutex.new
class << self
def connection_pool
@pool || create_pool
end
private
def create_pool
# allows only one thread to access the mutex at a time which
# avoids creating multiple pools
@mutex.synchronize do
return @pool if @pool # in case threads after the first enter the mutex
@pool = ConnectionPool.new(size: 10) do
Dalli::Client.new
end
end
end
end
end
We’ve introduced a mutex in create_pool
to only allow one thread to create the pool at a time. Additionally, if threads are waiting for the mutex to be free while the pool is initially being created, return @pool if @pool
will stop those threads from creating a new pool when they get a chance to access the mutex.
At this point, we should be able to use connection_pool
safely across our threads.
In Action
So we can see this in action, we will again slightly modify the above class so we can run it in many threads and see what the output is.
With a Mutex
class ConnectionManager
@my_var = nil
@mutex = Mutex.new
class << self
def connection_pool
@my_var || create_pool
end
private
def create_pool
@mutex.synchronize do
return @my_var if @my_var
puts "updating my_var"
@my_var = 1
end
end
end
end
threads = []
10.times {
threads << Thread.new {
puts "[ThreadId=#{Thread.current.object_id}] my_values=#{ConnectionManager.connection_pool}"
}
}
threads.each(&:join)
Output
updating my_var
[ThreadId=70279845930440] my_values=1
[ThreadId=70279850352420] my_values=1
[ThreadId=70279850352280] my_values=1
[ThreadId=70279850352140] my_values=1
[ThreadId=70279850352000] my_values=1
[ThreadId=70279850351840] my_values=1
[ThreadId=70279850351700] my_values=1
[ThreadId=70279850351540] my_values=1
[ThreadId=70279845930020] my_values=1
[ThreadId=70279845929580] my_values=1
Now, @my_var
is only initialized once by the first thread and returned in consequent calls and threads.
There you have it! How to a create thread safe connection pool using class instance variables, a Mutex, and the connection_pool gem.