Supercharge Resque and Sidekiq With Go Part 2
In Part 1 we looked at a way to push jobs onto a queue with Ruby and have a Go program consume them. In Part 2 I want to explore some real world examples.
Tracking & Analytics
Your PM has asked you plug in another analytics service (on top of the 8 you
already have) to measure the usage of your application. Whenever a request
happens the relevant user details are logged. Email, user_id
, url, ip address,
etc.
There is an existing Sidekiq worker that looks similar to this
class TrackUserJob < ActiveJob::Base
queue_as :track
def perform(id, url, ip_address, controller, action)
user = User.find(id)
GenericService.track!(id: user.id, name: user.name, email: user.email, ip_address: ip_address, endpoint: "#{controller}/#{action}"
SomethingFromGoogle.track(id: user.id, name: user.name, email: user.email, url: url)
HotNewStartup.trendy_term_for_tracking("userId" => user.id, "userName" => user.name, "userEmail" => user.email)
#...
#...
#...
#...
end
end
This sort of job is an excellent candidate to switch over to Go. There are no application side-effects. Fire off a request and forget about it. It is also probably high volume - a million plus a month.
There is one blocker to remove before we can switch - the user database lookup
User.find(id)
. We could look up the user in our Go worker but I would
discourage that approach. Doing so duplicates knowledge of the database schema.
If a column changes, two programs now need to be updated.
Furthermore, a method like #name
by may not be a database column. It could be
a method on the ActiveRecord model which concatenates first_name
and
last_name
. Again we would be forced to duplicate that logic.
It is much easier to change the signature of the method. Rewrite it to just take all the string values it needs.
# From
def perform(id, url, ip_address, controller, action)
# To
def perform(id, name, email, url, ip_address, controller, action)
Now we have no need to query the database.
Pushing the Job onto the Queue
There are a few options on how to push the job onto the queue.
-
Change every place the method is called to use
Sidekiq::Client.push
instead ofTrackUserJob.perform_async(..args)
-
Keep the
TrackUserJob
class but make it almost empty. Remove everything except forqueue_as
Although Option 1 might involve a bit of shotgun surgery I tend to prefer it over having an empty and somewhat confusing class.
# somewhere in a controller
MyRubyJob.perform_async(current_user.id, current_user.name, current_user.email, current_path, request.ip_address, controller_name, action_name)
# becomes
Sidekiq::Client.push(
"queue" => "track",
"class" => "TrackUserJob",
"args" => [current_user.id, current_user.name, current_user.email, current_path, request.ip_address, controller_name, action_name]
)
Go Worker
Here is the code from Part 1. The only change so far
has been renaming the Worker function to TrackUserJob
.
package main
import (
"fmt"
"github.com/jrallison/go-workers"
)
func TrackUserJob(message *workers.Msg) {
}
func main() {
workers.Configure(map[string]string{
// location of redis instance
"server": "localhost:6379",
// instance of the database
"database": "1",
// number of connections to keep open with redis
"pool": "10",
// unique process id for this instance of workers (for proper recovery of inprogress jobs on crash)
"process": "1",
})
// pull messages from "track" queue with concurrency of 1000
workers.Process("track", TrackUserJob, 1000)
// Blocks until process is told to exit via unix signal
workers.Run()
}
To access the arguments for the job the Msg
type provides an
Args
method which
returns a wrapper object around
simplejson. To extract the arguments
array use the
Array
method from simplejson
.
func TrackUserJob(message *workers.Msg) {
args, _ := message.Args().Array()
// args looks like [1, "patrick", "[email protected]", .....]
// Omitted
// Do stuff with args
// Post to services
}
The arguments will come in the same order they were put in. If working with an
array feels clumsy here is an example of using hashes. In the Ruby code switch
the args
to a hash.
Sidekiq::Client.push(
"queue" => "track",
"class" => "TrackUserJob",
"args" => [{
id: current_user.id,
name: current_user.name,
email: current_user.email,
...
}]
)
Redis will serialize that hash into a JSON string. There are two ways to work with the data. The first is to continue to use simplejson. The data is going to be a one element array with a map as the only value.
[{"id":5,"name":"Patrick","email":"[email protected]"}]
Use the
GetIndex
method to pull out the first and only element. Then use other simplejson methods
to extract out values.
func TrackUserJob(message *workers.Msg) {
args, _ := message.Args().GetIndex(0)
id, _ := args.Get("id").Int()
name, _ := args.Get("name").String()
email, _ := args.Get("email").String()
}
It is also possible to unmarshal the JSON into a struct.
type Args struct {
Id string `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
}
func TrackUserJob(message *workers.Msg) {
var payload []Args
json.Unmarshal(message.Args().ToJson(), &payload)
args := payload[0]
// args.Id => 5
// args.Name => "Patrick"
// args.Email => "[email protected]"
}
With the arguments in hand all that is left is to write the HTTP requests. Becasuse they are not very interesting I have omitted the requests from the examples.
That is all there is to this simple example. The Go Worker pulls off jobs, runs them, and exits. The next example will show how to post some data back to the Ruby process.
Computation Heavy
This example will demonstrate how to offload some computationally heavy work to Go. Once finished it will return the results back to our Ruby program.
Because I like using Sudoku as an example, the Ruby program will be a Sudoku solving service. Users will post a puzzle as a string and the program will solve it in the background and notify the user when finished.
However, imagine the algorithm we use to solve the puzzles is not very fast (short on time so it is an inelegant brute force solver).
Instead of having Ruby inefficiently try to finish the puzzles it will offload
the work to Go. Each unsolved puzzle will get pushed onto a unsolved_puzzles
queue. Go Workers will pick up the jobs, compute the unsolved puzzle and push
the result onto a solved
queue. Ruby workers will then pull off the solved
jobs and update the database record with the solved solution.
Here is the Ruby code
# A Rails controller somewhere
class SudokuController < ApplicationController
def create
@sudoku = Sudoku.new(sudoku_params)
if @sudoku.save
Sidekiq::Client.push(
"queue" => "unsolved_puzzles",
"class" => "SudokuSolver",
"args" => [@sudoku.id.to_s, @sudoku.unsolved_puzzle]
)
else
render :new
end
end
private
def sudoku_params
params.require(:sudoku).permit(:unsolved_puzzle)
end
end
Corresponding Go Code.
package main
import (
"fmt"
"github.com/jrallison/go-workers"
)
func Solve(puzzle string) string{
// Code to solve a Sudoku puzzle
// Returns the solved puzzle as a string
}
func SudokuSolver(message *workers.Msg) {
args, _ := message.Args().Array()
id := args[0]
puzzle := args[1]
solvedPuzzle := Solve(puzzle)
// Add a job to the solved queue. Args need to be an array
workers.Enqueue("solved", "SolvedPuzzleJob", []string{id, solvedPuzzle})
}
func main() {
workers.Configure(map[string]string{
// Omitted see above
})
// pull messages from "unsolved_puzzle" queue with concurrency of 100
workers.Process("unsolved_puzzles", SudokuSolver, 100)
// Blocks until process is told to exit via unix signal
workers.Run()
}
The Go Worker pushes a job onto the solved queue with workers.Enqueue
. The
first argument is the queue, the second is the class name and the third is the
array of arguments.
And a Sidekiq Job to save the result. Its class name must match the second
argument of workers.Enqueue
from above.
class SolvedPuzzleJob < ActiveJob::Base
queue_as :solved
def perform(id, solved_puzzle)
sudoku = Sudoku.find(id)
sudoku.update(solved_puzzle: solved_puzzle)
sudoku.notify_user!
end
end
Things to consider
-
Redis is not meant to serialize large objects.
If you try to avoid database lookups by dumping an entire object (or set of objects) into the arguments lists you may run into some performance issues. At that point either keep the job in Ruby or have it hit an API for the data instead.
-
Do not mix and match workers on the same queue
Do not have both Sidekiq / Resque and Go working on the same queue. Not sure what will happen, but I cannot image it ending well.