Quarkus - Using Apache Kafka with Reactive Messaging

This guide demonstrates how your Quarkus application can utilize SmallRye Reactive Messaging to interact with Apache Kafka.

Prerequisites

To complete this guide, you need:

  • less than 15 minutes

  • an IDE

  • JDK 1.8+ installed with JAVA_HOME configured appropriately

  • Apache Maven 3.6.2+

  • A running Kafka cluster, or Docker Compose to start a development cluster

  • GraalVM installed if you want to run in native mode.

Architecture

In this guide, we are going to generate (random) prices in one component. These prices are written in a Kafka topic (prices). A second component reads from the prices Kafka topic and apply some magic conversion to the price. The result is sent to an in-memory stream consumed by a JAX-RS resource. The data is sent to a browser using server-sent events.

Architecture

Solution

We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example.

Clone the Git repository: git clone https://github.com/quarkusio/quarkus-quickstarts.git, or download an archive.

The solution is located in the kafka-quickstart directory.

Creating the Maven Project

First, we need a new project. Create a new project with the following command:

mvn io.quarkus:quarkus-maven-plugin:1.13.4.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-quickstart \
    -DclassName="org.acme.kafka.PriceResource" \
    -Dpath="/prices" \
    -Dextensions="resteasy,smallrye-reactive-messaging-kafka"
cd kafka-quickstart

This command generates a Maven project, importing the Reactive Messaging and Kafka connector extensions.

If you already have your Quarkus project configured, you can add the smallrye-reactive-messaging-kafka extension to your project by running the following command in your project base directory:

./mvnw quarkus:add-extension -Dextensions="smallrye-reactive-messaging-kafka"

This will add the following to your pom.xml:

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>

Starting Kafka

Then, we need a Kafka cluster. You can follow the instructions from the Apache Kafka web site or create a docker-compose.yaml file with the following content:

version: '2'

services:

  zookeeper:
    image: strimzi/kafka:0.19.0-kafka-2.5.0
    command: [
      "sh", "-c",
      "bin/zookeeper-server-start.sh config/zookeeper.properties"
    ]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs

  kafka:
    image: strimzi/kafka:0.19.0-kafka-2.5.0
    command: [
      "sh", "-c",
      "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
    ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

Once created, run docker-compose up.

This is a development cluster, do not use in production.

The price generator

Create the src/main/java/org/acme/kafka/PriceGenerator.java file, with the following content:

package org.acme.kafka;

import java.time.Duration;
import java.util.Random;

import javax.enterprise.context.ApplicationScoped;

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

/**
 * A bean producing random prices every 5 seconds.
 * The prices are written to a Kafka topic (prices). The Kafka configuration is specified in the application configuration.
 */
@ApplicationScoped
public class PriceGenerator {

    private Random random = new Random();

    @Outgoing("generated-price")                        (1)
    public Multi<Integer> generate() {                  (2)
        return Multi.createFrom().ticks().every(Duration.ofSeconds(5))
                .onOverflow().drop()
                .map(tick -> random.nextInt(100));
    }

}
1 Instruct Reactive Messaging to dispatch the items from returned stream to generated-price.
2 The method returns a Mutiny stream (Multi) emitting a random price every 5 seconds.

The method returns a Reactive Stream. The generated items are sent to the stream named generated-price. This stream is mapped to Kafka using the application.properties file that we will create soon.

The price converter

The price converter reads the prices from Kafka, and transforms them. Create the src/main/java/org/acme/kafka/PriceConverter.java file with the following content:

package org.acme.kafka;

import io.smallrye.reactive.messaging.annotations.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;

/**
 * A bean consuming data from the "prices" Kafka topic and applying some conversion.
 * The result is pushed to the "my-data-stream" stream which is an in-memory stream.
 */
@ApplicationScoped
public class PriceConverter {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("prices")                                     (1)
    @Outgoing("my-data-stream")                             (2)
    @Broadcast                                              (3)
    @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING) (4)
    public double process(int priceInUsd) {
        return priceInUsd * CONVERSION_RATE;
    }

}
1 Indicates that the method consumes the items from the prices topic
2 Indicates that the objects returned by the method are sent to the my-data-stream stream
3 Indicates that the item are dispatched to all subscribers
4 Make sure to acknowledge the incoming message

