Apache Kafka

Apache Kafka Rebalance Protocol for the Cloud: Static Membership

Boyang Chen
Last Updated: 

Static Membership is an enhancement to the current rebalance protocol that aims to reduce the downtime caused by excessive and unnecessary rebalances for general Apache Kafka® client implementations. This applies to Kafka consumers, Kafka Connect, and Kafka Streams. To get a better grasp on the rebalance protocol, we’ll examine this concept in depth and explain what it means. If you already know what a Kafka rebalance is, feel free to jump directly to the following section to save time: When do we trigger an unnecessary rebalance?

What does “rebalance” mean when it comes to Kafka?

A Kafka rebalance is a distributed protocol for client-side applications to process a common set of resources in a dynamic group. Two primary goals for this protocol are:

  1. Group resource assignment
  2. Membership change capture

Take a Kafka consumer, for example. A group of Kafka consumers read input data from Kafka through subscriptions, and topic partitions are their shared unit of tasks. Three consumers (C1, C2, and C3), two topics (T1 and T2) with three partitions each, and subscriptions would appear as follows:

C1: T1, T2
C2: T2
C3: T1

The rebalance protocol ensures that C1 and C2 take non-overlapping assignments from topic T2*, and the same goes for C1 and C3 from T1. A valid assignment looks like this:

C1: t1-p1, t2-p1
C2: t2-p2, t2-p3
C3: t1-p2, t1-p3

*Note that the consumer does not check if the assignment returned from the assignor respects these rules. If your customized assignor assigns partitions to multiple owners, it would still be silently accepted and cause double fetching. Strictly speaking, only built-in rebalance assignors obey this rule for resource isolation

However, the assignment below is not allowed, as it introduces overlapping assignments:

C1: t1-p1, t2-p1
C2: t2-p1, t2-p2, t2-p3
C3: t1-p2, t1-p3

The rebalance protocol also needs to properly handle membership changes. For the above case, if a new member C4 subscribing to T2 joins, the rebalance protocol will try to adjust the load within the group:

C1: t1-p1, t2-p1
C2: t2-p3
C3: t1-p2, t1-p3
C4: t2-p2

In summary, the rebalance protocol needs to “balance” the load within a client group as it scales, while making the task ownership safe at the same time. Similar to most distributed consensus algorithms, Kafka takes a two-phase approach. For simplicity, we’ll stick to the Kafka consumer for now.

Consumer rebalance demo

The endpoint that consumers commit progress to is called a group coordinator, which is hosted on a designated broker. It also serves as the centralized manager of group rebalances. When the group starts rebalancing, the group coordinator first switches its state to rebalance so that all interacting consumers are notified to rejoin the group. Until all the members rejoin or the coordinator waits long enough and reaches the rebalance timeout, the group proceeds to another stage called sync, which officially announces the formation of a valid consumer group. To distinguish members who fall out of the group during this process, each successful rebalance increments a counter called generation ID and propagates its value to all the joined members, so that out-of-generation members can be fenced.

In the sync stage, the group coordinator replies to all members with the latest generation information. Specifically, it nominates one of the members as the leader and replies to the leader with encoded membership and subscription metadata.

The leader shall complete the assignment based on membership and topic metadata information, and reply to the coordinator with the assignment information. During this period, all the followers are required to send a sync group request to get their actual assignments and go into a wait pool until the leader finishes transmitting the assignment to the coordinator. Upon receiving the assignment, the coordinator transitions the group from sync to stable. All pending and upcoming follower sync requests will be answered with individual assignment.

Here, we describe two demo cases: one is an actual rebalance walkthrough, and the other is the high-level state machine. Note that in the sync stage, we can always fall back to rebalance mode if rebalance conditions are triggered, such as adding a new member, topic partition expansion, etc.

Rebalance Demo

State Machine View: Two-Phase Protocol

The rebalance protocol is very effective at balancing task processing load in real time and letting users freely scale their applications, but it is a rather heavy operation as well, requiring the entire consumer group to stop working temporarily. Members are expected to revoke ongoing assignments and initialize new assignments at the start and end of each rebalance. Such operations take overhead, especially for stateful operations where the task needs to first restore a local state from its backup topic before serving.

