Monday October 03, 2022

What’s New in Apache Kafka 3.3

We are proud to announce the release of Apache Kafka® 3.3 on behalf of the Apache Kafka community. The 3.3 release contains many new features and improvements. This blog post will highlight some of the more prominent features. For a full list of changes, be sure to check the 3.3.0 and 3.3.1 release notes.

For several years, the Apache Kafka community has been developing a new way to run Apache Kafka with self-managed metadata. This new mode, called KRaft mode, improves partition scalability and resiliency while simplifying deployments of Apache Kafka. It also eliminates the need to run an Apache ZooKeeper™ cluster alongside every Apache Kafka cluster.

The 3.3 release now marks KRaft mode as production ready for new clusters only. There are some features that are currently supported by Apache ZooKeeper (ZK) mode that are not yet supported by KRaft mode. For more information on these features and proposed KRaft timelines, read KIP-833.

Kafka Broker, Controller, Producer, Consumer and Admin Client

KIP-833: Mark KRaft as Production Ready

KIP-833 marks KRaft as production-ready for new clusters in the Apache Kafka 3.3 release. KIP-833 also marks 3.5.0 as the bridge release. The bridge release is the release that would allow the migration of Apache Kafka clusters from ZK mode to KRaft mode.

KIP-778: KRaft to KRaft upgrades

KIP-778 allows the upgrade of KRaft clusters without the need for the infamous “double roll”. In order to facilitate upgrades of Apache Kafka in KRaft mode, we need the ability to upgrade controllers and brokers while holding back the use of new RPCs and record formats until the whole cluster has been upgraded.

KIP-841: Fenced replicas should not be allowed to join the ISR in KRaft

KIP-841 improves the topic partitions’ availability during clean shutdown. It does this by enforcing the following invariants: 1) a fenced or in-controlled-shutdown replica is not eligible to be in the ISR; and 2) a fenced or in-controlled-shutdown replica is not eligible to become leader.

KIP-836: Expose replication information of the cluster metadata

KIP-836 exposes the DescribeQuorum API to the Admin client and adds two new fields per replica to the response. This information can be used to query the availability and lag of the cluster metadata for the controllers and brokers in a KRaft cluster.

KIP-835: Monitor KRaft Controller Quorum health

With KRaft mode, Apache Kafka added a new controller quorum to the cluster. These controllers need to be able to commit records for Apache Kafka to be available. KIP-835 measures availability by periodically causing the high-watermark and the last committed offset to increase. Monitoring services can compare that these last committed offsets are advancing. They can also use these metrics to check that all of the brokers and controllers are relatively within each other's offset.

KIP-859: Add metadata log processing error related metrics

With KRaft mode, the cluster metadata replicated log is the source of metadata related information for all servers in the cluster. Any errors that occur while processing this log could lead to the in-memory state for the server becoming inconsistent. It is important that such errors are made visible. KIP-859 exposes metrics that can be monitored so that affected servers can be discovered.

KIP-794: Strictly Uniform Sticky Partitioner

KIP-794 improves the default partitioner to distribute non-keyed data evenly in batches among healthy brokers and less data to unhealthy brokers. For example, the p99 latency for a producer workload with abnormal behavior was reduced from 11s to 154ms.

KIP-373: Allow users to create delegation tokens for other users

KIP-373 allows users to create delegation tokens for other users. This allows the following use cases: 1) a designated superuser can create tokens without requiring individual user credentials; and 2) a designated superuser can run kafka clients on behalf of another user.

KIP-831: Add metric for log recovery progress

Log recovery is a process triggered when a Kafka server starts up, if it had a previous unclean shutdown. It is used to make sure the log is in a good state and is not corrupted. KIP-831 exposes metrics to allow users to monitor the progress of log recovery.

KIP-709: Extend OffsetFetch RPC to accept multiple group ids

KIP-709 streamlines the process of fetching offsets from consumer groups so that a single request can be made to fetch offsets for multiple groups. This carries the following advantages: 1) it reduces request overhead; and 2) it simplifies client side code.

KIP-827: Expose log dirs total and usable space via Kafka API

KIP-827 exposes an RPC for querying the disk total size and disk usage size per log directory. This is useful for tools that are interested in querying this information without relying on the exposed metrics.

KIP-851: Add requireStable flag into ListConsumerGroupOffsetsOptions

KIP-851 adds the option in the Admin client for querying the committed offsets when using exactly once semantics.

KIP-843: Adding addMetricIfAbsent method to Metrics

KIP-843 allows the metrics API to atomically query a metric if present or create a metric if absent.

KIP-824: Allowing dumping segment logs limiting the batches in the output

KIP-824 allows the kafka-dump-logs tool to be configured to only scan and print the records at the start of the log segment instead of the entire log segment.

Kafka Streams

KIP-846: Source/sink node metrics for consumed/produced throughput in Streams

With the metrics available today in the plain consumer it is possible for users of Kafka Streams to derive the consumed throughput of their applications at the subtopology level, but the same is not true for the produced throughput.

KIP-846 fills in this gap and gives end users a way to compute the production rate of each subtopology by introducing two new metrics for the throughput at sink nodes. Even though it is possible to derive the consumed throughput with existing client level metrics, KIP-846 also adds two new metrics for the throughput at source nodes to provide an equally fine-grained metrics scope as for the newly added metrics at the sink nodes and to simplify the user experience.

KIP-834: Pause/resume KafkaStreams topologies

KIP-834 adds the ability to pause and resume topologies. This can be used to reduce resources used or modify data pipelines. Paused topologies skip processing, punctuation, and standby tasks. For distributed Streams applications, each instance will need to be paused and resumed separately.

KIP-820: Consolidate KStream transform() and process() methods

KIP-820 generalizes the KStream API to consolidate Transformers (which could forward results) and Processors (which could not). The change makes use of the new type-safe Processor API. This simplifies Kafka Streams, making it easier to use and learn.

KIP-812: Introduce another form of the KafkaStreams.close() API that forces the member to leave the consumer group

KIP-812 can efficiently close the stream permanently by forcing the member to leave the consumer group.

Kafka Connect

KIP-618: Exactly once support for source connectors

KIP-618 adds exactly one semantic support to source connectors. The Connect framework was expanded to atomically write source records and their source offsets to Apache Kafka, and to prevent zombie tasks from producing data to Apache Kafka.


In addition to all of the KIPs listed above, Apache Kafka 3.3 is packed with fixes and improvements. To learn more:

This was a community effort, so thank you to everyone who contributed to this release, including all our users and our 116 authors:

Akhilesh C, Akhilesh Chaganti, Alan Sheinberg, Aleksandr Sorokoumov, Alex Sorokoumov, Alok Nikhil, Alyssa Huang, Aman Singh, Amir M. Saeid, Anastasia Vela, András Csáki, Andrew Borley, Andrew Dean, andymg3, Aneesh Garg, Artem Livshits, A. Sophie Blee-Goldman, Bill Bejeck, Bounkong Khamphousone, bozhao12, Bruno Cadonna, Chase Thomas, chern, Chris Egerton, Christo Lolov, Christopher L. Shannon, CHUN-HAO TANG, Clara Fang, Clay Johnson, Colin Patrick McCabe, David Arthur, David Jacot, David Mao, Dejan Maric, dengziming, Derek Troy-West, Divij Vaidya, Edoardo Comar, Edwin, Eugene Tolbakov, Federico Valeri, Guozhang Wang, Hao Li, Hongten, Idan Kamara, Ismael Juma, Jacklee, James Hughes, Jason Gustafson, JK-Wang, jnewhouse, Joel Hamill, John Roesler, Jorge Esteban Quilcate Otoya, José Armando García Sancio, jparag, Justine Olshan, K8sCat, Kirk True, Konstantine Karantasis, Kvicii, Lee Dongjin, Levani Kokhreidze, Liam Clarke-Hutchinson, Lucas Bradstreet, Lucas Wang, Luke Chen, Manikumar Reddy, Marco Aurelio Lotz, Matthew de Detrich, Matthias J. Sax, Mickael Maison, Mike Lothian, Mike Tobola, Milind Mantri, nicolasguyomar, Niket, Niket Goel, Nikolay, Okada Haruki, Philip Nee, Prashanth Joseph Babu, Rajani Karuturi, Rajini Sivaram, Randall Hauch, Richard Joerger, Rittika Adhikari, RivenSun, Rohan, Ron Dagostino, ruanliang, runom, Sanjana Kaundinya, Sayantanu Dey, SC, sciclon2, Shawn, sunshujie1990, Thomas Cooper, Tim Patterson, Tom Bentley, Tom Kaszuba, Tomonari Yamashita, vamossagar12, Viktor Somogyi-Vass, Walker Carlson, Xavier Léauté, Xiaobing Fang, Xiaoyue Xue, xjin-Confluent, xuexiaoyue, Yang Yu, Yash Mayya, Yu, yun-yun

Tuesday May 17, 2022

What's New in Apache Kafka 3.2.0

I’m proud to announce the release of Apache Kafka 3.2.0 on behalf of the Apache Kafka® community. The 3.2.0 release contains many new features and improvements. This blog will highlight some of the most prominent new features. For the full list of changes, be sure to see the release notes. You can also watch the release video for a summary of what’s new in Apache Kafka 3.2.0.

While KRaft mode is not yet recommended for production, we have introduced a KRaft-based authorizer along with several fixes and improvements. In addition, a proposal for marking KRaft mode as production ready in Apache Kafka 3.3 is being discussed by the community.

Since log4j 1.x has known security vulnerabilities and is not maintained anymore, we replaced it with reload4j. reload4j is a drop-in replacement with fixes for the known security vulnerabilities. We plan to migrate to log4j 2.x in the next major release of Apache Kafka (see KIP-653).

Kafka broker, Producer, Consumer and AdminClient

KIP-801: StandardAuthorizer for KRaft

KIP-801 introduces a built-in authorizer, StandardAuthorizer, that does not depend on Zookeeper. This means you can now run a secure Kafka cluster without Zookeeper! StandardAuthorizer stores its ACLs in the __cluster_metadata topic and it is used by default in KRaft clusters. StandardAuthorizer does all of the same things that AclAuthorizer does for Zookeeper-dependent clusters.

KIP-704: Send a hint to the partition leader to recover the partition

With KIP-704, the controller is now able to communicate to a newly elected topic partition leader whether it was elected using the unclean leader election strategy. This information tells the new topic partition leader that it needs to recover its state. As an example, this will be used in the future to clean up transaction state, which may be left inconsistent following an unclean election.

KIP-764: Configurable backlog size for creating Acceptor

When there are many large clients, preferred leader election can cause many clients to open connections within a very short window of time. This can cause the SYN backlog for TCP’s acceptor sockets to be filled up resulting in retries being delayed or producers being slowed down.

KIP-764 introduces a new configuration socket.listen.backlog.size that allows setting the size of the SYN backlog for TCP’s acceptor sockets on the brokers. Increasing this configuration can mitigate the issues resulting from many open connections.

KIP-784: Add top-level error code field to DescribeLogDirsResponse

KIP-784 adds an error code to the response of the DescribeLogDirs API. In previous releases DescribeLogDirs returned an empty response if users did not have the necessary authorization for the request. Clients had to interpret the empty response as a CLUSTER_AUTHORIZATION_FAILED error. KIP-784 makes DescribeLogDirs API consistent with other APIs and allows returning other errors besides CLUSTER_AUTHORIZATION_FAILED.

KIP-788: Allow configuring num.network.threads per listener

On Kafka brokers it is common to define multiple listeners. Each listener has its own pool of network threads. In many cases, some listeners handle a lot less traffic than others and typically do not need the same number of threads as the listeners that need to handle more traffic.

KIP-788 allows setting the pool size of network threads individually per listener. This allows for fine-tuning of the number of network threads to dynamically accommodate traffic spikes or slightly reduce memory usage when using listeners with different traffic loads. For this purpose, the existing configuration num.network.threads is updated to support being set on specific listeners via listener.name.<name of the listener>.num.network.threads.

KIP-798 and KIP-810: kafka-console-producer now can write headers and null values

The kafka-console-producer is an important debugging tool. KIP-798 provides a way to add headers to a record that is written to a topic. KIP-810 allows writing records with value null to a topic. That means the kafka-console-producer can now produce tombstone records to a compacted topic. Both of these features improve debugging with the kafka-console-producer.

KIP-800: Add reason to JoinGroupRequest and LeaveGroupRequest

When a consumer leaves or joins a consumer group, it logs the reason locally. Until this release, the brokers did not have any information on the reasons a consumer joined or left the consumer group. That made rebalances triggered by LeaveGroupRequest and JoinGroupRequest hard to troubleshoot. KIP-800 propagates the reasons for leaving and joining a consumer group to the brokers, making troubleshooting rebalance issues easier.

KIP-814: Static membership protocol should let the leader skip assignment