The process method is called for every Kafka record from the prices topic (configured in the application configuration). Every result is sent to the my-data-stream in-memory stream.

The price resource

Finally, let’s bind our stream to a JAX-RS resource. Creates the src/main/java/org/acme/kafka/PriceResource.java file with the following content:

package org.acme.kafka;

import io.smallrye.reactive.messaging.annotations.Channel;
import org.reactivestreams.Publisher;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.jboss.resteasy.annotations.SseElementType;

/**
 * A simple resource retrieving the in-memory "my-data-stream" and sending the items as server-sent events.
 */
@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("my-data-stream") Publisher<Double> prices; (1)

    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS) (2)
    @SseElementType("text/plain") (3)
    public Publisher<Double> stream() { (4)
        return prices;
    }
}
1 Injects the my-data-stream channel using the @Channel qualifier
2 Indicates that the content is sent using Server Sent Events
3 Indicates that the data contained within the server sent events is of type text/plain
4 Returns the stream (Reactive Stream)

Configuring the Kafka connector

We need to configure the Kafka connector. This is done in the application.properties file. The keys are structured as follows:

mp.messaging.[outgoing|incoming].{channel-name}.property=value

The channel-name segment must match the value set in the @Incoming and @Outgoing annotation:

  • generated-price → sink in which we write the prices

  • prices → source in which we read the prices

# Configure the SmallRye Kafka connector
kafka.bootstrap.servers=localhost:9092

# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.generated-price.connector=smallrye-kafka
mp.messaging.outgoing.generated-price.topic=prices
mp.messaging.incoming.prices.health-readiness-enabled=false
mp.messaging.outgoing.generated-price.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer

# Configure the Kafka source (we read from it)
mp.messaging.incoming.prices.connector=smallrye-kafka
mp.messaging.incoming.prices.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer

More details about this configuration is available on the Producer configuration and Consumer configuration section from the Kafka documentation. These properties are configured with the prefix kafka.

What about my-data-stream? This is an in-memory stream, not connected to a message broker.
What is "mp.messaging.incoming.prices.health-readiness-enabled=false"?

The health-readiness-enabled disables the readiness health check. By default, it verifies that there is an active connection with the broker. In our case, the connection only happens when we get the first consumer. This is because the stream is consumed as an SSE, waiting lazily for the first connection to trigger the whole stream. So, if you are running in an environment only routing traffic to containers that are ready (such as Kubernetes), it would not send traffic to your application, which, as a consequence, will never connect to Kafka and pass the readiness check.

More details about health reporting is given in Kafka Health Checks.

The HTML page

Final touch, the HTML page reading the converted prices using SSE.

Create the src/main/resources/META-INF/resources/prices.html file, with the following content:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Prices</title>

    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">

    <h2>Last price</h2>
    <div class="row">
    <p class="col-md-12">The last price is <strong><span id="content">N/A</span>&nbsp;&euro;</strong>.</p>
    </div>
</div>
</body>
<script src="https://code.jquery.com/jquery-3.3.1.min.js"></script>
<script>
    var source = new EventSource("/prices/stream");
    source.onmessage = function (event) {
        document.getElementById("content").innerHTML = event.data;
    };
</script>
</html>

Nothing spectacular here. On each received price, it updates the page.

Get it running

If you followed the instructions, you should have Kafka running. Then, you just need to run the application using:

./mvnw quarkus:dev

Open http://localhost:8080/prices.html in your browser.

If you started the Kafka broker with docker compose, stop it using CTRL+C followed by docker-compose down.

Running Native

You can build the native executable with:

./mvnw package -Pnative

Imperative usage

Sometimes, you need to have an imperative way of sending messages.

For example, if you need to send a message to a stream, from inside a REST endpoint, when receiving a POST request. In this case, you cannot use @Outgoing because your method has parameters.

For this, you can use an Emitter.

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Consumes;
import javax.ws.rs.core.MediaType;

@Path("/prices")
public class PriceResource {

    @Inject @Channel("price-create") Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        priceEmitter.send(price);
    }
}
The Emitter configuration is done the same way as the other stream configuration used by @Incoming and @Outgoing. In addition, you can use @OnOverflow to configure back-pressure strategy.
Deprecation

