PCG logo
Article

Testing a Kafka consumer with Avro schema messages

The short answer is, as usual: it depends. Let’s assume this situation:

You have a Spring Boot microservice to manage user data. That microservice listens to incoming events (such as user creations, updates or deletes) from Kafka, transforms them into your own business objects, writes them into a PostgreSQL database and provides them via REST interface to your frontend. The overall infrastructure provides AvroExternal Link messages and the Confluent schema registryExternal Link.

The sample project uses Apache Maven with the avro-maven-plugin to download the schema files and generate the sources, but of course there are plugins for Gradle too.

Now you want to test that your Kafka consumer reads the events, transforms them into your database entities, and saves them.

When you check on the internet for testing in the context of Spring Boot Kafka consumer and Avro schema, you find quite a few variants: using the MockSchemaRegistryClientExternal Link, or writing your own custom Avro de-/serializersExternal Link, or setting up a Testcontainers ecosystemExternal Link with a Kafka, a Zookeeper and a Confluent Schema Registry, or using the EmbeddedKafka provided by SpringExternal Link in the spring-kafka-test dependency.

All these solutions have their valid pros and cons.

In this blog post, I will present a solution that uses a minimum set of Testcontainers to provide the best compromise between control, speed and efficiency. The other solutions I listed above have their relative pros and cons, which I will cover later in this post.

Let me walk you through the smaller setup, simulating the situation above. We have a Spring Boot application, a PostgreSQL database and our Kafka consumer. The application needs to listen to Kafka messages for users that were added or modified, and has to update the database items accordingly. You can find the full code repositoryExternal Link here. I added the UserEvent class for compilation purpse only, normally that would end up in your generated classes via the Avro plugin.

Let’s address the first problem: how to interact with the Schema Registry from our testing environment? We need a mock. Hidden in Confluent’s schema registry package, in the AbstractKafkaAvroSerDeConfigExternal Link class, you can find this comment for the schema registry url:

Comma-separated list of URLs for schema registry instances that can be used to register or look up schemas. If you wish to get a connection to a mocked schema registry for testing, you can specify a scope using the ‘mock://’ pseudo-protocol. For example, ‘mock://my-scope-name’ corresponds to ‘MockSchemaRegistry.getClientForScope(“my-scope-name”)’.

So that means we can configure the Kafka producer and consumer with an imaginary schema registry URL, that only needs to start with “mock://” and you automatically get to work with the MockSchemaRegistryClient. This way, you don’t need to explicitly initiate the MockSchemaRegistryClientExternal Link and configure everything accordingly. That also eradicates the need for the Confluent Schema Registry Container. Running the Kafka TestcontainerExternal Link with the embedded Zookeeper, we no longer need an extra Zookeeper container, and we are down to one Testcontainer for the messaging. This way I ended up with only two Testcontainers: Kafka and the database.

Set up and configure your containers

java
Code Copied!copy-button
protected static final PostgreSQLContainer<?> postgreSQLContainer =
      new PostgreSQLContainer<>("postgres:10.9")
          .withPassword("postgres")
          .withUsername("postgres")
          .withExposedPorts(5432)
          .withReuse(true);

  protected static final KafkaContainer kafkaContainer =
      new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.3"))
          .withEmbeddedZookeeper()
          .withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9093 ,BROKER://0.0.0.0:9092")
          .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
          .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
          .withEnv("KAFKA_BROKER_ID", "1")
          .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
          .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
          .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
          .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
          .withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "")
          .withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
          .withNetwork(network);

  static {
    Startables.deepStart(Stream.of(postgreSQLContainer, kafkaContainer)).join();
  }

And create a Kafka producer and consumer with their configurations for the tests.

java
Code Copied!copy-button
 public static KafkaProducer<Object, Object> createProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(
        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        io.confluent.kafka.serializers.KafkaAvroSerializer.class);
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://testUrl");
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "kafkatest");
    return new KafkaProducer<>(props);
  }
java
Code Copied!copy-button
public static KafkaConsumer<String, Object> createEventConsumer() {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
    props.put(
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://testUrl");
    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafkatest");
    return new KafkaConsumer<>(props);
  }

Here we only add the mock prefixed schema registry URL, no special serializers or deserializers. Don’t forget to set the property for the specific.avro.reader to “true” in your Kafka consumer configuration to ensure that the consumer does not fail with a class cast exception.

Now in your tests you create the events you expect the Kafka producer to send, have your Kafka consumer subscribe to the topic, and get the ConsumerRecord from the KafkaTestUtilsExternal Link to be processed by your own listener. And then you test the result of whatever your listener did. That’s all.

Pros and Cons? 🍿

Of course, runtime is an issue. Testcontainers need to start and network before being fully available. A much faster alternative to my setup is to move all the infrastructure to memory, by using EmbeddedKafka as a broker and replacing PostgreSQL with the H2 database. You can find a working version of the EmbeddedKafka in the branch “embeddedKafka” (my naming skill is highly imaginative).

Admittedly, in terms of timing, the minimalistic setup with EmbeddedKafka and H2 is pretty hard to beat. The test contains the same methods as the main branch, and takes on average 5 seconds to run on my machine.

My version with one Kafka and one Postgres container takes on average 15 seconds to run.

Now imagine how long it would run with 4 Testcontainers (Schema Registry, Kafka and Zookeeper, plus the database container). And with more code, with more complex business logic to test. 😕

But for using the EmbeddedKafka, you need to pay attention to the dependencies versions. The Confluent Schema Registry Client comes with it’s own Zookeeper version, and depending on the version of the Schema Registry Client you might end up with a different version of Zookeeper than the one expected by the kafka-test dependency, which would result in a ClassNotFound exception when running the test. For more details, check the version matrix by Confluent and the Kafka client matrixExternal Link by Spring.

So I stuck to the container setup because of version constraints. In the case of this demo application it did not matter at all, but to avoid unexpected behavior, I recommend sticking as close to the production versions as possible. And of course using the H2 might also not always be an option, depending on what features (such as constraints, json datatype, some join statements) you use of your production database. So this is a compromise regarding control and test runtime.

Also, did I already mention ‘it depends’?

You can find the code on githubExternal Link, with the main branch being the demonstration with two containers and the embeddedKafka branch being the container free version.


Continue Reading

News
PCG Showcases Cutting-Edge AI Solutions at FAIEMA 2024

PCG presented AI innovations at FAIEMA 2024, featuring document retrieval and road monitoring solutions using AWS Cloud. Speakers included Thanasis Politis and Vasko Donev, along with industry experts.

Learn more
Article
AWS Lambda: Avoid these common pitfalls

It's a great offering to get results quickly, but like any good tool, it needs to be used correctly.

Learn more
Article
Google Cloud report uncovers: GenAI as a driver of growth and success

The study ‘The ROI of Generative AI’ by Google Cloud delivers impressive figures. Find out how organisations around the world benefit from GenAI.

Learn more
Case Study
Sports
How TVB Stuttgart organizes its home games with Asana

With the work management tool, the German handball league benefits from efficient collaboration and increases employee satisfaction.

Learn more
See all

Let's work together

United Kingdom
Arrow Down