Since Apache Kafka 2.4.0 when static membership was introduced, consumers can rejoin a consumer group after a brief absence without triggering a rebalance. If the leader of the consumer group had a brief absence and then rejoined, it would remain the leader. However, there was no way to let the rejoined consumer know that it was still the leader without triggering another rebalance. Ultimately this can cause the group to miss some metadata changes, such as partition increases. With KIP-814 the rejoining leader is informed about its leadership without computing a new assignment.

Kafka Streams

KIP-708: Rack awareness for Kafka Streams

Starting with Apache Kafka 3.2.0, Kafka Streams can distribute its standby replicas over distinct “racks” with KIP-708. To form a “rack”, Kafka Streams uses tags in the application configuration. For example, Kafka Streams clients might be tagged with the cluster or the cloud region they are running in. Users can specify the tags that should be used for the rack-aware distribution of the standby replicas by setting the configuration rack.aware.assignment.tags. During task assignment, Kafka Streams will try its best to distribute the standby replicas over different task dimensions. Rack-aware standby assignment improves fault tolerance in case of the failure of an entire “rack”. This can be used, for example, to ensure that replicas are distributed over different availability zones in a cloud hosting provider.

KIP-796, KIP-805, KIP-806: Interactive Query v2

KIP-796 specifies an improved interface for Interactive Queries in Kafka Streams (IQv2). The new interface aims to make querying the state store simpler and faster as well as to reduce the maintenance cost when modifying existing state stores and when adding new state stores. KIP-796 describes the generic interface for querying state stores with Interactive Queries. Specific query types can be added to Interactive Query v2 by implementing the Query interface. KIP-976 also defines the KeyQuery class to allow users to evaluate a key/value lookup via IQv2.

KIP-805 adds the RangeQuery class to Interactive Query v2. The RangeQuery class is an implementation of the Query interface that allows querying state stores over a range specified by upper or lower key bounds or scanning all records of a state store when no bounds are provided.

KIP-806 adds two implementations of the Query interface – the WindowKeyQuery class and the WindowRangeQuery class. The former allows scanning over windows with a given key within a given time range and the latter allows scanning over windows within a given time range independently of the keys of the windows.

KIP-796 is a long-term project that will be extended with new query types in future releases. As of Apache Kafka 3.2.0, IQv2 is in preview. The public documentation site has not been updated, and the interfaces of IQv2 are marked as @Evolving (meaning that they may break compatibility in minor releases without a deprecation period if preview users find significant flaws in the current API). A future release will remove the @Evolving annotation and designate IQv2 as stable.

KIP-791: Add record metadata to state store context

KIP-791 adds method recordMetadata() to the StateStoreContext, providing access to the topic, partition, and offset of the record currently being processed. Exposing the current context in this way allows state stores to track their current offset in each input partition, allowing them to implement the consistency mechanisms introduced in KIP-796.

Kafka Connect

KIP-769: Connect APIs to list all connector plugins and retrieve their configuration definitions

KIP-769 extends the GET /connector-plugins endpoint with a new query parameter connectorsOnly which when set to false lists all the available plugins and not just connectors. The new query parameter helps users to verify what plugins are available without the need to know how the Connect runtime is set up. Usage of the new parameter is GET /connector-plugins?connectorsOnly=false. By default connectorsOnly is set to true for compatibility with previous behavior.

Additionally, KIP-769 adds a new endpoint that will return the configurations of a given plugin. The new endpoint is used as follows: GET /connector-plugins/<plugin>/config. The new endpoint works with all plugins returned by GET /connector-plugins.

KIP-808: Add support for different Unix time precisions in TimestampConverter SMT

KIP-808 introduces a new optional configuration field unix.precision for the TimestampConverter SMT that allows the user to define a desired precision for the SMT. Valid values for this new field are seconds, milliseconds, microseconds, and nanoseconds. This addition is motivated by the fact that in external systems Unix time is represented with different precisions.

KIP-779: Allow source tasks to handle producer exceptions

KIP-779 makes source connectors resilient to producer exceptions. Since source connectors ingest data from systems users do not have control over, it might happen that received messages are too large or unprocessable for the configured Connect worker, Kafka broker, and other ecosystem components. Previously such an error always killed the connector.

With KIP-779 the WorkerSourceTask checks for the configured error.tolerance when sending the message fails. If error.tolerance is set to all, the WorkerSourceTask will ignore the exception, allow the connector to acknowledge its source system and continue processing. If error.tolerance is not set to all, the source connector will fail.

A note on compatibility: Existing source connectors that set errors.tolerance to all and expect to die on producer failure will need to be updated as described in the KIP. Source connectors that do not set errors.tolerance to all will not be affected by this change and be killed in the event of a producer failure.


In addition to all the KIPs listed above, Apache Kafka 3.2.0 is packed with fixes and other improvements.

For next steps:

This was a huge community effort, so thank you to everyone who contributed to this release, including all our users and our 144 authors and reviewers:

A. Sophie Blee-Goldman, Adam Kotwasinski, Aleksandr Sorokoumov, Alexander Stohr, Alexandre Garnier, Alok Nikhil, Andras Katona, Andrew Eugene Choi, Antony Stubbs, Artem Livshits, Bill Bejeck, Bounkong Khamphousone, Boyang Chen, Bruno Cadonna, Chang, Chia-Ping Tsai, Chris Egerton, Colin P. McCabe, Cong Ding, David Arthur, David Jacot , David Mao, Ed B, Edwin, GauthamM-official, GuoPhilipse, Guozhang Wang, Gwen Shapira, Hao Li, Haoze Wu, Idan Kamara, Igor Soarez, Ismael Juma, Israel Ekpo, James Galasyn, James Hughes, Jason Gustafson, Jason Koch, Jeff Kim, Joel Hamill, John Roesler, Jonathan Albrecht, Jorge Esteban Quilcate Otoya, Josep Prat, Joseph (Ting-Chou) Lin, José Armando García Sancio, Jr, Jules Ivanic, Julien Chanaud, Jun Rao, Justin Lee, Justine Olshan, Kamal Chandraprakash, Kate Stanley, Kirk True, Knowles Atchison, Konstantine Karantasis, Kowshik Prakasam, Kurt Ostfeld, Kvicii, Leah Thomas, Lee Dongjin, Levani Kokhreidze, Liam Clarke-Hutchinson, Lucas Bradstreet, Ludovic DEHON, Luizfrf3, Luke Chen, Manikumar Reddy, Marc Löhe, Matthew Wong, Matthias J. Sax, Michal T, Mickael Maison, Mike Lothian, Márton Sigmond, Nick Telford, Nigel Liang, Niket, Okada Haruki, Omnia G H Ibrahim, Paolo Patierno, Patrick Stuedi, Philip Nee, Prateek Agarwal, Rajini Sivaram, Randall Hauch, Ricardo Brasil, Richard, RivenSun, Rob Leland, Ron Dagostino, Sayantanu Dey, Sean Li, Sergio Peña, Sherzod Mamadaliev, Shylaja Kokoori, Stanislav Vodetskyi, Tamara Skokova, Tim Patterson, Tolga H. Dur, Tom Bentley, Tomas Forsman, Tomonari Yamashita, Vicky Papavasileiou, Victoria Xia, Vijay Krishna, Vincent Jiang, Vladimir Sitnikov, Walker Carlson, Wenhao Ji, Wenjun Ruan, Xiaobing Fang, Xiaoyue Xue, YEONCHEOL JANG, Yang Yu, YeonCheol Jang, Yu, Zhang Hongyi, aSemy, bozhao12, defhacks, dengziming, florin-akermann, gf13871, jiangyuan, jiangyuan04, keashem, kurtostfeld, lhunyady, liym, loboxu, loboya~, mkandaswamy, prince-mahajan, sunshujie1990, vamossagar12, wangyap, xuexiaoyue, yasar03, zhonghou3, zzccctv, 工业废水, 彭小漪

Monday January 24, 2022

What's New in Apache Kafka 3.1.0

On behalf of the Apache Kafka® community, it is my pleasure to announce the release of Apache Kafka 3.1.0. The 3.1.0 release contains many improvements and new features. We’ll highlight some of the more prominent features in this blog post, but see the release notes for the full list of changes.

While KRaft is still not recommended for production (known gaps), we have fixed multiple bugs and we have continued to add missing features.

Tiered Storage work continues, with the goal of unlocking infinite scaling and faster rebalance times.

You can also watch the release video for a summary of what’s new in Apache Kafka 3.1.0.

Kafka broker, Producer, Consumer and AdminClient

KIP-516: Topic identifiers

Starting from Apache Kafka 3.1, the FetchRequest supports topic IDs. Topic IDs provide a safer way to fetch data from topics without any chance of incorrectly interacting with stale topics with the same name. It also improves the efficiency of the fetch protocol because sending Uuids on the wire is generally smaller than sending Strings. This is a major step forward in the development of KIP-516.

KIP-773: Differentiate consistently metric latency measured in millis and nanos

KIP-773 enhances naming consistency for three new client metrics with millis and nanos. For example, io-waittime-total is reintroduced as io-wait-time-ns-total. The previously introduced metrics without ns will be deprecated but available for backward compatibility.

KIP-768: Extend SASL/OAUTHBEARER with support for OIDC

KIP-768 provides a built-in and production-grade implementation of the interfaces defined in KIP-255 to allow Kafka to connect to an OpenID identity provider (e.g., Okta, Auth0, and Microsoft Azure) for authentication and token retrieval.

KIP-748: Add broker count metrics

KIP-748 introduces two new metrics that are exposed by both the ZooKeeper and KRaft controller: ActiveBrokerCount and FencedBrokerCount. They respectively expose the number of active brokers in the cluster known by the controller and the number of fenced brokers known by the controller.

Kafka Streams

KAFKA-13439: The eager rebalance protocol is deprecated

The cooperative rebalancing protocol has been the default since Kafka 2.4, but we have continued to support the eager rebalancing protocol to provide an upgrade path from earlier client versions. This support will be dropped in a future release, so any users still on the eager protocol should prepare to finish upgrading their applications to the cooperative protocol in version 3.1. See KAFKA-13439 for more details.

KIP-783: Add TaskId field to StreamsException

KIP-783 guarantees that every exception thrown up to the uncaught exception handler, whether that be the new StreamsUncaughtExceptionHandler or the old generic UncaughtExceptionHandler, is wrapped as a StreamsException. The KIP also introduces a new TaskId field to the StreamsException class, with a getter API to expose it. This field is set for any exception that originates from, or is tied to, a specific task.

KIP-775: Custom partitioners in foreign-key joins

Today, foreign-key (FK) joins in Kafka Streams only work if both tables being joined (the primary table and the foreign-key table) use the default partitioner.

This limitation is due to the subscription and response topics in the implementation being hardwired to use the default partitioner. If the foreign-key table is not co-partitioned with the subscription topic, then foreign-key lookups may be routed to a Streams instance that does not have state for the foreign-key table, resulting in missing join records. Similarly, if the primary table is not co-partitioned with the response topic, then subscription responses may be routed to an instance that does not contain the original (triggering) record, resulting in a failed hash comparison and a dropped join result.

KIP-775 introduces support for foreign-key joins on tables with custom partitioners, by extending the foreign-key join interface to allow custom partitioners to be passed in.

KIP-766: fetch/findSessions queries with open endpoints for SessionStore/WindowStore

KIP-766 extends the semantics of the existing range interfaces in the ReadOnlySessionStore and the ReadOnlyWindowStore to support unbounded ranges. Specifically, the interfaces now support the use of null values as a way to represent unbounded ranges.

KIP-763: Range queries with open endpoints

KIP-763 extends the semantics of the existing range and reverseRange interfaces in the ReadOnlyKeyValueStore to support unbounded ranges. Specifically, the interfaces now support the use of null values as a way to represent unbounded ranges.

KIP-761: Add total blocked time metric to Streams

KIP-761 introduces a new metric called blocked-time-total that measures the total time a Kafka Streams thread has spent blocked on Kafka since it was started. Users can sample this metric periodically and use the difference between samples to measure time blocked during an interval. This is very useful to debug Kafka Streams application performance as it gives the proportion of time the application was blocked on Kafka vs. processing records.


KIP-690: Add additional configuration to control MirrorMaker2 internal topics naming convention

MirrorMaker2 (MM2) internal topic names (heartbeats, checkpoints, and offset syncs) are hardcoded in the source code, which makes it hard to run MM2 with any Kafka cluster that has rules around a topic’s naming convention and doesn’t allow auto-creation for topics. In this case, you will need to create these internal topics upfront manually and make sure they do follow the cluster rules and guidance for topic creation, so MM2 should have flexibility to let you override the name of internal topics to use the ones you create.

KIP-690 introduces new methods to ReplicationPolicy that define how MM2 internal topics are named based on some new configuration.


Apache Kafka 3.1 has a lot of great fixes and improvements in addition to the KIPs listed here. To learn more:

This was a huge community effort, so thank you to everyone who contributed to this release, including all our users and our 114 authors and reviewers:

A. Sophie Blee-Goldman, Alexander Iskuskov, Alexander Stohr, Almog Gavra, Andras Katona, Andrew Patterson, Andy Chambers, Andy Lapidas, Anna Sophie Blee-Goldman, Antony Stubbs, Arjun Satish, Bill Bejeck, Boyang Chen, Bruno Cadonna, CHUN-HAO TANG, Cheng Tan, Chia-Ping Tsai, Chris Egerton, Christo Lolov, Colin P. McCabe, Cong Ding, Daniel Urban, David Arthur, David Jacot, David Mao, Dmitriy Fishman, Edoardo Comar, Ewen Cheslack-Postava, Greg Harris, Guozhang Wang, Igor Soarez, Ismael Juma, Israel Ekpo, Ivan Ponomarev, Jakub Scholz, James Galasyn, Jason Gustafson, Jeff Kim, Jim Galasyn, JoeCqupt, Joel Hamill, John Gray, John Roesler, Jongho Jeon, Jorge Esteban Quilcate Otoya, Jose Sancio, Josep Prat, José Armando García Sancio, Jun Rao, Justine Olshan, Kalpesh Patel, Kamal Chandraprakash, Kevin Zhang, Kirk True, Konstantine Karantasis, Kowshik Prakasam, Leah Thomas, Lee Dongjin, Lucas Bradstreet, Luke Chen, Manikumar Reddy, Matthew Wong, Matthias J. Sax, Michael Carter, Mickael Maison, Nigel Liang, Niket, Niket Goel, Oliver Hutchison, Omnia G H Ibrahim, Patrick Stuedi, Phil Hardwick, Prateek Agarwal, Rajini Sivaram, Randall Hauch, René Kerner, Richard Yu, Rohan, Ron Dagostino, Ryan Dielhenn, Sanjana Kaundinya, Satish Duggana, Sergio Peña, Sherzod Mamadaliev, Stanislav Vodetskyi, Ted Yu, Tom Bentley, Tomas Forsman, Tomer Wizman, Uwe Eisele, Victoria Xia, Viktor Somogyi-Vass, Vincent Jiang, Walker Carlson, Weisheng Yang, Xavier Léauté, Yanwen(Jason) Lin, Yi Ding, Zara Lim, andy0x01, dengziming, feyman2016, ik, ik.lim, jem, jiangyuan, kpatelatwork, leah, loboya~, lujiefsi, sebbASF, singingMan, vamossagar12, wenbingshen

Tuesday September 21, 2021

What's New in Apache Kafka 3.0.0

I'm pleased to announce the release of Apache Kafka 3.0 on behalf of the Apache Kafka® community. Apache Kafka 3.0 is a major release in more ways than one. Apache Kafka 3.0 introduces a variety of new features, breaking API changes, and improvements to KRaft—Apache Kafka’s built-in consensus mechanism that will replace Apache ZooKeeper™.

While KRaft is not yet recommended for production (list of known gaps), we have made many improvements to the KRaft metadata and APIs. Exactly-once and partition reassignment support are worth highlighting. We encourage you to check out KRaft's new features and to try it out in a development environment.

Starting with Apache Kafka 3.0, the Producer enables the strongest delivery guarantees by default (acks=all, enable.idempotence=true). This means that users now get ordering and durability by default.

Also, don’t miss the Kafka Connect task restart enhancements, Kafka Streams improvements in timestamp-based synchronization, and MirrorMaker2’s more flexible configuration options.

To review the full list of features and enhancements, be sure to read the release notes. You can also watch the release video for a summary of what’s new in Apache Kafka 3.0.0.

Universal changes

KIP-750 (Part I): Deprecate support for Java 8 in Kafka

Support for Java 8 is deprecated across all components of the Apache Kafka project in 3.0. This will give users time to adapt before the next major release (4.0), when Java 8 support is planned to be removed.

KIP-751 (Part I): Deprecate support for Scala 2.12 in Kafka

Support for Scala 2.12 is also deprecated everywhere in Apache Kafka 3.0. As with Java 8, we’re giving users time to adapt because support for Scala 2.12 is planned to be removed in the next major release (4.0).

Kafka Broker, Producer, Consumer and AdminClient

KIP-630: Kafka Raft Snapshot

A major feature that we are introducing with 3.0 is the ability for KRaft Controllers and KRaft Brokers to generate, replicate, and load snapshots for the metadata topic partition named __cluster_metadata. This topic is used by the Kafka Cluster to store and replicate metadata information about the cluster like Broker configuration, topic partition assignment, leadership, etc. As this state grows, Kafka Raft Snapshot provides an efficient way to store, load, and replicate this information.

KIP-746: Revise KRaft Metadata Records

Experience and continuous development since the first version of the Kafka Raft Controller have surfaced the need to revise a few of the metadata record types that are used when Kafka is configured to run without ZooKeeper (ZK).

KIP-730: Producer ID generation in KRaft mode

With 3.0 and KIP-730 the Kafka Controller is now completely taking over the responsibility of generating a Kafka Producer ID. The Controller is doing so both in ZK and KRaft modes. This takes us closer to the bridge release, which will allow users to transition from Kafka deployments that use ZK to new deployments that use KRaft.

KIP-679: Producer will enable the strongest delivery guarantee by default

Starting with 3.0, the Kafka Producer turns on by default idempotency and the acknowledgement of delivery by all of the replicas. This makes record delivery guarantees stronger by default.

KIP-735: Increase default consumer session timeout

The default value of the Kafka Consumer’s configuration property session.timeout.ms is increased from 10 seconds to 45 seconds. This will allow the Consumer to adapt better by default to transient network failures and avoid consecutive rebalances when a Consumer appears to leave the group only temporarily.

KIP-709: Extend OffsetFetch requests to accept multiple group ids

Requesting the current offsets of a Kafka Consumer group has been possible for quite some time. But fetching the offsets of multiple consumer groups requires an individual request for each group. In 3.0 and with KIP-709, the fetch and AdminClient APIs are extended to support reading the offsets of multiple consumer groups at the same time within a single request/response.

KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time

Supporting operations that can be applied to multiple consumer groups at the same time in an efficient way heavily depends on the ability of the clients to discover the coordinators of these groups efficiently. This becomes possible with KIP-699, which adds support for discovering the coordinators for multiple groups with one request. Kafka clients have been updated to use this optimization when talking to new Kafka Brokers that support this request.

KIP-724: Drop support for message formats v0 and v1

Four years since its introduction in June 2017 with Kafka 0.11.0, message format v2 has been the default message format. Thus, with enough water (or streams if you may) having flowed under the bridge, the major release of 3.0 gives us a good opportunity to deprecate the older message formats—namely v0 and v1. These formats are rarely in use today. With 3.0, users will get a warning if they configure their Brokers to use the message formats v0 or v1. This option will be removed in Kafka 4.0 (see KIP-724 for details and implications from the deprecation of v0 and v1 message formats).

KIP-707: The future of KafkaFuture

When the KafkaFuture type was introduced to facilitate the implementation of the Kafka AdminClient, pre-Java 8 versions were still in widespread use and Java 7 was officially supported by Kafka. Fast forward to a few years later and now Kafka runs on Java versions that support the CompletionStage and CompletableFuture class types. With KIP-707, KafkaFuture adds a method to return a CompletionStage object and in that way enhances the usability of KafkaFuture in a backwards compatible way.

KIP-466: Add support for List<T> serialization and deserialization

KIP-466 adds new classes and methods for the serialization and deserialization of generic lists—a feature useful to Kafka clients and Kafka Streams alike.

KIP-734: Improve AdminClient.listOffsets to return timestamp and offset for the record with the largest timestamp

The users’ capabilities to list offsets of Kafka topic/partitions have been extended. With KIP-734, users can now ask the AdminClient to return the offset and timestamp of the record with the highest timestamp in a topic/partition. (This is not to be confused with what the AdminClient returns already as the latest offset—which is the offset of the next record to be written in the topic/partition.) This extension to the existing ListOffsets API allows users to probe the liveliness of a partition by asking which is the offset of the most recent record written and what its timestamp is.

Kafka Connect

KIP-745: Connect API to restart connector and tasks

In Kafka Connect a connector is represented during runtime as a group of a Connector class instance and one or more Task class instances, and most operations on connectors available through the Connect REST API can be applied to the group as a whole. A notable exception since the beginning has been the restart endpoints for the Connector and Task instances. To restart the connector as a whole, users had to make individual calls to restart the Connector instance and the Task instances. In 3.0, KIP-745 gives the ability to the users to restart either all or only the failed of a connector’s Connector and Task instances with a single call. This feature is an add-on capability and the previous behavior of the restart REST API remains unchanged.

KIP-738: Removal of Connect's internal converter properties

Following their deprecation in the previous major release (Apache Kafka 2.0), internal.key.converter and internal.value.converter are removed as configuration properties and prefixes in the Connect worker’s configuration. Moving forward, internal Connect topics will exclusively use the JsonConverter to store records without embedded schemas. Any existing Connect clusters that used different converters will have to port their internal topics to the new format (see KIP-738 for details on the upgrade path).

KIP-722: Enable connector client overrides by default

Since Apache Kafka 2.3.0, a Connect worker can be configured to allow connector configurations to override the Kafka client properties used by the connector. This has been a widely used feature and now with the opportunity of a major release the ability to override connector client properties is enabled by default (connector.client.config.override.policy is set to All by default).

KIP-721: Enable connector log contexts in Connect Log4j configuration

Another feature that was introduced back in 2.3.0 but hasn’t been enabled by default up to this point is connector log contexts. This is changing in 3.0 and the connector context is added by default in the pattern of log4j logs of the Connect worker. An upgrade to 3.0 from a previous release will change the format of log lines exported by log4j by adding the connector context, where appropriate.

Kafka Streams

KIP-695: Further improve Kafka Streams timestamp synchronization

KIP-695 enhances the semantics of how Streams tasks choose to fetch records, and extends the meaning and the available values of the configuration property max.task.idle.ms. This change required a new method in the Kafka Consumer API, currentLag, that is able to return the consumer lag of a specific partition if it is known locally and without contacting the Kafka Broker.

KIP-715: Expose committed offset in streams

Starting with 3.0, three new methods are added to the TaskMetadata interface: committedOffsets, endOffsets, and timeCurrentIdlingStarted. These methods can allow Streams applications to keep track of the progress and health of its tasks.

KIP-740: Clean up public API in TaskId

KIP-740 represents a significant renovation of the TaskId class. Several methods and all internal fields are deprecated, with new subtopology() and partition() getters replacing the old topicGroupId and partition fields (see also KIP-744 for relevant changes and an amendment to KIP-740).

KIP-744: Migrate TaskMetadata and ThreadMetadata to an interface with internal implementation

KIP-744 takes the changes proposed by KIP-740 one step further and separates the implementation from the public API of a number of classes. To accomplish this, the new interfaces TaskMetadata, ThreadMetadata, and StreamsMetadata are introduced while the existing classes with the same names are deprecated.

KIP-666: Add Instant-based methods to ReadOnlySessionStore

The Interactive Queries API is extended with a new set of methods in the ReadOnlySessionStore and SessionStore interfaces that accept arguments of the Instant data type. This change will affect any custom read-only Interactive Query session store implementations that will need to implement the new methods.

KIP-622: Add currentSystemTimeMs and currentStreamTimeMs to ProcessorContext

The ProcessorContext adds two new methods in 3.0, currentSystemTimeMs and currentStreamTimeMs. The new methods give users the ability to query the cached system time and the streams time respectively, and they can be used in a uniform way in production and test code.

KIP-743: Remove config value 0.10.0-2.4 of Streams built-in metrics version config

Support for the legacy metrics structure for the built-in metrics in Streams is lifted in 3.0. KIP-743 is removing the value 0.10.0-2.4 from the configuration property built.in.metrics.version. That leaves latest as the only valid value of this property at the moment (has been the default value since 2.5).

KIP-741: Change default SerDe to be null

The prior default value of the default SerDe properties is removed. Streams used to default to the ByteArraySerde. Starting with 3.0, there is no default, and users are required to either set their SerDe as needed in the API or set a default via DEFAULT_KEY_SERDE_CLASS_CONFIG and DEFAULT_VALUE_SERDE_CLASS_CONFIG in their Streams configuration. The prior default was almost always not applicable to real applications and caused more confusion than convenience.

KIP-733: Change Kafka Streams default replication factor config

With the opportunity of a major release, the default value of the Streams configuration property replication.factor changes from 1 to -1. This will allow new Streams applications to use the default replication factor defined at the Kafka Broker and therefore won’t be required to set this configuration value when they move to production. Note that Kafka Brokers version 2.5 or above are required for the new default value.

KIP-732: Deprecate eos-alpha and replace eos-beta with eos-v2

