Integration Testing of Non-Blocking Retries with Spring Kafka

How to write integration tests for your Spring Kafka implementation of consumers having retries and Dead Letter Publishing enabled.

mukut bhattacharjee
SelectFrom

--

Kafka Non-Blocking Retries

Non-Blocking retries in Kafka are done via configuring retry topics for the main topic. An Additional Dead Letter Topic can also be configured if required. Events will be forwarded to DLT if all retries are exhausted. A Lot of resources are available in the public domain to understand the technicalities.

https://medium.com/lydtech-consulting/kafka-consumer-non-blocking-retry-spring-retry-topics-4ca09675e8b5

https://www.baeldung.com/spring-retry-kafka-consumer

What to test?

It can be a challenging job when it comes to writing integration tests for the retry mechanism in your code.

  • How do you test that the event has been retried for the required number of times?
  • How do you test that retries are only performed when certain exceptions occur and not for others?
  • How do you test if another retry is not done if the exception is resolved in the previous retry?
  • How do you test that the nth attempt in the retry succeeds after (n-1) retry attempts have failed?
  • How to test if the event has been sent to the Dead Letter Queue when all the retry attempts have been exhausted?

Let’s see with some code. You can find a lot of good articles which show how to set up Non-Blocking retries using Spring Kafka. One such implementation is given below. This is accomplished using the @RetryableTopic and @DltHandler annotations from Spring-Kafka.

Setting up the Retryable Consumer

@Slf4j
@Component
@RequiredArgsConstructor
public class CustomEventConsumer {

private final CustomEventHandler handler;

@RetryableTopic(attempts = "${retry.attempts}",
backoff = @Backoff(
delayExpression = "${retry.delay}",
multiplierExpression = "${retry.delay.multiplier}"
),
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
dltStrategy = FAIL_ON_ERROR,
autoStartDltHandler = "true",
autoCreateTopics = "false",
include = {CustomRetryableException.class})
@KafkaListener(topics = "${topic}", id = "${default-consumer-group:default}")
public void consume(CustomEvent event, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
try {
log.info("Received event on topic {}", topic);
handler.handleEvent(event);
} catch (Exception e) {
log.error("Error occurred while processing event", e);
throw e;
}
}

@DltHandler
public void listenOnDlt(@Payload CustomEvent event) {
log.error("Received event on dlt.");
handler.handleEventFromDlt(event);
}

}

If you notice in the above code snippet, the include parameter contains CustomRetryableException.class. This tells the consumer to retry only in case CustomRetryableException is thrown by the CustomEventHandler#handleEvent method. You can add as many exceptions as you like. There is an exclude parameter as well, but any one of them can be used at a time. The event processing should be retried for a maximum of ${retry.attempts} times before publishing to the DLT.

Setting up test infra

To write integration tests, you need to make sure that you have a functioning Kafka broker (embedded preferred) and a fully functioning publisher. Let’s set up our infrastructure.

@EnableKafka
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
@EmbeddedKafka(partitions = 1,
brokerProperties = {"listeners=" + "${kafka.broker.listeners}",
"port=" + "${kafka.broker.port}"},
controlledShutdown = true,
topics = {"test", "test-retry-0", "test-retry-1", "test-dlt"}
)
@ActiveProfiles("test")
class DocumentEventConsumerIntegrationTest {

@Autowired
private KafkaTemplate<String, CustomEvent> testKafkaTemplate;


// tests

}

When using an embedded Kafka broker, it is important to mention the topics to be created. They will not be created automatically. In this case, we are creating 4 topics, namely:

"test", "test-retry-0", "test-retry-1", "test-dlt"

We have set out maximum retry attempts to 3. Each topic corresponds to each of the retry attempts. So events should be forwarded to DLT if 3 retries are exhausted.

Test Cases

Retry should not be done if consumption is successful on the first attempt.

This can be tested by the fact that the CustomEventHandler#handleEvent method is called only once. Further tests on Log statements can also be added.

    @Test
void test_should_not_retry_if_consumption_is_successful() throws ExecutionException, InterruptedException {
CustomEvent event = new CustomEvent("Hello");
// GIVEN
doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));

// WHEN
testKafkaTemplate.send("test", event).get();

// THEN
verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));
verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
}

Retry should not be done if a non-retryable exception is raised.

In this case, the CustomEventHandler#handleEvent method should be invoked only once

    @Test
void test_should_not_retry_if_non_retryable_exception_raised() throws ExecutionException, InterruptedException {
CustomEvent event = new CustomEvent("Hello");
// GIVEN
doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));

// WHEN
testKafkaTemplate.send("test", event).get();

// THEN
verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));
verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
}

Retry for the maximum configured number of times if a RetryableException is thrown and subsequently should be published to Dead Letter Topic when retries are exhausted.

In this case, the CustomEventHandler#handleEvent method should be invoked 3 (maxRetries) times and CustomEventHandler#handleEventFromDlt method should be invoked once.

    @Test
void test_should_retry_maximum_times_and_publish_to_dlt_if_retryable_exception_raised() throws ExecutionException, InterruptedException {
CustomEvent event = new CustomEvent("Hello");
// GIVEN
doThrow(CustomRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));

// WHEN
testKafkaTemplate.send("test", event).get();

// THEN
verify(customEventHandler, timeout(10000).times(maxRetries)).handleEvent(any(CustomEvent.class));
verify(customEventHandler, timeout(2000).times(1)).handleEventFromDlt(any(CustomEvent.class));
}

A considerable timeout has been added in the verification stage so that exponential back-off delay can be taken into consideration before the test is completed. This is important and may result in an assertion failure if not set properly.

Should be retried until RetryableException is resolved And should not continue retrying if a non-retryable exception is raised or consumption eventually succeeds.

The test has been set up such as to throw a RetryableException first and then throw a NonRetryableException, such that retry is done once and event is not forwarded to DLT.

  @Test
void test_should_retry_until_retryable_exception_is_resolved_by_non_retryable_exception() throws ExecutionException,
InterruptedException {
CustomEvent event = new CustomEvent("Hello");
// GIVEN
doThrow(CustomRetryableException.class).doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));

// WHEN
testKafkaTemplate.send("test", event).get();

// THEN
verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));
verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
}

@Test
void test_should_retry_until_retryable_exception_is_resolved_by_successful_consumption() throws ExecutionException,
InterruptedException {
CustomEvent event = new CustomEvent("Hello");
// GIVEN
doThrow(CustomRetryableException.class).doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));

// WHEN
testKafkaTemplate.send("test", event).get();

// THEN
verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));
verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
}

Conclusion

So you can see that the integration test is a mix and match of strategies, timeouts, delays and verifications so as to foolproof the retry mechanism of your Kafka Event-Driven Architecture.

Kudos. Feel Free to suggest improvements and reach out to me on LinkedIn @ https://www.linkedin.com/in/mukutbhattacharjee/

--

--