The Server Engineering team at LotusFlare has been using Kafka for seven years. If there is one thing we have learned, it is that you never stop learning Kafka.
Kafka is an extremely useful, fast, and reliable distributed streaming platform. However, depending on the implementation, you could end up with significant issues, many of which result in data loss.
There are different variables that could cause data loss in Kafka including data offsets, consumer auto-commit configuration, producer acknowledgements, replication, etc.
Today, I’ll describe a first-hand experience we had with Kafka. It’s the story of how we almost lost our Kafka data which led us to the restructuring of our Kafka offsets. I hope the tips here will help you do the same.
We recently experienced temporary data loss in one of our systems. Not to stress you out, in the end we managed to fully recover the data but this incident led to the re-evaluation of how we were defining retention policies and managing Kafka offsets. To give you more background on the system itself, the service in question consumes data from Kafka, and inserts it into a corresponding table in a data warehouse. We refer to this service as the Ingestion Service. It is built on top of akka streams and is deployed with horizontal scaling to achieve high throughput.
What Was the Problem?
At a certain time, we experienced a significant lag in the Ingestion Service due to the sub-optimal Kafka configuration of batch sizes and virtual machine resources (CPU units). The rate of data consumption from certain Kafka topics was much slower than the rate of message production going to those same topics.
This is normal during peak traffic times, or right after a maintenance window. In the vast majority of cases the system will adjust and eventually catch up with all other incoming messages. What was special in this particular incident was that the total ingestion lag exceeded the Kafka retention policy we had set-up in our environments, meaning that data was no longer available in Kafka. At first, we thought that all the messages had been lost, but that was not exactly true. There was a subset of messages in the topics that were available (associated with older events), but the service was not loading them. And yet, ingestion was still running, and data for the most recent events were ingested into the data warehouse.
You may ask, how is this possible? This is where Kafka offset management comes in. In Kafka, an offset represents the current position of a consumer when reading messages from a topic. As the consumer reads and processes messages, it will typically commit those offsets back to Kafka, so that any new instance that joins the consumer group can be told from which offset in the topic to start reading messages from. Kafka provides the auto.offset.reset policy that allows a Kafka cluster to tell a consumer where to read from when there is no committed position available. As per Confluent, this happens when:
1. The consumer group is initialized for the first time
2. Any previously available offset is out of range
Point 2 above can in fact happen if the retention policy has been exceeded, and the oldest record with an offset that was present in the Kafka cluster is no longer available. Kafka allows for auto.offset.reset to take the value of:
- latest - the default value, which represents the most recent offset available in a topic- earliest - the oldest offset available in a topic (ie. the oldest message)
And this is where the problem was. Since we had a value of latest assigned as the reset policy, data loss due to the retention policy meant that the next offset available to the consumer group became the “newest" offset available in the topic. Consumers would only read messages that arrived after they rejoined the group. With a retention policy of 3 days, this meant we lost the offset for the record that was 3 days old.Instead of resuming from the next available offset (which would have been roughly 48-72 hours old), Kafka assigned the latest record to the consumer, which in practice represented the latest record published to the topic within the last 10 minutes. All other events in between were never loaded for this reason.
We were able to recover lost events since the source of data lives in a persistence layer in our stack, thereby triggering the producers to resend events that were eventually made available in our data warehouse. In terms of Ingestion latency, an increase in CPU resources and fine tuning batch sizes resulted in a significant increase in throughput, which eventually reduced the total lag from several hours to few minutes, well below an SLA of 24 hours.
Most importantly, we made a few Kafka related changes:
1. We updated our Kafka offset reset policy to earliest in several applications. In the event that an ingestion lag is observed again (due to extended Maintenance Windows or spikes in data traffic), we reduce the blast radius associated with data loss by attempting to load the oldest available record in Kafka. This is particularly beneficial in services where auto commits are not enabled and idempotency is accounted for.
2. We added several days to our Kafka retention policy to provide a bit of buffer room for systems to re-stabilize, as well as for development teams to have enough time to work on potential fixes to issues without an unnecessary loss of data.
While this change may sound arbitrary, it was very significant for us.
With a retention policy period of three days, any potential issues that could have started as early as Saturday morning had to be resolved (whether automatically or with intervention) by Monday. This puts a lot of stress on the team where human resources may be limited in some aspects.
Kafka is reliable, but here and there, ingestion lags will happen, whether due to extended Maintenance Windows or spikes in data traffic, and that could endanger your data. While there may be no foolproof way to prevent that from happening, it’s imperative that you do everything you can to minimize these risks.
What’s your current Kafka offset policy? How would your system be affected in the case of an ingestion lag? Are auto commits enabled? And what about idempotency? How many developers do you have on hand?
These are some of the important questions that you have to answer to come to the right decision in regards to defining retention policies and managing Kafka offsets. The solutions above worked for us, but I am not saying they would be right for you. Still, just like anything in our industry, you can take the framework and adapt it to your own case. It should help!
About the authors:
Julio V. is a Software Engineer, working in the Platform Team at LotusFlare. Julio specializes in building data pipelines and distributed web applications to power reporting solutions and business insights. His past experience covers projects in the acoustics, noise, and vibration space, as well as retail data analytics. He holds Bachelor's and Master's degrees in Applied Science (Engineering) from the University of Toronto (2013, 2015), and has published several research papers on search and rescue deployment methodologies.
Ivan I. is an Engineering Lead of the Platform Team at LotusFlare. Ivan is passionate about designing and developing cutting edge, reliable and scalable distributed systems. He is the go-to person for JVM performance, GC tuning and troubleshooting. Previously, he was System Architect at a large Telecom group. He holds a degree from the University of Belgrade.