Spring Boot 4.1 · Spring Kafka 4.1 · At-least-once processing

Reliable Event Listener How-To Guide

A practical implementation guide for Kafka consumers that call downstream REST services, write to a database, avoid silent message loss, recover from transient failures, and replay dead-lettered events safely.

Spring Boot4.1.0
Offset ruleAck after DB commit
Spring Kafka4.1.0
Replay modeControlled

Executive summary

The safest baseline is a manually acknowledged Kafka listener that throws on failure, retries transient errors, sends exhausted records to a dead-letter topic, and uses idempotent database writes so duplicate delivery is safe.

Do this

  • Disable Kafka auto-commit.
  • Use MANUAL_IMMEDIATE ack mode.
  • Call ack.acknowledge() only after durable success.
  • Let exceptions bubble up to Spring Kafka.

Avoid this

  • Catching, logging, and suppressing listener exceptions.
  • Committing offsets before the DB transaction succeeds.
  • Blindly replaying DLT messages forever.
  • Assuming REST + DB + Kafka can be made truly exactly-once.

Operational target

  • Transient failures retry automatically.
  • Poison messages go to DLT.
  • DLT replay is gated and rate-limited.
  • Outages pause consumption instead of amplifying failures.
Key principle A message is not complete until downstream enrichment, the database transaction, and offset acknowledgement have all completed safely.

Architecture

The recommended flow separates normal processing, failure recovery, and replay operations.

Kafka topicMain listenerREST enrichmentDB transactionAck offset
FailureRetryDLTControlled replayOriginal topic
ComponentResponsibilityReliability behavior
Main listenerConsumes original topicManual ack after successful processing
Error handlerRetries and recovers failed recordsPublishes exhausted records to DLT
ProcessorCalls enrichment services and DB writerThrows retryable/non-retryable exceptions
DB idempotency tableTracks processed event IDsPrevents duplicate side effects
DLT replayerRepublishes eligible DLT recordsStopped by default, gated, rate-limited
Circuit breaker pauserPauses/resumes consumer during outagesPrevents hot-loop failure storms

Gradle dependencies

This guide is updated for Spring Boot 4.1.0 and shows the Gradle setup only. Use Boot’s dependency management for Spring-managed artifacts; it manages spring-kafka 4.1.0, Apache Kafka clients 4.2.1, and the PostgreSQL JDBC driver 42.7.11. Add an explicit version for Resilience4j because it is not part of the Spring Boot BOM.

Verified dependency baseline: June 18, 2026
Spring Boot 4 starter note Prefer spring-boot-starter-webmvc for MVC REST services. The older spring-boot-starter-web artifact still exists, but it is deprecated in favor of spring-boot-starter-webmvc in Spring Boot 4.x.
LibraryVersion / sourceWhy it is used
Spring Boot4.1.0Application framework and dependency management.
Spring Kafka4.1.0, managed by BootKafka listeners, error handling, DLT publishing, pause/resume.
Apache Kafka clients4.2.1, managed by BootKafka consumer/producer protocol implementation.
Resilience4jresilience4j-spring-boot4:2.4.0Circuit breaker, retry, rate limiter, and health events around REST calls.
PostgreSQL JDBC42.7.11, managed by BootRuntime database driver.
build.gradle
plugins {
    id "java"
    id "org.springframework.boot" version "4.1.0"
    id "io.spring.dependency-management" version "1.1.7"
}

java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(21)
    }
}

dependencies {
    implementation "org.springframework.boot:spring-boot-starter-webmvc"
    implementation "org.springframework.boot:spring-boot-starter-data-jpa"
    implementation "org.springframework.boot:spring-boot-starter-kafka"

    // Resilience4j Spring Boot 4 integration for @Retry, @CircuitBreaker, RateLimiter, etc.
    implementation "io.github.resilience4j:resilience4j-spring-boot4:2.4.0"
    implementation "org.springframework.boot:spring-boot-starter-aop"
    implementation "org.springframework.boot:spring-boot-starter-actuator"

    runtimeOnly "org.postgresql:postgresql"

    testImplementation "org.springframework.boot:spring-boot-starter-test"
    testImplementation "org.springframework.boot:spring-boot-starter-kafka-test"
}