Another Streams configuration value that is deprecated in 3.0 is exactly_once as a value of the property processing.guarantee. The value exactly_once corresponds to the original implementation of Exactly Once Semantics (EOS), available to any Streams applications that connects to a Kafka cluster version 0.11.0 or newer. This first implementation of EOS has been superseded by the second implementation of EOS in Streams, which was represented by the value exactly_once_beta in the processing.guarantee property. Moving forward, the name exactly_once_beta is also deprecated and replaced by the new name exactly_once_v2. In the next major version (4.0), both exactly_once and exactly_once_beta will be removed, leaving exactly_once_v2 as the only option for EOS delivery guarantees.

KIP-725: Streamlining configurations for WindowedSerializer and WindowedDeserializer

The configuration properties default.windowed.key.serde.inner and default.windowed.value.serde.inner are deprecated in favor of a single new property windowed.inner.class.serde for use by the Kafka Consumer. Kafka Streams users are recommended to configure their windowed SerDe by passing this in to the SerDe constructor instead and then supplying the SerDe wherever it’s used in topology.

KIP-633: Deprecate 24 hour default for the grace period in Streams

In Kafka Streams, windowed operations are allowed to process records outside of their window according to a configuration property that is called the grace period. Previously, this configuration was optional and easy to miss, leading to the default of 24 hours. This was a frequent source of confusion for users of the Suppression operator, since it would buffer records until the grace period had elapsed and therefore add a 24 hour latency. In 3.0, Windows classes are enhanced with factory methods that require them to be constructed with a custom grace period or no grace period at all. The old factory methods that applied a default grace period of 24 hours have been deprecated, along with the corresponding grace() APIs which are incompatible with the new factory methods that already set this config.

KIP-623: Add "internal-topics" option to streams application reset tool

The Streams use of the application reset tool kafka-streams-application-reset becomes more flexible with the addition of a new command-line parameter: --internal-topics. The new parameter accepts a list of comma-separated topic names that correspond to internal topics that can be scheduled for deletion with this application tool. Combining this new parameter with the existing parameter --dry-run allows users to confirm which topics will be deleted and specify a subset of them if necessary before actually performing the deletion operation.


KIP-720: Deprecate MirrorMaker v1

With 3.0, the first version of MirrorMaker is being deprecated. Going forward, development of new features and major improvements will focus on MirrorMaker2 (MM2).

KIP-716: Allow configuring the location of the offset-syncs topic with MirrorMaker2

With 3.0, users can now configure where MirrorMaker2 creates and stores its internal topic that it uses to convert consumer group offsets. This will allow users of MirrorMaker2 to maintain the source Kafka cluster as a strictly read-only cluster and use a different Kafka cluster to store offset records (that being the target Kafka cluster or even a third cluster beyond the source and target clusters).


Apache Kafka 3.0 is a major step forward for the Apache Kafka project. To learn more:

This was a huge community effort, so thank you to everyone who contributed to this release, including all our users and our 141 authors and reviewers:

A. Sophie Blee-Goldman, Adil Houmadi, Akhilesh Dubey, Alec Thomas, Alexander Iskuskov, Almog Gavra, Alok Nikhil, Alok Thatikunta, Andrew Lee, Bill Bejeck, Boyang Chen, Bruno Cadonna, CHUN-HAO TANG, Cao Manh Dat, Cheng Tan, Chia-Ping Tsai, Chris Egerton, Colin P. McCabe, Cong Ding, Daniel Urban, Daniyar Yeralin, David Arthur, David Christle, David Jacot, David Mao, David Osvath, Davor Poldrugo, Dejan Stojadinović, Dhruvil Shah, Diego Erdody, Dong Lin, Dongjoon Hyun, Dániel Urbán, Edoardo Comar, Edwin Hobor, Eric Beaudet, Ewen Cheslack-Postava, Gardner Vickers, Gasparina Damien, Geordie, Greg Harris, Gunnar Morling, Guozhang Wang, Gwen (Chen) Shapira, Ignacio Acuña Frías, Igor Soarez, Ismael Juma, Israel Ekpo, Ivan Ponomarev, Ivan Yurchenko, Jason Gustafson, Jeff Kim, Jim Galasyn, Jim Hurne, JoelWee, John Gray, John Roesler, Jorge Esteban Quilcate Otoya, Josep Prat, José Armando García Sancio, Juan Gonzalez-Zurita, Jun Rao, Justin Mclean, Justine Olshan, Kahn Cheny, Kalpesh Patel, Kamal Chandraprakash, Konstantine Karantasis, Kowshik Prakasam, Leah Thomas, Lee Dongjin, Lev Zemlyanov, Liu Qiang, Lucas Bradstreet, Luke Chen, Manikumar Reddy, Marco Aurelio Lotz, Matthew de Detrich, Matthias J. Sax, Michael G. Noll, Michael Noll, Mickael Maison, Nathan Lincoln, Niket Goel, Nikhil Bhatia, Omnia G H Ibrahim, Peng Lei, Phil Hardwick, Rajini Sivaram, Randall Hauch, Rohan Desai, Rohit Deshpande, Rohit Sachan, Ron Dagostino, Ryan Dielhenn, Ryanne Dolan, Sanjana Kaundinya, Sarwar Bhuiyan, Satish Duggana, Scott Hendricks, Sergio Peña, Shao Yang Hong, Shay Elkin, Stanislav Vodetskyi, Sven Erik Knop, Tom Bentley, UnityLung, Uwe Eisele, Vahid Hashemian, Valery Kokorev, Victoria Xia, Viktor Somogyi-Vass, Viswanathan Ranganathan, Vito Jeng, Walker Carlson, Warren Zhu, Xavier Léauté, YiDing-Duke, Zara Lim, Zhao Haiyuan, bmaidics, cyc, dengziming, feyman2016, high.lee, iamgd67, iczellion, ketulgupta1995, lamberken, loboya~, nicolasguyomar, prince-mahajan, runom, shenwenbing, thomaskwscott, tinawenqiao, vamossagar12, wenbingshen, wycccccc, xjin-Confluent, zhaohaidao

Monday April 19, 2021

What’s New in Apache Kafka 2.8.0

I'm proud to announce the release of Apache Kafka 2.8.0 on behalf of the Apache Kafka® community. The 2.8.0 release contains many new features and improvements. This blog post highlights some of the more prominent ones. Be sure to see the release notes for the full list of changes. You can also watch the release video for a summary of what's new.

This release offers an early access version of KIP-500, which allows you to run Kafka brokers without Apache ZooKeeper, instead depending on an internal Raft implementation. This highly anticipated architectural improvement enables support for more partitions per cluster, simpler operation, and tighter security.

Kafka broker, producer, and consumer

KIP-500: Replace ZooKeeper with a self-managed quorum

We are excited to announce that 2.8 introduces an early-access look at Kafka without ZooKeeper! The implementation is not yet feature complete and should not be used in production, but it is possible to start new clusters without ZooKeeper and go through basic produce and consume use cases.

At a high level, KIP-500 works by moving topic metadata and configurations out of ZooKeeper and into a new internal topic named @metadata. This topic is managed by an internal Raft quorum of "controllers" and is replicated to all brokers in the cluster. The leader of the Raft quorum serves the same role as the controller in clusters today. A node in the KIP-500 world can serve as a controller, a broker, or both, depending on the new process.roles configuration. See the README for quickstart instructions and additional details.

This release has been a massive effort by the community over the past year, and this will continue over the course of this year. We expect significant improvements when it comes to feature completeness and hardening by the mid-year and end of year releases. Here is a quick look at the most significant KIPs that have been merged:

  • KIP-500: lays out the vision for an event-driven model for managing metadata with a replicated log managed with the Raft protocol
  • KIP-595: specifies the Raft protocol, which is used for the @metadata topic
  • KIP-631: specifies the event-driven controller model, including the new broker lifecycle and the metadata record schemas
  • KIP-590: specifies a new protocol to allow forwarding client requests from brokers to the controller

KIP-700: Add Describe Cluster API

The Kafka AdminClient has historically used the broker's Metadata API to get information about the cluster. However, the Metadata API is primarily focused on supporting the consumer and producer client, which follow different patterns than the AdminClient. KIP-700 decouples the AdminClient from the Metadata API by adding a new API to directly query the brokers for information about the cluster. This change enables the addition of new admin features in the future without disruption to the producer and consumer.

KIP-684: Support mutual TLS authentication on SASL_SSL listeners

Historically, Kafka disabled TLS client authentication (also known as mutual TLS authentication) for SASL_SSL listeners even if ssl.client.auth was configured. This behaviour was introduced at a time when this configuration option could only be configured broker-wide. In the common case where SASL_SSL used SASL authentication without requiring key store distribution, enforcing TLS client authentication for SASL_SSL clients was not desirable.

KIP-684 allows you to combine TLS authentication with SASL-based client identity on a per-listener basis. This is important for organizations where mutual TLS authentication is mandatory. KIP-684 builds on the listener-prefixed configuration options introduced by KIP-103.

KIP-676: Respect logging hierarchy

Log4j uses a hierarchical model for configuring loggers within an application. Each logger's name is delimited by periods (.), which are treated as levels in the logger hierarchy. Individual loggers and intermediate hierarchy levels can both be configured (for example, to enable debug logging). If an individual logger is not explicitly configured, it inherits the configuration of its nearest ancestor, all the way up to the root logger, which is the common ancestor of all loggers in the system. Historically, the Kafka broker's APIs for viewing log levels did not respect this hierarchy, instead reporting only the root logger's configuration for any unconfigured individual logger. KIP-676 corrects this behavior by instead resolving the logger configurations the same way that the logging framework does.

KIP-673: Emit JSONs with new auto-generated schema

Kafka brokers offer debug-level request/response logs. Previously, they were semi-structured logs produced by the request/response classes' toString override. KIP-673 adjusts these logs to be JSON structured so that they can more easily be parsed and used by logging toolchains.

KIP-612: Limit broker connection creation rate

Creating a new connection adds overhead to the broker. KIP-306 mitigated the issue of connection storms due to unauthorized connections. However, connection storms may also come from authorized clients. To make it easier for you to ensure the stability of the brokers, KIP-612 adds the ability to set a limit on the rate at which the broker accepts new connections, both overall and per IP address.

KIP-516: Topic identifiers

Previously, topics in Kafka were identified solely by their name. KIP-516 introduces topic IDs to uniquely identify topics. Topic IDs are unique throughout their lifetime, even beyond deletion of the corresponding topic. They are also a more efficient representation on the wire and in memory compared to topic names.

Starting in 2.8.0, existing and new topics will be given topic IDs. Clients can now receive topic IDs through metadata responses. Work for this KIP is still ongoing, and future releases will include support for more efficient deletes, as well as adding topic IDs to other requests.

Kafka Connect

KIP-661: Expose task configurations in Connect REST API

Kafka Connect exposes a REST API allowing callers to view the configuration of running connectors. This is very useful, as it allows you to determine the workload of connectors. In Connect, connectors process the configuration before actually beginning execution and in some cases specialize and map this configuration to each individual task that will perform the actual work of transferring data to or from Kafka. You have historically been able to view the nominal configuration, but not the actual resolved configurations used by the running tasks. KIP-661 adds a new API endpoint and method to allow callers to retrieve the actual runtime configuration of a connector's tasks. This can be used for debugging but also for understanding the impact of failures (for example, a task crashing).

Kafka Streams

KIP-696: Update Streams FSM to clarify ERROR state meaning

Kafka Streams exposes a state machine to help you to reason about the state of your applications in logs and metrics, as well as to trigger user-defined behavior on state transitions. This state machine contains an "ERROR" state that has historically meant that all threads have died. Until KIP-663, there was no way to replace dead threads, so ERROR was a terminal state. However, with the addition of recent resilience improvements (KIP-663 and KIP-671 below), having no running threads no longer clearly indicates an ERROR, nor is it terminal. Regardless, applications can still experience fatal errors, and you still need to know when this happens. KIP-696 updates the state machine to grant ERROR a more specific meaning, namely that it is a terminal state indicating that the application has experienced a fatal ERROR.

KIP-689: Extend StreamJoined to allow more store configs

Kafka Streams offers the StreamJoined config object to set various configuration options for join operations. KIP-689 adds the ability to control the settings of the changelog topics that make the join state durable. The default configuration is still appropriate for most applications, but advanced operators need the ability to tune the configurations of the internal topics that support stream processing.

KIP-680: TopologyTestDriver should not require a properties argument

Kafka Streams offers the TopologyTestDriver runtime, which supports testing entire Streams applications in a fast, single-threaded, deterministic environment without running any extra components (like brokers or ZooKeeper). The constructor of TopologyTestDriver was designed to mirror the constructor of KafkaStreams (the main runtime): It takes the application itself (a topology) as one argument and the configuration as a second argument.

Historically, TopologyTestDriver enforced the same required configs as KafkaStreams, including a broker connection string and an application identifier, even though these configurations are meaningless for TopologyTestDriver. Starting in 2.8.0, these boilerplate configurations are no longer required, and KIP-680 simplifies the common case by adding new constructor overloads that do not require a configuration argument at all. The main constructor is still available, so your current tests will continue to work, and you can still use the main constructor if you need to specify extra configuration options.