Essentially, a rebalance kicks in when following conditions are met:

  1. Group membership changes, such as a new member joining
  2. Member subscription changes, such as one consumer changing the subscribed topics
  3. Resource changes, such as adding more partitions to the subscribed topic

When do we trigger an unnecessary rebalance?

In the real world, there are many scenarios where a group coordinator triggers unnecessary rebalances that are detrimental to application performance. The first case is transient member timeout. To understand this, we need to first introduce two concepts: consumer heartbeat and session timeout.

Consumer heartbeat and session timeout

A Kafka consumer maintains a background thread to periodically send heartbeat requests to the coordinator to indicate its liveness. The consumer configuration called session.timeout.ms defines how long the coordinator waits after the member’s last heartbeat before it assuming the member failed. When this value is set too low, a network jitter or a long garbage collection (GC) might fail the liveness check, causing the group coordinator to remove this member and begin rebalancing. The solution is simple: instead of using the default 10-second session timeout, set it to a larger value to greatly reduce transient failure-caused rebalances.

Note that the longer you set the session timeout to, the longer partial unavailability you will have when a consumer actually fails. We will explain how to choose this value in a later section on how to opt into Static Membership.

Rolling bounce procedure

From time to time, we need to restart our application, deploy new code, or perform a rollback, etc. These operations in the worst case may cause a lot of rebalances. When a consumer instance shuts down, it sends a leave group request to the group coordinator, letting itself be removed from the group and triggering another rebalance afterwards. When that consumer resumes after a bounce, it sends a join group request to the group coordinator, triggering another rebalance.

During a rolling bounce procedure, consecutive rebalances are triggered as instances that are shut down and resumed, and partitions are reassigned back and forth. The final assignment result is purely random and incurs a large cost to pay for task shuffling and reinitialization.

How about letting members choose not to leave the group? Not an option either. To understand why, we need to talk about the member ID for a moment.

Consumer member ID

When a new member joins the group, the request contains no membership information. The group coordinator will assign a universally unique identifier (UUID) to this member as its member ID, put the ID in the cache, and embed this information in its response to the member. Within this consumer’s lifecycle, it could reuse the same member ID without the coordinator triggering a rebalance when it rejoins, except in edge cases such as leader rejoining.

Going back to the rolling bounce scenario, a restarted member will erase in-memory membership information and rejoin the group without member ID or generation ID. Since the rejoining consumer would be recognized as a completely new member of the group, the group coordinator does not guarantee that its old assignment will be assigned back. As you can see, a member leaving the group is not the root cause for unnecessary task shuffling—the loss of identity is.

What is Static Membership?

Static Membership, unlike Dynamic Membership, aims to persist member identity across multiple generations of the group. The goal here is to reuse the same subscription information and make the old members “recognizable” to the coordinator. Static Membership introduces a new consumer configuration called group.instance.id, which is configured by users to uniquely identify their consumer instances. Although the coordinator-assigned member ID gets lost during restart, the coordinator will still recognize this member based on the provided group instance ID in the join request. Therefore, the same assignment is guaranteed.

Static Membership is extremely friendly with cloud application setups, because nowadays deployment technologies such as Kubernetes are very self-contained for managing the health of applications. To heal a dead or ill-performing consumer, Kubernetes could easily bring down the relevant instance and spin up a new one using the same instance ID. With a cloud management framework, the group coordinator’s client health check is ongoing.

Below is a quick demo of how Static Membership works.

Static Membership Demo

How to opt into Static Membership

Since the Apache Kafka 2.3 release, Static Membership has become generally available for the community. Here are the instructions if you want to be an alpha user:

  1. Upgrade your broker to 2.3 or higher. Specifically, you need to upgrade inter.broker.protocol.version to 2.3 or higher in order to enable this feature.
  2. On the client side:
    • Upgrade your client library to 2.3 or higher.
    • Define a longer and reasonable session timeout. As stated before, a tight session timeout value could make the group unstable as members are kicked out of it spuriously due to missing a single heartbeat. You should set the session timeout to a reasonable value based on the business tolerance of partial unavailability. For example, setting a session timeout to 10 minutes for a business that could tolerate 15 minutes of unavailability is reasonable, whereas setting it to five seconds is not.
    • Set the group.instance.id configuration to a unique ID for your consumer. If you are a Kafka Streams user, use the same configuration for your stream instance.
  3. Deploy the new code to your application. Static Membership will take effect in your next rolling bounce.

