Apache Kafka Streams in Production: Lessons Learned

Introduction

Kafka Streams is deceptively simple to get started with. The DSL reads well, the documentation covers the basics thoroughly, and you can have a working stream processing application in under an hour. The complexity reveals itself later: when your state stores grow beyond available memory, when a poison pill message brings processing to a halt, or when rebalancing takes minutes instead of seconds.

This article covers the lessons I learned running Kafka Streams applications that process event-driven translation workflows. These services handle message ordering, exactly-once semantics, and stateful aggregations across multiple topics. The patterns here apply to any domain, but the examples draw from real scenarios I have encountered.

Topology Design: Keep It Simple, Keep It Debuggable

The biggest mistake I see in Kafka Streams applications is overly complex topologies. When you can chain filter, map, flatMap, groupBy, aggregate, join, and merge together in a single fluid API, it is tempting to build the entire processing pipeline in one class. Resist this temptation.

I structure topologies as small, composable units with clear input and output topics. Each processing stage reads from a topic, performs a bounded transformation, and writes to another topic. This makes each stage independently testable, independently scalable, and independently recoverable:

@Component
public class TranslationEventTopology {

    @Autowired
    public void buildPipeline(StreamsBuilder builder) {
        KStream<String, TranslationEvent> events = builder.stream(
            "translation.events",
            Consumed.with(Serdes.String(), translationEventSerde())
        );

        // Branch into different processing paths
        Map<String, KStream<String, TranslationEvent>> branches = events
            .split(Named.as("event-"))
            .branch((key, event) -> event instanceof TranslationRequested,
                     Branched.as("requested"))
            .branch((key, event) -> event instanceof TranslationCompleted,
                     Branched.as("completed"))
            .branch((key, event) -> event instanceof TranslationFailed,
                     Branched.as("failed"))
            .defaultBranch(Branched.as("unknown"));

        // Process each branch independently
        processRequestedEvents(branches.get("event-requested"));
        processCompletedEvents(branches.get("event-completed"));
        processFailedEvents(branches.get("event-failed"));
        handleUnknownEvents(branches.get("event-unknown"));
    }

    private void processRequestedEvents(KStream<String, TranslationEvent> stream) {
        stream
            .mapValues(event -> (TranslationRequested) event)
            .mapValues(this::enrichWithProjectMetadata)
            .to("translation.requests.enriched",
                Produced.with(Serdes.String(), enrichedRequestSerde()));
    }

    private void processCompletedEvents(KStream<String, TranslationEvent> stream) {
        stream
            .mapValues(event -> (TranslationCompleted) event)
            .mapValues(this::computeQualityScore)
            .to("translation.completed.scored",
                Produced.with(Serdes.String(), scoredCompletionSerde()));
    }
}

Name everything. Every processor, every state store, every repartition topic should have an explicit name. When Kafka Streams auto-generates names, they look like KSTREAM-AGGREGATE-0000000003-repartition, which makes debugging impossible when you have 20 processing nodes:

events
    .groupByKey(Grouped.as("group-by-job-id"))
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5))
        .named("five-minute-window"))
    .aggregate(
        TranslationJobStats::new,
        (key, event, stats) -> stats.update(event),
        Named.as("aggregate-job-stats"),
        Materialized.<String, TranslationJobStats, WindowStore<Bytes, byte[]>>as(
            "job-stats-store")
            .withKeySerde(Serdes.String())
            .withValueSerde(jobStatsSerde())
    );

State Store Management

State stores are where Kafka Streams keeps the data for aggregations, joins, and any stateful transformation. By default, they use RocksDB backed by a changelog topic. This works well until it does not.

Size your state stores deliberately. Every state store maintains an in-memory index plus on-disk data in RocksDB. If your aggregation key has high cardinality (millions of unique keys), the RocksDB block cache and index become significant memory consumers. I configure RocksDB explicitly rather than relying on defaults:

@Bean
public StreamsConfig kafkaStreamsConfig() {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "translation-processor");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
              CustomRocksDBConfig.class.getName());
    props.put(StreamsConfig.STATE_DIR_CONFIG, "/data/kafka-streams");
    props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
    return new StreamsConfig(props);
}

public class CustomRocksDBConfig implements RocksDBConfigSetter {
    @Override
    public void setConfig(String storeName, Options options,
                          Map<String, Object> configs) {
        BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)
            options.tableFormatConfig();

        // 64MB block cache per store (default is much higher)
        tableConfig.setBlockCache(new LRUCache(64 * 1024 * 1024));
        tableConfig.setBlockSize(16 * 1024); // 16KB blocks
        tableConfig.setCacheIndexAndFilterBlocks(true);
        tableConfig.setPinL0FilterAndIndexBlocksInCache(true);

        options.setTableFormatConfig(tableConfig);
        options.setMaxWriteBufferNumber(3);
        options.setWriteBufferSize(16 * 1024 * 1024); // 16MB write buffer
    }

    @Override
    public void close(String storeName, Options options) {
        // cleanup if needed
    }
}

Windowed stores need cleanup. If you use time-windowed aggregations, old windows accumulate on disk indefinitely unless you configure retention. I have seen state stores grow to hundreds of gigabytes because nobody set a retention policy:

.aggregate(
    TranslationJobStats::new,
    (key, event, stats) -> stats.update(event),
    Materialized.<String, TranslationJobStats, WindowStore<Bytes, byte[]>>as(
        "job-stats-store")
        .withRetention(Duration.ofDays(7))
        .withKeySerde(Serdes.String())
        .withValueSerde(jobStatsSerde())
);

Error Handling: The Poison Pill Problem

The most common production issue in Kafka Streams applications is the poison pill: a message that cannot be deserialized or processed. By default, a deserialization error crashes the entire application. A processing error does the same. In production, you need explicit strategies for both.

Deserialization errors should use a LogAndContinueExceptionHandler or a custom handler that routes bad messages to a dead letter topic:

public class DeadLetterDeserializationHandler implements DeserializationExceptionHandler {

    private KafkaProducer<byte[], byte[]> dlqProducer;

    @Override
    public void configure(Map<String, ?> configs) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                  configs.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                  ByteArraySerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                  ByteArraySerializer.class);
        this.dlqProducer = new KafkaProducer<>(props);
    }

    @Override
    public DeserializationHandlerResponse handle(ProcessorContext context,
                                                  ConsumerRecord<byte[], byte[]> record,
                                                  Exception exception) {
        ProducerRecord<byte[], byte[]> dlqRecord = new ProducerRecord<>(
            record.topic() + ".dlq",
            record.key(),
            record.value()
        );
        dlqRecord.headers()
            .add("error.message", exception.getMessage().getBytes())
            .add("error.source.topic", record.topic().getBytes())
            .add("error.source.partition",
                 String.valueOf(record.partition()).getBytes())
            .add("error.source.offset",
                 String.valueOf(record.offset()).getBytes())
            .add("error.timestamp",
                 String.valueOf(System.currentTimeMillis()).getBytes());

        dlqProducer.send(dlqRecord);
        return DeserializationHandlerResponse.CONTINUE;
    }
}

Processing errors require a different approach because they happen within your business logic. I wrap processing steps with a utility that catches exceptions and routes failures to a dead letter topic while allowing the stream to continue:

public class SafeValueMapper<V, VR> implements ValueMapper<V, Iterable<VR>> {

    private final ValueMapper<V, VR> delegate;
    private final String dlqTopic;
    private final BiConsumer<V, Exception> errorReporter;

    @Override
    public Iterable<VR> apply(V value) {
        try {
            VR result = delegate.apply(value);
            return Collections.singletonList(result);
        } catch (Exception e) {
            errorReporter.accept(value, e);
            return Collections.emptyList();
        }
    }
}

// Usage in topology
stream
    .flatMapValues(new SafeValueMapper<>(
        this::enrichWithProjectMetadata,
        "translation.enrichment.dlq",
        (event, ex) -> log.error("Failed to enrich event: {}", event.jobId(), ex)
    ))
    .to("translation.requests.enriched");

Operational Patterns: Rebalancing, Scaling, and Monitoring

Rebalancing is the most operationally painful aspect of Kafka Streams. When you deploy a new version, add instances, or an instance dies, Kafka triggers a rebalance that redistributes partitions. During rebalance, processing stops. For stateful applications, state stores need to be restored from changelog topics, which can take minutes.

Use static membership to reduce unnecessary rebalances during rolling deployments:

spring:
  kafka:
    streams:
      properties:
        group.instance.id: ${HOSTNAME}
        session.timeout.ms: 60000

With static membership, Kafka recognizes returning instances by their group.instance.id and reassigns the same partitions without a full rebalance. This is critical for stateful applications where state store restoration is expensive.

Standby replicas reduce restoration time when a rebalance does occur:

num.standby.replicas=1

This maintains a hot copy of each state store on another instance. If the primary owner fails, the standby can take over almost instantly instead of restoring from the changelog topic.

For monitoring, expose these metrics at minimum:

@Component
public class KafkaStreamsHealthIndicator implements HealthIndicator {

    private final StreamsBuilderFactoryBean streamsFactory;

    @Override
    public Health health() {
        KafkaStreams.State state = streamsFactory.getKafkaStreams().state();

        if (state == KafkaStreams.State.RUNNING) {
            return Health.up()
                .withDetail("state", state.name())
                .withDetail("threadCount", getActiveThreadCount())
                .build();
        } else if (state == KafkaStreams.State.REBALANCING) {
            return Health.status("REBALANCING")
                .withDetail("state", state.name())
                .build();
        } else {
            return Health.down()
                .withDetail("state", state.name())
                .build();
        }
    }
}

Key Takeaways

Kafka Streams is a powerful library, but production readiness requires attention to details that the getting-started guides do not cover. Name everything in your topology for debuggability. Configure RocksDB and state store retention explicitly. Implement dead letter queues for both deserialization and processing errors. Use static membership and standby replicas to minimize rebalance impact.

The most important lesson is to keep topologies simple and composable. When something goes wrong at 2 AM, you want to be able to look at a topology and understand exactly where a message is in the pipeline. Complex, monolithic topologies make that impossible. Simple stages connected by topics make it straightforward.