KIP-671: Introduce Kafka-Streams-specific uncaught exception handler

Kafka Streams encapsulates complex logic, including both user- and system-defined code, I/O operations, multi-threading, etc., all of which offer any number of opportunities to encounter an unexpected exception. Before, Kafka Streams adopted the safe and simple approach of throwing the exceptions up to the top level, which would ultimately kill the relevant execution thread. For visibility, Streams exposed the native Java thread's ability to register an UncaughtExceptionHandler. In practice, many use cases require more than just visibility when a thread dies.

KIP-671 adds a new handler (StreamsUncaughtExceptionHandler), which offers the same level of visibility while also providing a mechanism to replace the dead thread (if you desire more resilience) or shut down the system (either all threads in the current instance or all instances in the cluster), in case you prefer to fail fast. The handler allows the selection of different actions, depending on the actual exception.

KIP-663: API to start and shut down Streams threads

Kafka Streams applications are structured as a cluster of instances, each with some number of execution threads. The number of threads is configured at startup. Under heavy load, you may wish to experiment with increasing or decreasing the number of threads on an instance in order to better utilize system resources or reduce bottlenecks. Previously, this involved stopping, reconfiguring, and restarting each instance. KIP-663 adds new methods to the KafkaStreams interface, which allows you to individually add and remove processing threads without disrupting the other threads running on the same instance.

KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

One of the operations that Kafka Streams provides is the ability to window an input record stream for aggregation. For example, when computing the number of updates for each key per hour, the window size is one hour. The window size is defined as part of the stream processing logic in the Streams DSL, and Kafka Streams automatically configures the serializer and deserializer necessary to store and retrieve these windows from local storage and Kafka topics. Because the window size itself is fixed and known for a particular operation, the serializer and deserializer contain a space optimization, storing only the window's start timestamp (as the end can be computed by adding the window size to the start time).

Occasionally, you need to directly load serialized records, for example, when debugging an application or verifying an intermediate processing phase. To support these use cases, KIP-659 gives callers a way to directly configure the deserializer (TimeWindowedDeserializer) with the window size, in much the same way that Streams configures its own internal deserializer for the same data.

KIP-572: Improve timeouts and retries in Kafka Streams

KIP-572 was partially implemented in Apache Kafka 2.7.0 and completed in 2.8.0. This KIP adds a new retry behavior to fill an important resilience gap in running Kafka Streams applications. Many of Streams's functions rely on remote calls, for example, to Kafka brokers. As with any network call, these operations are subject to arbitrary errors and delays.

The Kafka client libraries that Streams relies on have their own resilience settings, which can help to smooth out minor network disruptions, but setting the clients to be too resilient means that any client API call may block for a long time, which affects the overall stability of the application. On the other hand, setting these client timeouts too short would lead to applications crashing during minor network outages. KIP-572 adds a higher-level retry loop. Now, when Streams encounters a timeout exception while processing a task, it will attempt to make progress on other tasks before retrying the failed one.


Apache Kafka 2.8.0 has a lot of great fixes and improvements in addition to the KIPs listed here.

For next steps:

This was a huge community effort, so thank you to everyone who contributed to this release, including all our users and our 151 authors and reviewers:

17hao, abc863377, Adem Efe Gencer, akumar, Alexander Iskuskov, Alexandre Dupriez, Almog Gavra, Alok Nikhil, Anastasia Vela, Andrew Choi, Andrey Bozhko, Andrey Falko, Andy Coates, Andy Wilkinson, Ankit Kumar, Anna Povzner, Anna Sophie Blee-Goldman, APaMio, Arjun Satish, ArunParthiban-ST, Attila Sasvari, Benoit Maggi, bertber, Bill Bejeck, Bob Barrett, Boyang Chen, Brajesh Kumar, Brian Byrne, Bruno Cadonna, Cheng Tan, Chia-Ping Tsai, Chris Egerton, CHUN-HAO TANG, Colin Patrick McCabe, Cyrus Vafadari, David Arthur, David Jacot, David Mao, dengziming, Dhruvil Shah, Dima Reznik, Dongjoon Hyun, Dongxu Wang, Edoardo Comar, Emre Hasegeli, Ewen Cheslack-Postava, feyman2016, fml2, Gardner Vickers, Geordie, Govinda Sakhare, Greg Harris, Guozhang Wang, Gwen Shapira, Hamza Slama, high.lee, huxi, Igor Soarez, Ilya Ganelin, Ismael Juma, Israel Ekpo, Ivan Ponomarev, Ivan Yurchenko, jackyoh, Jakob Homan, James Cheng, James Yuzawa, Jason Gustafson, Jesse Gorzinski, Jim Galasyn, John Roesler, Jorge Esteban Quilcate Otoya, José Armando García Sancio, Jose Sancio, Julien Chanaud, Julien Jean Paul Sirocchi, Jun Rao, Justine Olshan, Kengo Seki, Konstantine Karantasis, Kowshik Prakasam, leah, Leah Thomas, Lee Dongjin, Levani Kokhreidze, Lev Zemlyanov, Liju John, limengmonty, Lincong Li, Lucas Bradstreet, Luke Chen, Manikumar Reddy, Marco Aurelio Lotz, mathieu, Matthew Wong, Matthias J. Sax, Matthias Merdes, Michael Bingham, Michael G. Noll, Mickael Maison, Montyleo, mowczare, Nigel Liang, Nikhil Bhatia, Nikolay Izhikov, Ning Zhang, Nitesh Mor, notifygd, Okada Haruki, Oliver Dineen, panguncle, parafiend, Patrick Dignan, Prateek Agarwal, Prithvi, Rajini Sivaram, Raman Verma, Ramesh Krishnan M, Rameshkrishnan Muthusamy, Randall Hauch, Richard Fussenegger, Rohan Desai, Rohit Deshpande, Ron Dagostino, Ryanne Dolan, Samuel Cantero, Sanjana Kaundinya, Sanket Fajage, Satish Duggana, Scott Hendricks, Scott Sugar, Shao Yang Hong, shenwenbing, ssugar, Stanislav Kozlovski, Stanislav Vodetskyi, Taisiia Goltseva, tang7526, Thorsten Hake, Tom Bentley, vamossagar12, Victoria Xia, Viktor Somogyi-Vass, voffcheg109, Walker Carlson, wenbingshen, William Hammond, wycccccc, xakassi, Xavier Léauté, Yilong Chang, zhangyue19921010

Monday December 21, 2020

What’s New in Apache Kafka 2.7.0

I'm proud to announce the release of Apache Kafka 2.7.0 on behalf of the Apache Kafka® community. The 2.7.0 release contains many new features and improvements. This blog post highlights some of the more prominent ones. Be sure to see the release notes for the full list of changes. You can also watch the release video for a summary of what’s new.

In this release, we’ve continued steady progress toward the task of replacing ZooKeeper in Kafka with KIP-497, which adds a new inter-broker API for altering the in-sync replica (ISR). This release also provides the addition of the Core Raft Implementation as part of KIP-595. Now there is a separate "raft" module containing the core consensus protocol. Until integration with the controller (the broker in the Kafka cluster responsible for managing the state of partitions and replicas) is complete, there is a standalone server that you can use for testing the performance of the Raft implementation.

Of course, there are additional efforts underway toward replacing Zookeeper, with seven KIPs in active development to provide support for more partitions per cluster, simpler operation, and tighter security.

Tiered Storage work continues and unlocks infinite scaling and faster rebalance times via KIP-405.

Kafka broker, producer, and consumer

KIP-654: Aborted transaction with non-flushed data should throw a non-fatal exception

When a Java client producer aborts a transaction with any non-flushed (pending) data, a fatal exception is thrown. But aborting a transaction with pending data is in fact considered a normal situation. The thrown exception should be to notify you that records aren’t being sent, not that the application is in an unrecoverable state. KIP-654 introduces a new exception TransactionAbortedException, allowing you to retry if desired.

KIP-651: Support PEM format for private keys and SSL certificates and private key

Currently, Kafka only supports JKS or PKCS12 file-based key and trust stores when using SSL. While it’s no longer a standard for email, the Privacy-Enhanced Mail (PEM) is a standard format for storing and distributing cryptographic keys and certificates. KIP-651 adds support for PEM files for key and trust stores, allowing the use of third party providers relying on the PEM format.

KIP-612: Ability to limit connection creation rate on brokers

Creating connections adds CPU overhead to the broker. Connection storms can come from seemingly well-behaved clients and can stop the broker from performing other useful work. But now there is now a way of enforcing broker-wide and per-listener connection creation rates. The 2.7.0 release contains the first part of KIP-612, with per-IP connections rate limits expected to come in the 2.8.0 release.

KIP-599: Throttle create topic, create partition, and delete topic operations

The APIs to create topics, create partitions, and delete topics are operations that have a direct impact on the overall load in the Kafka controller. To prevent a cluster from being overwhelmed due to high concurrent topic and partition creations or topic deletions, there is a new quota limiting these operations. See KIP-599 for more details.

KIP-584: Versioning scheme for features

Apart from broker-client compatibility (for which Kafka has a strong record to ensure they do remain compatible), there are two main questions when new features become available in Kafka:

  1. How do Kafka clients become aware of broker capabilities?

  2. How does the broker decide which features to enable?

KIP-584 provides a flexible and operationally friendly solution for client discovery, feature gating, and rolling upgrades using a single restart.

KIP-554: Add broker-side SCRAM configuration API

With KIP-554, SCRAM credentials can be managed via the Kafka protocol and the kafka-configs tool was updated to use the newly introduced protocol APIs. This is another important step towards KIP-500 where ZooKeeper is replaced by a built in quorum.

KIP-497: Add inter-broker API to alter ISR

Currently, Kafka partition leader and ISR information is stored in ZooKeeper. Either the controller or a partition leader may update this information. Because either can update this state, there needs to be a mechanism for sharing this information, which can cause delays in reflecting ISR changes. The impact of these delays means that metadata requests may receive stale information.

In the 2.7.0 release, there is a new AlterIsr API, which gives the controller the exclusive ability to update the state of partition leaders and ISR. The chief benefit of this new API is that metadata requests will always reflect the latest state.

The addition of this API is a significant step forward in the process of removing ZooKeeper and the completion of KIP-500. For more information, see KIP-497.

KIP-431: Print additional fields from records with the ConsoleConsumer

Now you can print the headers on a ConsumerRecord with the ConsoleConsumer. See KIP-431 for more details.

Kafka Connect

KIP-632: Add DirectoryConfigProvider

KIP-632 adds a DirectoryConfigProvider class to support users needing to provide secrets for keys stored in a container filesystem, such as a Kubernetes environment.

Kafka Streams

KIP-662: Throw exception when source topics of Kafka Streams application is deleted

Today, if a user deletes the source topic of a running Kafka Streams application, the embedded consumer clients gracefully shut down. This client shutdown triggers rebalancing until all StreamThreads of the Streams application gracefully exit, leaving the application completely shut down without any chance to respond to the error. With the addition of KIP-662, when a user deletes a source topic from a running Streams application, the app throws a MissingSourceTopicException, allowing for you to react to the error.

KIP-648: Renaming getter method for interactive queries

KIP-648 changes the getter methods for interactive query objects to follow the Kafka format of not using the get prefix.

KIP-617: Allow Kafka Streams state stores to be iterated backwards

Currently, when using an iterator over a Kafka Streams state store, you can only traverse elements from oldest to newest. When iterating over a windowed state store and the user desires to return the latest N records, there is no choice but to use the inefficient approach of traversing all the oldest records before getting to the desired newer records. KIP-617 adds support for iteration over a state store in reverse. Iterating in reverse makes a latest N records retrieval much more efficient.

KIP-616: Rename implicit SerDes instances in kafka-streams-scala

Kafka Streams now how better Scala implicit Serdes support with KIP-616.

KIP-613: Add end-to-end latency metrics to Kafka Streams

Currently, the actual end-to-end latency of a record flowing through Kafka Streams is difficult to gauge at best. Kafka Streams now exposes end-to-end metrics, which will be a great help for enabling users to make design choices. See KIP-613 for more information.

KIP-607: Add metrics to Kafka Streams to report properties of RocksDB

The current metrics exposed by Kafka Streams for RocksDB do not include information on memory or disk usage. Now in 2.7.0, Kafka Streams reports properties RocksDB exposes by default. See KIP-607 for more details.

KIP-450: Sliding window aggregations in the DSL

Kafka Streams implements session windows, tumbling windows, and hopping windows as windowed aggregation methods. While hopping windows with a small advance time can imitate the behavior of a sliding window, this implementation’s performance is poor because it results in many overlapping and often redundant windows that require expensive calculations. With the addition of sliding windows via KIP-450, Kafka Streams now provides an efficient way to perform sliding aggregations.


To learn more about what’s new in Apache Kafka 2.7 and to see all the KIPs included in this release, be sure to check out the release notes and the highlights in the release video.

To download Apache Kafka 2.7.0, visit the project's download page.