External documentation

Kafka configuration

Disable auto-commit, use manual ack mode, and configure serializers/deserializers defensively.

application.yml
spring:
  kafka:
    bootstrap-servers: localhost:9092

    consumer:
      group-id: order-event-worker
      enable-auto-commit: false
      auto-offset-reset: earliest
      max-poll-records: 10

      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer

      properties:
        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.json.trusted.packages: "com.example.events"
        spring.json.value.default.type: "com.example.events.OrderCreatedEvent"

    listener:
      ack-mode: manual_immediate
      type: single

    producer:
      acks: all
      retries: 2147483647
      properties:
        enable.idempotence: true
        delivery.timeout.ms: 120000
        request.timeout.ms: 30000
        max.in.flight.requests.per.connection: 5

resilience4j:
  retry:
    instances:
      enrichmentService:
        max-attempts: 3
        wait-duration: 300ms

  circuitbreaker:
    instances:
      enrichmentService:
        sliding-window-size: 50
        minimum-number-of-calls: 20
        failure-rate-threshold: 50
        wait-duration-in-open-state: 30s

  timelimiter:
    instances:
      enrichmentService:
        timeout-duration: 2s
ReliableKafkaConsumerConfig.java
package com.example.kafka;

import com.example.events.OrderCreatedEvent;
import com.example.service.NonRetryableEventException;
import com.example.service.RetryableEventProcessingException;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.dao.CannotAcquireLockException;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.dao.RecoverableDataAccessException;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.util.backoff.ExponentialBackOff;

import java.util.Map;

@Configuration
public class ReliableKafkaConsumerConfig {

    public static final String ORDERS_TOPIC = "orders.events";
    public static final String ORDERS_DLT = "orders.events.DLT";

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderCreatedEvent>
    reliableKafkaListenerContainerFactory(
            ConsumerFactory<String, OrderCreatedEvent> consumerFactory,
            DefaultErrorHandler defaultErrorHandler
    ) {
        var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderCreatedEvent>();

        factory.setConsumerFactory(consumerFactory);
        factory.setCommonErrorHandler(defaultErrorHandler);
        factory.setConcurrency(3);

        factory.getContainerProperties()
                .setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

        factory.getContainerProperties().setSyncCommits(true);

        return factory;
    }

    @Bean
    public DefaultErrorHandler defaultErrorHandler(
            KafkaTemplate<Object, Object> kafkaTemplate
    ) {
        var recoverer = new DeadLetterPublishingRecoverer(
                kafkaTemplate,
                (record, exception) ->
                        new TopicPartition(record.topic() + ".DLT", record.partition())
        );

        var backOff = new ExponentialBackOff();
        backOff.setInitialInterval(1_000L);
        backOff.setMultiplier(2.0);
        backOff.setMaxInterval(30_000L);
        backOff.setMaxElapsedTime(90_000L);

        var errorHandler = new DefaultErrorHandler(recoverer, backOff);

        errorHandler.setCommitRecovered(true);

        errorHandler.addRetryableExceptions(
                RetryableEventProcessingException.class,
                RecoverableDataAccessException.class,
                CannotAcquireLockException.class,
                QueryTimeoutException.class
        );

        errorHandler.addNotRetryableExceptions(
                NonRetryableEventException.class,
                IllegalArgumentException.class
        );

        return errorHandler;
    }

    @Bean
    public NewTopic ordersTopic() {
        return TopicBuilder.name(ORDERS_TOPIC)
                .partitions(6)
                .replicas(3)
                .configs(Map.of(
                        TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2"
                ))
                .build();
    }