The io.smallrye.reactive.messaging.annotations.Emitter, io.smallrye.reactive.messaging.annotations.Channel and io.smallrye.reactive.messaging.annotations.OnOverflow classes are now deprecated and replaced by:

  • org.eclipse.microprofile.reactive.messaging.Emitter

  • org.eclipse.microprofile.reactive.messaging.Channel

  • org.eclipse.microprofile.reactive.messaging.OnOverflow

The new Emitter.send method returns a CompletionStage completed when the produced message is acknowledged.

Kafka Health Checks

Quarkus provides several health checks for Kafka. These checks are used in combination with the quarkus-smallrye-health extension.

When using the quarkus-kafka extension, you can enable readiness health check by setting the quarkus.kafka.health.enabled property to true in your application.properties. This check reports the status of the interaction with a default Kafka broker (configured using kafka.bootstrap.servers). That check requires an admin connection with the Kafka broker. This check is disabled by default. If enabled, when you access the /q/health/ready endpoint of your application, you will have information about the connection validation status.

When using Reactive Messaging and the Kafka connector, each configured channel (incoming or outgoing) provides a liveness and readiness check. The liveness check captures any unrecoverable failure happening during the communication with Kafka. The readiness check verifies that communication with Kafka is established. For each channel, you can disable the checks using:

# Disable both liveness and readiness checks with `health-enabled=false`:

# Incoming channel (receiving records form Kafka)
mp.messaging.incoming.your-channel.health-enabled=false
# Outgoing channel (writing records to Kafka)
mp.messaging.outgoing.your-channel.health-enabled=false

# Disable only the readiness check with `health-readiness-enabled=false`:

mp.messaging.incoming.your-channel.health-readiness-enabled=false
mp.messaging.outgoing.your-channel.health-readiness-enabled=false
You can configure the bootstrap.servers for each channel. Defaults is kafka.bootstrap.servers.

Reactive Messaging readiness check offers two strategies. The default strategy verifies that an active connection is established with the broker. This approach is not intrusive as it’s based on built-in metrics.

Using the health-readiness-topic-verification=true attribute, you can also check the topics used by the application exist in the broker. Note that, to achieve this, an admin connection is required.

JSON serialization

Quarkus has built-in capabilities to deal with JSON Kafka messages.

Imagine we have a Fruit pojo as follows:

public class Fruit {

    public String name;
    public int price;

    public Fruit() {
    }

    public Fruit(String name, int price) {
        this.name = name;
        this.price = price;
    }
}

And we want to use it to receive messages from Kafka, make some price transformation, and send messages back to Kafka.

import io.smallrye.reactive.messaging.annotations.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;

/**
* A bean consuming data from the "fruit-in" Kafka topic and applying some price conversion.
* The result is pushed to the "fruit-out" stream.
*/
@ApplicationScoped
public class FruitProcessor {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("fruit-in")
    @Outgoing("fruit-out")
    @Broadcast
    public Fruit process(Fruit fruit) {
        fruit.price = fruit.price * CONVERSION_RATE;
        return fruit;
    }

}

To do this, we will need to setup JSON serialization with Jackson or JSON-B.

With JSON serialization correctly configured, you can also use Publisher<Fruit> and Emitter<Fruit>.

Serializing via Jackson

First, you need to include the quarkus-jackson extension (if you already use the quarkus-resteasy-jackson extension, this is not needed).

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-jackson</artifactId>
</dependency>

There is an existing ObjectMapperSerializer that can be used to serialize all pojos via Jackson, but the corresponding deserializer is generic, so it needs to be subclassed.

So, let’s create a FruitDeserializer that extends the ObjectMapperDeserializer.

package com.acme.fruit.jackson;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer(){
        // pass the class to the parent.
        super(Fruit.class);
    }
}

Finally, configure your streams to use the Jackson serializer and deserializer.

# Configure the Kafka source (we read from it)
mp.messaging.incoming.fruit-in.connector=smallrye-kafka
mp.messaging.incoming.fruit-in.topic=fruit-in
mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jackson.FruitDeserializer

# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.fruit-out.connector=smallrye-kafka
mp.messaging.outgoing.fruit-out.topic=fruit-out
mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

