Reliable Event Listener How-To Guide
A practical implementation guide for Kafka consumers that call downstream REST services, write to a database, avoid silent message loss, recover from transient failures, and replay dead-lettered events safely.
Executive summary
The safest baseline is a manually acknowledged Kafka listener that throws on failure, retries transient errors, sends exhausted records to a dead-letter topic, and uses idempotent database writes so duplicate delivery is safe.
Do this
- Disable Kafka auto-commit.
- Use
MANUAL_IMMEDIATEack mode. - Call
ack.acknowledge()only after durable success. - Let exceptions bubble up to Spring Kafka.
Avoid this
- Catching, logging, and suppressing listener exceptions.
- Committing offsets before the DB transaction succeeds.
- Blindly replaying DLT messages forever.
- Assuming REST + DB + Kafka can be made truly exactly-once.
Operational target
- Transient failures retry automatically.
- Poison messages go to DLT.
- DLT replay is gated and rate-limited.
- Outages pause consumption instead of amplifying failures.
Architecture
The recommended flow separates normal processing, failure recovery, and replay operations.
| Component | Responsibility | Reliability behavior |
|---|---|---|
| Main listener | Consumes original topic | Manual ack after successful processing |
| Error handler | Retries and recovers failed records | Publishes exhausted records to DLT |
| Processor | Calls enrichment services and DB writer | Throws retryable/non-retryable exceptions |
| DB idempotency table | Tracks processed event IDs | Prevents duplicate side effects |
| DLT replayer | Republishes eligible DLT records | Stopped by default, gated, rate-limited |
| Circuit breaker pauser | Pauses/resumes consumer during outages | Prevents hot-loop failure storms |
Gradle dependencies
This guide is updated for Spring Boot 4.1.0 and shows the Gradle setup only. Use Boot’s dependency management for Spring-managed artifacts;
it manages spring-kafka 4.1.0, Apache Kafka clients 4.2.1, and the PostgreSQL JDBC driver
42.7.11. Add an explicit version for Resilience4j because it is not part of the Spring Boot BOM.
spring-boot-starter-webmvc for MVC REST services. The older spring-boot-starter-web artifact still exists,
but it is deprecated in favor of spring-boot-starter-webmvc in Spring Boot 4.x.
| Library | Version / source | Why it is used |
|---|---|---|
| Spring Boot | 4.1.0 | Application framework and dependency management. |
| Spring Kafka | 4.1.0, managed by Boot | Kafka listeners, error handling, DLT publishing, pause/resume. |
| Apache Kafka clients | 4.2.1, managed by Boot | Kafka consumer/producer protocol implementation. |
| Resilience4j | resilience4j-spring-boot4:2.4.0 | Circuit breaker, retry, rate limiter, and health events around REST calls. |
| PostgreSQL JDBC | 42.7.11, managed by Boot | Runtime database driver. |
plugins {
id "java"
id "org.springframework.boot" version "4.1.0"
id "io.spring.dependency-management" version "1.1.7"
}
java {
toolchain {
languageVersion = JavaLanguageVersion.of(21)
}
}
dependencies {
implementation "org.springframework.boot:spring-boot-starter-webmvc"
implementation "org.springframework.boot:spring-boot-starter-data-jpa"
implementation "org.springframework.boot:spring-boot-starter-kafka"
// Resilience4j Spring Boot 4 integration for @Retry, @CircuitBreaker, RateLimiter, etc.
implementation "io.github.resilience4j:resilience4j-spring-boot4:2.4.0"
implementation "org.springframework.boot:spring-boot-starter-aop"
implementation "org.springframework.boot:spring-boot-starter-actuator"
runtimeOnly "org.postgresql:postgresql"
testImplementation "org.springframework.boot:spring-boot-starter-test"
testImplementation "org.springframework.boot:spring-boot-starter-kafka-test"
}
External documentation
Kafka configuration
Disable auto-commit, use manual ack mode, and configure serializers/deserializers defensively.
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: order-event-worker
enable-auto-commit: false
auto-offset-reset: earliest
max-poll-records: 10
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
spring.json.trusted.packages: "com.example.events"
spring.json.value.default.type: "com.example.events.OrderCreatedEvent"
listener:
ack-mode: manual_immediate
type: single
producer:
acks: all
retries: 2147483647
properties:
enable.idempotence: true
delivery.timeout.ms: 120000
request.timeout.ms: 30000
max.in.flight.requests.per.connection: 5
resilience4j:
retry:
instances:
enrichmentService:
max-attempts: 3
wait-duration: 300ms
circuitbreaker:
instances:
enrichmentService:
sliding-window-size: 50
minimum-number-of-calls: 20
failure-rate-threshold: 50
wait-duration-in-open-state: 30s
timelimiter:
instances:
enrichmentService:
timeout-duration: 2s
package com.example.kafka;
import com.example.events.OrderCreatedEvent;
import com.example.service.NonRetryableEventException;
import com.example.service.RetryableEventProcessingException;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.dao.CannotAcquireLockException;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.dao.RecoverableDataAccessException;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.util.backoff.ExponentialBackOff;
import java.util.Map;
@Configuration
public class ReliableKafkaConsumerConfig {
public static final String ORDERS_TOPIC = "orders.events";
public static final String ORDERS_DLT = "orders.events.DLT";
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderCreatedEvent>
reliableKafkaListenerContainerFactory(
ConsumerFactory<String, OrderCreatedEvent> consumerFactory,
DefaultErrorHandler defaultErrorHandler
) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderCreatedEvent>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(defaultErrorHandler);
factory.setConcurrency(3);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setSyncCommits(true);
return factory;
}
@Bean
public DefaultErrorHandler defaultErrorHandler(
KafkaTemplate<Object, Object> kafkaTemplate
) {
var recoverer = new DeadLetterPublishingRecoverer(
kafkaTemplate,
(record, exception) ->
new TopicPartition(record.topic() + ".DLT", record.partition())
);
var backOff = new ExponentialBackOff();
backOff.setInitialInterval(1_000L);
backOff.setMultiplier(2.0);
backOff.setMaxInterval(30_000L);
backOff.setMaxElapsedTime(90_000L);
var errorHandler = new DefaultErrorHandler(recoverer, backOff);
errorHandler.setCommitRecovered(true);
errorHandler.addRetryableExceptions(
RetryableEventProcessingException.class,
RecoverableDataAccessException.class,
CannotAcquireLockException.class,
QueryTimeoutException.class
);
errorHandler.addNotRetryableExceptions(
NonRetryableEventException.class,
IllegalArgumentException.class
);
return errorHandler;
}
@Bean
public NewTopic ordersTopic() {
return TopicBuilder.name(ORDERS_TOPIC)
.partitions(6)
.replicas(3)
.configs(Map.of(
TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2"
))
.build();
}
@Bean
public NewTopic ordersDltTopic() {
return TopicBuilder.name(ORDERS_DLT)
.partitions(6)
.replicas(3)
.configs(Map.of(
TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2",
TopicConfig.RETENTION_MS_CONFIG,
String.valueOf(14L * 24 * 60 * 60 * 1000)
))
.build();
}
}
Reliable listener code
The listener should not catch and suppress exceptions. If processing fails, the exception must reach Spring Kafka’s error handler.
package com.example.kafka;
import com.example.events.OrderCreatedEvent;
import com.example.service.OrderEventProcessor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderEventListener {
private final OrderEventProcessor processor;
@KafkaListener(
id = "orders-main-listener",
topics = ReliableKafkaConsumerConfig.ORDERS_TOPIC,
groupId = "order-event-worker",
containerFactory = "reliableKafkaListenerContainerFactory"
)
public void onMessage(
ConsumerRecord<String, OrderCreatedEvent> record,
Acknowledgment ack
) {
log.info(
"Received order event. topic={}, partition={}, offset={}, key={}",
record.topic(),
record.partition(),
record.offset(),
record.key()
);
processor.process(record.key(), record.value());
ack.acknowledge();
log.info(
"Acknowledged order event. topic={}, partition={}, offset={}, key={}",
record.topic(),
record.partition(),
record.offset(),
record.key()
);
}
}
try/catch and then return normally. Returning normally allows the offset to be acknowledged and can drop the message on the floor.
Processing flow
The processor validates the event, calls both enrichment services, and performs one idempotent database write.
package com.example.service;
import com.example.client.CustomerClient;
import com.example.client.RiskClient;
import com.example.events.OrderCreatedEvent;
import com.example.persistence.OrderProjectionWriter;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class OrderEventProcessor {
private final CustomerClient customerClient;
private final RiskClient riskClient;
private final OrderProjectionWriter projectionWriter;
public void process(String kafkaKey, OrderCreatedEvent event) {
validate(event);
var customer = customerClient.getCustomer(event.customerId());
var risk = riskClient.getRiskScore(event.orderId());
projectionWriter.writeIfNotAlreadyProcessed(kafkaKey, event, customer, risk);
}
private void validate(OrderCreatedEvent event) {
if (event == null) {
throw new NonRetryableEventException("Event payload is null");
}
if (event.eventId() == null || event.eventId().isBlank()) {
throw new NonRetryableEventException("Missing eventId");
}
if (event.orderId() == null || event.customerId() == null) {
throw new NonRetryableEventException(
"Missing required orderId or customerId for eventId=" + event.eventId()
);
}
}
}
@Component
@RequiredArgsConstructor
public class CustomerClient {
private final RestClient restClient;
@Retry(name = "enrichmentService")
@CircuitBreaker(name = "customerService")
public CustomerDto getCustomer(Long customerId) {
try {
return restClient.get()
.uri("http://customer-service/customers/{id}", customerId)
.retrieve()
.onStatus(HttpStatusCode::is5xxServerError, (request, response) -> {
throw new RetryableEventProcessingException(
"Customer service 5xx for customerId=" + customerId
);
})
.onStatus(HttpStatusCode::is4xxClientError, (request, response) -> {
throw new NonRetryableEventException(
"Customer request rejected for customerId=" + customerId
);
})
.body(CustomerDto.class);
} catch (RetryableEventProcessingException | NonRetryableEventException e) {
throw e;
} catch (Exception e) {
throw new RetryableEventProcessingException(
"Failed calling customer service for customerId=" + customerId,
e
);
}
}
}
@Component
@RequiredArgsConstructor
public class RiskClient {
private final RestClient restClient;
@Retry(name = "enrichmentService")
@CircuitBreaker(name = "riskService")
public RiskDto getRiskScore(Long orderId) {
try {
return restClient.get()
.uri("http://risk-service/risks/orders/{orderId}", orderId)
.retrieve()
.onStatus(HttpStatusCode::isError, (request, response) -> {
throw new RetryableEventProcessingException(
"Risk service error for orderId=" + orderId
);
})
.body(RiskDto.class);
} catch (RetryableEventProcessingException e) {
throw e;
} catch (Exception e) {
throw new RetryableEventProcessingException(
"Failed calling risk service for orderId=" + orderId,
e
);
}
}
}
Idempotent database writes
You cannot assume Kafka will deliver each event only once. If the application crashes after committing the database transaction but before committing the Kafka offset, Kafka may redeliver the event. The database write must therefore be idempotent.
package com.example.persistence;
import com.example.client.CustomerDto;
import com.example.client.RiskDto;
import com.example.events.OrderCreatedEvent;
import lombok.RequiredArgsConstructor;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import java.time.OffsetDateTime;
@Repository
@RequiredArgsConstructor
public class OrderProjectionWriter {
private final JdbcTemplate jdbcTemplate;
@Transactional
public void writeIfNotAlreadyProcessed(
String kafkaKey,
OrderCreatedEvent event,
CustomerDto customer,
RiskDto risk
) {
int inserted = jdbcTemplate.update("""
insert into processed_kafka_message (
consumer_group,
event_id,
kafka_key,
processed_at
)
values (?, ?, ?, ?)
on conflict (consumer_group, event_id) do nothing
""",
"order-event-worker",
event.eventId(),
kafkaKey,
OffsetDateTime.now()
);
if (inserted == 0) {
return;
}
jdbcTemplate.update("""
insert into order_projection (
order_id,
customer_id,
customer_tier,
risk_score,
created_at
)
values (?, ?, ?, ?, ?)
on conflict (order_id) do update set
customer_id = excluded.customer_id,
customer_tier = excluded.customer_tier,
risk_score = excluded.risk_score
""",
event.orderId(),
event.customerId(),
customer.tier(),
risk.score(),
OffsetDateTime.now()
);
}
}
create table processed_kafka_message (
consumer_group varchar(200) not null,
event_id varchar(200) not null,
kafka_key varchar(500),
processed_at timestamptz not null,
primary key (consumer_group, event_id)
);
create table order_projection (
order_id bigint primary key,
customer_id bigint not null,
customer_tier varchar(100),
risk_score numeric,
created_at timestamptz not null
);
Error classification
Classify failures so transient problems are retried and bad data is not retried forever.
package com.example.service;
public class RetryableEventProcessingException extends RuntimeException {
public RetryableEventProcessingException(String message) {
super(message);
}
public RetryableEventProcessingException(String message, Throwable cause) {
super(message, cause);
}
}
package com.example.service;
public class NonRetryableEventException extends RuntimeException {
public NonRetryableEventException(String message) {
super(message);
}
public NonRetryableEventException(String message, Throwable cause) {
super(message, cause);
}
}
| Failure type | Recommended classification | Reason |
|---|---|---|
| HTTP 500/503, timeout, connection reset | Retryable | Dependency may recover. |
| DB connection pool exhausted, lock timeout | Retryable | Infrastructure or contention may clear. |
| Missing required event fields | Non-retryable | Retry will not fix malformed data. |
| Deserialization/schema mismatch | Non-retryable | Usually needs data/code/schema fix. |
| Duplicate event | Success | Idempotency table should treat it as already processed. |
Dead-letter topic handling
A DLT is not where messages go to disappear. Treat it as an operational queue with metrics, alerts, incident tracking, and a replay path.
package com.example.kafka;
import com.example.events.OrderCreatedEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class OrderEventDltListener {
@KafkaListener(
topics = ReliableKafkaConsumerConfig.ORDERS_DLT,
groupId = "order-event-dlt-monitor"
)
public void onDeadLetter(ConsumerRecord<String, OrderCreatedEvent> record) {
log.error(
"Order event landed in DLT. topic={}, partition={}, offset={}, key={}, value={}",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value()
);
// Production behavior:
// - write DLT metadata to an incident table
// - emit metrics and alerts
// - expose replay tooling after fixing root cause
}
}
DLT replay
Prefer republishing DLT records to the original topic rather than directly invoking the processor. This keeps one processing path, one retry policy, one idempotency strategy, and one set of metrics.
package com.example.kafka;
import com.example.events.OrderCreatedEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderDltReplayListener {
private static final String REPLAY_ATTEMPT_HEADER = "x-replay-attempt";
private static final String REPLAYED_FROM_DLT_HEADER = "x-replayed-from-dlt";
private static final Set<String> DLT_HEADERS_TO_STRIP = Set.of(
KafkaHeaders.DLT_EXCEPTION_FQCN,
KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN,
KafkaHeaders.DLT_EXCEPTION_STACKTRACE,
KafkaHeaders.DLT_EXCEPTION_MESSAGE,
KafkaHeaders.DLT_KEY_EXCEPTION_FQCN,
KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE,
KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE
);
private final KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate;
@Value("${app.kafka.dlt-replay.max-attempts:3}")
private int maxReplayAttempts;
@Value("${app.kafka.dlt-replay.send-timeout-seconds:10}")
private int sendTimeoutSeconds;
@KafkaListener(
id = "orders-dlt-replayer",
topics = ReliableKafkaConsumerConfig.ORDERS_DLT,
groupId = "order-event-dlt-replayer",
containerFactory = "reliableKafkaListenerContainerFactory",
autoStartup = "false"
)
public void replay(
ConsumerRecord<String, OrderCreatedEvent> dltRecord,
Acknowledgment ack
) throws Exception {
String originalTopic = originalTopic(dltRecord);
int replayAttempt = replayAttempt(dltRecord);
if (replayAttempt >= maxReplayAttempts || !isReplayable(dltRecord)) {
publishToParkingLot(dltRecord, replayAttempt);
ack.acknowledge();
return;
}
ProducerRecord<String, OrderCreatedEvent> replayRecord =
new ProducerRecord<>(
originalTopic,
null,
dltRecord.key(),
dltRecord.value(),
replayHeaders(dltRecord, replayAttempt + 1)
);
kafkaTemplate.send(replayRecord)
.get(sendTimeoutSeconds, TimeUnit.SECONDS);
ack.acknowledge();
log.info(
"Replayed DLT record. dltTopic={}, dltOffset={}, originalTopic={}, key={}, replayAttempt={}",
dltRecord.topic(),
dltRecord.offset(),
originalTopic,
dltRecord.key(),
replayAttempt + 1
);
}
private boolean isReplayable(ConsumerRecord<String, OrderCreatedEvent> record) {
String exceptionClass = headerAsString(record, KafkaHeaders.DLT_EXCEPTION_FQCN);
if (exceptionClass == null) {
return true;
}
return !exceptionClass.contains("NonRetryableEventException")
&& !exceptionClass.contains("DeserializationException")
&& !exceptionClass.contains("MessageConversionException")
&& !exceptionClass.contains("ClassCastException");
}
private void publishToParkingLot(
ConsumerRecord<String, OrderCreatedEvent> dltRecord,
int replayAttempt
) throws Exception {
String parkingLotTopic = ReliableKafkaConsumerConfig.ORDERS_TOPIC + ".parking-lot";
ProducerRecord<String, OrderCreatedEvent> parkingLotRecord =
new ProducerRecord<>(
parkingLotTopic,
dltRecord.key(),
dltRecord.value()
);
dltRecord.headers().forEach(header -> parkingLotRecord.headers().add(header));
parkingLotRecord.headers().add(
REPLAY_ATTEMPT_HEADER,
String.valueOf(replayAttempt).getBytes(StandardCharsets.UTF_8)
);
kafkaTemplate.send(parkingLotRecord)
.get(sendTimeoutSeconds, TimeUnit.SECONDS);
}
private RecordHeaders replayHeaders(
ConsumerRecord<String, OrderCreatedEvent> dltRecord,
int replayAttempt
) {
RecordHeaders headers = new RecordHeaders();
for (Header header : dltRecord.headers()) {
if (!DLT_HEADERS_TO_STRIP.contains(header.key())
&& !REPLAY_ATTEMPT_HEADER.equals(header.key())) {
headers.add(header);
}
}
headers.add(REPLAY_ATTEMPT_HEADER,
String.valueOf(replayAttempt).getBytes(StandardCharsets.UTF_8));
headers.add(REPLAYED_FROM_DLT_HEADER,
"true".getBytes(StandardCharsets.UTF_8));
headers.add("x-original-dlt-topic",
dltRecord.topic().getBytes(StandardCharsets.UTF_8));
headers.add("x-original-dlt-offset",
String.valueOf(dltRecord.offset()).getBytes(StandardCharsets.UTF_8));
return headers;
}
private String originalTopic(ConsumerRecord<String, OrderCreatedEvent> record) {
String originalTopic = headerAsString(record, KafkaHeaders.DLT_ORIGINAL_TOPIC);
if (originalTopic != null && !originalTopic.isBlank()) {
return originalTopic;
}
if (record.topic().endsWith(".DLT")) {
return record.topic().substring(0, record.topic().length() - ".DLT".length());
}
throw new IllegalStateException(
"Cannot determine original topic for DLT record from topic=" + record.topic()
);
}
private int replayAttempt(ConsumerRecord<String, OrderCreatedEvent> record) {
String attempt = headerAsString(record, REPLAY_ATTEMPT_HEADER);
return attempt == null ? 0 : Integer.parseInt(attempt);
}
private String headerAsString(
ConsumerRecord<String, OrderCreatedEvent> record,
String headerName
) {
Header header = record.headers().lastHeader(headerName);
if (header == null || header.value() == null) {
return null;
}
return new String(header.value(), StandardCharsets.UTF_8);
}
}
package com.example.ops;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/ops/kafka/dlt-replay")
@RequiredArgsConstructor
public class DltReplayController {
private static final String DLT_REPLAYER_ID = "orders-dlt-replayer";
private final KafkaListenerEndpointRegistry registry;
@PostMapping("/start")
public ResponseEntity<String> startReplay() {
MessageListenerContainer container = registry.getListenerContainer(DLT_REPLAYER_ID);
if (container == null) {
return ResponseEntity.notFound().build();
}
if (!container.isRunning()) {
container.start();
}
container.resume();
return ResponseEntity.accepted().body("DLT replay started for " + DLT_REPLAYER_ID);
}
@PostMapping("/pause")
public ResponseEntity<String> pauseReplay() {
MessageListenerContainer container = registry.getListenerContainer(DLT_REPLAYER_ID);
if (container == null) {
return ResponseEntity.notFound().build();
}
container.pause();
return ResponseEntity.accepted().body("DLT replay paused for " + DLT_REPLAYER_ID);
}
@PostMapping("/stop")
public ResponseEntity<String> stopReplay() {
MessageListenerContainer container = registry.getListenerContainer(DLT_REPLAYER_ID);
if (container == null) {
return ResponseEntity.notFound().build();
}
container.stop();
return ResponseEntity.accepted().body("DLT replay stopped for " + DLT_REPLAYER_ID);
}
}
Automation guardrails
DLT replay can be automated, but automate the decision gate rather than blindly replaying every failed message.
Replay only when
- Customer service circuit breaker is closed.
- Risk service circuit breaker is closed.
- Database health is up.
- Main consumer error rate is below threshold.
- Replay attempt count is below the maximum.
- The exception type is replayable.
- Replay is rate-limited.
Do not auto-replay
- Deserialization errors.
- Validation errors.
- Unknown schema errors.
- Messages that exceeded max replay attempts.
- Failures whose root cause is still unknown.
Use a parking lot
- For non-replayable DLT records.
- For replay-exhausted records.
- For records that need manual correction.
- For auditability and incident review.
@Configuration
public class DltReplayRateLimitConfig {
@Bean
public RateLimiter dltReplayRateLimiter() {
return RateLimiterRegistry.ofDefaults()
.rateLimiter(
"dltReplay",
io.github.resilience4j.ratelimiter.RateLimiterConfig.custom()
.limitForPeriod(25)
.limitRefreshPeriod(Duration.ofSeconds(1))
.timeoutDuration(Duration.ofSeconds(5))
.build()
);
}
}
private final RateLimiter dltReplayRateLimiter;
private void waitForReplayPermit() {
boolean permitted = dltReplayRateLimiter.acquirePermission();
if (!permitted) {
throw new IllegalStateException("Could not acquire DLT replay permit");
}
}
Circuit breakers and pausing the consumer
Pause the main consumer for systemic downstream failures. Do not pause for a single poison message; send that message to the DLT instead.
| Situation | Action | Why |
|---|---|---|
| Customer service outage | Pause consumer | Prevents retry storms and DLT floods. |
| Risk service outage | Pause consumer | Dependency is systemically unavailable. |
| Database unavailable | Pause consumer | No durable write can succeed. |
| One malformed event | DLT | Pausing would block healthy events behind a bad one. |
| One duplicate event | Acknowledge | Idempotency table converts it to success. |
resilience4j:
circuitbreaker:
instances:
customerService:
sliding-window-type: COUNT_BASED
sliding-window-size: 50
minimum-number-of-calls: 20
failure-rate-threshold: 50
slow-call-rate-threshold: 50
slow-call-duration-threshold: 2s
wait-duration-in-open-state: 30s
permitted-number-of-calls-in-half-open-state: 3
automatic-transition-from-open-to-half-open-enabled: true
riskService:
sliding-window-type: COUNT_BASED
sliding-window-size: 50
minimum-number-of-calls: 20
failure-rate-threshold: 50
slow-call-rate-threshold: 50
slow-call-duration-threshold: 2s
wait-duration-in-open-state: 30s
permitted-number-of-calls-in-half-open-state: 3
automatic-transition-from-open-to-half-open-enabled: true
package com.example.kafka;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaCircuitBreakerPauser {
private static final String MAIN_LISTENER_ID = "orders-main-listener";
private final KafkaListenerEndpointRegistry registry;
private final CircuitBreakerRegistry circuitBreakerRegistry;
private final AtomicBoolean paused = new AtomicBoolean(false);
private final List<String> dependencyBreakers = List.of(
"customerService",
"riskService"
);
@PostConstruct
public void registerCircuitBreakerListeners() {
for (String breakerName : dependencyBreakers) {
CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker(breakerName);
circuitBreaker.getEventPublisher()
.onStateTransition(event -> {
log.warn(
"Circuit breaker transition. breaker={}, transition={}",
breakerName,
event.getStateTransition()
);
reconcileKafkaConsumerState();
});
}
}
public void reconcileKafkaConsumerState() {
if (anyDependencyOpen()) {
pauseMainConsumer();
} else if (allDependenciesClosed()) {
resumeMainConsumer();
} else {
log.info("Not resuming Kafka consumer yet; at least one dependency is HALF_OPEN");
}
}
private boolean anyDependencyOpen() {
return dependencyBreakers.stream()
.map(name -> circuitBreakerRegistry.circuitBreaker(name))
.map(CircuitBreaker::getState)
.anyMatch(state ->
state == CircuitBreaker.State.OPEN
|| state == CircuitBreaker.State.FORCED_OPEN
);
}
private boolean allDependenciesClosed() {
return dependencyBreakers.stream()
.map(name -> circuitBreakerRegistry.circuitBreaker(name))
.map(CircuitBreaker::getState)
.allMatch(state -> state == CircuitBreaker.State.CLOSED);
}
private void pauseMainConsumer() {
MessageListenerContainer container = registry.getListenerContainer(MAIN_LISTENER_ID);
if (container == null) {
log.error("Could not find Kafka listener container {}", MAIN_LISTENER_ID);
return;
}
if (paused.compareAndSet(false, true)) {
log.warn("Pausing Kafka listener {}", MAIN_LISTENER_ID);
container.pause();
}
}
private void resumeMainConsumer() {
MessageListenerContainer container = registry.getListenerContainer(MAIN_LISTENER_ID);
if (container == null) {
log.error("Could not find Kafka listener container {}", MAIN_LISTENER_ID);
return;
}
if (paused.compareAndSet(true, false)) {
log.warn("Resuming Kafka listener {}", MAIN_LISTENER_ID);
container.resume();
}
}
}
Production checklist
Kafka
enable-auto-commit=false.MANUAL_IMMEDIATEack mode.- DLT has at least as many partitions as the source topic.
- Producer uses
acks=alland idempotence. - Monitor consumer lag and DLT growth.
Application
- Listener throws on failure.
- REST calls have timeouts.
- Exceptions are classified.
- DB writes are idempotent.
- Duplicate events are treated as success.
Operations
- DLT alerts exist.
- Replay is stopped by default.
- Replay is gated and rate-limited.
- Parking-lot topic exists.
- Runbook documents when to pause/resume.
Failure behavior matrix
| Failure | Expected behavior |
|---|---|
| Customer REST call fails | Listener throws; Spring Kafka retries; no offset ack. |
| Risk REST call fails | Listener throws; Spring Kafka retries; no offset ack. |
| DB write fails | Transaction rolls back; listener throws; no offset ack. |
| Retries exhausted | Record is published to DLT; recovered offset is committed. |
| DLT publishing fails | Error handler retries instead of silently skipping. |
| App crashes after DB commit before Kafka ack | Kafka redelivers; idempotency table detects duplicate; listener acknowledges safely. |
| Bad payload or validation failure | Sent to DLT or parking lot without wasting transient retries. |