    @Bean
    public NewTopic ordersDltTopic() {
        return TopicBuilder.name(ORDERS_DLT)
                .partitions(6)
                .replicas(3)
                .configs(Map.of(
                        TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2",
                        TopicConfig.RETENTION_MS_CONFIG,
                        String.valueOf(14L * 24 * 60 * 60 * 1000)
                ))
                .build();
    }
}

Reliable listener code

The listener should not catch and suppress exceptions. If processing fails, the exception must reach Spring Kafka’s error handler.

OrderEventListener.java
package com.example.kafka;

import com.example.events.OrderCreatedEvent;
import com.example.service.OrderEventProcessor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class OrderEventListener {

    private final OrderEventProcessor processor;

    @KafkaListener(
            id = "orders-main-listener",
            topics = ReliableKafkaConsumerConfig.ORDERS_TOPIC,
            groupId = "order-event-worker",
            containerFactory = "reliableKafkaListenerContainerFactory"
    )
    public void onMessage(
            ConsumerRecord<String, OrderCreatedEvent> record,
            Acknowledgment ack
    ) {
        log.info(
                "Received order event. topic={}, partition={}, offset={}, key={}",
                record.topic(),
                record.partition(),
                record.offset(),
                record.key()
        );

        processor.process(record.key(), record.value());

        ack.acknowledge();

        log.info(
                "Acknowledged order event. topic={}, partition={}, offset={}, key={}",
                record.topic(),
                record.partition(),
                record.offset(),
                record.key()
        );
    }
}
Danger pattern Do not wrap the processor in try/catch and then return normally. Returning normally allows the offset to be acknowledged and can drop the message on the floor.

Processing flow

The processor validates the event, calls both enrichment services, and performs one idempotent database write.

OrderEventProcessor.java
package com.example.service;

import com.example.client.CustomerClient;
import com.example.client.RiskClient;
import com.example.events.OrderCreatedEvent;
import com.example.persistence.OrderProjectionWriter;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class OrderEventProcessor {

    private final CustomerClient customerClient;
    private final RiskClient riskClient;
    private final OrderProjectionWriter projectionWriter;

    public void process(String kafkaKey, OrderCreatedEvent event) {
        validate(event);

        var customer = customerClient.getCustomer(event.customerId());
        var risk = riskClient.getRiskScore(event.orderId());

        projectionWriter.writeIfNotAlreadyProcessed(kafkaKey, event, customer, risk);
    }

    private void validate(OrderCreatedEvent event) {
        if (event == null) {
            throw new NonRetryableEventException("Event payload is null");
        }

        if (event.eventId() == null || event.eventId().isBlank()) {
            throw new NonRetryableEventException("Missing eventId");
        }

        if (event.orderId() == null || event.customerId() == null) {
            throw new NonRetryableEventException(
                    "Missing required orderId or customerId for eventId=" + event.eventId()
            );
        }
    }
}
REST clients with Resilience4j
@Component
@RequiredArgsConstructor
public class CustomerClient {

    private final RestClient restClient;

    @Retry(name = "enrichmentService")
    @CircuitBreaker(name = "customerService")
    public CustomerDto getCustomer(Long customerId) {
        try {
            return restClient.get()
                    .uri("http://customer-service/customers/{id}", customerId)
                    .retrieve()
                    .onStatus(HttpStatusCode::is5xxServerError, (request, response) -> {
                        throw new RetryableEventProcessingException(
                                "Customer service 5xx for customerId=" + customerId
                        );
                    })
                    .onStatus(HttpStatusCode::is4xxClientError, (request, response) -> {
                        throw new NonRetryableEventException(
                                "Customer request rejected for customerId=" + customerId
                        );
                    })
                    .body(CustomerDto.class);
        } catch (RetryableEventProcessingException | NonRetryableEventException e) {
            throw e;
        } catch (Exception e) {
            throw new RetryableEventProcessingException(
                    "Failed calling customer service for customerId=" + customerId,
                    e
            );
        }
    }
}

