In today's post, we are going to implement a naive background processing system for fun! We might learn some things along the way as a peek into the internals of popular background processing systems like Sidekiq. The product of this fun is by no means intended for production use.
Let’s imagine we have a task in our application that loads one or more websites and extracts their titles. As we don’t have any influence on the performance of these websites, we’d like to perform the task outside our main thread (or the current request—if we’re building a web application), but in the background.
Encapsulating a Task
Before we get into background processing, let’s build a service object to perform the task at hand. We’ll use OpenURI and Nokogiri to extract the contents of the title tag.
1require 'open-uri'
2require 'nokogiri'
3
4class TitleExtractorService
5 def call(url)
6 document = Nokogiri::HTML(open(url))
7 title = document.css('html > head > title').first.content
8 puts title.gsub(/[[:space:]]+/, ' ').strip
9 rescue
10 puts "Unable to find a title for #{url}"
11 end
12end
Calling the service prints the title of the given URL.
1TitleExtractorService.new.call('https://appsignal.com')
2# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir
This works as expected, but let’s see if we can improve the syntax a little to make it look and feel a bit more like other background processing systems. By creating a Magique::Worker
module, we can add some syntactic sugar to the service object.
1module Magique
2 module Worker
3 def self.included(base)
4 base.extend(ClassMethods)
5 end
6
7 module ClassMethods
8 def perform_now(*args)
9 new.perform(*args)
10 end
11 end
12
13 def perform(*)
14 raise NotImplementedError
15 end
16 end
17end
The module adds a perform
method to the worker instance and a perform_now
method to the worker class to make the invocation a bit better.
Let’s include the module into our service object. While we’re at it, let’s also rename it to TitleExtractorWorker
and change the call
method to perform
.
1class TitleExtractorWorker
2 include Magique::Worker
3
4 def perform(url)
5 document = Nokogiri::HTML(open(url))
6 title = document.css('html > head > title').first.content
7 puts title.gsub(/[[:space:]]+/, ' ').strip
8 rescue
9 puts "Unable to find a title for #{url}"
10 end
11end
The invocation still has the same result, but it’s a bit clearer what's going on.
1TitleExtractorWorker.perform_now('https://appsignal.com')
2# AppSignal: Application Performance Monitoring for Ruby on Rails and Elixir
Implementing Asynchronous Processing
Now that we have the title extraction working, we can grab all titles from past Ruby Magic articles. To do this, let’s assume we have a RUBYMAGIC
constant with a list of all the URLs of past articles.
1RUBYMAGIC.each do |url|
2 TitleExtractorWorker.perform_now(url)
3end
4
5# Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
6# Bindings and Lexical Scope in Ruby | AppSignal Blog
7# Building a Ruby C Extension From Scratch | AppSignal Blog
8# Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
9# ...
We get the titles of past articles, but it takes a while to extract them all. That’s because we wait until each request is completed before moving on to the next one.
Let’s improve that by introducing a perform_async
method to our worker module. To speed things up, it creates a new thread for each URL.
1module Magique
2 module Worker
3 module ClassMethods
4 def perform_async(*args)
5 Thread.new { new.perform(*args) }
6 end
7 end
8 end
9end
After changing the invocation to TitleExtractorWorker.perform_async(url)
, we get all the titles almost at once. However, this also means that we’re opening more than 20 connections to the Ruby Magic blog at once. (Sorry for messing with your blog, folks! 😅)
If you’re following along with your own implementation and testing this outside of a long-running process (like a web server), don’t forget to add something like loop { sleep 1 }
to the end of your script to make sure the process doesn’t immediately terminate.
Queueing up Tasks
With the approach of creating a new thread for every invocation, we’ll eventually hit resource limits (both on our side and on the websites we are accessing). As we’d like to be nice citizens, let’s change the implementation to something that is asynchronous but doesn’t feel like a denial-of-service attack.
A common way to solve this problem is to use the producer/consumer pattern. One or more producers push tasks onto a queue while one or more consumers take tasks from the queue and process them.
A queue is basically a list of elements. In theory, a simple array would do the job. However, as we’re dealing with concurrency, we need to make sure that only one producer or consumer can access the queue at a time. If we aren’t careful about this, things will end in chaos—just like two people trying to squeeze through a door at once.
This problem is known as the producer-consumer problem and there are multiple solutions to it. Luckily, it is a very common problem and Ruby ships with a proper Queue
implementation that we can use without having to worry about thread synchronization.
To use it, let’s make sure both producers and consumers can access the queue. We do this by adding a class method to our Magique
module and assigning an instance of Queue
to it.
1module Magique
2 def self.backend
3 @backend
4 end
5
6 def self.backend=(backend)
7 @backend = backend
8 end
9end
10
11Magique.backend = Queue.new
Next, we change our perform_async
implementation to push a task onto the queue instead of creating its own new thread. A task is represented as a hash including a reference to the worker class as well as the arguments passed to the perform_async
method.
1module Magique
2 module Worker
3 module ClassMethods
4 def perform_async(*args)
5 Magique.backend.push(worker: self, args: args)
6 end
7 end
8 end
9end
With that, we’re done with the producer side of things. Next, let’s take a look at the consumer side.
Each consumer is a separate thread that takes tasks from the queue and performs them. Instead of stopping after one task, like the thread, the consumer then takes another task from the queue and performs it, and so on. Here’s a basic implementation of a consumer called Magique::Processor
. Each processor creates a new thread that loops infinitely. For every iteration, it tries to grab a new task from the queue, creates a new instance of the worker class, and calls its perform
method with the given arguments.
1module Magique
2 class Processor
3 def self.start(concurrency = 1)
4 concurrency.times { |n| new("Processor #{n}") }
5 end
6
7 def initialize(name)
8 thread = Thread.new do
9 loop do
10 payload = Magique.backend.pop
11 worker_class = payload[:worker]
12 worker_class.new.perform(*payload[:args])
13 end
14 end
15
16 thread.name = name
17 end
18 end
19end
In addition to the processing loop, we add a convenience method called Magique::Processor.start
. This allows us to spin up multiple processors at once. While naming the thread isn’t really necessary, it will allow us to see if things are actually working as expected.
Let’s adjust the output of our TitleExtractorWorker
to include the name of the current thread.
1puts "[#{Thread.current.name}] #{title.gsub(/[[:space:]]+/, ' ').strip}"
To test our background processing setup, we first need to spin up a set of processors before enqueueing our tasks.
1Magique.backend = Queue.new
2Magique::Processor.start(5)
3
4RUBYMAGIC.each do |url|
5 TitleExtractorWorker.perform_async(url)
6end
7
8# [Processor 3] Bindings and Lexical Scope in Ruby | AppSignal Blog
9# [Processor 4] Building a Ruby C Extension From Scratch | AppSignal Blog
10# [Processor 1] Unraveling Classes, Instances and Metaclasses in Ruby | AppSignal Blog
11# [Processor 0] Ruby's Hidden Gems, StringScanner | AppSignal Blog
12# [Processor 2] Fibers and Enumerators in Ruby: Turning Blocks Inside Out | AppSignal Blog
13# [Processor 4] Closures in Ruby: Blocks, Procs and Lambdas | AppSignal Blog
14# ...
When this is run, we still get the titles of all articles. While it’s not as fast as just using a separate thread for every task, it’s still faster than the initial implementation that had no background processing. Thanks to the added processor names, we can also confirm that all processors are working through the queue. By tweaking the number of concurrent processors, it’s possible to find a balance between processing speed and existing resource limitations.
Expanding to Multiple Processes and Machines
So far, the current implementation of our background processing system works well enough. It’s still limited to the same process, though. Resource-hungry tasks will still affect the performance of the entire process. As a final step, let’s look at distributing the workload across multiple processes and maybe even multiple machines.
The queue is the only connection between producers and consumers. Right now, it’s using an in-memory implementation. Let’s take more inspiration from Sidekiq and implement a queue using Redis.
Redis has support for lists that allow us to push and fetch tasks from. Additionally, the Redis Ruby gem is thread-safe and the Redis commands to modify lists are atomic. These properties make it possible to use it for our asynchronous background processing system without running into synchronization problems.
Let’s create a Redis backed queue that implements the push
and shift
methods just like the Queue
we used previously.
1require 'json'
2require 'redis'
3
4module Magique
5 module Backend
6 class Redis
7 def initialize(connection = ::Redis.new)
8 @connection = connection
9 end
10
11 def push(job)
12 @connection.lpush('magique:queue', JSON.dump(job))
13 end
14
15 def shift
16 _queue, job = @connection.brpop('magique:queue')
17 payload = JSON.parse(job, symbolize_names: true)
18 payload[:worker] = Object.const_get(payload[:worker])
19 payload
20 end
21 end
22 end
23end
As Redis doesn’t know anything about Ruby objects, we have to serialize our tasks into JSON before storing them in the database using the lpush
command that adds an element to the front of the list.
To fetch a task from the queue, we’re using the brpop
command, which gets the last element from a list. If the list is empty, it’ll block until a new element is available. This is a nice way to pause our processors when no tasks are available. Finally, after getting a task out of Redis, we have to look up the real Ruby class based on the name of the worker using Object.const_get
.
As a final step, let’s split things up into multiple processes. On the producer side of things, the only thing we have to do is change the backend to our newly implemented Redis queue.
1# ...
2
3Magique.backend = Magique::Backend::Redis.new
4
5RUBYMAGIC.each do |url|
6 TitleExtractorWorker.perform_async(url)
7end
On the consumer side of things, we can get away with a few lines like this:
1# ...
2
3Magique.backend = Magique::Backend::Redis.new
4Magique::Processor.start(5)
5
6loop { sleep 1 }
When executed, the consumer process will wait for new work to arrive in the queue. Once we start the producer process that pushes tasks into the queue, we can see that they get processed immediately.
Enjoy Responsibly and Don’t Use This in Production
While we kept it far from a real world setup you would use in production (so don't!), we took a few steps in building a background processor. We started by making a process run as a background service. Then we made it async and used Queue
to solve the producer-consumer problem. Then we expanded the process to multiple processes or machines using Redis rather then an in-memory implementation.
As mentioned before, this is a simplified implementation of a background processing system. There are a lot of things missing and not explicitly dealt with. These include (but are not limited to) error handling, multiple queues, scheduling, connection pooling, and signal handling.
Nonetheless, we had fun writing this and hope you enjoyed a peek under the hood of a background processing system. Perhaps you even took away a thing or two.