Like a Kafka Producer that optimizes writes to Kafka, a Consumer is used for optimal consumption of Kafka data. The primary role of a Kafka consumer is to take Kafka connection and consumer properties to read records from the appropriate Kafka broker. Complexities of concurrent application consumption, offset management, delivery semantics, and a lot more are taken care of by Consumer APIs.
Some of the consumer properties in the bootstrap servers are: fetch.min.bytes, max.partition.fetch.bytes, fetch.max.bytes, enable.auto.commit, and many more. We will discuss some of these properties later in the next part of the article series.
Role of Kafka Consumers
Multiple applications can consume records from the same Kafka topic, as shown in the diagram below. Each application that consumes data from Kafka gets it’s own copy and can read at its own speed. In other words, offsets consumed by one application could be different from another application. Kafka keeps tracks of the offsets consumed by each application in an internal__consumer_offset topic.
Consumer Group and Consumer
Each application consuming data from Kafka is treated as a consumer group. For example, if two applications are consuming the same topic from Kafka, then, internally, Kafka creates two consumer groups. Each consumer group can have one or more consumers. If a topic has three partitions and an application consumes it, then a consumer group would be created and a consumer in the consumer group will consume all partitions of the topic. The diagram below depicts a consumer group with a single consumer.
Kafka multi-partition single consumer
When an application wants to increase the speed of processing and process partitions in parallel then it can add more consumers to the consumer group. Kafka takes care of keeping track of offsets consumed per consumer in a consumer group, rebalancing consumers in the consumer group when a consumer is added or removed and lot more.
Kafka multi-partition multi-consumer
When there are multiple consumers in a consumer group, each consumer in the group is assigned one or more partitions. Each consumer in the group will process records in parallel from each leader partition of the brokers. A consumer can read from more than one partition.
It’s very important to understand that no single partition will be assigned to two consumers in the same consumer group; in other words, the same partition will not be processed by two consumers as shown in the diagram below.
Kafka same partition multiple-consumer
When consumers in a consumer group are more than partitions in a topic then over-allocated consumers in the consumer group will be unused.
Kafka unused consumer
When you have multiple topics and multiple applications consuming the data, consumer groups and consumers of Kafka will look similar to the diagram shown below.
Multiple application and multiple Kafka topic
Coordinator and Leader Discovery
In order to manage the handshake between Kafka and the application that forms the consumer group and consumer, a coordinator on the Kafka side and a leader (one of the consumers in the consumer group) is elected. The first consumer that initiates the process is automatically elected as leader in the consumer group. As explained in the diagram below, for a consumer to join a consumer group, the following handshake processes take place:
Kafka consumer and coordinator protocol
In order to create or join a group, a consumer has to first find the coordinator on the Kafka side that manages the consumer group. The consumer makes a “find coordinator” request to one of the bootstrap servers. If a coordinator already doesn’t exist it’s identified based on a hashing formula and returned as a response to “find coordinator” request.
Once the coordinator is identified, the consumer makes a “join group” request to the coordinator. The coordinator returns the consumer group leader and metadata details. If a leader already doesn’t exist then the first consumer of the group is elected as leader. Consuming application can also control the leader elected by the coordinator node.
Kafka consumer join group
After leader details are received for the join group request, the consumer makes a “Sync group” request to the coordinator. This request triggers the rebalancing process across consumers in the consumer group, as the partitions assigned to the consumers will change after the “sync group” request.
Kafka consumer sync group
All consumers in the consumer group will receive updated partition assignments that they need to consume when a consumer is added/removed or “sync group” request is sent. Data consumption by all consumers in the consumer group will be halted until the rebalance process is complete.
Kafka consumer rebalance group
Each consumer in the consumer group periodically sends a heartbeat signal to its group coordinator. In the case of heartbeat timeout, the consumer is considered lost and rebalancing is initiated by the coordinator.
Kafka consumer heartbeat
A consumer can choose to leave the group anytime by sending a “leave group” request. The coordinator will acknowledge the request and initiate a rebalance. In case the leader node leaves the group, a new leader is elected from the group and a rebalance is initiated.
Kafka consumer leave group
As explained in Part 1of this series, “partitions” are units of parallelism. As consumers in a consumer group are limited by the partition in a topic, it’s very important to decide you partitions based on the SLA and scale your consumers accordingly. Consumer offsets are managed and stored by Kafka in an internal __consumer_offsettopic. Each consumer in a consumer group follows the find coordinator, join group, sync group, heartbeat, and leave group protocols. In the next article in this series, we’ll look into Kafka consumer properties and delivery semantics.
In this article series, we will learn Kafka basics, Kafka delivery semantics, and configuration to achieve different semantics, Spark Kafka integration, and optimization. In Part 1 of this series we’ll look at Kafka basics.
The following could be some of the problem statements:
Many sources and target systems to integrate. Generally, integration of many systems involves complexities like dealing with many protocols, messaging formats, etc.
Message systems handle high volume streams.
Some of the use cases include:
Tracking user activity, log aggregation, etc.
What Is Kafka?
Kafka is a horizontally scalable, fault tolerant, and fast messaging system. It’s a pub-sub model in which various producers and consumers can write and read. It decouples source and target systems. Some of the key features are:
Scale to 100s of nodes.
Can handle millions of messages per second.
Real-time processing (~10ms).
Topic, Partitions, and Offsets
A topic is a specific stream of data. It is very similar to a table in a NoSQL database. Like tables in a NoSQL database, the topic is split into partitions that enable topics to be distributed across various nodes. Like primary keys in tables, topics have offsets per partitions. You can uniquely identify a message using its topic, partition and offset.
Partitions enable topics to be distributed across the cluster. Partitions are a unit of parallelism for horizontal scalability. One topic can have more than one partition scaling across nodes.
Messages are assigned to partitions based on partition keys; if there are no partition keys then the partition is randomly assigned. It’s important to use the correct key to avoid hotspots.
Each message in a partition is assigned an incremental id called an offset. Offsets are unique per partition and messages are ordered only within a partition. Messages written to partitions are immutable.
The diagram below shows the architecture of Kafka.
ZooKeeper is a centralized service for managing distributed systems. It offers hierarchical key-value store, configuration, synchronization, and name registry services to the distributed system it manages. ZooKeeper acts as ensemble layer (ties things together) and ensures high availability of the Kafka cluster. Kafka nodes are also called brokers. It’s important to understand that Kafka cannot work without ZooKeeper.
From the list of ZooKeeper nodes, one of the nodes is elected as a leader and the rest of the nodes follow the leader. In the case of a ZooKeeper node failure, one of the followers is elected as leader. More than 1 node is strongly recommended for high availability and more than 7 is not recommended.
ZooKeeper stores metadata and the current state of the Kafka cluster. For example details, like topic name, the number of partitions, replication, leader details of petitions, and consumer group details are stored in ZooKeeper. You can think of ZooKeeper like a project manager who manages resources in the project and remembers the state of the project.
Key things to remember:
Manages list of brokers.
Elects broker leaders when a broker goes down.
Sends notifications on a new broker, new topic, deleted topic, lost brokers, etc.
From Kafka 0.10 on, consumer offsets are not stored in ZooKeeper, only the metadata of the cluster is stored in ZooKeepr.
The leader in ZooKeepr handles all writes and follower ZooKeepr handle only reads.
A broker is a single Kafka node that is managed by ZooKeeper. Set of brokers form a Kafka cluster. Topics that are created in Kaka are distributed across brokers based on the partition, replication, and other factors. When a broker node fails based on the state stored in zookeeper it automatically rebalances the cluster and also in case if a leader partition is lost then one of the follower petition is elected as the leader.
You can think of broker as a team leader who takes care of the assigned tasks, in case if a team lead isn’t available then the manager takes care of assigning tasks to other team members.
A replication is making a copy of a partition available in another broker. Replication enables Kafka to be fault tolerant. When a partition of the topic is available in multiple brokers then one of the partitions in a broker is elected as leader and rest of the replication of partition are followers.
Replication enables Kafka to be fault tolerant even when a broker is down. For example, Topic B partition 0 is stored in both broker 0 and broker 1. Both producers and consumers are severed only by the leader. In case of a broker failure the partition from another broker is elected as a leader and it starts serving the producers and consumer groups. Replica partitions that are in sync with the leader are flagged as ISR (In Sync Replica).
IT Team and Kafka Cluster Analogy
The diagram below depicts an analogy of an IT team and Kafka cluster.
Below is the summary of core components in Kafka.
ZooKeeper manages Kafka brokers and their metadata.
Brokers are horizontally scalable Kafka nodes that contain topics and it’s replications.
Topics are message streams with one or more partitions.
Partitions contains messages with unique offsets per partition.
Replication enables Kafka to be fault tolerant using follower partitions.
Kafka is a word that gets heard a lot nowadays… A lot of leading digital companies seem to use it as well. But what is it actually?
Kafka was originally developed at LinkedIn in 2011 and has improved a lot since then. Nowadays it is a whole platform, allowing you to redundantly store absurd amounts of data, have a message bus with huge throughput (millions/sec) and use real-time stream processing on the data that goes through it all at once.
This is all well and great, but stripped down to its core, Kafka is a distributed, horizontally-scalable, fault-tolerant, commit log.
Those were some fancy words, let’s go at them one by one and see what they mean. Afterwards, we will dive deep into how it works.
A distributed system is one which is split into multiple running machines, all of which work together in a cluster to appear as one single node to the end user. Kafka is distributed in the sense that it stores, receives and sends messages on different nodes (called brokers).
The benefits to this approach are high scalability and fault-tolerance.
Let’s define the term vertical scalability first. Say, for instance, you have a traditional database server which is starting to get overloaded. The way to get this solved is to simply increase the resources (CPU, RAM, SSD) on the server. This is called vertical scaling — where you add more resources to the machine. There are two big disadvantages to scaling upwards:
There are limits defined by the hardware. You cannot scale upwards indefinitely.
It usually requires downtime, something which big corporations cannot afford.
Horizontal scalability is solving the same problem by throwing more machines at it. Adding a new machine does not require downtime nor are there any limits to the amount of machines you can have in your cluster. The catch is that not all systems support horizontal scalability, as they are not designed to work in a cluster and those that are are usually more complex to work with.
Something that emerges in non-distributed systems is that they have a single point of failure (SPoF). If your single database server fails (as machines do) for whatever reason, you’re screwed.
Distributed systems are designed in such a way to accommodate failures in a configurable way. In a 5-node Kafka cluster, you can have it continue working even if 2 of the nodes are down. It is worth noting that fault-tolerance is at a direct tradeoff with performance, as in the more fault-tolerant your system is, the less performant it is.
A commit log (also referred to as write-ahead log, transaction log) is a persistent ordered data structure which only supports appends. You cannot modify nor delete records from it. It is read from left to right and guarantees item ordering.
– Are you telling me that Kafka is such a simple data structure?
In many ways, yes. This structure is at the heart of Kafka and is invaluable, as it provides ordering, which in turn provides deterministic processing. Both of which are non-trivial problems in distributed systems.
Kafka actually stores all of its messages to disk (more on that later) and having them ordered in the structure lets it take advantage of sequential disk reads.
Reads and writes are a constant time O(1) (knowing the record ID), which compared to other structure’s O(log N) operations on disk is a huge advantage, as each disk seek is expensive.
Reads and writes do not affect another. Writing would not lock reading and vice-versa (as opposed to balanced trees)
These two points have huge performance benefits, since the data size is completely decoupled from performance. Kafka has the same performance whether you have 100KB or 100TB of data on your server.
How does it work?
Applications (producers) send messages (records) to a Kafka node (broker) and said messages are processed by other applications called consumers. Said messages get stored in a topic and consumers subscribe to the topic to receive new messages.
As topics can get quite big, they get split into partitions of a smaller size for better performance and scalability. (ex: say you were storing user login requests, you could split them by the first character of the user’s username) Kafka guarantees that all messages inside a partition are ordered in the sequence they came in. The way you distinct a specific message is through its offset, which you could look at as a normal array index, a sequence number which is incremented for each new message in a partition.
Kafka follows the principle of a dumb broker and smart consumer. This means that Kafka does not keep track of what records are read by the consumer and delete them but rather stores them a set amount of time (e.g one day) or until some size threshold is met. Consumers themselves poll Kafka for new messages and say what records they want to read. This allows them to increment/decrement the offset they’re at as they wish, thus being able to replay and reprocess events.
It is worth noting that consumers are actually consumer groups which have one or more consumer processes inside. In order to avoid two processes reading the same message twice, each partition is tied to only one consumer process per group.
Persistence to Disk
As I mentioned earlier, Kafka actually stores all of its records to disk and does not keep anything in RAM. You might be wondering how this is in the slightest way a sane choice. There are numerous optimizations behind this that make it feasible:
Kafka has a protocol which groups messages together. This allows network requests to group messages together and reduce network overhead, the server in turn persist chunk of messages in one go and consumer fetch large linear chunks at once
Linear reads/writes on a disk are fast. The concept that modern disks are slow is because of disk seek, something that is not an issue in big linear operations.
Said linear operations are heavily optimized by the OS, via read-ahead(prefetch large block multiples) and write-behind (group small logical writes into big physical writes) techniques.
Modern OSes cache the disk in free RAM. This is called pagecache.
Since Kafka stores messages in a standardized binary format unmodified throughout the whole flow (producer->broker->consumer), it can make use of the zero-copy optimization. That is when the OS copies data from the pagecache directly to a socket, effectively bypassing the Kafka broker application entirely
All of these optimizations allow Kafka to deliver messages at near network speed.
Data Distribution & Replication
Let’s talk about how Kafka achieves fault-tolerance and how it distributes data between nodes.
Partition data is replicated across multiple brokers in order to preserve the data in case one broker dies.
At all times, one broker “owns” a partition and is the node through which applications write/read from the partition. This is called a partition leader. It replicates the data it receives to N other brokers, called followers. They store the data as well and are ready to be elected as leader in case the leader node dies.
This helps you configure the guarantee that any successfully published message will not be lost. Having the option to change the replication factor lets you trade performance for stronger durability guarantees, depending on the criticality of the data.
In this way, if one leader ever fails, a follower can take his place.
You may be asking, though:
– How does a producer/consumer know who the leader of a partition is?
For a producer/consumer to write/read from a partition, they need to know its leader, right? This information needs to be available from somewhere.
Kafka stores such metadata in a service called Zookeeper.
What is Zookeeper?
Zookeeper is a distributed key-value store. It is highly-optimized for reads but writes are slower. It is most commonly used to store metadata and handle the mechanics of clustering (heartbeats, distributing updates/configurations, etc).
It allows clients of the service (the Kafka brokers) to subscribe and have changes sent to them once they happen. This is how brokers know when to switch partition leaders. Zookeeper is also extremely fault-tolerant and it ought to be, as Kafka heavily depends on it.
It is used for storing all sort of metadata, to mention some:
Consumer group‘s offset per partition (although modern clients store offsets in a separate Kafka topic)
ACL (Access Control Lists) — used for limiting access/authorization
How does a producer/consumer know who the leader of a partition is?
Producer and Consumers used to directly connect and talk to Zookeeper to get this (and other) information. Kafka has been moving away from this coupling and since versions 0.8 and 0.9 respectively, clients fetch metadata information from Kafka brokers directly, who themselves talk to Zookeeper.
In Kafka, a stream processor is anything that takes continual streams of data from input topics, performs some processing on this input and produces a stream of data to output topics (or external services, databases, the trash bin, wherever really…)
It is possible to do simple processing directly with the producer/consumer APIs, however for more complex transformations like joining streams together, Kafka provides a integrated Streams API library.
This API is intended to be used within your own codebase, it is not running on a broker. It works similar to the consumer API and helps you scale out the stream processing work over multiple applications (similar to consumer groups).
A stateless processing of a stream is deterministic processing that does not depend on anything external. You know that for any given data you will always produce the same output independent of anything else. An example for that would be simple data transformation — appending something to a string "Hello" -> "Hello, World!".
It is important to recognize that streams and tables are essentially the same. A stream can be interpreted as a table and a table can be interpreted as a stream.
Stream as a Table
If you look at how synchronous database replication is achieved, you’ll see that it is through the so-called streaming replication, where each change in a table is sent to a replica server. A Kafka stream can be interpreted in the same way — as a stream of updates for data, in which the aggregate is the final result of the table. Such streams get saved in a local RocksDB (by default) and are called a KTable.
Table as a Stream
A table can be looked at as a snapshot of the latest value for each key in a stream. In the same way stream records can produce a table, table updates can produce a changelog stream.
Some simple operations like map() or filter() are stateless and do not require you to keep any data regarding the processing. However, in real life, most operations you’ll do will be stateful (e.g count()) and as such will require you to store the currently accumulated state.
The problem with maintaining state on stream processors is that the stream processors can fail! Where would you need to keep this state in order to be fault-tolerant?
A naive approach is to simply store all state in a remote database and join over the network to that store. The problem with this is that there is no locality of data and lots of network round-trips, both of which will significantly slow down your application. A more subtle but important problem is that your stream processing job’s uptime would be tightly coupled to the remote database and the job will not be self-contained (a change in the database from another team might break your processing).
So what’s a better approach?
Recall the duality of tables and streams. This allows us to convert streams into tables that are co-located with our processing. It also provides us with a mechanism for handling fault tolerance — by storing the streams in a Kafka broker.
A stream processor can keep its state in a local table (e.g RocksDB), which will be updated from an input stream (after perhaps some arbitrary transformation). When the process fails, it can restore its data by replaying the stream.
You could even have a remote database be the producer of the stream, effectively broadcasting a changelog with which you rebuild the table locally.
Normally, you’d be forced to write your stream processing in a JVM language, as that is where the only official Kafka Streams API client is.
Currently in a developer preview, KSQL is a new feature which allows you to write your simple streaming jobs in a familiar SQL-like language.
You set up a KSQL server and interactively query it through a CLI to manage the processing. It works with the same abstractions (KStream & KTable), guarantees the same benefits of the Streams API (scalability, fault-tolerance) and greatly simplifies work with streams.
Kafka streams is a perfect mix of power and simplicity. It arguably has the best capabilities for stream jobs on the market and it integrates with Kafka way easier than other stream processing alternatives (Storm, Samza, Spark,Wallaroo).
The problem with most other stream processing frameworks is that they are complex to work with and deploy. A batch processing framework like Spark needs to:
Control a large number of jobs over a pool of machines and efficiently distribute them across the cluster.
To achieve this it has to dynamically package up your code and physically deploy it to the nodes that will execute it. (along with configuration, libraries, etc.)
Unfortunately tackling these problems makes the frameworks pretty invasive. They want to control many aspects of how code is deployed, configured, monitored, and packaged.
The underlying motivation of Kafka Streams is to enable all your applications to do stream processing without the operational complexity of running and maintaining yet another cluster. The only potential downside is that it is tightly coupled with Kafka, but in the modern world where most if not all real-time processing is powered by Kafka that may not be a big disadvantage.
When would you use Kafka?
As we already covered, Kafka allows you to have a huge amount of messages go through a centralized medium and store them without worrying about things like performance or data loss.
This means it is perfect for use as the heart of your system’s architecture, acting as a centralized medium that connects different applications. Kafka can be the center piece of an event-driven architecture and allows you to truly decouple applications from one another.
Kafka allows you to easily decouple communication between different (micro)services. With the Streams API, it is now easier than ever to write business logic which enriches Kafka topic data for service consumption. The possibilities are huge and I urge you to explore how companies are using Kafka.
Apache Kafka is a distributed streaming platform capable of handling trillions of events a day. Kafka provides low-latency, high-throughput, fault-tolerant publish and subscribe pipelines and is able to process streams of events.
We went over its basic semantics (producer, broker, consumer, topic), learned about some of its optimizations (pagecache), learned how it’s fault-tolerant by replicating data and were introduced to its powerful streaming abilities.
Kafka has seen large adoption at thousands of companies worldwide, including a third of the Fortune 500. With the continual improvement of Kafka and the recently released first major version 1.0(1st November, 2017),there are predictions that this Streaming Platform is going to be as big and central of a data platform as relational databases are.
I hope that this introduction helped familiarize you with Apache Kafka and its potential.
Further Reading Resources & Things I did not mention
The rabbit hole goes deeper than this article was able to cover. Here are some features I did not get the chance to mention but are nevertheless important to know:
Connector API — API helping you connect various services to Kafka as a source or sink (PostgreSQL, Redis, ElasticSearch)
Log Compaction — An optimization which reduces log size. Extremely useful in changelog streams