@Component
@RequiredArgsConstructor
public class RiskClient {

    private final RestClient restClient;

    @Retry(name = "enrichmentService")
    @CircuitBreaker(name = "riskService")
    public RiskDto getRiskScore(Long orderId) {
        try {
            return restClient.get()
                    .uri("http://risk-service/risks/orders/{orderId}", orderId)
                    .retrieve()
                    .onStatus(HttpStatusCode::isError, (request, response) -> {
                        throw new RetryableEventProcessingException(
                                "Risk service error for orderId=" + orderId
                        );
                    })
                    .body(RiskDto.class);
        } catch (RetryableEventProcessingException e) {
            throw e;
        } catch (Exception e) {
            throw new RetryableEventProcessingException(
                    "Failed calling risk service for orderId=" + orderId,
                    e
            );
        }
    }
}

Idempotent database writes

You cannot assume Kafka will deliver each event only once. If the application crashes after committing the database transaction but before committing the Kafka offset, Kafka may redeliver the event. The database write must therefore be idempotent.

OrderProjectionWriter.java
package com.example.persistence;

import com.example.client.CustomerDto;
import com.example.client.RiskDto;
import com.example.events.OrderCreatedEvent;
import lombok.RequiredArgsConstructor;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;

import java.time.OffsetDateTime;

@Repository
@RequiredArgsConstructor
public class OrderProjectionWriter {

    private final JdbcTemplate jdbcTemplate;

    @Transactional
    public void writeIfNotAlreadyProcessed(
            String kafkaKey,
            OrderCreatedEvent event,
            CustomerDto customer,
            RiskDto risk
    ) {
        int inserted = jdbcTemplate.update("""
                insert into processed_kafka_message (
                    consumer_group,
                    event_id,
                    kafka_key,
                    processed_at
                )
                values (?, ?, ?, ?)
                on conflict (consumer_group, event_id) do nothing
                """,
                "order-event-worker",
                event.eventId(),
                kafkaKey,
                OffsetDateTime.now()
        );

        if (inserted == 0) {
            return;
        }

        jdbcTemplate.update("""
                insert into order_projection (
                    order_id,
                    customer_id,
                    customer_tier,
                    risk_score,
                    created_at
                )
                values (?, ?, ?, ?, ?)
                on conflict (order_id) do update set
                    customer_id = excluded.customer_id,
                    customer_tier = excluded.customer_tier,
                    risk_score = excluded.risk_score
                """,
                event.orderId(),
                event.customerId(),
                customer.tier(),
                risk.score(),
                OffsetDateTime.now()
        );
    }
}
PostgreSQL DDL
create table processed_kafka_message (
    consumer_group varchar(200) not null,
    event_id varchar(200) not null,
    kafka_key varchar(500),
    processed_at timestamptz not null,
    primary key (consumer_group, event_id)
);

create table order_projection (
    order_id bigint primary key,
    customer_id bigint not null,
    customer_tier varchar(100),
    risk_score numeric,
    created_at timestamptz not null
);

Error classification

Classify failures so transient problems are retried and bad data is not retried forever.

Exception types
package com.example.service;

public class RetryableEventProcessingException extends RuntimeException {

    public RetryableEventProcessingException(String message) {
        super(message);
    }

    public RetryableEventProcessingException(String message, Throwable cause) {
        super(message, cause);
    }
}

package com.example.service;

public class NonRetryableEventException extends RuntimeException {

    public NonRetryableEventException(String message) {
        super(message);
    }

    public NonRetryableEventException(String message, Throwable cause) {
        super(message, cause);
    }
}
Failure typeRecommended classificationReason
HTTP 500/503, timeout, connection resetRetryableDependency may recover.
DB connection pool exhausted, lock timeoutRetryableInfrastructure or contention may clear.
Missing required event fieldsNon-retryableRetry will not fix malformed data.
Deserialization/schema mismatchNon-retryableUsually needs data/code/schema fix.
Duplicate eventSuccessIdempotency table should treat it as already processed.

