Apache Kafka Streams is a client-side library for building applications and microservices that process and analyze data stored in Kafka topics. It combines the simplicity of writing and deploying standard Java/Scala applications with the power of Kafka’s immutable log, enabling sophisticated stream processing with native fault tolerance and scalability.
KStream and KTable – The core abstractions
Kafka Streams models data using two primary stream processing abstractions: KStream and KTable. Understanding their fundamental difference is key to designing correct streaming applications.
A KStream represents an unbounded, immutable record of events. Every record (key-value pair) represents a specific event, like a page view or a financial transaction. When processing a KStream, we consider a record just once. Operations like filter, map, and peek are typically applied to KStreams.
// Example: Filtering high-value transactions from a KStream
KStream<String, Double> transactions = builder.stream("input-transactions");
KStream<String, Double> highValue = transactions
.filter((key, value) -> value >= 1000.0);
highValue.to("high-value-transactions");
A KTable represents a changelog stream, where Kafka interprets each data record as an update or deletion of the state. It is essentially a materialized view of a KStream. When processing a KTable, a new record with the same key overwrites the previous value. This abstraction is essential for maintaining dynamic state.
// Example: Counting user clicks (maintaining a running count per user)
KTable<String, Long> userCounts = clicksStream
.groupByKey()
.count();
userCounts.toStream().to("user-click-counts");
Kafka Streams in Code – Practical operations
The power of Kafka Streams lies in its extensive set of operators. We typically divide operations into Stateless and Stateful categories.
Stateless Operations transform one message at a time without referencing any prior messages. Examples include mapValues, flatMapValues, and filter.
// Example: Converting a comma-separated string value to a List of items
KStream<String, String> rawData = builder.stream("raw-log-data");
KStream<String, List<String>> processedData = rawData
.mapValues(value -> Arrays.asList(value.split(",")));
Stateful Operations require remembering previous records to compute the result. This includes aggregations (count, reduce), joins, and windowing.
// Example: Stateful reduction to track the largest transaction per user
KTable<String, Double> largestTxn = transactions
.groupByKey()
.reduce((currentMax, newValue) -> Math.max(currentMax, newValue));
Windowing and Joins for temporal correlation
Working with real-time data often requires correlating events that happen within a specific time boundary. We achieve this through Windowing. Tumbling windows (fixed size, non-overlapping) and Hopping windows (fixed size, overlapping) are the most common types.
// Example: Counting clicks every 5 minutes (Tumbling Window)
KTable<Windowed<String>, Long> fiveMinCounts = clicksStream
.groupByKey(Grouped.as("click-group"))
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
We use Joins to combine records from two streams or tables based on a common key and, optionally, a time window. Joining a KStream to a KTable is common for enriching event data with current state information.
// Example: Joining a KStream of user activity with a KTable of user profile data
KStream<String, Activity> activityStream = builder.stream("user-activity");
KTable<String, Profile> profileTable = builder.table("user-profiles");
KStream<String, EnrichedActivity> enrichedStream = activityStream.join(
profileTable,
(activity, profile) -> new EnrichedActivity(activity, profile)
);
Parallelism and State Stores in Kafka Streams
Kafka Streams automatically manages Parallelism by mapping stream tasks to the input topic’s partitions. If a topic has 10 partitions, you can run up to 10 instances of your Kafka Streams application, with each instance handling a subset of the partitions, ensuring high throughput and horizontal scalability.
State Stores are the mechanism Kafka Streams uses to persist the state required for stateful operations (like counts, joins, and aggregations). These are typically backed by RocksDB on the local disk of the application instance, offering fast access. We bake state store’s contents up to internal, compacted Kafka topics, ensuring fault tolerance. If an instance fails, another can quickly restore its state from these changelog topics.
Stream application error handling
While the library handles many operational errors (like network partitions or broker failure), application developers must manage data-specific issues.
- Deserialization Errors: Occur when Kafka cannot parse an incoming message (e.g., corrupted JSON). A custom
DeserializationExceptionHandlershould be configured to log the error and skip the corrupt message, preventing the application from crashing and stopping processing. - Processing Errors: Runtime exceptions (e.g., NullPointerException) within a stream operator. These can be caught using a
ProductionExceptionHandleror within the processing logic itself (e.g., usingtry-catchinside amaporflatMap). - Handling Skew: If one partition is significantly larger than others (data skew), it can lead to one application instance processing much more data than its peers. This is a topic-level issue and usually requires re-partitioning the upstream data source to distribute keys more evenly.
By mastering the core abstractions, leveraging stateful operations with reliable state stores, and implementing robust error handling, developers can build powerful, resilient, and highly scalable real-time microservices using Apache Kafka Streams.
That’s all.
Try it at home!