Of course this release would not be possible without a huge effort from the community. A big thank you to everyone involved in this release, including the following 116 people (according to git shortlog) who contributed either code or documentation:

A. Sophie Blee-Goldman, Chia-Ping Tsai, John Roesler, David Jacot, Jason Gustafson, Matthias J. Sax, Bruno Cadonna, Ismael Juma, Guozhang Wang, Rajini Sivaram, Luke Chen, Boyang Chen, Tom Bentley, showuon, leah, Bill Bejeck, Chris Egerton, Ron Dagostino, Randall Hauch, Xavier Léauté, Kowshik Prakasam, Konstantine Karantasis, David Arthur, Mickael Maison, Colin Patrick McCabe, huxi, Nikolay, Manikumar Reddy, Jorge Esteban Quilcate Otoya, Vito Jeng, bill, Bob Barrett, vinoth chandar, feyman2016, Jim Galasyn, Greg Harris, khairy, Sanjana Kaundinya, Ning Zhang, Aakash Shah, Andras Katona, Andre Araujo, Andy Coates, Anna Povzner, Badai Aqrandista, Brian Byrne, Dima Reznik, Jeff Kim, John Thomas, Justine Olshan, Lee Dongjin, Leonard Ge, Lucas Bradstreet, Mario Molina, Michael Bingham, Rens Groothuijsen, Stanislav Kozlovski, Yuriy Badalyantc, Levani Kokhreidze, Lucent-Wong, Gokul Srinivas, Mandar Tillu, Gal Margalit, tswstarplanet, Evelyn Bayes, Micah Paul Ramos, vamossagar12, Ego, Navina Ramesh, Nikhil Bhatia, Edoardo Comar, Nikolay Izhikov, Dhruvil Shah, Nitesh Mor, Noa Resare, David Mao, Raman Verma, Cheng Tan, Adam Bellemare, Richard Fussenegger, Rob Meng, Rohan, Can Cecen, Benoit Maggi, Sasaki Toru, Shaik Zakir Hussain, Shailesh Panwar, Sharath Bhat, voffcheg109, Thorsten Hake, Auston, Vikas Singh, Ashish Roy, Arjun Satish, xakassi, Zach Zhang, albert02lowis, Antony Stubbs, Ankit Kumar, gnkoshelev, high.lee, huangyiming, Andrew Egelhofer, jeff kim, jiameixie, Andrew Choi, JoelWee, Jesse Gorzinski, Alex Diachenko, Ivan Yurchenko, manijndl7, Igor Soarez, Gonzalo Muñoz, sbellapu, serjchebotarev, Adem Efe Gencer

Thursday August 06, 2020

What’s New in Apache Kafka 2.6.0

On behalf of the Apache Kafka® community, it is my pleasure to announce the release of Apache Kafka 2.6.0. The community has created another exciting release with many new features and improvements. We’ll highlight some of the more prominent features in this blog post, but see the release notes for the full list of changes.

We’ve made quite a few significant performance improvements in this release, particularly when the broker has larger partition counts. Broker shutdown performance is significantly improved, and performance is dramatically improved when producers use compression. Various aspects of ACL usage are faster and require less memory. And we’ve reduced memory allocations in several other places within the broker.

This release also adds support for Java 14. And over the past few releases, the community has switched to using Scala 2.13 by default and now recommends using Scala 2.13 for production.

Finally, these accomplishments are only one part of a larger active roadmap in the run up to Apache Kafka 3.0, which may be one of the most significant releases in the project’s history. The work to replace Zookeeper with built-in Raft-based consensus is well underway with eight KIPs in active development. Kafka’s new Raft protocol for the metadata quorum is already available for review. Tiered Storage unlocks infinite scaling and faster rebalance times via KIP-405, and is up and running in internal clusters at Uber.

Kafka broker, producer, and consumer

KIP-546: Add Client Quota APIs to the Admin Client

Managing quotas today in Kafka can be challenging because they can map to any combination of user and client. This feature adds a native API for managing quotas, making the process more intuitive and less error prone. A new kafka-client-quotas.sh command line tool lets users describe existing quotas, resolve the effective quotas for an entity with contextual information about how those quotas were derived, and modify a quota configuration entry by specifying which entries to add, update, and/or remove. For example:

$ /bin/kafka-client-quotas.sh --bootstrap-server localhost:9092 \
                              --alter --names=client-id=my-client \
                              --defaults=user \
                              --add=consumer_byte_rate=2000000 \

See KIP-546 for more details.

KIP-551: Expose disk read and write metrics

Disk access on the Kafka broker machines may impact latency and throughput. This change adds metrics that track how many bytes Kafka is reading and writing from the disk.

See KIP-551 for more details.

KIP-568: Explicit rebalance triggering on the Consumer

The Kafka consumer coordinates which topic partitions are assigned to each client in the same consumer group. This feature allows applications using the consumer to explicitly trigger a rebalance, such as if an application uses some system condition to determine whether it is ready to receive partitions.

See KIP-568 for more details.

KIP-573: Enable TLSv1.3 by default

TLS 1.3 is now the default TLS protocol when using Java 11 or higher, and TLS 1.2 remains the default for earlier Java versions. As with Apache Kafka 2.5.0, TLS 1.0 and 1.1 are disabled by default due to known security vulnerabilities, though users can still enable them if required.

See KIP-573 for more details.

KIP-574: CLI Dynamic Configuration with file input

Kafka configs for the most part are defined by a single value that maps to a config name. Before this change, it was hard to set configs that are better defined by more complex structures such as nested lists or JSON. Kafka now supports using the kafka-configs.sh command line tool to set configs defined in a file. For example:

$ bin/kafka-configs.sh --bootstrap-server localhost:9092 \
                       --entity-type brokers --entity-default \
                       --alter --add-config-file new.properties

See KIP-574 for more details.

KIP-602: Change default value for client.dns.lookup

Apache Kafka 2.1.0 and KIP-302 introduced the use_all_dns_ips option for the client.dns.lookup client property. With this change, the use_all_dns_ips option is now the default so that it will attempt to connect to the broker using all of the possible IP addresses of a hostname. The new default will reduce connection failure rates and is more important in cloud and containerized environments where a single hostname may resolve to multiple IP addresses.

See KIP-602 for more details.

Kafka Connect

KIP-158: Kafka Connect should allow source connectors to set topic-specific settings for new topics

This widely requested feature allows Kafka Connect to automatically create Kafka topics for source connectors that write records, if those topics do not yet exist. This is enabled by default but does require connector configurations to define the rules used by Connect when creating these topics. For example, simply including the following will cause Connect to create any missing topics with 5 partitions and a replication factor of 3:

Additional rules with topic matching expressions and topic-specific settings can be defined, making this a powerful and useful feature, especially when Kafka brokers have disabled topic auto creation.

See KIP-158 for more details.

KIP-605: Expand Connect Worker Internal Topic Settings

Speaking of creating topics, the Connect worker configuration can now specify additional topic settings, including using the Kafka broker defaults for partition count and replication factor, for the internal topics used for connector configurations, offsets, and status.

See KIP-605 for more details.

KIP-610: Error Reporting in Sink Connectors

Kafka Connect already had the ability to write records to a dead letter queue (DLQ) topic if those records could not be serialized or deserialized, or when a Single Message Transform (SMT) failed. Now Connect gives sink connectors the ability to send individual records to the DLQ if the connector deems the records to be invalid or problematic. Sink connectors need to explicitly make use of this feature, but doing so will allow sink connectors to continue operating even if some records in the consumed topics are somehow incompatible with the sink connector.

See KIP-610 for more details.

KIP-585: Filter and Conditional SMTs

Defining SMTs for connectors that use multiple topics can be challenging, since not every SMT may apply for every record on every topic. With this feature, each SMT can define a predicate with the conditions when that SMT should be applied. It also defines a “filter” SMT that works with the predicates to drop records that match certain conditions.

See KIP-585 for more details.

KIP-577: Allow HTTP Response Headers to be Configured for Kafka Connect

It is now possible to add custom headers to all Kafka Connect REST API responses. This allows users to ensure REST API responses comply with corporate security policies.

See KIP-577 for more details.

Kafka Streams

KIP-441: Smooth Scaling Out of Kafka Streams

Prior to this change, when Kafka Streams assigns a stateful task, Streams had to catch it up to the head of its changelog before beginning to process it. This feature avoids stop-the-world rebalances by allowing the prior owner of a stateful task to keep it even if the assignment is unbalanced, until the new owner gets caught up, then changing ownership after the catch-up phase.

See KIP-441 for more details.

KIP-444: Augment metrics for Kafka Streams

This feature adds more out-of-the-box metrics and removes some that are not useful. It also improves the APIs that Streams applications use to register custom metrics.

See KIP-444 for more details.

KIP-447: Producer scalability for exactly once semantics

This release adds additional work on this KIP to simplify the API for applications that read from and write to Kafka transactionally. Previously, this use case typically required separate producer instances for each input partition, but now there is no special requirement. This makes it much easier to build exactly-once semantics (EOS) applications that consume large numbers of partitions. This is foundational for a similar improvement in Kafka Streams in the next release.

See KIP-447 for more details.

KIP-557: Add emit on change support for Kafka Streams

This change adds an emit-on-change processing option to Kafka Streams and complements the existing emit-on-update and emit-on-window-close options. This new option drops idempotent updates where the prior and updated record have identical byte arrays. This feature helps eliminate high numbers of identical operations that forward an enormous number of unnecessary results down the topology.

See KIP-557 for more details.


To learn more about what’s new in Apache Kafka 2.6 and to see all the KIPs included in this release, be sure to check out the release notes and highlights video.

To download Apache Kafka 2.6.0, visit the project's download page.

This was a huge community effort, so thank you to everyone who contributed to this release, including all of our users and the 127 people (according to git shortlog) that contributed code or documentation changes in this release:

17hao, A. Sophie Blee-Goldman, Aakash Shah, Adam Bellemare, Agam Brahma, Alaa Zbair, Alexandra Rodoni, Andras Katona, Andrew Olson, Andy Coates, Aneel Nazareth, Anna Povzner, Antony Stubbs, Arjun Satish, Auston, avalsa, Badai Aqrandista, belugabehr, Bill Bejeck, Bob Barrett, Boyang Chen, Brian Bushree, Brian Byrne, Bruno Cadonna, Charles Feduke, Chia-Ping Tsai, Chris Egerton, Colin Patrick McCabe, Daniel, Daniel Beskin, David Arthur, David Jacot, David Mao, dengziming, Dezhi “Andy” Fang, Dima Reznik, Dominic Evans, Ego, Eric Bolinger, Evelyn Bayes, Ewen Cheslack-Postava, fantayeneh, feyman2016, Florian Hussonnois, Gardner Vickers, Greg Harris, Gunnar Morling, Guozhang Wang, high.lee, Hossein Torabi, huxi, Ismael Juma, Jason Gustafson, Jeff Huang, jeff kim, Jeff Widman, Jeremy Custenborder, Jiamei Xie, jiameixie, jiao, Jim Galasyn, Joel Hamill, John Roesler, Jorge Esteban Quilcate Otoya, José Armando García Sancio, Konstantine Karantasis, Kowshik Prakasam, Kun Song, Lee Dongjin, Leonard Ge, Lev Zemlyanov, Levani Kokhreidze, Liam Clarke-Hutchinson, Lucas Bradstreet, Lucent-Wong, Magnus Edenhill, Manikumar Reddy, Mario Molina, Matthew Wong, Matthias J. Sax, maulin-vasavada, Michael Viamari, Michal T, Mickael Maison, Mitch, Navina Ramesh, Navinder Pal Singh Brar, nicolasguyomar, Nigel Liang, Nikolay, Okada Haruki, Paul, Piotr Fras, Radai Rosenblatt, Rajini Sivaram, Randall Hauch, Rens Groothuijsen, Richard Yu, Rigel Bezerra de Melo, Rob Meng, Rohan, Ron Dagostino, Sanjana Kaundinya, Scott, Scott Hendricks, sebwills, Shailesh Panwar, showuon, SoontaekLim, Stanislav Kozlovski, Steve Rodrigues, Svend Vanderveken, Sönke Liebau, THREE LEVEL HELMET, Tom Bentley, Tu V. Tran, Valeria, Vikas Singh, Viktor Somogyi, vinoth chandar, Vito Jeng, Xavier Léauté, xiaodongdu, Zach Zhang, zhaohaidao, zshuo, 阿洋

Thursday April 16, 2020

What's New in Apache Kafka 2.5.0

On behalf of the Apache Kafka® community, it is my pleasure to announce the release of Apache Kafka 2.5.0. The community has created another exciting release.

We are making progress on KIP-500 and have added new metrics and security features, among other improvements. This blog post goes into detail on some of the added functionality, but to get a full list of what’s new in this release, please see the release notes.

Kafka broker, producer, and consumer

KIP-500 update

