Marek Kadek
18 Mar 2021
•
16 min read
Scala often gets a reputation for how complicated it is to write and setup and how much boilerplate there is. The same goes for functional programming. My goal is to show you the opposite — how even with limited knowledge of the following topics you can productively use competing libraries in the same field, and how well they fit together. How you can easily treat Kafka topics as streams of data, how simply you can compose SQL statements into a bigger transaction, and overall how productive you can be in today's Scala’s ecosystem.
This article relies on some very limited amount of knowledge of Scala, SBT, Kafka, Postgres, Docker and Functional Programming. If I did my homework right, you shouldn’t feel intimidated at any point by any of these technologies. Before using anything, I’ll try to provide the briefest description that I think is sufficient for you — dear reader. Now if you know those technologies, go ahead and skip those brief descriptions, as I’ll be hugely simplifying :-)
The overall program is short, concise, and I believe no matter how experienced you are within these topics, you might discover something useful. Within the article you’ll find the following:
Following a code on a blogpost may get a bit confusing, so I recommend checking out and following the runnable code that can be found in my github project.
We have bank accounts. Each bank account has the owner’s name and current balance in dollars. Our account owners want to transfer money between themselves. Peter might want to send 300$ to me. I might want to send some money to my brother, Marius. We issue these commands to a bank, and once they are processed, our balance changes. If something goes wrong during processing, neither sender nor receiver observes any changes to their bank account, at any time.
To have some concrete example, let's define how such a command could look:
{"from": "Peter", "to": "Marek", amount: 300}
- this would mean, that Peter is sending 300$ to Marek, and thus if this gets processed, Peter's balance should be -300$
(how unfortunate for Peter), while Marek's would be +300$. We could designate the type of this command, such as type:"TransferBalance"
, but since we support only single command we don't need to.
Once we issue the command, it is enqueued into Kafka. Kafka is distributed queue, at least for this example ;-). You have probably used Queue data structure before - think of Kafka's Topic as such Queue, one that is externalized from your program, has more capabilities, scales much better, is resilient, etc.
Bank has an application that consumes these events from Kafka, processes them, and updates the Postgres database with new balances of each user. Postgres is a relational database - a place where we can store and manipulate our business data.
A money transfer will be two SQL statements — one to decrement the sender’s balance, another one to increment the receiver’s balance. We want these to happen in a single transaction - we don’t want to end up in a state where we removed money from one account, and never added it to the other account because our app crashed. Luckily Postgres (as pretty much every other relational database) supports transactions. So our users shouldn’t complain that money vanished from their account and was never received.
So if the processing of the balance transfer command crashes, we won’t execute the actual transfer. It’s good that users won’t observe any discrepancies in their accounts, but they will see that even though they issued the command to transfer funds, nothing had happened. We need to tell Kafka (our queue), that we aren't done with this processing. Since we have these transactions (in our relational database), that designate the consistency of accounts, could we use them? Kafka stores internally what you have processed, under a number called offset. We can tell Kafka that we'll store this offset ourselves - in Postgres - and we'll update it in the same transaction that we update the users' funds. Thus if anything goes crashes processing of messages, the offset will stay unchanged (because the transaction will fail) and the messages will be processed again (once the service restarts or we fix the bug).
During this example, we’ll create a process that issues these commands (the user’s side, so to speak), and a process that consumes them (the bank side).
We'll need some containers running. Let's start Kafka with Zookeeper and Postgres.
docker run --rm \
--name=kafka \
-p 2181:2181 \
-p 9092:9092 \
--env ADVERTISED_HOST=localhost \
--env ADVERTISED_PORT=9092 \
johnnypark/kafka-zookeeper:2.6.0
docker run --rm -p 5432:5432 --name my-pg -e POSTGRES_PASSWORD=mysecretpassword -d postgres
The overall app will be based on ZIO. ZIO is, first of all, a functional programming library for building asynchronous and concurrent programs, though it has a lot more goodies.
We'll use SBT as our build tool - the most used build tool within the Scala ecosystem. Think of it as Maven or Gradle, but for Scala.
We now need the means to communicate with Kafka - to consume and produce messages. There is a Java client we could use. But that would mean we have to deal with unsafe impure code, and as I mentioned we'd like to use ZIO. Thus we reach for ZIO Kafka as our library of choice. Don't be alarmed - underneath it uses the same Java library I had linked, but it wraps it nicely, exposing functional-programming-friendly interface. Let's add it to our build.sbt file:
libraryDependencies += "dev.zio" %% "zio-kafka" % "0.14.0"
It transitively pulls zio-core and zio-streams. Core are the main components that you need anytime you're building ZIO app. ZIO Streams represents a stream of values that are pulled by some effectful computation - such as asking Kafka to give them to us :-). In my opinion, streams and stream-based applications are still very underused and underappreciated.
Let's be a good citizen, and while we're dealing with ZIO ecosystem, pull in the logging library.
libraryDepenencies += "dev.zio" %% "zio-logging" % "0.5.7"
We could integrate it with various backends, such as slf4j - but we don't need any of that for this toy app, we'll just use console. But should we need it, there are nice libraries already integrated.
Let's serialize our data in JSON as we write it to Kafka because it's such as common format after all. We'll use Circe.
val circeV = "0.13.0"
val circeDeps = Seq(
"io.circe" %% "circe-core" % circeV,
"io.circe" %% "circe-parser" % circeV,
"io.circe" %% "circe-generic" % circeV
)
libraryDependencies ++= circeDeps
scalacOptions += "-Ymacro-annotations"
We added also macro support to compiler options, as we'll be generating JSON codecs from case classes (as we are too lazy to write them by hand).
When it comes to dependencies, we covered the part that helps us integrating with Kafka, using ZIO, while using Circe for serialization (and deserialization) of messages.
How do we talk to Postgres? At this moment, there is no native ZIO driver we could use. We can pick any of the existing Java or Scala library, and wrap it with ZIO - it's a bit of a hassle. On the other side, there is a native Postgres library called _Skunk. Skunk is based off Cats and fs2, another fantastic functional programming libraries, that are competing with ZIO.
Let's add Skunk to our dependencies:
libraryDependencies += "org.tpolecat" %% "skunk-core" % "0.0.24"
Now we have two competing libraries (ZIO and Cats). Let them become friends via interoperability layer:
libraryDependencies += "dev.zio" %% "zio-interop-cats" % "2.3.1.0"
That's it - we have everything we need. Let's tie it all together.
Let's create a producer that issues a command to transfer some money from one person to another every second.
We can define our command as follows:
import io.circe.generic.JsonCodec
@JsonCodec
sealed trait Command
final case class TransferBalance(from: String, to: String, amount: Int) extends Command
There is only one possible command - TransferBalance. I encourage you to implement more commands (i.e. CloseAccount) and see how the compiler guides you.
You get automatic JSON format derivation via @JsonCodec
annotation.
This is the data that will be stored in Kafka's topic. Let's give the topic a name, define some people, and means to select a random person.
import zio.{ZIO, random}
import zio.kafka.serde.Serde
package object app {
val TopicName = "atopic"
private val People = Array("Peter", "Trevor", "Alina", "Marek", "Marius")
def randomPerson =
random.nextIntBetween(0, People.length).map(People.apply)
}
Even though we have defined JSON representation for our, we need to tell Kafka about it.
We'll use Circe Printer to print it to String and store it in Kafka. Kafka already has support for String serialization and deserialization (called Serde),
that ZIO Kafka library conveniently wraps in zio.kafka.serde.Serde.string
. We can inmap
over it - essentially we tell how to go from String to JSON (for deserialization), and how to go from Json to String (for serialization).
import io.circe.syntax._
import io.circe.{Printer, parser}
//... inside package object app
private val commandSerdePrinter = Printer.noSpaces
val CommandSerde: Serde[Any, Command] = zio.kafka.serde.Serde.string
.inmapM(x => ZIO.fromEither(parser.parse(x).flatMap(_.as[Command])))(x => ZIO.succeed(x.asJson.printWith(commandSerdePrinter)))
The first function of inmapM (x => ZIO.fromEither(parser.parse(x).flatMap(_.as[Command])))
instructs how to read it the String stored in Kafka. We parse it into
JSON format (parser.parse(x)
) and we decode it into our Command (.flatMap(_.as[Command])
). There can be errors in parsing to JSON (actual string is not JSON) and
in decoding into our Command structure (the JSON we read does not match our Command), and thus we want to fail if that happens. This error is represented in Either, which we can
lift into ZIO by ZIO.fromEither
method.
The second function of inmapM (x => ZIO.succeed(x.asJson.printWith(commandSerdePrinter)))
first encodes our Command into JSON x.asJson
using Circe.
Nothing can go wrong here - we can definitely encode every instance of Command into JSON. Second part (.printWith(commandSerdePrinter)
) prints it to JSON. Again,
no error can happen, and thus we lift it to successful ZIO.
Now we can implement the actual producer in 30ish lines of code.
import zio._
import zio.kafka.producer.{Producer, ProducerSettings}
import zio.kafka.serde.Serde
import zio.logging.{Logging, log}
import zio.duration._
// produces topic with correct prefix based on args
object CommandProducerApp extends App {
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {
val producerSettings = ProducerSettings(List("localhost:9092"))
val zio = for {
_ <- log.info("Starting to produce values")
_ <- Producer
.make(producerSettings, Serde.int, CommandSerde)
.use { p: Producer.Service[Any, Int, Command] =>
val produce = for {
amount <- random.nextIntBetween(0, 100)
from <- randomPerson
to <- randomPerson
event = TransferBalance(from, to, amount)
_ <- log.info(s"Appending $event")
_ <- p.produce(TopicName, event.hashCode(), event)
} yield ()
(produce *> ZIO.sleep(1.second)).forever
}
} yield ()
val loggingLayer = Logging.console()
zio
.provideCustomLayer(loggingLayer)
.exitCode
}
}
We had defined some ProducerSettings, then created Kafka Producer Producer.make(producerSettings, Serde.int, CommandSerde)
with our Serde.
We generate command with some random values, and append it to our topic p.produce(TopicName, event.hashCode(), event)
. Don't worry about the hashCode
part - that is related to Kafka Key and is not relevant for this example.
We want this program to run forever (produce *> ZIO.sleep(1.second)).forever
, producing values every second.
Because we added few logging statements, we need to provide the Logging layer (from zio-logging library) via .provideCustomLayer(loggingLayer)
. here learn more about ZIO Layers.
That's it - running this app will continuously send events into Kafka:
sbt runMain app.CommandProducerApp
Appending TransferBalance(Marek,Trevor,78)
Appending TransferBalance(Trevor,Peter,9)
Appending TransferBalance(Peter,Peter,74)
Appending TransferBalance(Trevor,Marek,76)
Appending TransferBalance(Marius,Trevor,62)
We want to store in our database two structures. Account balances and topic partition offsets. Let's start by account balances:
object Sql {
final case class Balance(name: String, balance: Int)
}
Balance holds data about account owner name and owner's current balance. We'll update this balance based on commands we'll process.
Let's prepare DDL statement for creation of table to represent our Balance:
val balanceTableDdl =
sql"""CREATE TABLE IF NOT EXISTS balances (
name VARCHAR PRIMARY KEY,
balance INT NOT NULL)""".command
We use Skunk's sql interpolator followed by command to create this into Command. So far we are not executing anything, we're simply preparing all the SQL we need.
The balance will be updated during processing, so we need a statement that will insert the balance if it does not exist, or otherwise increment/decrement it based on the desired amount. We prepare parametrized query, that takes two arguments - name, and the amount that we want to update the balance with.
def changeBalanceCommand =
sql"""INSERT INTO balances (name, balance) VALUES ($varchar, $int4)
ON CONFLICT (name) DO UPDATE
SET balance=EXCLUDED.balance + balances.balance""".command
We also want to store topic partitions in our relational database, so let's prepare the representation and SQL statements.
import org.apache.kafka.common.TopicPartition
object Sql {
// ... rest of code from previous section ...
final case class TopicPartitionOffset(topic: TopicPartition, offset: Long)
}
In our table, we can store TopicPartition as varchar, with the format of $topicname-$partition
. Our DDL statement would be:
import skunk.codec.all._
import skunk._
import skunk.implicits._
val partitionTableDdl =
sql"""CREATE TABLE IF NOT EXISTS offsets (
topic VARCHAR PRIMARY KEY,
"offset" BIGINT NOT NULL)""".command
Let's see how we could query these offsets for some partitions:
We create parametrized query:
def selectPartitionSql(topicPartitions: List[String]) = {
sql"""SELECT topic, "offset" FROM offsets
WHERE topic
IN (${varchar.list(topicPartitions.size)})"""
.query(varchar ~ int8)
}
We are creating a query that takes some variable list of strings and returns a tuple of String and Int (the topic and offset from our database). Our case class topic consists of TopicPartition, not String, and not every String we get from a database may be representing topic partition. Let's parse them and pick only valid topic partitions.
private def topicPartitionFromString(s: String): Option[TopicPartition] = s.split("-").toList match {
case t :: o :: Nil => o.toIntOption.map { offset => new TopicPartition(t, offset) }
case _ => None
}
def selectPartitionSql(topicPartitions: List[String]) = {
sql"""SELECT topic, "offset"
FROM offsets
WHERE topic
IN (${varchar.list(topicPartitions.size)})"""
.query(varchar ~ int8)
.map { case t ~ o =>
topicPartitionFromString(t).map(topic => TopicPartitionOffset(topic, o))
}
}
The last missing piece is SQL to upsert partition offsets. If they are missing, we will insert them, otherwise, we'll update the existing offset to the new one.
def updatePartitionCommand(partitions: List[(String, Long)]) = {
val enc = (varchar ~ int8).values.list(partitions)
sql"""INSERT INTO offsets (topic, "offset")
VALUES $enc
ON CONFLICT (topic) DO UPDATE
SET "offset"=EXCLUDED."offset"""".command
}
We have prepared SQL statements for creating offsets table, for querying for topic partitions data, and for upserting data.
Let's set expectations for our consumer's implementation. We expect to
Let's start by defining our ZIO app, and preparing our Postgres session. We'll represent it as ZIO Layer.
import zio._
import zio.interop.catz._
import skunk._
import natchez.Trace.Implicits.noop
object CommandProcessorApp extends App {
type SessionTask = Session[Task]
val dbSessionLayer: ZLayer[Any, Throwable, Has[SessionTask]] = ZManaged.runtime
.flatMap { implicit r: Runtime[Any] =>
Session.single(
host = "localhost",
port = 5432,
user = "postgres",
database = "postgres",
password = Some("mysecretpassword")
).toManaged
}.toLayer
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {
???
}
}
What's going on here? We introduce type alias type SessionTask = Session[Task]
for clarity. It represents a Session that can execute statements within
ZIO Task.
The Session.single
call creates a Session
, which comes from the Skunk library. It is wrapped in Resource to ensure
that we both acquire it and close it properly. But those are Cats construct. We convert it to ZIO equivalent - ZManaged via .toManaged
call. For that, we need the help of import zio.interop.catz._
as well as having implicit ZIO runtime in scope. We can get it from ZManaged.runtime
,
over which we flatMap to introduce it to current scope ZManaged.runtime.flatMap { implicit r: Runtime[Any] =>
. Lastly, we convert the ZManaged to a layer via toLayer
call.
Don't worry about import natchez.Trace.Implicits.noop
too much, it's just our tracing instance.
Overall, the whole integration of ZIO and Cats took a single import and bringing in implicit Runtime into scope. That's pretty simple.
Let's define our program. We want to create a Kafka consumer with manual offset retrieval (from Postgres).
def partitionToString(topic: TopicPartition): String =
s"${topic.topic()}-${topic.partition()}"
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {
val zio = for {
session <- ZIO.service[SessionTask]
_ <- ZIO.runtime[Any].flatMap { implicit r: Runtime[Any] =>
val manualOffsetRetrieval = Consumer.OffsetRetrieval.Manual { partitions =>
val list = partitions.map(partitionToString).toList
val query = Sql.selectPartitionSql(list)
session
.prepare(query).toManaged
.use(_.stream(list, 64).compile.toVector)
.map(xs => xs.collect { case Some(t) => t }
.map(x => x.topic -> x.offset)
.toMap)
}
val consumerSettings = ConsumerSettings(List("localhost:9092"))
.withGroupId("my.group")
.withOffsetRetrieval(manualOffsetRetrieval)
val consumerM = Consumer.make(consumerSettings)
??? // to be continued
}
} yield ()
??? // to be continued
}
We again introduce implicit runtime into scope, as we're working with Cats based Session. We specify manual offset retrieval
Consumer.OffsetRetrieval.Manual { partitions =>
. In it, we specify, how to fetch offsets for a set of partitions. We first
need to convert our partitions into string representation that we use in database ($topic-$partition
) via partitions.map(partitionToString).toList
.
We prepare the query (that we had created in the previous section) for selecting offsets session.prepare(query).toManaged
and use it (with our list of partitions)
to stream all results .use(_.stream(list, 64).compile.toVector)
. Our query for fetching offsets for many partitions may return many offsets, and thus having to think in terms of streams helps us not forget about it. Not every query you may run against Postgres may have nice characteristics of returning only few rows.
If you remember well in the previous section, we mentioned that not everything stored in the database may be a valid TopicPartition string representation.
We handle this encoding in our SQL definition, and thus now we only need to collect the ones that are valid .map(xs => xs.collect { case Some(t) => t }
.
We convert it to structure that Consumer.OffsetRetrieval.Manual
expects, which is map of String (PartitionTopic) to Long (Offset) via .map(x => x.topic -> x.offset).toMap
.
In the last step, we create Consumer based on settings (for manual retrieval) to connect to our local Kafka instance.
So far we've just been preparing things, such as SQL statements, Kafka Consumer, Postgres Session, but nothing really had happened. Let's create the tables and actually consume the Kafka Topic to process events.
Building on previous code:
// ... previous code
val consumerM = Consumer.make(consumerSettings)
for {
_ <- session.execute(Sql.partitionTableDdl)
_ <- session.execute(Sql.balanceTableDdl)
_ <- consumerM.use { consumer =>
consumer.subscribeAnd(Subscription.topics(TopicName))
.plainStream(Serde.int, CommandSerde)
.tap(x => log.info(x.value.toString))
.runDrain
}
} yield ()
We use Postgres Session to execute DDL statements to setup our database, and subscribe to Kafka topic consumer.subscribeAnd(Subscription.topics(TopicName))
.
This is the same topic Subscription.topics(TopicName)
that we produce messages into, and we use the same serializer-deserializer CommandSerde
.
We tap into the stream and log every message .tap(x => log.info(x.value.toString))
, and we run the stream to exhaustion - which will never happen, as the
stream of events from Kafka is infinite. At any time, the producer can append a new event to it.
Let's add another operation to the stream, where we actually process the messages and offset management in single transaction. This is the beefy part - and it's way simpler than it looks.
.tap(x => log.info(x.value.toString))
.foreachChunk { chunk =>
val offsetBatch = OffsetBatch(chunk.map(_.offset))
val offsets = offsetBatch.offsets
val offsetsList = offsets
.map { case (topicPartition, offset) => partitionToString(topicPartition) -> offset }
.toList
val commands = chunk.map(_.value)
session.transaction.toManaged.use { transaction =>
for {
_ <- ZIO.foreach_(commands) {
case TransferBalance(from, to, amount) =>
for {
_ <- session
.prepare(Sql.changeBalanceCommand)
.toManaged
.use(_.execute(from ~ -amount)) // notice the minus sign
_ <- session
.prepare(Sql.changeBalanceCommand)
.toManaged
.use(_.execute(to ~ amount))
} yield ()
}
_ <- session
.prepare(Sql.updatePartitionCommand(offsetsList))
.toManaged
.use(_.execute(offsetsList))
_ <- transaction.commit
} yield ()
}
}
We want to process the stream by chunks of Kafka records (think of it as pulling a bunch of messages at once in efficient manner), via .foreachChunk { chunk =>
.
For every chunk's offset, we create OffsetBatch
- instance that merges offsets into single data structure via OffsetBatch(chunk.map(_.offset))
.
We create a list of these offsets for our offset update command offsets.map{ case (topicPartition, offset) => ...
- it's the argument that we need for Sql.updatePartitionCommand
.
Mapping over chunk we can get a chunk of actual commands - the commands our producer has pushed to Kafka - chunk.map(_.value)
.
We open and use a transaction session.transaction.toManaged.use { transaction =>
. Inside a transaction, we want to make two SQL statements for every command - one to withdraw funds from the sender's account, and another one to deposit the receiver's funds. For each command
that we have (in our chunk), we want to create SQL statements them using ZIO.foreach_(commands) {
.
Executing statement is a matter of taking the SQL we wrote, preparing it and feeding it the values from command that we are processing.
session
.prepare(Sql.changeBalanceCommand)
.toManaged
.use(_.execute(to ~ amount)) // deposits `amount`
After all statements are executed within transaction, we also append committing Kafka offset statement, and we execute the transaction:
_ <- session
.prepare(Sql.updatePartitionCommand(offsetsList))
.toManaged
.use(_.execute(offsetsList))
_ <- transaction.commit
That's it - that's our main zio computation. We feed it our Logging and Session layer that we and run it:
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {
val zio = ??? // what we implemented above
val loggingLayer = Logging.console() ++ dbSessionLayer
zio
.provideCustomLayer(loggingLayer)
.exitCode
}
If we run the producer and consumer we should see that messages are being both produced and consumed. If we check our Postgres instance, we can find our offsets (yours will differ):
SELECT * FROM offsets
topic |offset|
--------|------|
atopic-0| 83|
Checking the balances
SELECT * FROM balances
name |balance|
------|-------|
Alina | -60|
Peter | 3|
Marek | 34|
Trevor| 216|
Marius| -193|
Because everyone started with 0$, and we handle every transfer in the transaction, at any given time the sum of these accounts should equal to 0 - even if you introduce random failures into the pipeline.
SELECT sum(balance) FROM balances
sum|
---|
0|
If you want to have some more fun or test it more, try implementing the following:
We successfully used libraries from competing ecosystems to play together with just a few lines. We were able to stay at all time within pure functional Scala, composing simple programs into bigger, more complex ones.
We saw two kinds of streams - the first one (fs2, Cats-based)) from Skunk library, when querying a table for entries (possibly millions of rows should the use case of the table be different). The second one (ZIO Stream) was representing our subscription to Kafka Topic, to which our producer pushed events. Our consumer program processed them within chunks, transformed them into a single transaction, and committed them together with offsets of our Kafka Topic.
Both effect and streaming libraries played together very nicely and allowed us to focus on the domain problem, which shows the length to which Scala library authors are willing to go for good ergonomics for end users.
Marek Kadek
See other articles by Marek
Ground Floor, Verse Building, 18 Brunswick Place, London, N1 6DZ
108 E 16th Street, New York, NY 10003
Join over 111,000 others and get access to exclusive content, job opportunities and more!