🧩 Problem Statement
We encountered a critical issue in our Kafka-based system responsible for SOA generation. Over time, we noticed that messages were piling up in Kafka topics, and the consumer lag was increasing dramatically. This backlog began impacting downstream systems, triggering performance degradations and escalations from multiple stakeholders.
After deep investigation, we discovered that the Kafka consumer was getting detached from its consumer group intermittently. This caused the broker to assume the consumer was dead, leading to rebalancing events and unprocessed messages sitting idle in the queue.
The root cause?
A long-running consumer thread that was performing heavy computation (SOA document generation) within the same consumer poll loop — delaying heartbeats and poll intervals beyond acceptable thresholds.
🎯 Task
Our task was clear:
- Prevent the Kafka consumer from detaching due to long-running processing.
- Ensure message throughput remained consistent even under heavy load.
- Avoid rebalancing storms that caused message duplication and increased lag.
In essence, we needed to decouple message consumption from message processing — ensuring that consuming from Kafka remained lightweight and consistent, while heavy business logic could still execute safely in parallel.
⚡Action
We approached the issue in two main steps:
1️⃣ Tune Kafka Consumer Configuration
Our first step was to revisit Kafka consumer settings.
The defaults were optimized for short-lived tasks, not for long SOA processing.
Here’s what we changed:
| Configuration | Previous Value | Updated Value | Purpose |
|---|---|---|---|
max.poll.interval.ms |
300000 (5 mins) | 900000 (15 mins) | Allowed longer time for processing before triggering a rebalance. |
max.poll.records |
500 | 50 | Reduced batch size to avoid overloading a single poll with too many messages. |
enable.auto.commit |
true | false | Switched to manual commit after successful processing to ensure message reliability. |
These tweaks gave our consumer breathing space — but the real fix came from decoupling consumption and processing.
2️⃣ Offload Processing to a Separate Thread Queue
Instead of blocking the Kafka consumer thread with heavy SOA generation, we introduced a non-blocking architecture using a ThreadPoolExecutor and an internal queue.
Here’s the flow:
Kafka Consumer Thread
│
▼
Internal BlockingQueue ←─ bounded capacity
│
▼
Worker Threads (SOA Processors)
│
▼
Manual Offset CommitThis way:
- The consumer thread focuses only on polling and enqueuing messages.
- Worker threads handle the heavy lifting asynchronously.
- Offsets are committed only after successful processing.
💻 Code Snippets (Spring Boot)
Kafka Consumer Configuration
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value(${"soa.consume.bootstrap-servers"})
private String bootstrapServer;
@Value(${"soa.consumer.group-id"})
private String soaConsumerGroup;
@Value(${"soa.consumer.max-poll-interval-ms:900000"})
private long soaIntervalPollMs;
@Value("${soa.consumer.max-poll-records:50}")
private int soaPollRecords;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServer);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, this.soaConsumerGroup);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, this.soaIntervalPollMs);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.soaPollRecords);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}@Service
public class SOAConsumerService {
private final BlockingQueue<ConsumerRecord<String, String>> queue = new LinkedBlockingQueue<>(1000);
private final ExecutorService executor = Executors.newFixedThreadPool(5);
public SOAConsumerService() {
// Initialize worker threads
for (int i = 0; i < 5; i++) {
executor.submit(new SOAWorker(queue));
}
}
@KafkaListener(topics = "soa-topic", containerFactory = "kafkaListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
try {
// Non-blocking enqueue
if (!queue.offer(record, 2, TimeUnit.SECONDS)) {
// Handle backpressure
System.out.println("Queue full! Skipping message temporarily...");
return;
}
acknowledgment.acknowledge(); // commit after enqueue
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}Worker Thread for Heavy Processing
@Slf4j
public class SOAWorker implements Runnable {
private final BlockingQueue<ConsumerRecord<String, String>> queue;
public SOAWorker(BlockingQueue<ConsumerRecord<String, String>> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
ConsumerRecord<String, String> record = queue.take();
//TODO:: process the message
}
}
}🧠 Key Benefits
- Consumer remains stable — no detachment or rebalances.
- ⚡ Continuous polling — minimal consumer lag.
- 🔄 Parallel processing — better utilisation of CPU cores.
- 💡 Scalable — thread pool size and queue capacity tunable as load increases.
📊 Result
This architectural refinement delivered immediate and measurable improvements:
- ✅ Zero consumer detachment incidents — stable consumer group membership.
- ⚡ Significant reduction in Kafka lag, as messages were consumed continuously.
- 💡 Improved throughput, thanks to parallelized message processing.
- 🧘 Operational stability, with no further escalations from downstream teams.
🏁 Key Takeaway
If your Kafka consumer performs heavy processing within its poll loop, you’re setting yourself up for consumer group instability and message backlog.
The best practice is to keep the consumer lean, offload heavy processing to worker threads, and manually commit offsets once done.