We're planting a tree for every job application! Click here to learn more

Introducing Goblero: a Go Embedded Job Queue

Adil H.

15 Feb 2019

6 min read

Introducing Goblero: a Go Embedded Job Queue
  • Go

Gophers Meet Badgers

Over the years I’ve used a number of Job Queues such as Sidekiq, Kue, or Faktory and they are often a central piece of many apps I build. As embedded databases are so popular in the Go world, I thought I’d try to create an open source embedded job queue for Go apps: Goblero

What is a Job Queue ?

Job Queues are also known as Background Job Queues, Task Queues or Work Servers. They can be very handy to handle asynchronous tasks or long running tasks that don’t belong in a web request handler for example. Most Job Queues are backed by a database such as Redis / RabbitMQ / Kafka that provides fast jobs storage/retrieval and sometimes also Pub/Sub capabilities.

Web App Flow: Client -> Http Handler -> Job Queue

The diagram above demonstrates a use case for a Job Queue:

  • The Http Handler receives an Http request from a Web Client, containing for example a file that needs some heavy processing.
  • The Http Handler responds as fast as possible and delegates the heavy lifting to the job queue.

Why an Embedded Job Queue ?

While Job Queues such as Sidekiq or Celery are more geared towards distributed processing by multiple workers, the idea behind an Embedded Job Queue is that it will run within the same process that creates the jobs. The main benefit is that we do not need to run a separate process for job processing and we also do not need to manage an external database. An Embedded Job Queue can be useful for small, local tasks where simple persistence is needed.

For example, let’s imagine a Go CLI app that receives in input a text file with 10,000 urls, fetches some data from another source for each url and then sends a POST request to the url. Imagine that it processes 5 urls at a time, and then crashes after 2,000 urls. How do we know where to restart ? Which urls were already processed ? Of course you could create an ad-hoc solution for your app to keep track of the processing state, but the idea behind a job queue is to allow you keep track of the processing state in a simple and reliable manner, with some data durability guarantees.

Enter Goblero

Goblero aims to be a Simple, Fast, Embedded, Persistent Job Queue for Go Apps. It’s still in alpha and not production ready at all, but most of the basic functionality is already there and the idea is to keep the features simple and you can already start playing with it in your Go side-projects.
I choose BadgerDB as a backing embedded database, as it seemed to have some interesting properties such as:

  • Key Value Store
  • Pure Go
  • Sorted KV access
  • ACID Concurrent Transactions

The source code in the repo should be pretty accessible if you want to learn about the library internals. The design is of course experimental at the moment. Issues, suggestions and pull requests are welcome !

In this article I wanted to focus on demonstrating it’s usage. We’ll use the Goblero Demo github repo which contains a simple working app that makes use of a Goblero Job Queue.

