Kafka Streams runs inside your Spring Boot app — no Flink, no Spark, no cluster to manage. Learn the DSL for filtering, aggregating, and joining streams with Spring Boot integration.
JOptimize Team
Kafka Streams is a client library that runs stream processing logic inside your application. No separate cluster, no Flink jobs, no Spark infrastructure — just add a dependency and write Java code. For real-time aggregations, enrichment, and routing, it's the lowest-complexity option that scales horizontally by adding instances.
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
# application.properties spring.kafka.streams.application-id=order-processor spring.kafka.streams.bootstrap-servers=kafka:9092 spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde spring.kafka.streams.properties.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde
@Configuration @EnableKafkaStreams public class OrderStreamConfig { @Bean public KStream<String, OrderEvent> orderStream( StreamsBuilder streamsBuilder) { KStream<String, OrderEvent> orders = streamsBuilder.stream("orders"); // Filter: only high-value orders KStream<String, OrderEvent> highValueOrders = orders .filter((key, order) -> order.getTotal().compareTo(new BigDecimal("1000")) > 0); // Transform: enrich with customer tier KStream<String, EnrichedOrder> enriched = highValueOrders .mapValues(order -> new EnrichedOrder( order, customerService.getTier(order.getCustomerId()) )); // Route to different topics by tier enriched.split() .branch((key, order) -> order.getTier() == CustomerTier.GOLD, Branched.withConsumer(s -> s.to("orders-gold"))) .branch((key, order) -> order.getTier() == CustomerTier.SILVER, Branched.withConsumer(s -> s.to("orders-silver"))) .defaultBranch(Branched.withConsumer(s -> s.to("orders-standard"))); return orders; } }
@Bean public KTable<Windowed<String>, OrderSummary> hourlyOrderSummary( StreamsBuilder streamsBuilder) { return streamsBuilder .stream("orders", Consumed.with(Serdes.String(), orderSerde)) .groupBy( (key, order) -> order.getRegion(), // Group by region Grouped.with(Serdes.String(), orderSerde) ) .windowedBy( TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)) ) .aggregate( OrderSummary::empty, // Initializer (region, order, summary) -> // Aggregator summary.add(order), Materialized.<String, OrderSummary, WindowStore<Bytes, byte[]>> as("hourly-order-summary") .withValueSerde(orderSummarySerde) ); }
This produces a rolling 1-hour count and sum of orders per region, updated in real-time as new orders arrive. The materialized KTable can be queried directly via interactive queries.
@Bean public KStream<String, EnrichedOrder> enrichedOrders(StreamsBuilder streamsBuilder) { // Stream: incoming orders (fast-moving) KStream<String, OrderEvent> orders = streamsBuilder.stream("orders"); // Table: product catalog (slow-moving, updated via CDC) KTable<String, Product> products = streamsBuilder.table("products", Consumed.with(Serdes.String(), productSerde)); // Join: enrich each order with product details return orders .selectKey((key, order) -> order.getProductId().toString()) .join( products, (order, product) -> new EnrichedOrder(order, product), Joined.with(Serdes.String(), orderSerde, productSerde) ); }
The stream-table join enriches each order with the current product data without any external DB call — everything flows through Kafka.
@RestController @RequiredArgsConstructor public class StreamQueryController { private final KafkaStreams streams; @GetMapping("/regions/{region}/summary") public OrderSummary getHourlySummary(@PathVariable String region) { ReadOnlyWindowStore<String, OrderSummary> store = streams.store( StoreQueryParameters.fromNameAndType( "hourly-order-summary", QueryableStoreTypes.windowStore() )); Instant now = Instant.now(); WindowStoreIterator<OrderSummary> iterator = store.fetch(region, now.minus(Duration.ofHours(1)), now); if (iterator.hasNext()) { return iterator.next().value; } return OrderSummary.empty(); } }
Interactive queries let you expose the stream's materialized state as an HTTP endpoint — real-time aggregations accessible as a REST API.
@Bean public KStream<String, OrderEvent> orderStreamWithErrorHandling( StreamsBuilder streamsBuilder) { return streamsBuilder .stream("orders") .mapValues((key, order) -> { try { return enrichOrder(order); } catch (Exception e) { log.error("Failed to process order {}: {}", key, e.getMessage()); return null; // Will be filtered out } }) .filter((key, value) -> value != null); // Remove failed records }
For more robust handling, route failed records to a DLT:
# application.properties spring.kafka.streams.properties.default.deserialization-exception-handler= org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
retention.ms and window.size.msmapValues() slow the entire pipeline; pre-load reference data as KTable insteadSerializationException that silently drops messages in some configurationsnull values — Kafka uses null-value tombstone records for deletions in compacted topics; filter nulls before processingKafka Streams runs real-time data processing inside your Spring Boot app with zero external infrastructure. The DSL covers filtering, mapping, aggregating in time windows, and joining streams with reference tables. Interactive queries expose materialized state as REST endpoints. For aggregations, routing, and enrichment over Kafka topics, it's the lowest-complexity path to real-time processing.
JOptimize analyzes your Spring Boot event architecture for blocking operations in stream handlers, missing error handling, and serialization configuration issues.
Build reliable real-time pipelines — free architecture scan.
Master Spring Boot, security, and Java performance with hands-on courses.
JOptimize finds N+1 queries, EAGER collections, and 70+ other issues in your Java codebase — in under 30 seconds.