Dead-letter topic handling

A DLT is not where messages go to disappear. Treat it as an operational queue with metrics, alerts, incident tracking, and a replay path.

OrderEventDltListener.java
package com.example.kafka;

import com.example.events.OrderCreatedEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class OrderEventDltListener {

    @KafkaListener(
            topics = ReliableKafkaConsumerConfig.ORDERS_DLT,
            groupId = "order-event-dlt-monitor"
    )
    public void onDeadLetter(ConsumerRecord<String, OrderCreatedEvent> record) {
        log.error(
                "Order event landed in DLT. topic={}, partition={}, offset={}, key={}, value={}",
                record.topic(),
                record.partition(),
                record.offset(),
                record.key(),
                record.value()
        );

        // Production behavior:
        // - write DLT metadata to an incident table
        // - emit metrics and alerts
        // - expose replay tooling after fixing root cause
    }
}

DLT replay

Prefer republishing DLT records to the original topic rather than directly invoking the processor. This keeps one processing path, one retry policy, one idempotency strategy, and one set of metrics.

Replay rule The DLT offset should be acknowledged only after the replay record has been successfully published back to Kafka or safely moved to a parking-lot topic.
OrderDltReplayListener.java
package com.example.kafka;

import com.example.events.OrderCreatedEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
@RequiredArgsConstructor
public class OrderDltReplayListener {

    private static final String REPLAY_ATTEMPT_HEADER = "x-replay-attempt";
    private static final String REPLAYED_FROM_DLT_HEADER = "x-replayed-from-dlt";

    private static final Set<String> DLT_HEADERS_TO_STRIP = Set.of(
            KafkaHeaders.DLT_EXCEPTION_FQCN,
            KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN,
            KafkaHeaders.DLT_EXCEPTION_STACKTRACE,
            KafkaHeaders.DLT_EXCEPTION_MESSAGE,
            KafkaHeaders.DLT_KEY_EXCEPTION_FQCN,
            KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE,
            KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE
    );

    private final KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate;

    @Value("${app.kafka.dlt-replay.max-attempts:3}")
    private int maxReplayAttempts;

    @Value("${app.kafka.dlt-replay.send-timeout-seconds:10}")
    private int sendTimeoutSeconds;

    @KafkaListener(
            id = "orders-dlt-replayer",
            topics = ReliableKafkaConsumerConfig.ORDERS_DLT,
            groupId = "order-event-dlt-replayer",
            containerFactory = "reliableKafkaListenerContainerFactory",
            autoStartup = "false"
    )
    public void replay(
            ConsumerRecord<String, OrderCreatedEvent> dltRecord,
            Acknowledgment ack
    ) throws Exception {
        String originalTopic = originalTopic(dltRecord);
        int replayAttempt = replayAttempt(dltRecord);

        if (replayAttempt >= maxReplayAttempts || !isReplayable(dltRecord)) {
            publishToParkingLot(dltRecord, replayAttempt);
            ack.acknowledge();
            return;
        }

        ProducerRecord<String, OrderCreatedEvent> replayRecord =
                new ProducerRecord<>(
                        originalTopic,
                        null,
                        dltRecord.key(),
                        dltRecord.value(),
                        replayHeaders(dltRecord, replayAttempt + 1)
                );

        kafkaTemplate.send(replayRecord)
                .get(sendTimeoutSeconds, TimeUnit.SECONDS);

        ack.acknowledge();

        log.info(
                "Replayed DLT record. dltTopic={}, dltOffset={}, originalTopic={}, key={}, replayAttempt={}",
                dltRecord.topic(),
                dltRecord.offset(),
                originalTopic,
                dltRecord.key(),
                replayAttempt + 1
        );
    }