To get started with the demo, you will need to have Go installed (https://golang.org/dl/). Then you can clone the repo using:

git clone [https://github.com/didil/goblero-demo.git](https://github.com/didil/goblero-demo.git)

Get the package:

go get -u github.com/didil/goblero/pkg/blero

Build the demo app:

go build .

Here is the code that we’ll be running:

package main

import (
	"flag"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/didil/goblero/pkg/blero"
)

func main() {
	// Parse flags
	n := flag.Int("n", 1, "number of processors")
	flag.Parse()

	// Create a new Blero backend
	bl := blero.New("db/")
	// Start Blero
	err := bl.Start()
	if err != nil {
		log.Fatal(err)
	}

	// Register processor(s)
	for i := 1; i <= *n; i++ {
		pI := i
		fmt.Printf("Registering Processor %v ...\n", pI)
		bl.RegisterProcessorFunc(func(j *blero.Job) error {
			fmt.Printf("[Processor %v] Processing job: %v - data: %v\n", pI, j.Name, string(j.Data))
			// Simulate processing
			time.Sleep(2 * time.Second)
			fmt.Printf("[Processor %v] Done Processing job: %v\n", pI, j.Name)

			return nil
		})
	}

	// Enqueue jobs
	if len(os.Args) > 1 && os.Args[1] == "enqueue" {
		fmt.Println("Enqueuing jobs ...")
		for i := 1; i <= 50; i++ {
			jobName := fmt.Sprintf("Job #%v", i)
			jobData := []byte(fmt.Sprintf("Job Data #%v", i))
			_, err := bl.EnqueueJob(jobName, jobData)
			if err != nil {
				log.Fatal(err)
			}
		}
	}

	// Wait for SIGTERM or SIGINT to stop Blero and exit
	var exitCh = make(chan os.Signal)
	signal.Notify(exitCh, syscall.SIGINT)
	signal.Notify(exitCh, syscall.SIGTERM)
	s := <-exitCh
	fmt.Printf("Caught signal %v. Exiting ...\n", s)

	// Stop Blero
	bl.Stop()
	os.Exit(0)
}

To start the app we’ll run:

./goblero-demo enqueue

This command:

  • starts the demo app
  • creates a new Blero backend
  • creates a Badger database in the db folder of the repo
  • Registers a Processor which is a function that will run the jobs (it will simulate some task processing by sleeping 2 seconds then returning)
  • creates and enqueues 50 jobs

Your output should look something like this:

$ ./goblero-demo enqueue
Starting Blero ...
Registering Processor 1 ...
Enqueuing jobs ...
[Processor 1] Processing job: Job# 1 - data: Job Data# 1
[Processor 1] Done Processing job: Job# 1
[Processor 1] Processing job: Job# 2 - data: Job Data# 2
[Processor 1] Done Processing job: Job# 2
[Processor 1] Processing job: Job# 3 - data: Job Data# 3
[Processor 1] Done Processing job: Job# 3
[Processor 1] Processing job: Job# 4 - data: Job Data# 4
[Processor 1] Done Processing job: Job# 4
[Processor 1] Processing job: Job# 5 - data: Job Data# 5
^CCaught signal interrupt. Exiting ...
Stopping Blero ...

I’ve stopped the app after Job# 5 started voluntarily. Let’s restart the app without the “enqueue” command

$ ./goblero-demo
Starting Blero ...
Registering Processor 1 ...
[Processor 1] Processing job: Job# 6 - data: Job Data# 6
[Processor 1] Done Processing job: Job# 6
[Processor 1] Processing job: Job# 7 - data: Job Data# 7
[Processor 1] Done Processing job: Job# 7
[Processor 1] Processing job: Job# 8 - data: Job Data# 8
^CCaught signal interrupt. Exiting ...
Stopping Blero ...

It continues right where it last stopped! That’s the idea, you might notice that Job# 5 is in an ambiguous status now. Is it done ? The library does not yet support continuing work on incomplete jobs but it’s a feature that should be added soon.

Let’s now try to add more job processors and see what happens. We can can do that by running the demo with the -n flag to change the number of processors.

Starting Blero ...
Registering Processor 1 ...
Registering Processor 2 ...
Registering Processor 3 ...
[Processor 3] Processing job: Job# 9 - data: Job Data# 9
[Processor 1] Processing job: Job# 10 - data: Job Data# 10
[Processor 2] Processing job: Job# 11 - data: Job Data# 11
[Processor 2] Done Processing job: Job# 11
[Processor 3] Done Processing job: Job# 9
[Processor 1] Done Processing job: Job# 10
[Processor 2] Processing job: Job# 12 - data: Job Data# 12
[Processor 3] Processing job: Job# 13 - data: Job Data# 13
[Processor 1] Processing job: Job# 14 - data: Job Data# 14
[Processor 2] Done Processing job: Job# 12
[Processor 1] Done Processing job: Job# 14
[Processor 2] Processing job: Job# 15 - data: Job Data# 15
[Processor 1] Processing job: Job# 16 - data: Job Data# 16
[Processor 3] Done Processing job: Job# 13
[Processor 3] Processing job: Job# 17 - data: Job Data# 17
^CCaught signal interrupt. Exiting ...
Stopping Blero ...

The work continues and we’re now processing 3 jobs in parallel ! Goblero internally distributes the jobs across goroutines, but you don’t have to deal with channels/signaling etc. There is no support for timeouts yet but that’s a planned feature through Go contexts.

Quick Benchmark

There is still a lot of optimisation to be done on the internals/data storage model/locking etc but I’ve added a couple of benchmark to the test files

# Core i5 laptop / 8GB Ram / SSD 
make bench
BenchmarkEnqueue/EnqueueJob-4 50000 159942 ns/op(~ 6250 ops/s)
BenchmarkEnqueue/dequeueJob-4 5000 2767260 ns/op(~ 361 ops/s)

You’ll notice that dequeuing is somewhat slow at the moment (360 ops/second on my test laptop). But for a quick first version of something that’s mainly built to process long running jobs in a single process: not too bad !
I think the slow dequeues are mostly due to the fact that we’re saving to disk after each operation to avoid losing data on crashes + the dequeue process takes multiple steps at the moment:

  • DB Seek to find the next pending job
  • DB Read + Deserialize the data to a Job struct
  • DB Delete the job from the pending Queue
  • DB Set the job on the active Queue

Todo:

Some tasks that still need work:

  • Restart interrupted jobs after app restart/crashes
  • Sweep completed jobs from the “complete” queue
  • Failed Jobs retry options
  • Allow batch enqueuing
  • Add support for Go contexts
  • Test in real conditions under high load
  • Expose Prometheus Metrics in an Http handler
  • Optimize performance / Locking

I hope that you will find the Goblero library useful, and please reach out if you have any ideas/suggestions/thoughts. Happy Go hacking !

Did you like this article?

Related jobs

See all

Title

The company

  • Remote

Title

The company

  • Remote

Title

The company

  • Remote

Title

The company

  • Remote

Related articles

JavaScript Functional Style Made Simple

JavaScript Functional Style Made Simple

Daniel Boros

12 Sep 2021

JavaScript Functional Style Made Simple

JavaScript Functional Style Made Simple

Daniel Boros

12 Sep 2021

WorksHub

CareersCompaniesSitemapFunctional WorksBlockchain WorksJavaScript WorksAI WorksGolang WorksJava WorksPython WorksRemote Works
hello@works-hub.com

Ground Floor, Verse Building, 18 Brunswick Place, London, N1 6DZ

108 E 16th Street, New York, NY 10003

Subscribe to our newsletter

Join over 111,000 others and get access to exclusive content, job opportunities and more!

© 2024 WorksHub

Privacy PolicyDeveloped by WorksHub