In Apache Kafka 2.5, some preparatory work has been done towards the removal of Apache ZooKeeper™ (ZK).

  • KIP-555: details about the ZooKeeper deprecation process in admin tools
  • KIP-543: dynamic configs will not require ZooKeeper access

Exactly once semantics (EOS) – Foundational improvements

KIP-447: Producer scalability for exactly once semantics

This KIP simplifies the API for applications that read from and write to Kafka transactionally. Previously, this use case typically required separate producer instances for each input partition, but now there is no special requirement. This makes it much easier to build EOS applications that consume large numbers of partitions. This is foundational for a similar improvement in Kafka Streams in the next release.

See KIP-447 for more details.

KIP-360: Improve reliability of idempotent/transactional producer

This KIP addresses a problem with producer state retention on the broker, which is what makes the idempotence guarantee possible. Previously, when the log was truncated to enforce retention or truncated from a call to delete records, the broker dropped producer state, which led to UnknownProducerId errors. With this improvement, the broker instead retains producer state until expiration. This KIP also gives the producer a powerful way to recover from unexpected errors.

See KIP-360 for more details.

Metrics and operational improvements

KIP-515: Enable ZK client to use the new TLS supported authentication (ZK 3.5.7)

Apache Kafka 2.5 now ships ZooKeeper 3.5.7. One feature of note is the newly added ZooKeeper TLS support in ZooKeeper 3.5. When deploying a secure Kafka cluster, it’s critical to use TLS to encrypt communication in transit. Apache Kafka 2.4 already ships with ZooKeeper 3.5, which adds TLS support between the broker and ZooKeeper. However, configuration information has to be passed via system properties as -D command line options on the Java invocation of the broker or CLI tool (e.g., zookeeper-security-migration), which is not secure. KIP-515 introduces the necessary changes to enable the use of secure configuration values for using TLS with ZooKeeper.

ZooKeeper 3.5.7 supports both mutual TLS authentication via its ssl.clientAuth=required configuration value and TLS encryption without client certificate authentication via ssl.clientAuth=none.

See KIP-515 for more details.

KIP-511: Collect and Expose Client’s Name and Version in the Brokers

Previously, operators of Apache Kafka could only identify incoming clients using the clientId field set on the consumer and producer. As this field is typically used to identify different applications, it leaves a gap in operational insight regarding client software libraries and versions. KIP-511 introduces two new fields to the ApiVersionsRequest RPC: ClientSoftwareName and ClientSoftwareVersion.

These fields are captured by the broker and reported through a new set of metrics. The metric MBean pattern is:


For example, the Apache Kafka 2.4 Java client produces the following MBean on the broker:


See KIP-511 for more details.

KIP-559: Make the Kafka Protocol Friendlier with L7 Proxies

This KIP identifies and improves several parts of our protocol, which were not fully self-describing. Some of our APIs have generic bytes fields, which have implicit encoding. Additional context is needed to properly decode these fields. This KIP addresses this problem by adding the necessary context to the API so L7 proxies can fully decode our protocols.

See KIP-559 for more details.

KIP-541: Create a fetch.max.bytes configuration for the broker

Kafka consumers can choose the maximum number of bytes to fetch by setting the client-side configuration fetch.max.bytes. Too high of a value may degrade performance on the broker for other consumers. If the value is extremely high, the client request may time out. KIP-541 centralizes this configuration with a broker setting that puts an upper limit on the maximum number of bytes that the client can choose to fetch.

See KIP-541 for more details.

Kafka Connect

KIP-558: Track a connector’s active topics

During runtime, it’s not easy to know the topics a sink connector reads records from when a regex is used for topic selection. It’s also not possible to know which topics a source connector writes to. KIP-558 enables developers, operators, and applications to easily identify topics used by source and sink connectors.

$ curl -s 'http://localhost:8083/connector/a-source-connector/topics'

The topic tracking is enabled by default but can also be disabled with topic.tracking.enable=false.

See KIP-558 for more details.

Kafka Streams

KIP-150: Add Cogroup to the DSL

In the past, aggregating multiple streams into one could be complicated and error prone. It generally requires you to group and aggregate all of the streams into tables, then make multiple outer join calls. The new co-group operator cleans up the syntax of your programs, reduces the number of state store invocations, and overall increases performance.

KTable<K, CG> cogrouped =
    .cogroup(grouped2, aggregator2)
    .cogroup(grouped3, aggregator3)
    .aggregate(initializer1, materialized1);

See KIP-150 for more details.

KIP-523: Add toTable() to the DSL

A powerful way to interpret a stream of events is as a changelog and to materialize a table over it. KIP-523 as a toTable() function can be applied to a stream and materializes the latest value per key. It’s important to note that any null values will be interpreted as deletes for a given key (tombstones).

See KIP-523 for more details.

KIP-535: Allow state stores to serve stale reads during rebalance

Previously, interactive queries (IQs) against state stores would fail during the time period when there is a rebalance in progress. This degraded the uptime of applications that depend on the ability to query Kafka Streams’ tables of state. KIP-535 gives applications the ability to query any replica of a state store and observe how far each replica is lagging behind the primary.

See KIP-535 and this blog post for more details.


We have dropped support for Scala 2.11 in Apache Kafka 2.5. Scala 2.12 and 2.13 are now the only supported versions.

TLS 1.2 is now the default SSL protocol. TLS 1.0 and 1.1 are still supported.


To learn more about what’s new in Apache Kafka 2.5 and to see all the KIPs included in this release, be sure to check out the release notes and highlights video.

Monday December 16, 2019

What's New in Apache Kafka 2.4

On behalf of the Apache Kafka® community, it is my pleasure to announce the release of Apache Kafka 2.4.0. This release includes a number of key new features and improvements that we will highlight in this blog post. For the full list, please see the release notes.

What’s new with the Kafka broker, producer, and consumer

KIP-392: Allow consumers to fetch from closest replica

Historically, consumers were only allowed to fetch from leaders. In multi-datacenter deployments, this often means that consumers are forced to incur expensive cross-datacenter network costs in order to fetch from the leader. With KIP-392, Kafka now supports reading from follower replicas. This gives the broker the ability to redirect consumers to nearby replicas in order to save costs.

See KIP-392 and this blog post for more details.

KIP-429: Kafka Consumer Incremental Rebalance Protocol

KIP-429 adds Incremental Cooperative Rebalancing to the consumer rebalance protocol in addition to the original eager rebalance protocol. Unlike the eager protocol, which always revokes all assigned partitions prior to a rebalance and then tries to reassign them altogether, the incremental protocol tries to minimize the partition migration between members of a consumer group by letting consumers retain their partitions during a rebalance event. As a result, end-to-end rebalance times triggered by scaling out/down operations as well as rolling bounces are shorter, benefitting heavy, stateful consumers, such as Kafka Streams applications. 

See KIP-429 and this blog post for more details.

KIP-455: Create an Administrative API for Replica Reassignment

As a replacement for the existing ZooKeeper-based API, the new API supports incremental replica reassignments and cancellation of ongoing reassignments. This also addresses the current limitations in the ZooKeeper-based API like security enforcement and auditability. The new API is exposed via the AdminClient.

See KIP-455 for more details.

KIP-480: Sticky Partitioner

Currently, in the case where no partition and key are specified, a producer's default partitioner partitions records in a round-robin fashion. This results in more batches that are smaller in size and leads to more requests and queuing as well as higher latency. 

KIP-480 implements a new partitioner, which chooses the sticky partition that changes when the batch is full if no partition or key is present. Using the sticky partitioner helps improve message batching, decrease latency, and reduce the load for the broker. Some of the benchmarks which Justine Olshan discusses on the KIP show up to a 50% reduction in latency and 5–15% reduction in CPU utilization.

See KIP-480 and this blog post for more details.

KIP-482: The Kafka Protocol should Support Optional Tagged Fields

The Kafka remote procedure call (RPC) protocol has its own serialization format for binary data. The Kafka protocol currently does not support optional fields, nor does it support attaching an extra field to a message in a manner that is orthogonal to the versioning scheme. 

In order to support these scenarios, KIP-482 adds optional tagged fields to the Kafka serialization format. Tagged fields are always optional. KIP-482 also implements more efficient serialization for variable-length objects.

See KIP-482 for more details.

KIP-504: Add new Java Authorizer Interface

This KIP defines a Java authorizer API that is consistent with other pluggable interfaces in the broker. Several limitations in the current Scala authorizer API that could not be fixed without breaking compatibility have been addressed in the new API. Additional request context is now provided to authorizers to support authorization based on the security protocol or listener. 

The API also supports asynchronous ACL updates with batching. The new pluggable authorizer API only requires a dependency on the client’s JAR. A new out-of-the-box authorizer has been added, leveraging features supported by the new API. The additional context provided to the authorizer has been used to improve audit logging. Batched updates enhance the efficiency of ACL updates using the new authorizer when multiple ACLs are added for a resource. An asynchronous startup and updated APIs will enable Kafka to be used as the storage backend for ACLs once ZooKeeper is removed under KIP-500. In addition, authorizer implementations can now enable dynamic reconfiguration without broker restarts.

See KIP-504 for more details.

KIP-525: Return topic metadata and configs in CreateTopics response

Before, the CreateTopics API response only returned a success or failure status along with any errors. With KIP-525, the API response returns additional metadata, including the actual configuration of the topic that was created. This removes the need for additional requests to obtain topic configuration after creating the topic. 

Furthermore, this KIP enables users to obtain default broker configs for topic creation using CreateTopics with validateOnly=true. This is useful for displaying default configs in management tools used to create topics.

See KIP-525 for more details.

KAFKA-7548: KafkaConsumer should not throw away already fetched data for paused partitions.

When a partition is paused by the user in the consumer, the partition is considered "unfetchable." When the consumer has already fetched data for a partition and the partition is paused, then in the next consumer poll all data from "unfetchable" partitions will be discarded. In use cases where pausing and resuming partitions are common during regular operation of the consumer, this can result in discarding pre-fetched data when it's not necessary. 

Once the partition is resumed, new fetch requests will be generated and sent to the broker to get the same partition data again. Depending on the frequency of pausing and resuming of partitions, this can impact different aspects of consumer polling, including broker/consumer throughput, number of consumer fetch requests, and NIO-related garbage collection (GC) concerns for regularly dereferenced byte buffers of partition data. This issue is now resolved by retaining completed fetch data for partitions that are paused so that it may be returned in a future consumer poll once the partition is resumed by the user.

See KAFKA-7548 for more details.

What’s new in Kafka Connect

KIP-382: MirrorMaker 2.0

KIP-382 implements MirrorMaker 2.0 (MM2), a new multi-cluster, cross-datacenter replication engine based on the Connect framework. This tool includes several features designed for disaster recovery, including cross-cluster consumer checkpoints and offset syncs. Automatic topic renaming and cycle detection enable bidirectional active-active replication and other complex topologies. 

A new RemoteClusterUtils class enables clients to interpret checkpoints, heartbeats, and "remote topics" from other clusters.

See KIP-382 for more details.

KIP-440: Extend Connect Converter to support headers

KIP-440 adds header support to Kafka Connect. This enables the use of Kafka Connect together with Kafka producers and consumers that rely on headers for serialization/deserialization.

See KIP-440 for more details.

KIP-507: Securing Internal Connect REST Endpoints

KIP-507 brings out-of-the-box authentication and authorization to an internal REST endpoint used by Connect workers to relay task configurations to the leader. If left unsecured, this endpoint could be used to write arbitrary task configurations to a Connect cluster. 

However, after KIP-507, the endpoint automatically secures as long as the other attack surfaces of a Connect cluster (such as its public REST API and the underlying Kafka cluster used to host internal topics and perform group coordination) are also secure.

See KIP-507 for more details.

KIP-481: SerDe Improvements for Connect Decimal type in JSON

KIP-481 adds to the JSON converter decimal.format for serializing Connect’s DECIMAL logical type values as number literals rather than base64 string literals. This new option defaults to base64 to maintain the previous behavior, but it can be changed to number to serialize decimal values as normal JSON numbers. The JSON converter automatically deserializes using either format, so make sure to upgrade consumer applications and sink connectors before changing source connector converters to use the number format.

See KIP-481 for more details.

What’s New in Kafka Streams

KIP-213: Support non-key joining in KTable

Previously, the Streams domain-specific language (DSL) only allowed table-table joins based on the primary key of the joining KTables. Now, for a KTable (left input) to join with another KTable (right input) based on a specified foreign key as part of its value fields, the join result is a new KTable keyed on the left KTable’s original key. This supports updates from both sides of the join.

See KIP-213 for more details.

KIP-307: Allow to define custom processor names with KStreams DSL

Prior to this release, while building a new topology through the Kafka Streams DSL, the processors were automatically named. A complex topology with dozens of operations can be hard to understand if the processor names are not relevant. This KIP allows users to set more meaningful processor names.

See KIP-307 for more details.

KIP-470: TopologyTestDriver test input and output usability improvements