    private boolean isReplayable(ConsumerRecord<String, OrderCreatedEvent> record) {
        String exceptionClass = headerAsString(record, KafkaHeaders.DLT_EXCEPTION_FQCN);

        if (exceptionClass == null) {
            return true;
        }

        return !exceptionClass.contains("NonRetryableEventException")
                && !exceptionClass.contains("DeserializationException")
                && !exceptionClass.contains("MessageConversionException")
                && !exceptionClass.contains("ClassCastException");
    }

    private void publishToParkingLot(
            ConsumerRecord<String, OrderCreatedEvent> dltRecord,
            int replayAttempt
    ) throws Exception {
        String parkingLotTopic = ReliableKafkaConsumerConfig.ORDERS_TOPIC + ".parking-lot";

        ProducerRecord<String, OrderCreatedEvent> parkingLotRecord =
                new ProducerRecord<>(
                        parkingLotTopic,
                        dltRecord.key(),
                        dltRecord.value()
                );

        dltRecord.headers().forEach(header -> parkingLotRecord.headers().add(header));
        parkingLotRecord.headers().add(
                REPLAY_ATTEMPT_HEADER,
                String.valueOf(replayAttempt).getBytes(StandardCharsets.UTF_8)
        );

        kafkaTemplate.send(parkingLotRecord)
                .get(sendTimeoutSeconds, TimeUnit.SECONDS);
    }

    private RecordHeaders replayHeaders(
            ConsumerRecord<String, OrderCreatedEvent> dltRecord,
            int replayAttempt
    ) {
        RecordHeaders headers = new RecordHeaders();

        for (Header header : dltRecord.headers()) {
            if (!DLT_HEADERS_TO_STRIP.contains(header.key())
                    && !REPLAY_ATTEMPT_HEADER.equals(header.key())) {
                headers.add(header);
            }
        }

        headers.add(REPLAY_ATTEMPT_HEADER,
                String.valueOf(replayAttempt).getBytes(StandardCharsets.UTF_8));
        headers.add(REPLAYED_FROM_DLT_HEADER,
                "true".getBytes(StandardCharsets.UTF_8));
        headers.add("x-original-dlt-topic",
                dltRecord.topic().getBytes(StandardCharsets.UTF_8));
        headers.add("x-original-dlt-offset",
                String.valueOf(dltRecord.offset()).getBytes(StandardCharsets.UTF_8));

        return headers;
    }

    private String originalTopic(ConsumerRecord<String, OrderCreatedEvent> record) {
        String originalTopic = headerAsString(record, KafkaHeaders.DLT_ORIGINAL_TOPIC);

        if (originalTopic != null && !originalTopic.isBlank()) {
            return originalTopic;
        }

        if (record.topic().endsWith(".DLT")) {
            return record.topic().substring(0, record.topic().length() - ".DLT".length());
        }

        throw new IllegalStateException(
                "Cannot determine original topic for DLT record from topic=" + record.topic()
        );
    }

    private int replayAttempt(ConsumerRecord<String, OrderCreatedEvent> record) {
        String attempt = headerAsString(record, REPLAY_ATTEMPT_HEADER);
        return attempt == null ? 0 : Integer.parseInt(attempt);
    }

    private String headerAsString(
            ConsumerRecord<String, OrderCreatedEvent> record,
            String headerName
    ) {
        Header header = record.headers().lastHeader(headerName);
        if (header == null || header.value() == null) {
            return null;
        }
        return new String(header.value(), StandardCharsets.UTF_8);
    }
}
DltReplayController.java
package com.example.ops;

import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/ops/kafka/dlt-replay")
@RequiredArgsConstructor
public class DltReplayController {

    private static final String DLT_REPLAYER_ID = "orders-dlt-replayer";

    private final KafkaListenerEndpointRegistry registry;

    @PostMapping("/start")
    public ResponseEntity<String> startReplay() {
        MessageListenerContainer container = registry.getListenerContainer(DLT_REPLAYER_ID);
        if (container == null) {
            return ResponseEntity.notFound().build();
        }
        if (!container.isRunning()) {
            container.start();
        }
        container.resume();
        return ResponseEntity.accepted().body("DLT replay started for " + DLT_REPLAYER_ID);
    }