Now, your Kafka messages will contain a Jackson serialized representation of your Fruit pojo.

Serializing via JSON-B

First, you need to include the quarkus-jsonb extension (if you already use the quarkus-resteasy-jsonb extension, this is not needed).

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-jsonb</artifactId>
</dependency>

There is an existing JsonbSerializer that can be used to serialize all pojos via JSON-B, but the corresponding deserializer is generic, so it needs to be subclassed.

So, let’s create a FruitDeserializer that extends the generic JsonbDeserializer.

package com.acme.fruit.jsonb;

import io.quarkus.kafka.client.serialization.JsonbDeserializer;

public class FruitDeserializer extends JsonbDeserializer<Fruit> {
    public FruitDeserializer(){
        // pass the class to the parent.
        super(Fruit.class);
    }
}
If you don’t want to create a deserializer for each of your pojo, you can use the generic io.vertx.kafka.client.serialization.JsonObjectDeserializer that will deserialize to a javax.json.JsonObject. The corresponding serializer can also be used: io.vertx.kafka.client.serialization.JsonObjectSerializer.

Finally, configure your streams to use the JSON-B serializer and deserializer.

# Configure the Kafka source (we read from it)
mp.messaging.incoming.fruit-in.connector=smallrye-kafka
mp.messaging.incoming.fruit-in.topic=fruit-in
mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jsonb.FruitDeserializer

# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.fruit-out.connector=smallrye-kafka
mp.messaging.outgoing.fruit-out.topic=fruit-out
mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer

Now, your Kafka messages will contain a JSON-B serialized representation of your Fruit pojo.

Sending JSON Server-Sent Events (SSE)

If you want RESTEasy to send JSON Server-Sent Events, you need to use the @SseElementType annotation to define the content type of the events, as the method will be annotated with @Produces(MediaType.SERVER_SENT_EVENTS).

The following example shows how to use SSE from a Kafka topic source.

import io.smallrye.reactive.messaging.annotations.Channel;
import org.reactivestreams.Publisher;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.jboss.resteasy.annotations.SseElementType;

@Path("/fruits")
public class FruitResource {

    @Inject
    @Channel("fruit-out") Publisher<Fruit> fruits;

    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType(MediaType.APPLICATION_JSON)
    public Publisher<Fruit> stream() {
        return fruits;
    }
}

Blocking processing

You often need to combine Reactive Messaging with blocking processing such as database interactions. For this, you need to use the @Blocking annotation indicating that the processing is blocking and cannot be run on the caller thread.

For example, The following code illustrates how you can store incoming payloads to a database using Hibernate with Panache:

package org.acme.panache;

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;
import javax.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Blocking
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

The complete example is available in the kafka-panache-quickstart directory.

There are 2 @Blocking annotations:

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

They have the same effect. Thus, you can use both. The first one provides more fine-grain tuning such as the worker pool to use and whether it preserves the order. The second one, used in also with other reactive features of Quarkus, uses the default worker pool and preserves the order.

Testing a Kafka application

Testing without a broker

It can be useful to test the application without having to start a Kafka broker. To achieve this, you can switch the channels managed by the Kafka connector to in-memory.

This approach only works for JVM tests. It cannot be used for native tests (because they do not support injection).

First, add the following dependency to your application:

<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-reactive-messaging-in-memory</artifactId>
    <scope>test</scope>
</dependency>

Then, create a Quarkus Test Resource as follows:

public class KafkaTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager {

    @Override
    public Map<String, String> start() {
        Map<String, String> env = new HashMap<>();
        Map<String, String> props1 = InMemoryConnector.switchIncomingChannelsToInMemory("orders");  (1)
        Map<String, String> props2 = InMemoryConnector.switchOutgoingChannelsToInMemory("queue");   (2)
        env.putAll(props1);
        env.putAll(props2);
        return env;  (3)
    }

    @Override
    public void stop() {
        InMemoryConnector.clear();  (4)
    }
}
  1. Switch the incoming channel "orders" (expecting messages from Kafka) to in-memory.

  2. Switch the outgoing channel "queue" (writing messages to Kafka) to in-memory.

  3. Builds and returns a Map containing all the properties required to configure the application to use in-memory channels.

  4. When the test stops, clear the InMemoryConnector (discard all the received and sent messages)

