Back to Blog
kafkakafka-streamsspring-bootevent-drivenreal-timejava

Kafka Streams with Spring Boot: Real-Time Data Processing Without a Framework (2026)

Kafka Streams runs inside your Spring Boot app — no Flink, no Spark, no cluster to manage. Learn the DSL for filtering, aggregating, and joining streams with Spring Boot integration.

J

JOptimize Team

May 25, 2026· 10 min read

Kafka Streams is a client library that runs stream processing logic inside your application. No separate cluster, no Flink jobs, no Spark infrastructure — just add a dependency and write Java code. For real-time aggregations, enrichment, and routing, it's the lowest-complexity option that scales horizontally by adding instances.


Setup

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
# application.properties spring.kafka.streams.application-id=order-processor spring.kafka.streams.bootstrap-servers=kafka:9092 spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde spring.kafka.streams.properties.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde

The Basics: Filter and Transform

@Configuration @EnableKafkaStreams public class OrderStreamConfig { @Bean public KStream<String, OrderEvent> orderStream( StreamsBuilder streamsBuilder) { KStream<String, OrderEvent> orders = streamsBuilder.stream("orders"); // Filter: only high-value orders KStream<String, OrderEvent> highValueOrders = orders .filter((key, order) -> order.getTotal().compareTo(new BigDecimal("1000")) > 0); // Transform: enrich with customer tier KStream<String, EnrichedOrder> enriched = highValueOrders .mapValues(order -> new EnrichedOrder( order, customerService.getTier(order.getCustomerId()) )); // Route to different topics by tier enriched.split() .branch((key, order) -> order.getTier() == CustomerTier.GOLD, Branched.withConsumer(s -> s.to("orders-gold"))) .branch((key, order) -> order.getTier() == CustomerTier.SILVER, Branched.withConsumer(s -> s.to("orders-silver"))) .defaultBranch(Branched.withConsumer(s -> s.to("orders-standard"))); return orders; } }

Aggregation: Count and Sum by Time Window

@Bean public KTable<Windowed<String>, OrderSummary> hourlyOrderSummary( StreamsBuilder streamsBuilder) { return streamsBuilder .stream("orders", Consumed.with(Serdes.String(), orderSerde)) .groupBy( (key, order) -> order.getRegion(), // Group by region Grouped.with(Serdes.String(), orderSerde) ) .windowedBy( TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)) ) .aggregate( OrderSummary::empty, // Initializer (region, order, summary) -> // Aggregator summary.add(order), Materialized.<String, OrderSummary, WindowStore<Bytes, byte[]>> as("hourly-order-summary") .withValueSerde(orderSummarySerde) ); }

This produces a rolling 1-hour count and sum of orders per region, updated in real-time as new orders arrive. The materialized KTable can be queried directly via interactive queries.


Stream-Table Join: Enrich with Reference Data

@Bean public KStream<String, EnrichedOrder> enrichedOrders(StreamsBuilder streamsBuilder) { // Stream: incoming orders (fast-moving) KStream<String, OrderEvent> orders = streamsBuilder.stream("orders"); // Table: product catalog (slow-moving, updated via CDC) KTable<String, Product> products = streamsBuilder.table("products", Consumed.with(Serdes.String(), productSerde)); // Join: enrich each order with product details return orders .selectKey((key, order) -> order.getProductId().toString()) .join( products, (order, product) -> new EnrichedOrder(order, product), Joined.with(Serdes.String(), orderSerde, productSerde) ); }

The stream-table join enriches each order with the current product data without any external DB call — everything flows through Kafka.


Interactive Queries: Query the Stream State

@RestController @RequiredArgsConstructor public class StreamQueryController { private final KafkaStreams streams; @GetMapping("/regions/{region}/summary") public OrderSummary getHourlySummary(@PathVariable String region) { ReadOnlyWindowStore<String, OrderSummary> store = streams.store( StoreQueryParameters.fromNameAndType( "hourly-order-summary", QueryableStoreTypes.windowStore() )); Instant now = Instant.now(); WindowStoreIterator<OrderSummary> iterator = store.fetch(region, now.minus(Duration.ofHours(1)), now); if (iterator.hasNext()) { return iterator.next().value; } return OrderSummary.empty(); } }

Interactive queries let you expose the stream's materialized state as an HTTP endpoint — real-time aggregations accessible as a REST API.


Error Handling and Dead Letter

@Bean public KStream<String, OrderEvent> orderStreamWithErrorHandling( StreamsBuilder streamsBuilder) { return streamsBuilder .stream("orders") .mapValues((key, order) -> { try { return enrichOrder(order); } catch (Exception e) { log.error("Failed to process order {}: {}", key, e.getMessage()); return null; // Will be filtered out } }) .filter((key, value) -> value != null); // Remove failed records }

For more robust handling, route failed records to a DLT:

# application.properties spring.kafka.streams.properties.default.deserialization-exception-handler= org.apache.kafka.streams.errors.LogAndContinueExceptionHandler

Common Mistakes to Avoid

  • Not setting state store cleanup policy — windowed stores accumulate indefinitely without compaction; configure retention.ms and window.size.ms
  • Calling external services in stream operations — blocking HTTP calls inside mapValues() slow the entire pipeline; pre-load reference data as KTable instead
  • Using the wrong serdes — serialization mismatches between producer and consumer cause SerializationException that silently drops messages in some configurations
  • Not handling null values — Kafka uses null-value tombstone records for deletions in compacted topics; filter nulls before processing

Summary

Kafka Streams runs real-time data processing inside your Spring Boot app with zero external infrastructure. The DSL covers filtering, mapping, aggregating in time windows, and joining streams with reference tables. Interactive queries expose materialized state as REST endpoints. For aggregations, routing, and enrichment over Kafka topics, it's the lowest-complexity path to real-time processing.


Analyze Your Event-Driven Architecture

JOptimize analyzes your Spring Boot event architecture for blocking operations in stream handlers, missing error handling, and serialization configuration issues.

Build reliable real-time pipelines — free architecture scan.

Want to go deeper?

Master Spring Boot, security, and Java performance with hands-on courses.

Detect issues in your project

JOptimize finds N+1 queries, EAGER collections, and 70+ other issues in your Java codebase — in under 30 seconds.