Cristian Castiblanco
11 Jun 2018
•
7 min read
TLDR: we run several applications consuming multiple real-time streams. These applications implement idempotence using Redis sets. In this post I describe our journey moving to bloom filters (using the ReBloom module), which brought down our memory usage by almost 10x.
Let's briefly define idempotence:
An operation is said to be idempotent when applying it multiple times has the same effect.
There are some reasons why applications consuming a real-time stream must have an idempotence layer, e.g.:
Streaming systems support parallel reads by multiple consumers of the same application by partitioning the stream into shards. This allows to distribute the work between multiple workers, so there must be a shared data store to keep track of the progress. Redis is a good candidate.
It is also necessary for the applications to survive from both software and hardware crashes (e.g., the application, and the hardware where it is running, are assumed to be ephemeral), so implementing idempotence in-memory or storing data in locally is not an option.
The simplest way to provide an idempotence layer is to save all elements observed into a set. Before processing an event, one would check whether it is already there.
This approach is obviously memory intensive and will break if the amount of data is significant enough (say a couple of million events per hour).
Before diving into the meat of this post, let's briefly see what our current implementation looks like, which we have been using for years and that we are replacing with bloom filters.
Every event in the stream is associated with an ID, which we rehash
using MD5 to shorten it. Then, we check/add
(SISMEMBER
/
SADD
) the ID in a set whose key
name is derived from the current day + the initial two bytes of
the MD5-hashed ID (i.e., every day of idempotence uses 65536 sets). We
then save the ID (minus its first two bytes) as raw bytes.
This is a nice trick that reduces storage usage a little bit: since
all keys starting with bytes XX
go into the same set, we can drop
that part when inserting them. Think of a single-level
trie.
We also keep around the last day of idempotence for 24 hours (using
EXPIRE
), and when checking for nullipotent elements, we do so on
both days.
A bloom filter is a space-efficient probabilistic data structure that allows you to check whether an item is a member of a set. There is a catch, though: a query returns either "definitely not in the set" or "possibly in the set" (i.e., false positives).
Another caveat is that you can't list the members of a set (like you
would do in a redis set using SMEMBERS
), because the items
themselves are not saved, just their fingerprint. This is what makes
them space-efficient.
We had already used bloom filters before to deduplicate events for our real-time dashboards. We tried implementing them using Lua on Redis, but the performance was not production-grade. So we turned our attention to bloomd, a bloom filters server written in C.
We used bloomd
for a while, and its performance is OK. But it has
some drawbacks:
In 2016 Salvatore announced the Redis Modules
API at RedisConf; I remember fantasizing
about implementing bloom filters as a module. But bloomd
was already
working, and the modules API still felt experimental, so we waited.
A few months ago we came to know about ReBloom, a Redis module that provides two probabilistic datatypes: scalable bloom filters and cuckoo filters. It is also developed by Redis Labs, our Redis provider, which is encouraging.
ReBloom brings to the table several benefits over bloomd
. Since it
sits on top of Redis, there are lots of things that come for free:
Here's an example of the API usage:
# Assuming you are running a redis server with rebloom loaded...
# Use BF.RESERVE to create a filter with the specified
# error rate probability and capacity
> BF.RESERVE your_filter 0.00001 50000000
OK
# Use BF.ADD to add elements to the filter
> BF.ADD your_filter foo
# Use BF.EXISTS to get elements from a filter
> BF.EXISTS your_filter foo
1
> BF.EXISTS your_filter bar
0
It is also possible to add or check multiple items in bulk. And using
the BF.INSERT
command you can also insert and specify the error
rate/capacity to use if the filter is not created yet.
Our current ReBloom setup, which came to be after lots of manual tuning and analysis, is almost ten times more space-efficient than using sets at peak time.
The first thing that stands out is that the space taken by the bloom filters is constant. The reason behind this is that the space used by a bloom filter depends on its initial configuration (capacity and error rate), and it is allocated at the moment of creation.
ReBloom implements Scalable Bloom Filters, which means that once the maximum capacity is reached, a new filter is created on top of the original one, with higher capacity and error rate.
In the graph above there was no scaling, and it is not a coincidence: we tuned the configuration to prevent the filter to scale as this has two undesirable effects:
Tuning the initial configuration of your filters is paramount to get the most out of them. It requires lots of experimentation and understanding of your data.
The parameters with which new filters are created, capacity and error rate, determine their initial size and are immutable.
If you are too conservative, your filter will probably scale multiple times, and every time it will become twice as big, i.e., it grows exponentially. On the other hand, if you are too generous, odds are you will waste space unnecessarily, making this whole journey pointless.
To make things more complicated, it's not easy to predict what the initial size of the bloom filter will be. Here are some examples:
# creating filter with capacity of 25M and 1-e10 error rate
> BF.RESERVE foo 0.0000000001 25000000
OK
# let's inspect what we actually got...
> BF.DEBUG foo
1) "size:0"
2) "bytes:268435456 bits:2147483648 hashes:34 capacity:44808984 size:0 ratio:1e-10"
Interesting: the capacity is actually 44808984, way more than we wanted! What's going on? Well, it has to do with the math used to reserve the filter.
Let's try a different one, e.g.:
# creating filter with capacity of 22M and 1-e10 error rate
> BF.RESERVE bar 0.0000000001 22404492
OK
> BF.DEBUG bar
1) "size:0"
2) "bytes:134217728 bits:1073741824 hashes:34 capacity:22404492 size:0 ratio:1e-10"
That looks more like what we want. Let's increase the error rate by 10:
# creating filter with capacity of 22M and 1-e09 error rate
> BF.RESERVE qux 0.000000001 22404492
OK
> BF.DEBUG qux
1) "size:0"
2) "bytes:134217728 bits:1073741824 hashes:30 capacity:24893880 size:0 ratio:1e-09"
Even more interesting: the size in bytes is identical to the previous example, even though we decreased the error rate. There is a difference though, our filter has a more generous capacity (it went from 22404492 to 24893880), and it uses fewer hashes. Again, this is due to the math used by the current implementation.
Since changes to the initial capacity and error rate influence the resulting size so dramatically, I plotted it for different parameters in an interactive notebook, which you will find handy should you need to tune your filters:
Something to keep in mind is that you might lose data to false positives. This is a very important trade-off you need to make when using bloom filters, which also limits the scenarios in which is feasible to use them.
If every event in your pipeline is critical, and losing even one would impact business, then bloom filters are not for you.
Most often than not, though, pipelines contain all kinds of events, with different degrees of importance. And in some applications, dropping events are not that of a big deal (e.g., an app feeding a real-time dashboard: if you are summing large amounts of events, dropping one or two would be unnoticeable).
There are several ways to avoid this, though. For instance, you could use bloom filters with different configuration depending on the kind of event passing through it: high-priority events could have a minimal error probability (or even use plain old sets instead of a probabilistic data structure), while those that are less important could use a more forgiving configuration.
Cristian Castiblanco
I am a software engineer with 10+ years of experience. I've worked with several stacks but specialize in JVM technologies (Java, Scala, Kotlin, etc.), and recently I've been working in data/backend engineering using Hadoop tech (Spark, Hive, Impala, etc.) as well as Apache Flink. I write clean code promptly, test my work and refactor mercilessly. I love to learn new techniques and technologies, and also like to teach, and can explain things clearly. I pick up new things quickly and bring passion to whatever I do. I dislike the "full stack engineer" term, but I have been all over the place. I have years of experience doing Android development, I have extensively worked in backend development (building systems and API servers, and managing databases) as well as collaborated in the writing of the frontend (Angular and React.js) systems that consume them. Most of the development teams I've worked for own their deployments, so I've come to learn devops too (Jenkins, Spinnaker, Kubernetes, etc.).
See other articles by Cristian
Ground Floor, Verse Building, 18 Brunswick Place, London, N1 6DZ
108 E 16th Street, New York, NY 10003
Join over 111,000 others and get access to exclusive content, job opportunities and more!