Create a Quarkus Test using the test resource created above:

@QuarkusTest
@QuarkusTestResource(KafkaTestResourceLifecycleManager.class)
class BaristaTest {

    @Inject @Any
    InMemoryConnector connector; (1)

    @Test
    void testProcessOrder() {
        InMemorySource<Order> orders = connector.source("orders"); (2)
        InMemorySink<Beverage> queue = connector.sink("queue");    (3)

        Order order = new Order();
        order.setProduct("coffee");
        order.setName("Coffee lover");
        order.setOrderId("1234");

        orders.send(order);  (4)

        await().<List<? extends Message<Beverage>>>until(queue::received, t -> t.size() == 1); (5)

        Beverage queuedBeverage = queue.received().get(0).getPayload();
        Assertions.assertEquals(Beverage.State.READY, queuedBeverage.getPreparationState());
        Assertions.assertEquals("coffee", queuedBeverage.getBeverage());
        Assertions.assertEquals("Coffee lover", queuedBeverage.getCustomer());
        Assertions.assertEquals("1234", queuedBeverage.getOrderId());
    }

}
  1. Inject the in-memory connector in your test class.

  2. Retrieve the incoming channel (orders) - the channel must have been switched to in-memory in the test resource.

  3. Retrieve the outgoing channel (queue) - the channel must have been switched to in-memory in the test resource.

  4. Use the send method to send a message to the orders channel. So, the application will process this message.

  5. Use the received method to check the messages produced by the application.

Starting Kafka in a test resource

Alternatively, you can start a Kafka broker in a test resource. The following snippet shows a test resource starting a Kafka broker using Testcontainers:

public class KafkaResource implements QuarkusTestResourceLifecycleManager {

    private final KafkaContainer kafka = new KafkaContainer();

    @Override
    public Map<String, String> start() {
        kafka.start();
        return Collections.singletonMap("kafka.bootstrap.servers", kafka.getBootstrapServers());  (1)
    }

    @Override
    public void stop() {
        kafka.close();
    }
}
  1. Configure the Kafka bootstrap location, so the application connects to this broker.

Authenticating with OAuth

If your Kafka broker uses OAuth as authentication mechanism, you need to configure the Kafka consumer to enable this authentication process. First, add the following dependency to your application:

<dependency>
    <groupId>io.strimzi</groupId>
    <artifactId>kafka-oauth-client</artifactId>
</dependency>

This dependency provides the callback handler required to handle the OAuth workflow. Then, in the application.properties, add:

mp.messaging.connector.smallrye-kafka.security.protocol=SASL_PLAINTEXT
mp.messaging.connector.smallrye-kafka.sasl.mechanism=OAUTHBEARER
mp.messaging.connector.smallrye-kafka.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.client.id="team-a-client" \
  oauth.client.secret="team-a-client-secret" \
  oauth.token.endpoint.uri="http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/token" ;
mp.messaging.connector.smallrye-kafka.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

Update the oauth.client.id, oauth.client.secret and oauth.token.endpoint.uri values.

OAuth authentication works for both JVM and native modes.

Using Snappy

On outgoing channels, you can enable Snappy compression by setting the compression.type attribute to snappy:

mp.messaging.outgoing.fruit-out.compression.type=snappy

In JVM mode, it will work out of the box. However, to compile your application to a native executable, you need to:

  1. Uses GraalVM 21.+

  2. Add quarkus.kafka.snappy.enabled=true to your application.properties

In native mode, Snappy is disabled by default as the use of Snappy requires embedding a native library and unpacking it when the application starts.

Configuration

More details about the SmallRye Reactive Messaging configuration can be found in the SmallRye Reactive Messaging - Kafka Connector Documentation. The most important attributes are listed in the tables below:

Incoming channel configuration (polling from Kafka)

The following attributes are configured using:

mp.messaging.incoming.your-channel-name.attribute=value

Some properties have aliases which can be configured globally:

kafka.bootstrap.servers=...
Table 1. Incoming Attributes of the 'smallrye-kafka' connector
Attribute (alias) Description Mandatory Default

bootstrap.servers