Static Membership only works as expected if these instructions are followed. We have nonetheless made some preventative efforts to reduce the potential risk of human error.

Error handling

Sometimes a user can forget to upgrade a broker. When the consumer first gets started, it acquires the API version of the designated broker. If the client is configured with group instance ID and the broker is on older version, the application will crash immediately as the broker has no support for Static Membership yet.

If a user fails to configure the group instance ID uniquely, meaning that there are two or more members configured with the same instance ID, a fencing logic comes into play. When a known static member rejoins without a member ID, the coordinator generates a new UUID to reply to this member as its new member ID. At the same time, the group coordinator maintains a mapping from the instance ID to the latest assigned member ID. If a known static member rejoins with a valid member ID that doesn’t match with the cached ID, it immediately gets fenced by the coordinator response. This eliminates the risk of concurrent processing for duplicate static members.

In this very first version, we expect bugs that may invalidate the processing semantics or hinder the fencing logic. Some of them have been addressed in the trunk, such as KAFKA-8715, and we are still actively working on finding more issues.

Feedback is really appreciated! If you detect any issues with Static Membership, please file a JIRA or put a question on the dev mailing list to get our attention.

Want to know more?

There are still many details we haven’t covered in this blog post, like how this effort compares with Incremental Cooperative Rebalancing, how Static Membership helps with a non-sticky assignment strategy, and tooling support around the new protocol. If you’re interested, I cover all this and more in my session with Liquan Pei at Kafka Summit San Francisco titled Static Membership: Rebalance Strategy Designed for the Cloud.

This work has been ongoing for over a year, with many iterations and huge support from my colleagues, past colleagues, and community friends. I owe a big thank you to all of you, especially Guozhang Wang, Jason Gustafson, Liquan Pei, Yu Yang, Shawn Nguyen, Matthias J. Sax, John Roesler, Mayuresh Gharat, Dong Lin, and Mike Freyberger.

Boyang Chen is an infrastructure engineer at Confluent, where he works on the Kafka Streams Team to build the next-generation event streaming platform on top of Apache Kafka. Previously, Boyang worked at Pinterest as a software engineer on the Ads Infrastructure Team, where he tackled various ads real-time challenges and rebuilt the whole budgeting and pacing pipeline, making it fast and robust with concrete revenue gain and business impact.

Subscribe to the Confluent Blog

S'abonner

More Articles Like This

Providing Timely, Reliable, and Consistent Travel Information to Millions of Deutsche Bahn Passengers with Apache Kafka and Confluent Platform
Axel Löhn

Providing Timely, Reliable, and Consistent Travel Information to Millions of Deutsche Bahn Passengers with Apache Kafka and Confluent Platform

Axel Löhn

Every day, about 5.7 million rail passengers rely on Deutsche Bahn (DB) to get to their destination. Virtually every one of these passengers needs access to vital trip information, including […]

Kafka Streams and ksqlDB Compared – How to Choose
Dani Traphagen

Kafka Streams and ksqlDB Compared – How to Choose

Dani Traphagen

ksqlDB is a new kind of database purpose-built for stream processing apps, allowing users to build stream processing applications against data in Apache Kafka® and enhancing developer productivity. ksqlDB simplifies […]

Kafka Summit San Francisco 2019 Session Videos
Tim Berglund

Kafka Summit San Francisco 2019 Session Videos

Tim Berglund

Last week, the Kafka Summit hosted nearly 2,000 people from 40 different countries and 595 companies—the largest Summit yet. By the numbers, we got to enjoy four keynote speakers, 56 […]

Fully managed Apache Kafka as a Service!

Try Free