    @PostMapping("/pause")
    public ResponseEntity<String> pauseReplay() {
        MessageListenerContainer container = registry.getListenerContainer(DLT_REPLAYER_ID);
        if (container == null) {
            return ResponseEntity.notFound().build();
        }
        container.pause();
        return ResponseEntity.accepted().body("DLT replay paused for " + DLT_REPLAYER_ID);
    }

    @PostMapping("/stop")
    public ResponseEntity<String> stopReplay() {
        MessageListenerContainer container = registry.getListenerContainer(DLT_REPLAYER_ID);
        if (container == null) {
            return ResponseEntity.notFound().build();
        }
        container.stop();
        return ResponseEntity.accepted().body("DLT replay stopped for " + DLT_REPLAYER_ID);
    }
}

Automation guardrails

DLT replay can be automated, but automate the decision gate rather than blindly replaying every failed message.

Replay only when

  • Customer service circuit breaker is closed.
  • Risk service circuit breaker is closed.
  • Database health is up.
  • Main consumer error rate is below threshold.
  • Replay attempt count is below the maximum.
  • The exception type is replayable.
  • Replay is rate-limited.

Do not auto-replay

  • Deserialization errors.
  • Validation errors.
  • Unknown schema errors.
  • Messages that exceeded max replay attempts.
  • Failures whose root cause is still unknown.

Use a parking lot

  • For non-replayable DLT records.
  • For replay-exhausted records.
  • For records that need manual correction.
  • For auditability and incident review.
Simple local replay rate limiter
@Configuration
public class DltReplayRateLimitConfig {

    @Bean
    public RateLimiter dltReplayRateLimiter() {
        return RateLimiterRegistry.ofDefaults()
                .rateLimiter(
                        "dltReplay",
                        io.github.resilience4j.ratelimiter.RateLimiterConfig.custom()
                                .limitForPeriod(25)
                                .limitRefreshPeriod(Duration.ofSeconds(1))
                                .timeoutDuration(Duration.ofSeconds(5))
                                .build()
                );
    }
}

private final RateLimiter dltReplayRateLimiter;

private void waitForReplayPermit() {
    boolean permitted = dltReplayRateLimiter.acquirePermission();
    if (!permitted) {
        throw new IllegalStateException("Could not acquire DLT replay permit");
    }
}

Circuit breakers and pausing the consumer

Pause the main consumer for systemic downstream failures. Do not pause for a single poison message; send that message to the DLT instead.

SituationActionWhy
Customer service outagePause consumerPrevents retry storms and DLT floods.
Risk service outagePause consumerDependency is systemically unavailable.
Database unavailablePause consumerNo durable write can succeed.
One malformed eventDLTPausing would block healthy events behind a bad one.
One duplicate eventAcknowledgeIdempotency table converts it to success.
Resilience4j circuit breaker config
resilience4j:
  circuitbreaker:
    instances:
      customerService:
        sliding-window-type: COUNT_BASED
        sliding-window-size: 50
        minimum-number-of-calls: 20
        failure-rate-threshold: 50
        slow-call-rate-threshold: 50
        slow-call-duration-threshold: 2s
        wait-duration-in-open-state: 30s
        permitted-number-of-calls-in-half-open-state: 3
        automatic-transition-from-open-to-half-open-enabled: true

      riskService:
        sliding-window-type: COUNT_BASED
        sliding-window-size: 50
        minimum-number-of-calls: 20
        failure-rate-threshold: 50
        slow-call-rate-threshold: 50
        slow-call-duration-threshold: 2s
        wait-duration-in-open-state: 30s
        permitted-number-of-calls-in-half-open-state: 3
        automatic-transition-from-open-to-half-open-enabled: true
KafkaCircuitBreakerPauser.java
package com.example.kafka;