(kafka.bootstrap.servers)

A comma-separated list of host:port to use for establishing the initial connection to the Kafka cluster.

Type: string

false

localhost:9092

topic

The consumed / populated Kafka topic. If neither this property nor the topics properties are set, the channel name is used

Type: string

false

health-enabled

Whether health reporting is enabled (default) or disabled

Type: boolean

false

true

health-readiness-enabled

Whether readiness health reporting is enabled (default) or disabled

Type: boolean

false

true

health-readiness-topic-verification

Whether the readiness check should verify that topics exist on the broker. Default to false. Enabling it requires an admin connection.

Type: boolean

false

false

health-readiness-timeout

During the readiness health check, the connector connects to the broker and retrieves the list of topics. This attribute specifies the maximum duration (in ms) for the retrieval. If exceeded, the channel is considered not-ready.

Type: long

false

2000

tracing-enabled

Whether tracing is enabled (default) or disabled

Type: boolean

false

true

cloud-events

Enables (default) or disables the Cloud Event support. If enabled on an incoming channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an outgoing, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata.

Type: boolean

false

true

topics

A comma-separating list of topics to be consumed. Cannot be used with the topic or pattern properties

Type: string

false

pattern

Indicate that the topic property is a regular expression. Must be used with the topic property. Cannot be used with the topics property

Type: boolean

false

false

key.deserializer

The deserializer classname used to deserialize the record’s key

Type: string

false

org.apache.kafka.common.serialization.StringDeserializer

value.deserializer

The deserializer classname used to deserialize the record’s value

Type: string

true

fetch.min.bytes

The minimum amount of data the server should return for a fetch request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive.

Type: int

false

1

group.id

A unique string that identifies the consumer group the application belongs to.

If not set, defaults to the application name as set by the quarkus.application.name configuration property.

If that is not set either, a unique, generated id is used. It is recommended to always define a group.id, the automatic generation is only a convenient feature for development.

Type: string

false

enable.auto.commit

If enabled, consumer’s offset will be periodically committed in the background by the underlying Kafka client, ignoring the actual processing outcome of the records. It is recommended to NOT enable this setting and let Reactive Messaging handles the commit.

Type: boolean

false

false

retry

Whether or not the connection to the broker is re-attempted in case of failure

Type: boolean

false

true

retry-attempts

The maximum number of reconnection before failing. -1 means infinite retry

Type: int

false

-1

retry-max-wait

The max delay (in seconds) between 2 reconnects

Type: int

false

30

broadcast

Whether the Kafka records should be dispatched to multiple consumer

Type: boolean

false

false

auto.offset.reset

What to do when there is no initial offset in Kafka.Accepted values are earliest, latest and none

Type: string

false

latest

failure-strategy

Specify the failure strategy to apply when a message produced from a record is acknowledged negatively (nack). Values can be fail (default), ignore, or dead-letter-queue

Type: string

false

fail

commit-strategy

Specify the commit strategy to apply when a message produced from a record is acknowledged. Values can be latest, ignore or throttled. If enable.auto.commit is true then the default is ignore otherwise it is throttled

Type: string

false

throttled.unprocessed-record-max-age.ms

While using the throttled commit-strategy, specify the max age in milliseconds that an unprocessed message can be before the connector is marked as unhealthy.

Type: int

false

60000

dead-letter-queue.topic

When the failure-strategy is set to dead-letter-queue indicates on which topic the record is sent. Defaults is dead-letter-topic-$channel

Type: string

false

dead-letter-queue.key.serializer

When the failure-strategy is set to dead-letter-queue indicates the key serializer to use. If not set the serializer associated to the key deserializer is used

Type: string

false

dead-letter-queue.value.serializer

When the failure-strategy is set to dead-letter-queue indicates the value serializer to use. If not set the serializer associated to the value deserializer is used

Type: string

false

partitions

The number of partitions to be consumed concurrently. The connector creates the specified amount of Kafka consumers. It should match the number of partition of the targeted topic

Type: int

false

1

consumer-rebalance-listener.name

The name set in javax.inject.Named of a bean that implements io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener. If set, this rebalance listener is applied to the consumer.

Type: string

false

key-deserialization-failure-handler