The TopologyTestDriver allows you to test Kafka Streams logic. This is a lot faster than utilizing actual producers and consumers and makes it possible to simulate different timing scenarios. Kafka 2.4.0 introduces TestInputTopic and TestOutputTopic classes to simplify the test interface. 

See KIP-470 and this blog post for more details.

Metrics, monitoring, and operational improvements

  • KIP-412 adds support to dynamically alter a broker's log levels using the Admin API. 
  • KIP-495 allows users to dynamically alter log levels in the Connect framework.
  • KIP-521 changes Connect to also send log messages to a file and rolls that file every day.
  • KIP-460 modifies the PreferredLeaderElection RPC to support unclean leader election in addition to preferred leader election.
  • KIP-464 allows you to leverage num.partitions and default.replication.factor from the AdminClient#createTopics API.
  • KIP-492 supports the security provider config, which can be used to configure custom security algorithms.
  • KIP-496 adds an API to delete consumer offsets and expose it via the AdminClient.
  • KIP-503 adds metrics to monitor the number of topics/replicas marked for deletion.
  • KIP-475 adds metrics to measure the number of tasks on a connector.
  • KIP-471 exposes a subset of RocksDB's statistics in Kafka Streams metrics, which enables users to find bottlenecks and tune RocksDB accordingly.
  • KIP-484 adds new metrics for the group and transaction metadata loading duration.
  • KIP-444 adds a few new metrics at the Streams instance level such as static version/commit-id as well as dynamic state.

ZooKeeper upgrade to 3.5.x

ZooKeeper has been upgraded to 3.5.x. support for TLS encryption added in ZooKeeper 3.5.x. This enables us to configure TLS encryption between Kafka brokers and ZooKeeper.

Scala 2.13 support

Apache Kafka 2.4.0 now supports Scala 2.13 while also remaining compatible with Scala 2.12 and 2.11.


We want to take this opportunity to thank everyone who has contributed to this release!

To learn more about what’s new in Apache Kafka 2.4, be sure to check out the release notes and highlights video.

Monday June 24, 2019

What's New in Apache Kafka 2.3

It’s official: Apache Kafka® 2.3 has been released! Here is a selection of some of the most interesting and important features we added in the new release.

Core Kafka

KIP-351 and KIP-427: Improved monitoring for partitions which have lost replicas
In order to keep your data safe, Kafka creates several replicas of it on different brokers. Kafka will not allow writes to proceed unless the partition has a minimum number of in-sync replicas. This is called the “minimum ISR.”

Kafka already had metrics showing the partitions that had fewer than the minimum number of in-sync replicas. In this release, KIP-427 adds additional metrics showing partitions that have exactly the minimum number of in-sync replicas. By monitoring these metrics, users can see partitions that are on the verge of becoming under-replicated.

Additionally, KIP-351 adds the --under-min-isr command line flag to the kafka-topics command. This allows users to easily see which topics have fewer than the minimum number of in-sync replicas.

KIP-354: Add a Maximum Log Compaction Lag
To a first-order approximation, previous values of a key in a compacted topic get compacted some time after the latest key is written. Only the most recent value is available, and previous values are not. However, it has always been possible to set the minimum amount of time a key would stick around before being compacted, so we don’t lose the old value too quickly. Now, with KIP-354, it’s possible to set the maximum amount of time an old value will stick around. The new parameter max.log.compaction.time.ms specifies how long an old value may possibly live in a compacted topic. This can be used in complying with data retention regulations such as the GDPR.

KIP-402: Improve fairness in SocketServer processors
Previously, Kafka would prioritize opening new TCP connections over handling existing connections. This could cause problems if clients attempted to create many new connections within a short time period.KIP-402 prioritizes existing connections over new ones, which improves the broker’s resilience to connection storms. The KIP also adds a max.connections per broker setting.

KIP-461: Improve failure handling in the Replica Fetcher
In order to keep replicas up to date, each broker maintains a pool of replica fetcher threads. Each thread in the pool is responsible for fetching replicas for some number of follower partitions. Previously, if one of those partitions failed, the whole thread would fail with it, causing under-replication on potentially hundreds of partitions. With this KIP, if a single partition managed by a given replica fetcher thread fails, the thread continues handling the remainder of its partitions.

KAFKA-7283: Reduce the amount of time the broker spends scanning log files when starting up
When the broker starts up after an unclean shutdown, it checks the logs to make sure they have not been corrupted. This JIRA optimizes that process so that Kafka only checks log segments that haven't been explicitly flushed to disk. Now, the time required for log recovery is no longer proportional to the number of logs. Instead, it is proportional to the number of unflushed log segments. Some of the benchmarks which Zhanxiang Huang discusses on the JIRA show up to a 50% reduction in broker startup time.

Kafka Connect

KIP-415: Incremental Cooperative Rebalancing in Kafka Connect
In Kafka Connect, worker tasks are distributed among the available worker nodes. When a connector is reconfigured or a new connector is deployed-- as well as when a worker is added or removed-- the tasks must be rebalanced across the Connect cluster. This helps ensure that all of the worker nodes are doing a fair share of the Connect work. In 2.2 and earlier, a Connect rebalance caused all worker threads to pause while the rebalance proceeded. As of KIP-415, rebalancing is no longer a stop-the-world affair, making configuration changes a more pleasant thing.

KIP-449: Add connector contexts to Connect worker logs
A running Connect cluster contains several different thread pools. Each of these threads emits its own logging, as one might expect. However, this makes it difficult to untangle the sequence of events involved in a single logical operation, since the parts of that operation are running asynchronously in their various threads across the cluster. This KIP adds some context to each Connect log message, making it much easier to make sense of the state of a single connector over time.

Take a look at Robin Moffatt’s blog post.

KIP-258: Allow Users to Store Record Times=tamps in RocksDB
Prior to this KIP, message timestamps were not stored in the Streams state store. Only the key and value were there. With t=his KIP, timestamps are now included in the state store. This KIP lays the groundwork to enable future features like handling out-of-order messages in KTables and implementing TTLs for KTables.

KIP-428: Add in-memory window store / KIP-445: Add in-memory Session Store
These KIPs add in-memory implementations for the Kafka Streams window store and session store. Previously, the only component with an in-memory implementation was the state store. The in-memory implementations provide higher performance, in exchange for lack of persistence to disk. In many cases, this can be a very good tradeoff.

KIP-313: Add KStream.flatTransform and KStream.flatTransformValues
The first half of this KIP, the flatTransform() method, was delivered in Kafka 2.2. The flatTransform() method is very similar to flatMap(), in that it takes a single input record and produces one or more output records. flatMap() does this in a type-safe way but without access to the ProcessorContext and the state store. We’ve been able to use the Processor API to perform this same kind of operation with access to the ProcessorContext and the state store, but without the type safety of flatMap(). flatTransform() gave us the best of both worlds: processor API access, plus compile-time type checking.

flatTransformValues(), just introduced in the completed KIP-313 in Kafka 2.3, is to flatTransform() as flatMapValues() is to flatMap(). It lets us do processor-API-aware computations that return multiple records for each input record without changing the message key and causing a repartition.


Thanks for reading! Check out the release notes for more details about all the great stuff in Kafka 2.3. Also, check out this YouTube video on the highlights of the release!

Wednesday November 07, 2018

Apache Kafka Supports 200K Partitions Per Cluster

In Kafka, a topic can have multiple partitions to which records are distributed. Partitions are the unit of parallelism. In general, more partitions leads to higher throughput. However, there are some factors that one should consider when having more partitions in a Kafka cluster. I am happy to report that the recent Apache Kafka 1.1.0 release has significantly increased the number of partitions that a single Kafka cluster can support from the deployment and the availability perspective.

To understand the improvement, it’s useful to first revisit some of the basics about partition leaders and the controller. First, each partition can have multiple replicas for higher availability and durability. One of the replicas is designated as the leader and all the client requests are served from this lead replica. Second, one of the brokers in the cluster acts as the controller that manages the whole cluster. If a broker fails, the controller is responsible for selecting new leaders for partitions on the failed broker.

The Kafka broker does a controlled shutdown by default to minimize service disruption to clients. A controlled shutdown has the following steps. (1) A SIG_TERM signal is sent to the broker to be shut down. (2) The broker sends a request to the controller to indicate that it’s about to shut down. (3) The controller then changes the partition leaders on this broker to other brokers and persists that information in ZooKeeper. (4) The controller sends the new leader to other brokers. (5) The controller sends a successful reply to the shutting down broker, which finally terminates its process. At this point, there is no impact to the clients since their traffic has already been moved to other brokers. This process is depicted in Figure 1 below. Note that step (4) and (5) can happen in parallel.

Figure 1. (1) shutdown initiated on broker 1; (2) broker 1 sends controlled shutdown request to controller on broker 0; (3) controller writes new leaders in ZooKeeper; (4) controller sends new leaders to broker 2; (5) controller sends successful reply to broker 1.

Before Kafka 1.1.0, during the controlled shutdown, the controller moves the leaders one partition at a time. For each partition, the controller selects a new leader, writes it to ZooKeeper synchronously and communicates the new leader to other brokers through a remote request. This process has a couple of inefficiencies. First, the synchronous writes to ZooKeeper have higher latency, which slows down the controlled shutdown process. Second, communicating the new leader one partition at a time adds many small remote requests to every broker, which can cause the processing of the new leaders to be delayed.

In Kafka 1.1.0, we made significant improvements in the controller to speed up the controlled shutdown. The first improvement is using the asynchronous API when writing to ZooKeeper. Instead of writing the leader for one partition, waiting for it to complete and then writing another one, the controller submits the leader for multiple partitions to ZooKeeper asynchronously and then waits for them to complete at the end. This allows for request pipelining between the Kafka broker and the ZooKeeper server and reduces the overall latency. The second improvement is that the communication of the new leaders is batched. Instead of one remote request per partition, the controller sends a single remote request with the leader from all affected partitions.

We also made significant improvement in the controller failover time. If the controller goes down, the Kafka cluster automatically elects another broker as the new controller. Before being able to elect the partition leaders, the newly-elected controller has to first reload the state of all partitions in the cluster from ZooKeeper. If the controller has a hard failure, the window in which a partition is unavailable can be as long as the ZooKeeper session expiration time plus the controller state reloading time. So reducing the state reloading time improves the availability in this rare event. Prior to Kafka 1.1.0, the reloading uses the synchronous ZooKeeper API. In Kafka 1.1.0, this is changed to also use the asynchronous API for better latency.

We executed tests to evaluate the performance improvement of the controlled shutdown time and the controller reloading time. For both tests, we set up a 5 node ZooKeeper ensemble on different server racks.

In the first test, we set up a Kafka cluster with 5 brokers on different racks. In that cluster, we created 25,000 topics, each with a single partition and 2 replicas, for a total of 50,000 partitions. So, each broker has 10,000 partitions. We then measured the time to do a controlled shutdown of a broker. The results are shown in the table below.

kafka 1.0.0 kafka 1.1.0
controlled shutdown time 6.5 minutes 3 seconds

A big part of the improvement comes from fixing a logging overhead, which unnecessarily logs all partitions in the cluster every time the leader of a single partition changes. By just fixing the logging overhead, the controlled shutdown time was reduced from 6.5 minutes to 30 seconds. The asynchronous ZooKeeper API change reduced this time further to 3 seconds. These improvements significantly reduce the time to restart a Kafka cluster.

In the second test, we set up another Kafka cluster with 5 brokers and created 2,000 topics, each with 50 partitions and 1 replica. This makes a total of 100,000 partitions in the whole cluster. We then measured the state reloading time of the controller and observed a 100% improvement (the reloading time dropped from 28 seconds in Kafka 1.0.0 to 14 seconds in Kafka 1.1.0).

With those improvements, how many partitions can one expect to support in Kafka? The exact number depends on factors such as the tolerable unavailability window, ZooKeeper latency, broker storage type, etc. As a rule of thumb, we recommend each broker to have up to 4,000 partitions and each cluster to have up to 200,000 partitions. The main reason for the latter cluster-wide limit is to accommodate for the rare event of a hard failure of the controller as we explained earlier. Note that other considerations related to partitions still apply and one may need some additional configuration tuning with more partitions.

The improvement that we made in 1.1.0 is just one step towards our ultimate goal of making Kafka infinitely scalable. We have also made improvements on latency with more partitions in 1.1.0 and will discuss that in a separate blog. In the near future, we plan to make further improvements to support millions of partitions in a Kafka cluster.

The controller improvement work in Kafka 1.1.0 is a true community effort. Over a period of 9 months, people from 6 different organizations helped out and made this happen. Onur Karaman led the original design, did the core implementation and conducted the performance evaluation. Manikumar Reddy, Prasanna Gautam, Ismael Juma, Mickael Maison, Sandor Murakozi, Rajini Sivaram and Ted Yu each contributed some additional implementation or bug fixes. More details can be found in KAFKA-5642 and KAFKA-5027. A big thank you to all the contributors!



Hot Blogs (today's hits)

Tag Cloud