import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaCircuitBreakerPauser {

    private static final String MAIN_LISTENER_ID = "orders-main-listener";

    private final KafkaListenerEndpointRegistry registry;
    private final CircuitBreakerRegistry circuitBreakerRegistry;

    private final AtomicBoolean paused = new AtomicBoolean(false);

    private final List<String> dependencyBreakers = List.of(
            "customerService",
            "riskService"
    );

    @PostConstruct
    public void registerCircuitBreakerListeners() {
        for (String breakerName : dependencyBreakers) {
            CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker(breakerName);

            circuitBreaker.getEventPublisher()
                    .onStateTransition(event -> {
                        log.warn(
                                "Circuit breaker transition. breaker={}, transition={}",
                                breakerName,
                                event.getStateTransition()
                        );

                        reconcileKafkaConsumerState();
                    });
        }
    }

    public void reconcileKafkaConsumerState() {
        if (anyDependencyOpen()) {
            pauseMainConsumer();
        } else if (allDependenciesClosed()) {
            resumeMainConsumer();
        } else {
            log.info("Not resuming Kafka consumer yet; at least one dependency is HALF_OPEN");
        }
    }

    private boolean anyDependencyOpen() {
        return dependencyBreakers.stream()
                .map(name -> circuitBreakerRegistry.circuitBreaker(name))
                .map(CircuitBreaker::getState)
                .anyMatch(state ->
                        state == CircuitBreaker.State.OPEN
                                || state == CircuitBreaker.State.FORCED_OPEN
                );
    }

    private boolean allDependenciesClosed() {
        return dependencyBreakers.stream()
                .map(name -> circuitBreakerRegistry.circuitBreaker(name))
                .map(CircuitBreaker::getState)
                .allMatch(state -> state == CircuitBreaker.State.CLOSED);
    }

    private void pauseMainConsumer() {
        MessageListenerContainer container = registry.getListenerContainer(MAIN_LISTENER_ID);
        if (container == null) {
            log.error("Could not find Kafka listener container {}", MAIN_LISTENER_ID);
            return;
        }

        if (paused.compareAndSet(false, true)) {
            log.warn("Pausing Kafka listener {}", MAIN_LISTENER_ID);
            container.pause();
        }
    }

    private void resumeMainConsumer() {
        MessageListenerContainer container = registry.getListenerContainer(MAIN_LISTENER_ID);
        if (container == null) {
            log.error("Could not find Kafka listener container {}", MAIN_LISTENER_ID);
            return;
        }

        if (paused.compareAndSet(true, false)) {
            log.warn("Resuming Kafka listener {}", MAIN_LISTENER_ID);
            container.resume();
        }
    }
}

Production checklist

Kafka

  • enable-auto-commit=false.
  • MANUAL_IMMEDIATE ack mode.
  • DLT has at least as many partitions as the source topic.
  • Producer uses acks=all and idempotence.
  • Monitor consumer lag and DLT growth.

Application

  • Listener throws on failure.
  • REST calls have timeouts.
  • Exceptions are classified.
  • DB writes are idempotent.
  • Duplicate events are treated as success.

Operations

  • DLT alerts exist.
  • Replay is stopped by default.
  • Replay is gated and rate-limited.
  • Parking-lot topic exists.
  • Runbook documents when to pause/resume.

Failure behavior matrix

FailureExpected behavior
Customer REST call failsListener throws; Spring Kafka retries; no offset ack.
Risk REST call failsListener throws; Spring Kafka retries; no offset ack.
DB write failsTransaction rolls back; listener throws; no offset ack.
Retries exhaustedRecord is published to DLT; recovered offset is committed.
DLT publishing failsError handler retries instead of silently skipping.
App crashes after DB commit before Kafka ackKafka redelivers; idempotency table detects duplicate; listener acknowledges safely.
Bad payload or validation failureSent to DLT or parking lot without wasting transient retries.