The name set in javax.inject.Named of a bean that implements io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler. If set, deserialization failure happening when deserializing keys are delegated to this handler which may provide a fallback value.

Type: string

false

value-deserialization-failure-handler

The name set in javax.inject.Named of a bean that implements io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler. If set, deserialization failure happening when deserializing values are delegated to this handler which may provide a fallback value.

Type: string

false

graceful-shutdown

Whether or not a graceful shutdown should be attempted when the application terminates.

Type: boolean

false

true

Outgoing channel configuration (writing to Kafka)

The following attributes are configured using:

mp.messaging.outgoing.your-channel-name.attribute=value

Some properties have aliases which can be configured globally:

kafka.bootstrap.servers=...
Table 2. Outgoing Attributes of the 'smallrye-kafka' connector
Attribute (alias) Description Mandatory Default

acks

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. Accepted values are: 0, 1, all

Type: string

false

1

bootstrap.servers

(kafka.bootstrap.servers)

A comma-separated list of host:port to use for establishing the initial connection to the Kafka cluster.

Type: string

false

localhost:9092

buffer.memory

The total bytes of memory the producer can use to buffer records waiting to be sent to the server.

Type: long

false

33554432

close-timeout

The amount of milliseconds waiting for a graceful shutdown of the Kafka producer

Type: int

false

10000

cloud-events

Enables (default) or disables the Cloud Event support. If enabled on an incoming channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an outgoing, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata.

Type: boolean

false

true

cloud-events-data-content-type

(cloud-events-default-data-content-type)

Configure the default datacontenttype attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the datacontenttype attribute itself

Type: string

false

cloud-events-data-schema

(cloud-events-default-data-schema)

Configure the default dataschema attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the dataschema attribute itself

Type: string

false

cloud-events-insert-timestamp

(cloud-events-default-timestamp)

Whether or not the connector should insert automatically the time attribute` into the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the time attribute itself

Type: boolean

false

true

cloud-events-mode

The Cloud Event mode (structured or binary (default)). Indicates how are written the cloud events in the outgoing record

Type: string

false

binary

cloud-events-source

(cloud-events-default-source)

Configure the default source attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the source attribute itself

Type: string

false

cloud-events-subject

(cloud-events-default-subject)

Configure the default subject attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the subject attribute itself

Type: string

false

cloud-events-type

(cloud-events-default-type)

Configure the default type attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the type attribute itself

Type: string

false

health-enabled

Whether health reporting is enabled (default) or disabled

Type: boolean

false

true

health-readiness-enabled

Whether readiness health reporting is enabled (default) or disabled

Type: boolean

false

true

health-readiness-timeout

During the readiness health check, the connector connects to the broker and retrieves the list of topics. This attribute specifies the maximum duration (in ms) for the retrieval. If exceeded, the channel is considered not-ready.

Type: long

false

2000

health-readiness-topic-verification

Whether the readiness check should verify that topics exist on the broker. Default to false. Enabling it requires an admin connection.

Type: boolean

false

false

key

A key to used when writing the record

Type: string

false

key.serializer

The serializer classname used to serialize the record’s key

Type: string

false

org.apache.kafka.common.serialization.StringSerializer

max-inflight-messages

The maximum number of messages to be written to Kafka concurrently. It limits the number of messages waiting to be written and acknowledged by the broker. You can set this attribute to 0 remove the limit

Type: long

false

1024

merge

Whether the connector should allow multiple upstreams

Type: boolean

false

false

partition

The target partition id. -1 to let the client determine the partition

Type: int

false

-1

retries

Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error.

Type: long

false

2147483647

topic

The consumed / populated Kafka topic. If neither this property nor the topics properties are set, the channel name is used

Type: string

false

tracing-enabled

Whether tracing is enabled (default) or disabled

Type: boolean

false

true

value.serializer

The serializer classname used to serialize the payload

Type: string

true

waitForWriteCompletion

Whether the client waits for Kafka to acknowledge the written record before acknowledging the message

Type: boolean

false

true

Going further

This guide has shown how you can interact with Kafka using Quarkus. It utilizes SmallRye Reactive Messaging to build data streaming applications.

If you want to go further check the documentation of SmallRye Reactive Messaging, the implementation used in Quarkus.