Apache Kafka is a high-performance, highly scalable event streaming platform. To unlock Kafka’s full potential, you need to carefully consider the design of your application. It’s all too easy to write Kafka applications that perform poorly or eventually hit a scalability brick wall. Since 2015, IBM has provided the IBM Event Streams service, which is a fully-managed Apache Kafka service running on IBM Cloud®. Since then, the service has helped many customers, as well as teams within IBM, resolve scalability and performance problems with the Kafka applications they have written.
This article describes some of the common problems of Apache Kafka and provides some recommendations for how you can avoid running into scalability problems with your applications.
1. Minimize waiting for network round-trips
Certain Kafka operations work by the client sending data to the broker and waiting for a response. A whole round-trip might take 10 milliseconds, which sounds speedy, but limits you to at most 100 operations per second. For this reason, it’s recommended that you try to avoid these kinds of operations whenever possible. Fortunately, Kafka clients provide ways for you to avoid waiting on these round-trip times. You just need to ensure that you’re taking advantage of them.
Tips to maximize throughput:
- Don’t check every message sent if it succeeded. Kafka’s API allows you to decouple sending a message from checking if the message was successfully received by the broker. Waiting for confirmation that a message was received can introduce network round-trip latency into your application, so aim to minimize this where possible. This could mean sending as many messages as possible, before checking to confirm they were all received. Or it could mean delegating the check for successful message delivery to another thread of execution within your application so it can run in parallel with you sending more messages.
- Don’t follow the processing of each message with an offset commit. Committing offsets (synchronously) is implemented as a network round-trip with the server. Either commit offsets less frequently, or use the asynchronous offset commit function to avoid paying the price for this round-trip for every message you process. Just be aware that committing offsets less frequently can mean that more data needs to be re-processed if your application fails.
If you read the above and thought, “Uh oh, won’t that make my application more complex?” — the answer is yes, it likely will. There is a trade-off between throughput and application complexity. What makes network round-trip time a particularly insidious pitfall is that once you hit this limit, it can require extensive application changes to achieve further throughput improvements.
2. Don’t let increased processing times be mistaken for consumer failures
One helpful feature of Kafka is that it monitors the “liveness” of consuming applications and disconnects any that might have failed. This works by having the broker track when each consuming client last called “poll” (Kafka’s terminology for asking for more messages). If a client doesn’t poll frequently enough, the broker to which it is connected concludes that it must have failed and disconnects it. This is designed to allow the clients that are not experiencing problems to step in and pick up work from the failed client.
Unfortunately, with this scheme the Kafka broker can’t distinguish between a client that is taking a long time to process the messages it received and a client that has actually failed. Consider a consuming application that loops: 1) Calls poll and gets back a batch of messages; or 2) processes each message in the batch, taking 1 second to process each message.
If this consumer is receiving batches of 10 messages, then it’ll be approximately 10 seconds between calls to poll. By default, Kafka will allow up to 300 seconds (5 minutes) between polls before disconnecting the client — so everything would work fine in this scenario. But what happens on a really busy day when a backlog of messages starts to build up on the topic that the application is consuming from? Rather than just getting 10 messages back from each poll call, your application gets 500 messages (by default this is the maximum number of records that can be returned by a call to poll). That would result in enough processing time for Kafka to decide the application instance has failed and disconnect it. This is bad news.
You’ll be delighted to learn that it can get worse. It is possible for a kind of feedback loop to occur. As Kafka starts to disconnect clients because they aren’t calling poll frequently enough, there are less instances of the application to process messages. The likelihood of there being a large backlog of messages on the topic increases, leading to an increased likelihood that more clients will get large batches of messages and take too long to process them. Eventually all the instances of the consuming application get into a restart loop, and no useful work is done.
What steps can you take to avoid this happening to you?
- The maximum amount of time between poll calls can be configured using the Kafka consumer “max.poll.interval.ms” configuration. The maximum number of messages that can be returned by any single poll is also configurable using the “max.poll.records” configuration. As a rule of thumb, aim to reduce the “max.poll.records” in preferences to increasing “max.poll.interval.ms” because setting a large maximum poll interval will make Kafka take longer to identify consumers that really have failed.
- Kafka consumers can also be instructed to pause and resume the flow of messages. Pausing consumption prevents the poll method from returning any messages, but still resets the timer used to determine if the client has failed. Pausing and resuming is a useful tactic if you both: a) expect that individual messages will potentially take a long time to process; and b) want Kafka to be able to detect a client failure part way through processing an individual message.
- Don’t overlook the usefulness of the Kafka client metrics. The topic of metrics could fill a whole article in its own right, but in this context the consumer exposes metrics for both the average and maximum time between polls. Monitoring these metrics can help identify situations where a downstream system is the reason that each message received from Kafka is taking longer than expected to process.
We’ll return to the topic of consumer failures later in this article, when we look at how they can trigger consumer group re-balancing and the disruptive effect this can have.
3. Minimize the cost of idle consumers
Under the hood, the protocol used by the Kafka consumer to receive messages works by sending a “fetch” request to a Kafka broker. As part of this request the client indicates what the broker should do if there aren’t any messages to hand back, including how long the broker should wait before sending an empty response. By default, Kafka consumers instruct the brokers to wait up to 500 milliseconds (controlled by the “fetch.max.wait.ms” consumer configuration) for at least 1 byte of message data to become available (controlled with the “fetch.min.bytes” configuration).
Waiting for 500 milliseconds doesn’t sound unreasonable, but if your application has consumers that are mostly idle, and scales to say 5,000 instances, that’s potentially 2,500 requests per second to do absolutely nothing. Each of these requests takes CPU time on the broker to process, and at the extreme can impact the performance and stability of the Kafka clients that are want to do useful work.
Normally Kafka’s approach to scaling is to add more brokers, and then evenly re-balance topic partitions across all the brokers, both old and new. Unfortunately, this approach might not help if your clients are bombarding Kafka with needless fetch requests. Each client will send fetch requests to every broker leading a topic partition that the client is consuming messages from. So it is possible that even after scaling the Kafka cluster, and re-distributing partitions, most of your clients will be sending fetch requests to most of the brokers.
So, what can you do?
- Changing the Kafka consumer configuration can help reduce this effect. If you want to receive messages as soon as they arrive, the “fetch.min.bytes” must remain at its default of 1; however, the “fetch.max.wait.ms” setting can be increased to a larger value and doing so will reduce the number of requests made by idle consumers.
- At a broader scope, does your application need to have potentially thousands of instances, each of which consumes very infrequently from Kafka? There may be very good reasons why it does, but perhaps there are ways that it could be designed to make more efficient use of Kafka. We’ll touch on some of these considerations in the next section.
4. Choose appropriate numbers of topics and partitions
If you come to Kafka from a background with other publish–subscribe systems (for example Message Queuing Telemetry Transport, or MQTT for short) then you might expect Kafka topics to be very lightweight, almost ephemeral. They are not. Kafka is much more comfortable with a number of topics measured in thousands. Kafka topics are also expected to be relatively long lived. Practices such as creating a topic to receive a single reply message, then deleting the topic, are uncommon with Kafka and do not play to Kafka’s strengths.
Instead, plan for topics that are long lived. Perhaps they share the lifetime of an application or an activity. Also aim to limit the number of topics to the hundreds or perhaps low thousands. This might require taking a different perspective on what messages are interleaved on a particular topic.
A related question that often arises is, “How many partitions should my topic have?” Traditionally, the advice is to overestimate, because adding partitions after a topic has been created doesn’t change the partitioning of existing data held on the topic (and hence can affect consumers that rely on partitioning to offer message ordering within a partition). This is good advice; however, we’d like to suggest a few additional considerations:
- For topics that can expect a throughput measured in MB/second, or where throughput could grow as you scale up your application—we strongly recommend having more than one partition, so that the load can be spread across multiple brokers. The Event Streams service always runs Kafka with a multiple of 3 brokers. At the time of writing, it has a maximum of up to 9 brokers, but perhaps this will be increased in the future. If you pick a multiple of 3 for the number of partitions in your topic then it can be balanced evenly across all the brokers.
- The number of partitions in a topic is the limit to how many Kafka consumers can usefully share consuming messages from the topic with Kafka consumer groups (more on these later). If you add more consumers to a consumer group than there are partitions in the topic, some consumers will sit idle not consuming message data.
- There’s nothing inherently wrong with having single-partition topics as long as you’re absolutely sure they’ll never receive significant messaging traffic, or you won’t be relying on ordering within a topic and are happy to add more partitions later.
5. Consumer group re-balancing can be surprisingly disruptive
Most Kafka applications that consume messages take advantage of Kafka’s consumer group capabilities to coordinate which clients consume from which topic partitions. If your recollection of consumer groups is a little hazy, here’s a quick refresher on the key points:
- Consumer groups coordinate a group of Kafka clients such that only one client is receiving messages from a particular topic partition at any given time. This is useful if you need to share out the messages on a topic among a number of instances of an application.
- When a Kafka client joins a consumer group or leaves a consumer group that it has previously joined, the consumer group is re-balanced. Commonly, clients join a consumer group when the application they are part of is started, and leave because the application is shutdown, restarted or crashes.
- When a group re-balances, topic partitions are re-distributed among the members of the group. So for example, if a client joins a group, some of the clients that are already in the group might have topic partitions taken away from them (or “revoked” in Kafka’s terminology) to give to the newly joining client. The reverse is also true: when a client leaves a group, the topic partitions assigned to it are re-distributed amongst the remaining members.
As Kafka has matured, increasingly sophisticated re-balancing algorithms have (and continue to be) devised. In early versions of Kafka, when a consumer group re-balanced, all the clients in the group had to stop consuming, the topic partitions would be redistributed amongst the group’s new members and all the clients would start consuming again. This approach has two drawbacks (don’t worry, these have since been improved):
- All the clients in the group stop consuming messages while the re-balance occurs. This has obvious repercussions for throughput.
- Kafka clients typically try to keep a buffer of messages that have yet to be delivered to the application and fetch more messages from the broker before the buffer is drained. The intent is to prevent message delivery to the application stalling while more messages are fetched from the Kafka broker (yes, as per earlier in this article, the Kafka client is also trying to avoid waiting on network round-trips). Unfortunately, when a re-balance causes partitions to be revoked from a client then any buffered data for the partition has to be discarded. Likewise, when re-balancing causes a new partition to be assigned to a client, the client will start to buffer data starting from the last committed offset for the partition, potentially causing a spike in network throughput from broker to client. This is caused by the client to which the partition has been newly assigned re-reading message data that had previously been buffered by the client from which the partition was revoked.
More recent re-balance algorithms have made significant improvements by, to use Kafka’s terminology, adding “stickiness” and “cooperation”:
- “Sticky” algorithms try to ensure that after a re-balance, as many group members as possible keep the same partitions they had prior to the re-balance. This minimizes the amount of buffered message data that is discarded or re-read from Kafka when the re-balance occurs.
- “Cooperative” algorithms allow clients to keep consuming messages while a re-balance occurs. When a client has a partition assigned to it prior to a re-balance and keeps the partition after the re-balance has occurred, it can keep consuming from uninterrupted partitions by the re-balance. This is synergistic with “stickiness,” which acts to keep partitions assigned to the same client.
Despite these enhancements to more recent re-balancing algorithms, if your applications is frequently subject to consumer group re-balances, you will still see an impact on overall messaging throughput and be wasting network bandwidth as clients discard and re-fetch buffered message data. Here are some suggestions about what you can do:
- Ensure you can spot when re-balancing is occurring. At scale, collecting and visualizing metrics is your best option. This is a situation where a breadth of metric sources helps build the complete picture. The Kafka broker has metrics for both the amount of bytes of data sent to clients, and also the number of consumer groups re-balancing. If you’re gathering metrics from your application, or its runtime, that show when re-starts occur, then correlating this with the broker metrics can provide further confirmation that re-balancing is an issue for you.
- Avoid unnecessary application restarts when, for example, an application crashes. If you are experiencing stability issues with your application then this can lead to much more frequent re-balancing than anticipated. Searching application logs for common error messages emitted by an application crash, for example stack traces, can help identify how frequently problems are occurring and provide information helpful for debugging the underlying issue.
- Are you using the best re-balancing algorithm for your application? At the time of writing, the gold standard is the “CooperativeStickyAssignor”; however, the default (as of Kafka 3.0) is to use the “RangeAssignor” (and earlier assignment algorithm) in preference to the cooperative sticky assignor. The Kafka documentation describes the migration steps required for your clients to pick up the cooperative sticky assignor. It is also worth noting that while the cooperative sticky assignor is a good all round choice, there are other assignors tailored to specific use cases.
- Are the members for a consumer group fixed? For example, perhaps you always run 4 highly available and distinct instances of an application. You might be able to take advantage of Kafka’s static group membership feature. By assigning unique IDs to each instance of your application, static group membership allows you to side-step re-balancing altogether.
- Commit the current offset when a partition is revoked from your application instance. Kafka’s consumer client provides a listener for re-balance events. If an instance of your application is about to have a partition revoked from it, the listener provides the opportunity to commit an offset for the partition that is about to be taken away. The advantage of committing an offset at the point the partition is revoked is that it ensures whichever group member is assigned the partition picks up from this point—rather than potentially re-processing some of the messages from the partition.
You’re now an expert in scaling Kafka applications. You’re invited to put these points into practice and try out the fully-managed Kafka offering on IBM Cloud. For any challenges in set up, see the Getting Started Guide and FAQs.
The post Five scalability pitfalls to avoid with your Kafka application appeared first on IBM Blog.