Scout – An SQS listener for Rails apps and Sidekiq
We recently built Scout, a tool for consuming SQS messages and enqueueing them as Sidekiq jobs. It’s becoming a standard piece of our toolkit here at Enova, and I wanted to share a bit about some of the design considerations and challenges that went into building it.
The Problem
For the last few years we’ve been moving more and more to a Service Oriented Architecture here at Enova, which means as we add more features and functionality to our platform we’re adding more services to handle them. While it’s been helpful to keep our problem domains separate, managing communication within our growing cloud is becoming increasingly difficult. One of the technologies we’ve started to use to make this more manageable is Amazon’s SNS and SQS, which makes it easier for our applications to broadcast information other applications might need. Any consumers who need the information can then subscribe to the topic and process that data from an SQS queue.
Before starting to use SNS/SQS, we already had many use cases for background processing in our Rails apps. Sidekiq (shout-out to Mike Perham) has become our preferred tool for processing scheduled or on-the-fly background jobs. Sidekiq has a lot of built-in reliability features such as configurable retry strategies and locking to ensure jobs are unique, so we wanted to be able to run our SNS/SQS based jobs with it as well. The issue is that we didn’t find anything that would specifically integrate Sidekiq with an SQS queue. There were some libraries that were built to emulate some of the features of Sidekiq while being hooked up to SQS, but we wanted to be able to seamlessly switch between SQS and non-SQS jobs.
The MVP solution we came up with to get SNS/SQS out the door was a rake task that we could run on the same server as our application. We used circuitry to loop and read from SQS and then enqueue the Sidekiq jobs. That job looked roughly like this.
task :circuitry => :environment do Signal.trap('USR2') do # handle and exit end Circuitry.subscribe do |message, topic_name| case topic_name when :foo_worker FooWorker.perform_async(message) # ... end end end
The issue with the task is that we essentially had to copy-paste it to every Rails app that wanted to subscribe to SNS events. We didn’t want to add too much complicated handling for signals or errors since that would increase the amount of code we would need to share. We thought about making a gem, but it was relatively difficult to design a gem that could take its configuration from the app this way without relying on a lot of meta-programming.
The Solution
Processing an SQS queue is its own blocking loop, so we decided the best design would be to create a separate tool just to handle that work. We decided to build it in Go since we’ve had success in the past building small, efficient tools with it. The fact that a Go project compiles down to a single binary also makes it easy for us to install it any particular machine.
The design of Scout itself is relatively straightforward. As a general pattern, our apps have a single SQS queue for incoming messages that may be subscribed to multiple topics. Scout polls the queue at a pre-defined interval, and for each SQS message it receives it enqueues a Sidekiq job in Redis. It takes its configuration from a YAML file that includes the AWS credentials, the Redis connection information, and a mapping from SNS topics to Sidekiq workers. It’s meant to be run in the background as a continually processing daemon alongside the app.
Signal Handling
One of the challenges of building Scout was the question of how to safely handle shutdown. When we (re)-deploy our application we need to restart the daemon in case its configuration was changed, but that restart could come in the middle of handling any given message. If we delete the message before we’ve enqueued it in the database and then get a stop signal we could potentially drop the message entirely. On the other hand if we delete the message after enqueueing it, we could get a stop signal after enqueueing and “forget” to delete the message that we already processed.
The Go standard library gives us some tools for dealing with signals properly. In particular, os/signal provides a function to register a channel to receive certain signals. We can also create a channel to send a notification on a frequency with time.Tick(). Once we have those two channels set up, we can define our Listen
loop.
func Listen(queue Queue, freq <-chan time.Time) { for { select { case <-signals: log.Info("Got TERM") queue.Semaphore().Wait() return case tick := <-freq: log.Debug("Polling at: ", tick) queue.Semaphore().Add(1) go queue.Poll() } } }
Here queue.Semaphore()
is a WaitGroup that we can use to ensure that we’re not going to quit before all the work we started has finished. Before each call to Poll()
we make sure to increment the WaitGroup, and the poll defers a call to Done()
, which will decrement the WaitGroup. When we get a signal, the call to Wait()
blocks so we won’t start any new work, and then once all the in-flight work is complete, we return and the program can safely exit.
The built in concurrency primitives really make it easy to orchestrate our different processes and make sure we’re not dropping anything. I highly recommend pattern of using a select
statement over channels to interrupt a steady stream of work, which you can read more about here.
Testing
So how do we test a tool that needs to connect to a Redis instance as well as AWS? Making a call out to those services each time we run a test would be expensive in both time and money, as well as making it impossible to run tests locally.
For ease of unit testing we set up interfaces for dealing with SQS and enqueueing workers that provide the small set of functions we need.
// SQSClient is an interface for SQS type SQSClient interface { // Fetch returns the next batch of SQS messages Fetch() ([]Message, error) // Delete deletes a single message from SQS Delete(Message) error } // WorkerClient is an interface for enqueueing workers type WorkerClient interface { // Push pushes a worker onto the queue Push(class, args string) (string, error) }
All of the implementation details, external libraries, etc. are encapsulated behind that interface so the rest of the code can’t access them. Encapsulation like this is good design in general, but it also makes our lives much easier when we want to test. Because we have an interface we can just substitute one of our clients for a mocked client that fulfills the interface. For example, the mocked worker client.
type MockWorkerClient struct { Enqueued [][]string EnqueuedJID string EnqueueError error } func (m *MockWorkerClient) Push(class, args string) (string, error) { m.Enqueued = append(m.Enqueued, []string{class, args}) return m.EnqueuedJID, m.EnqueueError }
This client doesn’t do anything with what we give it, but it keeps track of all the arguments it gets called with. We can also pre-configure its return values, so that we test behavior like error cases and invalid data.
But what about testing the core of the client itself? For that we wrote tests to call out to the external services to validate our integration. However, we don’t want to run these expensive tests all the time, so we used Go’s build tags to only compile these tests if manually specified.
Check it out
Scout is open source, so if you want to use it for your own work or if you’re just interested in how it was written you should check us out on Github. Pull requests are welcome, so if you think there’s something we could have done better we’d love to add your contributions.