What's New in Apache Kafka 2.4
On behalf of the Apache Kafka® community, it is my pleasure to announce the release of Apache Kafka 2.4.0. This release includes a number of key new features and improvements that we will highlight in this blog post. For the full list, please see the release notes.
What’s new with the Kafka broker, producer, and consumer
KIP-392: Allow consumers to fetch from closest replica
Historically, consumers were only allowed to fetch from leaders. In multi-datacenter deployments, this often means that consumers are forced to incur expensive cross-datacenter network costs in order to fetch from the leader. With KIP-392, Kafka now supports reading from follower replicas. This gives the broker the ability to redirect consumers to nearby replicas in order to save costs.
KIP-429: Kafka Consumer Incremental Rebalance Protocol
KIP-429 adds Incremental Cooperative Rebalancing to the consumer rebalance protocol in addition to the original eager rebalance protocol. Unlike the eager protocol, which always revokes all assigned partitions prior to a rebalance and then tries to reassign them altogether, the incremental protocol tries to minimize the partition migration between members of a consumer group by letting consumers retain their partitions during a rebalance event. As a result, end-to-end rebalance times triggered by scaling out/down operations as well as rolling bounces are shorter, benefitting heavy, stateful consumers, such as Kafka Streams applications.
KIP-455: Create an Administrative API for Replica Reassignment
As a replacement for the existing ZooKeeper-based API, the new API supports incremental replica reassignments and cancellation of ongoing reassignments. This also addresses the current limitations in the ZooKeeper-based API like security enforcement and auditability. The new API is exposed via the AdminClient.
See KIP-455 for more details.
KIP-480: Sticky Partitioner
Currently, in the case where no partition and key are specified, a producer's default partitioner partitions records in a round-robin fashion. This results in more batches that are smaller in size and leads to more requests and queuing as well as higher latency.
KIP-480 implements a new partitioner, which chooses the sticky partition that changes when the batch is full if no partition or key is present. Using the sticky partitioner helps improve message batching, decrease latency, and reduce the load for the broker. Some of the benchmarks which Justine Olshan discusses on the KIP show up to a 50% reduction in latency and 5–15% reduction in CPU utilization.
KIP-482: The Kafka Protocol should Support Optional Tagged Fields
The Kafka remote procedure call (RPC) protocol has its own serialization format for binary data. The Kafka protocol currently does not support optional fields, nor does it support attaching an extra field to a message in a manner that is orthogonal to the versioning scheme.
In order to support these scenarios, KIP-482 adds optional tagged fields to the Kafka serialization format. Tagged fields are always optional. KIP-482 also implements more efficient serialization for variable-length objects.
See KIP-482 for more details.
KIP-504: Add new Java Authorizer Interface
This KIP defines a Java authorizer API that is consistent with other pluggable interfaces in the broker. Several limitations in the current Scala authorizer API that could not be fixed without breaking compatibility have been addressed in the new API. Additional request context is now provided to authorizers to support authorization based on the security protocol or listener.
The API also supports asynchronous ACL updates with batching. The new pluggable authorizer API only requires a dependency on the client’s JAR. A new out-of-the-box authorizer has been added, leveraging features supported by the new API. The additional context provided to the authorizer has been used to improve audit logging. Batched updates enhance the efficiency of ACL updates using the new authorizer when multiple ACLs are added for a resource. An asynchronous startup and updated APIs will enable Kafka to be used as the storage backend for ACLs once ZooKeeper is removed under KIP-500. In addition, authorizer implementations can now enable dynamic reconfiguration without broker restarts.
See KIP-504 for more details.
KIP-525: Return topic metadata and configs in CreateTopics response
Before, the CreateTopics API response only returned a success or failure status along with any errors. With KIP-525, the API response returns additional metadata, including the actual configuration of the topic that was created. This removes the need for additional requests to obtain topic configuration after creating the topic.
Furthermore, this KIP enables users to obtain default broker configs for topic creation using CreateTopics with validateOnly=true. This is useful for displaying default configs in management tools used to create topics.
See KIP-525 for more details.
KAFKA-7548: KafkaConsumer should not throw away already fetched data for paused partitions.
When a partition is paused by the user in the consumer, the partition is considered "unfetchable." When the consumer has already fetched data for a partition and the partition is paused, then in the next consumer poll all data from "unfetchable" partitions will be discarded. In use cases where pausing and resuming partitions are common during regular operation of the consumer, this can result in discarding pre-fetched data when it's not necessary.
Once the partition is resumed, new fetch requests will be generated and sent to the broker to get the same partition data again. Depending on the frequency of pausing and resuming of partitions, this can impact different aspects of consumer polling, including broker/consumer throughput, number of consumer fetch requests, and NIO-related garbage collection (GC) concerns for regularly dereferenced byte buffers of partition data. This issue is now resolved by retaining completed fetch data for partitions that are paused so that it may be returned in a future consumer poll once the partition is resumed by the user.
See KAFKA-7548 for more details.
What’s new in Kafka Connect
KIP-382: MirrorMaker 2.0
KIP-382 implements MirrorMaker 2.0 (MM2), a new multi-cluster, cross-datacenter replication engine based on the Connect framework. This tool includes several features designed for disaster recovery, including cross-cluster consumer checkpoints and offset syncs. Automatic topic renaming and cycle detection enable bidirectional active-active replication and other complex topologies.
A new RemoteClusterUtils class enables clients to interpret checkpoints, heartbeats, and "remote topics" from other clusters.
See KIP-382 for more details.
KIP-440: Extend Connect Converter to support headers
KIP-440 adds header support to Kafka Connect. This enables the use of Kafka Connect together with Kafka producers and consumers that rely on headers for serialization/deserialization.
See KIP-440 for more details.
KIP-507: Securing Internal Connect REST Endpoints
KIP-507 brings out-of-the-box authentication and authorization to an internal REST endpoint used by Connect workers to relay task configurations to the leader. If left unsecured, this endpoint could be used to write arbitrary task configurations to a Connect cluster.
However, after KIP-507, the endpoint automatically secures as long as the other attack surfaces of a Connect cluster (such as its public REST API and the underlying Kafka cluster used to host internal topics and perform group coordination) are also secure.
See KIP-507 for more details.
KIP-481: SerDe Improvements for Connect Decimal type in JSON
KIP-481 adds to the JSON converter decimal.format for serializing Connect’s DECIMAL logical type values as number literals rather than base64 string literals. This new option defaults to base64 to maintain the previous behavior, but it can be changed to number to serialize decimal values as normal JSON numbers. The JSON converter automatically deserializes using either format, so make sure to upgrade consumer applications and sink connectors before changing source connector converters to use the number format.
See KIP-481 for more details.
What’s New in Kafka Streams
KIP-213: Support non-key joining in KTable
Previously, the Streams domain-specific language (DSL) only allowed table-table joins based on the primary key of the joining KTables. Now, for a KTable (left input) to join with another KTable (right input) based on a specified foreign key as part of its value fields, the join result is a new KTable keyed on the left KTable’s original key. This supports updates from both sides of the join.
See KIP-213 for more details.
KIP-307: Allow to define custom processor names with KStreams DSL
Prior to this release, while building a new topology through the Kafka Streams DSL, the processors were automatically named. A complex topology with dozens of operations can be hard to understand if the processor names are not relevant. This KIP allows users to set more meaningful processor names.
See KIP-307 for more details.
KIP-470: TopologyTestDriver test input and output usability improvements
The TopologyTestDriver allows you to test Kafka Streams logic. This is a lot faster than utilizing actual producers and consumers and makes it possible to simulate different timing scenarios. Kafka 2.4.0 introduces TestInputTopic and TestOutputTopic classes to simplify the test interface.
Metrics, monitoring, and operational improvements
- KIP-412 adds support to dynamically alter a broker's log levels using the Admin API.
- KIP-495 allows users to dynamically alter log levels in the Connect framework.
- KIP-521 changes Connect to also send log messages to a file and rolls that file every day.
- KIP-460 modifies the PreferredLeaderElection RPC to support unclean leader election in addition to preferred leader election.
- KIP-464 allows you to leverage num.partitions and default.replication.factor from the AdminClient#createTopics API.
- KIP-492 supports the security provider config, which can be used to configure custom security algorithms.
- KIP-496 adds an API to delete consumer offsets and expose it via the AdminClient.
- KIP-503 adds metrics to monitor the number of topics/replicas marked for deletion.
- KIP-475 adds metrics to measure the number of tasks on a connector.
- KIP-471 exposes a subset of RocksDB's statistics in Kafka Streams metrics, which enables users to find bottlenecks and tune RocksDB accordingly.
- KIP-484 adds new metrics for the group and transaction metadata loading duration.
- KIP-444 adds a few new metrics at the Streams instance level such as static version/commit-id as well as dynamic state.
ZooKeeper upgrade to 3.5.x
ZooKeeper has been upgraded to 3.5.x. support for TLS encryption added in ZooKeeper 3.5.x. This enables us to configure TLS encryption between Kafka brokers and ZooKeeper.
Scala 2.13 support
Apache Kafka 2.4.0 now supports Scala 2.13 while also remaining compatible with Scala 2.12 and 2.11.
We want to take this opportunity to thank everyone who has contributed to this release!
What's New in Apache Kafka 2.3
It’s official: Apache Kafka® 2.3 has been released! Here is a selection of some of the most interesting and important features we added in the new release.
KIP-351 and KIP-427: Improved monitoring for partitions which have lost replicas
In order to keep your data safe, Kafka creates several replicas of it on different brokers. Kafka will not allow writes to proceed unless the partition has a minimum number of in-sync replicas. This is called the “minimum ISR.”
Kafka already had metrics showing the partitions that had fewer than the minimum number of in-sync replicas. In this release, KIP-427 adds additional metrics showing partitions that have exactly the minimum number of in-sync replicas. By monitoring these metrics, users can see partitions that are on the verge of becoming under-replicated.
Additionally, KIP-351 adds the --under-min-isr command line flag to the kafka-topics command. This allows users to easily see which topics have fewer than the minimum number of in-sync replicas.
KIP-354: Add a Maximum Log Compaction Lag
To a first-order approximation, previous values of a key in a compacted topic get compacted some time after the latest key is written. Only the most recent value is available, and previous values are not. However, it has always been possible to set the minimum amount of time a key would stick around before being compacted, so we don’t lose the old value too quickly. Now, with KIP-354, it’s possible to set the maximum amount of time an old value will stick around. The new parameter max.log.compaction.time.ms specifies how long an old value may possibly live in a compacted topic. This can be used in complying with data retention regulations such as the GDPR.
KIP-402: Improve fairness in SocketServer processors
Previously, Kafka would prioritize opening new TCP connections over handling existing connections. This could cause problems if clients attempted to create many new connections within a short time period.KIP-402 prioritizes existing connections over new ones, which improves the broker’s resilience to connection storms. The KIP also adds a max.connections per broker setting.
KIP-461: Improve failure handling in the Replica Fetcher
In order to keep replicas up to date, each broker maintains a pool of replica fetcher threads. Each thread in the pool is responsible for fetching replicas for some number of follower partitions. Previously, if one of those partitions failed, the whole thread would fail with it, causing under-replication on potentially hundreds of partitions. With this KIP, if a single partition managed by a given replica fetcher thread fails, the thread continues handling the remainder of its partitions.
KAFKA-7283: Reduce the amount of time the broker spends scanning log files when starting up
When the broker starts up after an unclean shutdown, it checks the logs to make sure they have not been corrupted. This JIRA optimizes that process so that Kafka only checks log segments that haven't been explicitly flushed to disk. Now, the time required for log recovery is no longer proportional to the number of logs. Instead, it is proportional to the number of unflushed log segments. Some of the benchmarks which Zhanxiang Huang discusses on the JIRA show up to a 50% reduction in broker startup time.
KIP-415: Incremental Cooperative Rebalancing in Kafka Connect
In Kafka Connect, worker tasks are distributed among the available worker nodes. When a connector is reconfigured or a new connector is deployed-- as well as when a worker is added or removed-- the tasks must be rebalanced across the Connect cluster. This helps ensure that all of the worker nodes are doing a fair share of the Connect work. In 2.2 and earlier, a Connect rebalance caused all worker threads to pause while the rebalance proceeded. As of KIP-415, rebalancing is no longer a stop-the-world affair, making configuration changes a more pleasant thing.
KIP-449: Add connector contexts to Connect worker logs
A running Connect cluster contains several different thread pools. Each of these threads emits its own logging, as one might expect. However, this makes it difficult to untangle the sequence of events involved in a single logical operation, since the parts of that operation are running asynchronously in their various threads across the cluster. This KIP adds some context to each Connect log message, making it much easier to make sense of the state of a single connector over time.
Take a look at Robin Moffatt’s blog post.
KIP-258: Allow Users to Store Record Times=tamps in RocksDB
Prior to this KIP, message timestamps were not stored in the Streams state store. Only the key and value were there. With t=his KIP, timestamps are now included in the state store. This KIP lays the groundwork to enable future features like handling out-of-order messages in KTables and implementing TTLs for KTables.
KIP-428: Add in-memory window store / KIP-445: Add in-memory Session Store
These KIPs add in-memory implementations for the Kafka Streams window store and session store. Previously, the only component with an in-memory implementation was the state store. The in-memory implementations provide higher performance, in exchange for lack of persistence to disk. In many cases, this can be a very good tradeoff.
KIP-313: Add KStream.flatTransform and KStream.flatTransformValues
The first half of this KIP, the flatTransform() method, was delivered in Kafka 2.2. The flatTransform() method is very similar to flatMap(), in that it takes a single input record and produces one or more output records. flatMap() does this in a type-safe way but without access to the ProcessorContext and the state store. We’ve been able to use the Processor API to perform this same kind of operation with access to the ProcessorContext and the state store, but without the type safety of flatMap(). flatTransform() gave us the best of both worlds: processor API access, plus compile-time type checking.
flatTransformValues(), just introduced in the completed KIP-313 in Kafka 2.3, is to flatTransform() as flatMapValues() is to flatMap(). It lets us do processor-API-aware computations that return multiple records for each input record without changing the message key and causing a repartition.
Apache Kafka Supports 200K Partitions Per Cluster
In Kafka, a topic can have multiple partitions to which records are distributed. Partitions are the unit of parallelism. In general, more partitions leads to higher throughput. However, there are some factors that one should consider when having more partitions in a Kafka cluster. I am happy to report that the recent Apache Kafka 1.1.0 release has significantly increased the number of partitions that a single Kafka cluster can support from the deployment and the availability perspective.
To understand the improvement, it’s useful to first revisit some of the basics about partition leaders and the controller. First, each partition can have multiple replicas for higher availability and durability. One of the replicas is designated as the leader and all the client requests are served from this lead replica. Second, one of the brokers in the cluster acts as the controller that manages the whole cluster. If a broker fails, the controller is responsible for selecting new leaders for partitions on the failed broker.
The Kafka broker does a controlled shutdown by default to minimize service disruption to clients. A controlled shutdown has the following steps. (1) A SIG_TERM signal is sent to the broker to be shut down. (2) The broker sends a request to the controller to indicate that it’s about to shut down. (3) The controller then changes the partition leaders on this broker to other brokers and persists that information in ZooKeeper. (4) The controller sends the new leader to other brokers. (5) The controller sends a successful reply to the shutting down broker, which finally terminates its process. At this point, there is no impact to the clients since their traffic has already been moved to other brokers. This process is depicted in Figure 1 below. Note that step (4) and (5) can happen in parallel.
Figure 1. (1) shutdown initiated on broker 1; (2) broker 1 sends controlled shutdown request to controller on broker 0; (3) controller writes new leaders in ZooKeeper; (4) controller sends new leaders to broker 2; (5) controller sends successful reply to broker 1.
Before Kafka 1.1.0, during the controlled shutdown, the controller moves the leaders one partition at a time. For each partition, the controller selects a new leader, writes it to ZooKeeper synchronously and communicates the new leader to other brokers through a remote request. This process has a couple of inefficiencies. First, the synchronous writes to ZooKeeper have higher latency, which slows down the controlled shutdown process. Second, communicating the new leader one partition at a time adds many small remote requests to every broker, which can cause the processing of the new leaders to be delayed.
In Kafka 1.1.0, we made significant improvements in the controller to speed up the controlled shutdown. The first improvement is using the asynchronous API when writing to ZooKeeper. Instead of writing the leader for one partition, waiting for it to complete and then writing another one, the controller submits the leader for multiple partitions to ZooKeeper asynchronously and then waits for them to complete at the end. This allows for request pipelining between the Kafka broker and the ZooKeeper server and reduces the overall latency. The second improvement is that the communication of the new leaders is batched. Instead of one remote request per partition, the controller sends a single remote request with the leader from all affected partitions.
We also made significant improvement in the controller failover time. If the controller goes down, the Kafka cluster automatically elects another broker as the new controller. Before being able to elect the partition leaders, the newly-elected controller has to first reload the state of all partitions in the cluster from ZooKeeper. If the controller has a hard failure, the window in which a partition is unavailable can be as long as the ZooKeeper session expiration time plus the controller state reloading time. So reducing the state reloading time improves the availability in this rare event. Prior to Kafka 1.1.0, the reloading uses the synchronous ZooKeeper API. In Kafka 1.1.0, this is changed to also use the asynchronous API for better latency.
We executed tests to evaluate the performance improvement of the controlled shutdown time and the controller reloading time. For both tests, we set up a 5 node ZooKeeper ensemble on different server racks.
In the first test, we set up a Kafka cluster with 5 brokers on different racks. In that cluster, we created 25,000 topics, each with a single partition and 2 replicas, for a total of 50,000 partitions. So, each broker has 10,000 partitions. We then measured the time to do a controlled shutdown of a broker. The results are shown in the table below.
|kafka 1.0.0||kafka 1.1.0|
|controlled shutdown time||6.5 minutes||3 seconds|
A big part of the improvement comes from fixing a logging overhead, which unnecessarily logs all partitions in the cluster every time the leader of a single partition changes. By just fixing the logging overhead, the controlled shutdown time was reduced from 6.5 minutes to 30 seconds. The asynchronous ZooKeeper API change reduced this time further to 3 seconds. These improvements significantly reduce the time to restart a Kafka cluster.
In the second test, we set up another Kafka cluster with 5 brokers and created 2,000 topics, each with 50 partitions and 1 replica. This makes a total of 100,000 partitions in the whole cluster. We then measured the state reloading time of the controller and observed a 100% improvement (the reloading time dropped from 28 seconds in Kafka 1.0.0 to 14 seconds in Kafka 1.1.0).
With those improvements, how many partitions can one expect to support in Kafka? The exact number depends on factors such as the tolerable unavailability window, ZooKeeper latency, broker storage type, etc. As a rule of thumb, we recommend each broker to have up to 4,000 partitions and each cluster to have up to 200,000 partitions. The main reason for the latter cluster-wide limit is to accommodate for the rare event of a hard failure of the controller as we explained earlier. Note that other considerations related to partitions still apply and one may need some additional configuration tuning with more partitions.
The improvement that we made in 1.1.0 is just one step towards our ultimate goal of making Kafka infinitely scalable. We have also made improvements on latency with more partitions in 1.1.0 and will discuss that in a separate blog. In the near future, we plan to make further improvements to support millions of partitions in a Kafka cluster.
The controller improvement work in Kafka 1.1.0 is a true community effort. Over a period of 9 months, people from 6 different organizations helped out and made this happen. Onur Karaman led the original design, did the core implementation and conducted the performance evaluation. Manikumar Reddy, Prasanna Gautam, Ismael Juma, Mickael Maison, Sandor Murakozi, Rajini Sivaram and Ted Yu each contributed some additional implementation or bug fixes. More details can be found in KAFKA-5642 and KAFKA-5027. A big thank you to all the contributors!