Apache Kafka Reference Guide

This reference guide demonstrates how your Quarkus application can utilize SmallRye Reactive Messaging to interact with Apache Kafka.

1. Introduction

Apache Kafka is a popular open-source distributed event streaming platform. It is used commonly for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. Similar to a message queue, or an enterprise messaging platform, it lets you:

  • publish (write) and subscribe to (read) streams of events, called records.

  • store streams of records durably and reliably inside topics.

  • process streams of records as they occur or retrospectively.

And all this functionality is provided in a distributed, highly scalable, elastic, fault-tolerant, and secure manner.

2. Quarkus Extension for Apache Kafka

Quarkus provides support for Apache Kafka through SmallRye Reactive Messaging framework. Based on Eclipse MicroProfile Reactive Messaging specification 2.0, it proposes a flexible programming model bridging CDI and event-driven.

This guide provides an in-depth look on Apache Kafka and SmallRye Reactive Messaging framework. For a quick start take a look at Getting Started to SmallRye Reactive Messaging with Apache Kafka.

You can add the smallrye-reactive-messaging-kafka extensions to your project by running the following command in your project base directory:

./mvnw quarkus:add-extension -Dextensions="smallrye-reactive-messaging-kafka"

This will add the following to your pom.xml:

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>

3. Configuring Smallrye Kafka Connector

Because Smallrye Reactive Messaging framework supports different messaging backends like Apache Kafka, AMQP, Apache Camel, JMS, MQTT, etc., it employs a generic vocabulary:

  • Applications send and receive messages. A message wraps a payload and can be extended with some metadata. With the Kafka connector, a message corresponds to a Kafka record.

  • Messages transit on channels. Application components connect to channels to publish and consume messages. The Kafka connector maps channels to Kafka topics.

  • Channels are connected to message backends using connectors. Connectors are configured to map incoming messages to a specific channel (consumed by the application) and collect outgoing messages sent to a specific channel. Each connector is dedicated to a specific messaging technology. For example, the connector dealing with Kafka is named smallrye-kafka.

A minimal configuration for the Kafka connector with an incoming channel looks like the following:

%prod.kafka.bootstrap.servers=kafka:9092 (1)
mp.messaging.incoming.prices.connector=smallrye-kafka (2)
1 Configure the broker location for the production profile. You can configure it globally or per channel using mp.messaging.incoming.$channel.bootstrap.servers property. In dev mode and when running tests, Dev Services for Kafka automatically starts a Kafka broker. When not provided this property defaults to localhost:9092.
2 Configure the connector to manage the prices channel. By default the topic name is same as the channel name. You can configure the topic attribute to override it.
The %prod prefix indicates that the property is only used when the application runs in prod mode (so not in dev or test). Refer to the Profile documentation for further details.

4. Receiving messages from Kafka

Continuing from the previous minimal configuration, your Quarkus application can receive message payload directly:

import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

There are several other ways your application can consume incoming messages:

Message
@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> msg) {
    // access record metadata
    var metadata = msg.getMetadata(IncomingKafkaRecordMetadata.class).orElseThrow();
    // process the message payload.
    double price = msg.getPayload();
    // Acknowledge the incoming message (commit the offset)
    return msg.ack();
}

The Message type lets the consuming method access the incoming message metadata and handle the acknowledgment manually. We’ll explore different acknowledgment strategies in Commit Strategies.

If you want to access the Kafka record objects directly, use:

ConsumerRecord
@Incoming("prices")
public void consume(ConsumerRecord<String, Double> record) {
    String key = record.key(); // Can be `null` if the incoming record has no key
    String value = record.value(); // Can be `null` if the incoming record has no value
    String topic = record.topic();
    int partition = record.partition();
    // ...
}

ConsumerRecord is provided by the underlying Kafka client and can be injected directly to the consumer method. Another simpler approach consists in using Record:

Record
@Incoming("prices")
public void consume(Record<String, Double> record) {
    String key = record.key(); // Can be `null` if the incoming record has no key
    String value = record.value(); // Can be `null` if the incoming record has no value
}

Record is a simple wrapper around key and payload of the incoming Kafka record.

@Channel

Alternatively, your application can inject a Multi in your bean and subscribe to its events as the following example:

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.annotations.Channel;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.jboss.resteasy.annotations.SseElementType;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("prices")
    Multi<Double> prices;

    @GET
    @Path("/prices")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType("text/plain")
    public Multi<Double> stream() {
        return prices;
    }
}

This is a good example of how to integrate a Kafka consumer with another downstream, in this example exposing it as a Server-Sent Events endpoint.

When consuming messages with @Channel, the application code is responsible for the subscription. In the example above, RESTEasy endpoint handles that for you.

Following types can be injected as channels:

@Inject @Channel("prices") Multi<Double> streamOfPayloads;

@Inject @Channel("prices") Multi<Message<Double>> streamOfMessages;

@Inject @Channel("prices") Publisher<Double> publisherOfPayloads;

@Inject @Channel("prices") Publisher<Message<Double>> publisherOfMessages;

As with the previous Message example, if your injected channel receives payloads (Multi<T>), it acknowledges the message automatically, and support multiple subscribers. If you injected channel receives Message (Multi<Message<T>>), you will be responsible for the acknowledgment and broadcasting. We will explore sending broadcast messages in Broadcasting messages on multiple consumers.

Injecting @Channel("prices") or having @Incoming("prices") does not automatically configure the application to consume messages from Kafka. You need to configure an inbound connector with mp.messaging.incoming.prices... or have an @Outgoing("prices") method somewhere in your application (in which case, prices will be an in-memory channel).

4.1. Blocking processing

Reactive Messaging invokes your method on an I/O thread. See the Quarkus Reactive Architecture documentation for further details on this topic. But, you often need to combine Reactive Messaging with blocking processing such as database interactions. For this, you need to use the @Blocking annotation indicating that the processing is blocking and should not be run on the caller thread.

For example, The following code illustrates how you can store incoming payloads to a database using Hibernate with Panache:

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;
import javax.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Blocking
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

The complete example is available in the kafka-panache-quickstart directory.

There are 2 @Blocking annotations:

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

They have the same effect. Thus, you can use both. The first one provides more fine-grained tuning such as the worker pool to use and whether it preserves the order. The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order.

Detailed information on the usage of @Blocking annotation can be found in SmallRye Reactive Messaging – Handling blocking execution.

4.2. Acknowledgment Strategies

All messages received by a consumer must be acknowledged. In the absence of acknowledgment, the processing is considered in error. If the consumer method receives a Record or a payload, the message will be acked on method return, also known as Strategy.POST_PROCESSING. If the consumer method returns another reactive stream or CompletionStage, the message will be acked when the downstream message is acked. You can override the default behavior to ack the message on arrival (Strategy.PRE_PROCESSING), or do not ack the message at all (Strategy.NONE) on the consumer method as in the following example:

@Incoming("prices")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public void process(double price) {
    // process price
}

If the consumer method receives a Message, the acknowledgment strategy is Strategy.MANUAL and the consumer method is in charge of ack/nack the message.

@Incoming("prices")
public CompletionStage<Void> process(Message<Double> msg) {
    // process price
    return msg.ack();
}

As mentioned above, the method can also override the acknowledgment strategy to PRE_PROCESSING or NONE.

4.3. Commit Strategies

When a message produced from a Kafka record is acknowledged, the connector invokes a commit strategy. These strategies decide when the consumer offset for a specific topic/partition is committed. Committing an offset indicates that all previous records have been processed. It is also the position where the application would restart the processing after a crash recovery or a restart.

Committing every offset has performance penalties as Kafka offset management can be slow. However, not committing the offset often enough may lead to message duplication if the application crashes between two commits.

The Kafka connector supports three strategies:

  • throttled keeps track of received messages and commits an offset of the latest acked message in sequence (meaning, all previous messages were also acked). This strategy guarantees at-least-once delivery even if the channel performs asynchronous processing. The connector tracks the received records and periodically (period specified by auto.commit.interval.ms, default: 5000 ms) commits the highest consecutive offset. The connector will be marked as unhealthy if a message associated with a record is not acknowledged in throttled.unprocessed-record-max-age.ms (default: 60000 ms). Indeed, this strategy cannot commit the offset as soon as a single record processing fails (see Error Handling Strategies to configure what happens on failing processing). If throttled.unprocessed-record-max-age.ms is set to less than or equal to 0, it does not perform any health check verification. Such a setting might lead to running out of memory if there are "poison pill" messages (that are never acked). This strategy is the default if enable.auto.commit is not explicitly set to true.

  • latest commits the record offset received by the Kafka consumer as soon as the associated message is acknowledged (if the offset is higher than the previously committed offset). This strategy provides at-least-once delivery if the channel processes the message without performing any asynchronous processing. This strategy should not be used in high load environment, as offset commit is expensive. However, it reduces the risk of duplicates.

  • ignore performs no commit. This strategy is the default strategy when the consumer is explicitly configured with enable.auto.commit to true. It delegates the offset commit to the underlying Kafka client. When enable.auto.commit is true this strategy DOES NOT guarantee at-least-once delivery. SmallRye Reactive Messaging processes records asynchronously, so offsets may be committed for records that have been polled but not yet processed. In case of a failure, only records that were not committed yet will be re-processed.

The Kafka connector disables the Kafka auto commit when it is not explicitly enabled. This behavior differs from the traditional Kafka consumer. If high throughput is important for you, and you are not limited by the downstream, we recommend to either:

  • use the throttled policy,

  • or set enable.auto.commit to true and annotate the consuming method with @Acknowledgment(Acknowledgment.Strategy.NONE).

4.4. Error Handling Strategies

If a message produced from a Kafka record is nacked, a failure strategy is applied. The Kafka connector supports three strategies:

  • fail: fail the application, no more records will be processed (default strategy). The offset of the record that has not been processed correctly is not committed.

  • ignore: the failure is logged, but the processing continue. The offset of the record that has not been processed correctly is committed.

  • dead-letter-queue: the offset of the record that has not been processed correctly is committed, but the record is written to a Kafka dead letter topic.

The strategy is selected using the failure-strategy attribute.

In the case of dead-letter-queue, you can configure the following attributes:

  • dead-letter-queue.topic: the topic to use to write the records not processed correctly, default is dead-letter-topic-$channel, with $channel being the name of the channel.

  • dead-letter-queue.key.serializer: the serializer used to write the record key on the dead letter queue. By default, it deduces the serializer from the key deserializer.

  • dead-letter-queue.value.serializer: the serializer used to write the record value on the dead letter queue. By default, it deduces the serializer from the value deserializer.

The record written on the dead letter queue contains a set of additional headers about the original record:

  • dead-letter-reason: the reason of the failure

  • dead-letter-cause: the cause of the failure if any

  • dead-letter-topic: the original topic of the record

  • dead-letter-partition: the original partition of the record (integer mapped to String)

  • dead-letter-offset: the original offset of the record (long mapped to String)

4.4.1. Retrying processing

You can combine Reactive Messaging with SmallRye Fault Tolerance, and retry processing if it failed:

@Incoming("kafka")
@Retry(delay = 10, maxRetries = 5)
public void consume(String v) {
   // ... retry if this method throws an exception
}

You can configure the delay, the number of retries, the jitter, etc.

If your method returns a Uni or CompletionStage, you need to add the @NonBlocking annotation:

@Incoming("kafka")
@Retry(delay = 10, maxRetries = 5)
@NonBlocking
public Uni<String> consume(String v) {
   // ... retry if this method throws an exception or the returned Uni produce a failure
}
The @NonBlocking annotation is only required with SmallRye Fault Tolerance 5.1.0 and earlier. Starting with SmallRye Fault Tolerance 5.2.0 (available since Quarkus 2.1.0.Final), it is not necessary. See SmallRye Fault Tolerance documentation for more information.

The incoming messages are acknowledged only once the processing completes successfully. So, it commits the offset after the successful processing. If the processing still fails, even after all retries, the message is nacked and the failure strategy is applied.

4.5. Consumer Groups

In Kafka, a consumer group is a set of consumers which cooperate to consume data from a topic. A topic is divided into a set of partitions. The partitions of a topic are assigned among the consumers in the group, effectively allowing to scale consumption throughput. Note that each partition is assigned to a single consumer from a group. However, a consumer can be assigned multiple partitions if the number of partitions is greater than the number of consumer in the group.

Let’s explore briefly different producer/consumer patterns and how to implement them using Quarkus:

  1. Single consumer thread inside a consumer group

    This is the default behavior of an application subscribing to a Kafka topic: Each Kafka connector will create a single consumer thread and place it inside a single consumer group. Consumer group id defaults to the application name as set by the quarkus.application.name configuration property. It can also be set using the kafka.group.id property.

    Architecture
  2. Multiple consumer threads inside a consumer group

    For a given application instance, the number of consumers inside the consumer group can be configured using mp.messaging.incoming.$channel.partitions property. The partitions of the subscribed topic will be divided among the consumer threads. Note that if the partitions value exceed the number of partitions of the topic, some consumer threads won’t be assigned any partitions.

    Architecture
  3. Multiple consumer applications inside a consumer group

    Similar to the previous example, multiple instances of an application can subscribe to a single consumer group, configured via mp.messaging.incoming.$channel.group.id property, or left default to the application name. This in turn will divide partitions of the topic among application instances.

    Architecture
  4. Pub/Sub: Multiple consumer groups subscribed to a topic

    Lastly different applications can subscribe independently to same topics using different consumer group ids. For example, messages published to a topic called orders can be consumed independently on two consumer applications, one with mp.messaging.incoming.orders.group.id=invoicing and second with mp.messaging.incoming.orders.group.id=shipping. Different consumer groups can thus scale independently according to the message consumption requirements.

    Architecture

4.5.1. Consumer Rebalance Listener

Inside a consumer group, as new group members arrive and old members leave, the partitions are re-assigned so that each member receives a proportional share of the partitions. This is known as rebalancing the group. To handle offset commit and assigned partitions yourself, you can provide a consumer rebalance listener. To achieve this, implement the io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener interface and expose it as a CDI bean with the @Idenfier qualifier. A common use case is to store offset in a separate data store to implement exactly-once semantic, or starting the processing at a specific offset.

The listener is invoked every time the consumer topic/partition assignment changes. For example, when the application starts, it invokes the partitionsAssigned callback with the initial set of topics/partitions associated with the consumer. If, later, this set changes, it calls the partitionsRevoked and partitionsAssigned callbacks again, so you can implement custom logic.

Note that the rebalance listener methods are called from the Kafka polling thread and will block the caller thread until completion. That’s because the rebalance protocol has synchronization barriers, and using asynchronous code in a rebalance listener may be executed after the synchronization barrier.

When topics/partitions are assigned or revoked from a consumer, it pauses the message delivery and resumes once the rebalance completes.

If the rebalance listener handles offset commit on behalf of the user (using the NONE commit strategy), the rebalance listener must commit the offset synchronously in the partitionsRevoked callback. We also recommend applying the same logic when the application stops.

Unlike the ConsumerRebalanceListener from Apache Kafka, the io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener methods pass the Kafka Consumer and the set of topics/partitions.

In the following example we set-up a consumer that always starts on messages from at most 10 minutes ago (or offset 0). First we need to provide a bean that implements io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener and is annotated with io.smallrye.common.annotation.Identifier. We then must configure our inbound connector to use this bean.

package inbound;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.TopicPartition;

import javax.enterprise.context.ApplicationScoped;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;

@ApplicationScoped
@Identifier("rebalanced-example.rebalancer")
public class KafkaRebalancedConsumerRebalanceListener implements KafkaConsumerRebalanceListener {

    private static final Logger LOGGER = Logger.getLogger(KafkaRebalancedConsumerRebalanceListener.class.getName());

    /**
     * When receiving a list of partitions, will search for the earliest offset within 10 minutes
     * and seek the consumer to it.
     *
     * @param consumer   underlying consumer
     * @param partitions set of assigned topic partitions
     */
    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        long now = System.currentTimeMillis();
        long shouldStartAt = now - 600_000L; //10 minute ago

        Map<TopicPartition, Long> request = new HashMap<>();
        for (TopicPartition partition : partitions) {
            LOGGER.info("Assigned " + partition);
            request.put(partition, shouldStartAt);
        }
        Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(request);
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> position : offsets.entrySet()) {
            long target = position.getValue() == null ? 0L : position.getValue().offset();
            LOGGER.info("Seeking position " + target + " for " + position.getKey());
            consumer.seek(position.getKey(), target);
        }
    }

}
package inbound;

import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class KafkaRebalancedConsumer {

    @Incoming("rebalanced-example")
    @Acknowledgment(Acknowledgment.Strategy.NONE)
    public CompletionStage<Void> consume(IncomingKafkaRecord<Integer, String> message) {
        // We don't need to ACK messages because in this example,
        // we set offset during consumer rebalance
        return CompletableFuture.completedFuture(null);
    }

}

To configure the inbound connector to use the provided listener, we either set the consumer rebalance listener’s identifier: mp.messaging.incoming.rebalanced-example.consumer-rebalance-listener.name=rebalanced-example.rebalancer

Or have the listener’s name be the same as the group id:

mp.messaging.incoming.rebalanced-example.group.id=rebalanced-example.rebalancer

Setting the consumer rebalance listener’s name takes precedence over using the group id.

4.5.2. Using unique consumer groups

If you want to process all the records from a topic (from its beginning), you need:

  1. to set auto.offset.reset = earliest

  2. assign your consumer to a consumer group not used by any other application.

Quarkus generates a UUID that changes between two executions (including in dev mode). So, you are sure no other consumer uses it, and you receive a new unique group id every time your application starts.

You can use that generated UUID as the consumer group as follows:

mp.messaging.incoming.your-channel.auto.offset.reset=earliest
mp.messaging.incoming.your-channel.group.id=${quarkus.uuid}
If the group.id attribute is not set, it defaults the quarkus.application.name configuration property.

4.6. Receiving Kafka Records in Batches

By default, incoming methods receive each Kafka record individually. Under the hood, Kafka consumer clients poll the broker constantly and receive records in batches, presented inside the ConsumerRecords container.

In batch mode, your application can receive all the records returned by the consumer poll in one go.

To achieve this you need to specify a compatible container type to receive all the data:

@Incoming("prices")
public void consume(List<Double> prices) {
    for (double price : prices) {
        // process price
    }
}

The incoming method can also receive Message<List<Payload>>, KafkaRecordBatch<Key, Payload> ConsumerRecords<Key, Payload> types. They give access to record details such as offset or timestamp:

@Incoming("prices")
public CompletionStage<Void> consumeMessage(KafkaRecordBatch<String, Double> records) {
    for (KafkaRecord<String, Double> record : records) {
        String payload = record.getPayload();
        String topic = record.getTopic();
        // process messages
    }
    // ack will commit the latest offsets (per partition) of the batch.
    return records.ack();
}

Note that the successful processing of the incoming record batch will commit the latest offsets for each partition received inside the batch. The configured commit strategy will be applied for these records only.

Conversely, if the processing throws an exception, all messages are nacked, applying the failure strategy for all the records inside the batch.

Quarkus autodetects batch types for incoming channels and sets batch configuration automatically. You can configure batch mode explicitly with mp.messaging.incoming.$channel.batch property.

5. Sending messages to Kafka

Configuration for the Kafka connector outgoing channels is similar to that of incoming:

%prod.kafka.bootstrap.servers=kafka:9092 (1)
mp.messaging.outgoing.prices-out.connector=smallrye-kafka (2)
mp.messaging.outgoing.prices-out.topic=prices (3)
1 Configure the broker location for the production profile. You can configure it globally or per channel using mp.messaging.outgoing.$channel.bootstrap.servers property. In dev mode and when running tests, Dev Services for Kafka automatically starts a Kafka broker. When not provided, this property defaults to localhost:9092.
2 Configure the connector to manage the prices-out channel.
3 By default, the topic name is same as the channel name. You can configure the topic attribute to override it.

Inside application configuration, channel names are unique. Therefore, if you’d like to configure an incoming and outgoing channel on the same topic, you will need to name channels differently (like in the examples of this guide, mp.messaging.incoming.prices and mp.messaging.outgoing.prices-out).

Then, your application can generate messages and publish them to the prices-out channel. It can use double payloads as in the following snippet:

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class KafkaPriceProducer {

    private final Random random = new Random();

    @Outgoing("prices-out")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> random.nextDouble());
    }

}

You should not call methods annotated with @Incoming and/or @Outgoing directly from your code. They are invoked by the framework. Having user code invoking them would not have the expected outcome.

Note that the generate method returns a Multi<Double>, which implements the Reactive Streams Publisher interface. This publisher will be used by the framework to generate messages and send them to the configured Kafka topic.

Instead of returning a payload, you can return a io.smallrye.reactive.messaging.kafka.Record to send key/value pairs:

@Outgoing("out")
public Multi<Record<String, Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
        .map(x -> Record.of("my-key", random.nextDouble()));
}

Payload can be wrapped inside org.eclipse.microprofile.reactive.messaging.Message to have more control on the written records:

@Outgoing("generated-price")
public Multi<Message<Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> Message.of(random.nextDouble())
                    .addMetadata(OutgoingKafkaRecordMetadata.<String>builder()
                            .withKey("my-key")
                            .withTopic("my-key-prices")
                            .withHeaders(new RecordHeaders().add("my-header", "value".getBytes()))
                            .build()));
}

OutgoingKafkaRecordMetadata allows to set metadata attributes of the Kafka record, such as key, topic, partition or timestamp. One use case is to dynamically select the destination topic of a message. In this case, instead of configuring the topic inside your application configuration file, you need to use the outgoing metadata to set the name of the topic.

Other than method signatures returning a Reactive Stream Publisher (Multi being an implementation of Publisher), outgoing method can also return single message. In this case the producer will use this method as generator to create an infinite stream.

@Outgoing("prices-out") T generate(); // T excluding void

@Outgoing("prices-out") Message<T> generate();

@Outgoing("prices-out") Uni<T> generate();

@Outgoing("prices-out") Uni<Message<T>> generate();

@Outgoing("prices-out") CompletionStage<T> generate();

@Outgoing("prices-out") CompletionStage<Message<T>> generate();

5.1. Sending messages with @Emitter

Sometimes, you need to have an imperative way of sending messages.

For example, if you need to send a message to a stream when receiving a POST request inside a REST endpoint. In this case, you cannot use @Outgoing because your method has parameters.

For this, you can use an Emitter.

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Consumes;
import javax.ws.rs.core.MediaType;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("price-create")
    Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        CompletionStage<Void> ack = priceEmitter.send(price);
    }
}

Sending a payload returns a CompletionStage, completed when the message is acked. If the message transmission fails, the CompletionStage is completed exceptionally with the reason of the nack.

The Emitter configuration is done the same way as the other stream configuration used by @Incoming and @Outgoing.

Using the Emitter you are sending messages from your imperative code to reactive messaging. These messages are stored in a queue until they are sent. If the Kafka producer client can’t keep up with messages trying to be sent over to Kafka, this queue can become a memory hog and you may even run out of memory. You can use @OnOverflow to configure back-pressure strategy. It lets you configure the size of the queue (default is 256) and the strategy to apply when the buffer size is reached. Available strategies are DROP, LATEST, FAIL, BUFFER, UNBOUNDED_BUFFER and NONE.

With the Emitter API, you can also encapsulate the outgoing payload inside Message<T>. As with the previous examples, Message lets you handle the ack/nack cases differently.

import java.util.concurrent.CompletableFuture;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Consumes;
import javax.ws.rs.core.MediaType;

@Path("/prices")
public class PriceResource {

    @Inject @Channel("price-create") Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        priceEmitter.send(Message.of(price)
            .withAck(() -> {
                // Called when the message is acked
                return CompletableFuture.completedFuture(null);
            })
            .withNack(throwable -> {
                // Called when the message is nacked
                return CompletableFuture.completedFuture(null);
            }));
    }
}

If you prefer using Reactive Stream APIs, you can use MutinyEmitter that will return Uni<Void> from the send method. You can therefore use Mutiny APIs for handling downstream messages and errors.

import org.eclipse.microprofile.reactive.messaging.Channel;

import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Consumes;
import javax.ws.rs.core.MediaType;

import io.smallrye.reactive.messaging.MutinyEmitter;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("price-create")
    MutinyEmitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public Uni<String> addPrice(Double price) {
        return quoteRequestEmitter.send(price)
                .map(x -> "ok")
                .onFailure().recoverWithItem("ko");
    }
}

It is also possible to block on sending the event to the emitter with the sendAndAwait method. It will only return from the method when the event is acked or nacked by the receiver.

Deprecation

The io.smallrye.reactive.messaging.annotations.Emitter, io.smallrye.reactive.messaging.annotations.Channel and io.smallrye.reactive.messaging.annotations.OnOverflow classes are now deprecated and replaced by:

  • org.eclipse.microprofile.reactive.messaging.Emitter

  • org.eclipse.microprofile.reactive.messaging.Channel

  • org.eclipse.microprofile.reactive.messaging.OnOverflow

The new Emitter.send method returns a CompletionStage completed when the produced message is acknowledged.

More information on how to use Emitter can be found in SmallRye Reactive Messaging – Emitters and Channels

5.2. Write Acknowledgement

When Kafka broker receives a record, its acknowledgement can take time depending on the configuration. Also, it stores in-memory the records that cannot be written.

By default, the connector does wait for Kafka to acknowledge the record to continue the processing (acknowledging the received Message). You can disable this by setting the waitForWriteCompletion attribute to false.

Note that the acks attribute has a huge impact on the record acknowledgement.

If a record cannot be written, the message is nacked.

5.3. Backpressure

The Kafka outbound connector handles back-pressure, monitoring the number of in-flight messages waiting to be written to the Kafka broker. The number of in-flight messages is configured using the max-inflight-messages attribute and defaults to 1024.

The connector only sends that amount of messages concurrently. No other messages will be sent until at least one in-flight message gets acknowledged by the broker. Then, the connector writes a new message to Kafka when one of the broker’s in-flight messages get acknowledged. Be sure to configure Kafka’s batch.size and linger.ms accordingly.

You can also remove the limit of in-flight messages by setting max-inflight-messages to 0. However, note that the Kafka producer may block if the number of requests reaches max.in.flight.requests.per.connection.

5.4. Retrying message dispatch

When the Kafka producer receives an error from the server, if it is a transient, recoverable error, the client will retry sending the batch of messages. This behavior is controlled by retries and retry.backoff.ms parameters. In addition to this, SmallRye Reactive Messaging will retry individual messages on recoverable errors, depending on the retries and delivery.timeout.ms parameters.

Note that while having retries in a reliable system is a best practice, the max.in.flight.requests.per.connection parameter defaults to 5, meaning that the order of the messages is not guaranteed. If the message order is a must for your use case, setting max.in.flight.requests.per.connection to 1 will make sure a single batch of messages is sent at a time, in the expense of limiting the throughput of the producer.

For applying retry mechanism on processing errors, see the section on Retrying processing.

5.5. In-memory channels

In some use cases, it is convenient to use the messaging patterns to transfer messages inside the same application. When you don’t connect a channel to a messaging backend like Kafka, everything happens in-memory, and the streams are created by chaining methods together. Each chain is still a reactive stream and enforces the back-pressure protocol.

The framework verifies that the producer/consumer chain is complete, meaning that if the application writes messages into an in-memory channel (using a method with only @Outgoing, or an Emitter), it must also consume the messages from within the application (using a method with only @Incoming or using an unmanaged stream).

5.6. Broadcasting messages on multiple consumers

By default, a channel can be linked to a single consumer, using @Incoming method or @Channel reactive stream. At application startup, channels are verified to form a chain of consumers and producers with single consumer and producer. You can override this behavior by setting mp.messaging.$channel.broadcast=true on a channel.

In case of in-memory channels, @Broadcast annotation can be used on the @Outgoing method. For example,

import java.util.Random;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.annotations.Broadcast;

@ApplicationScoped
public class MultipleConsumer {

    private final Random random = new Random();

    @Outgoing("in-memory-channel")
    @Broadcast
    double generate() {
        return random.nextDouble();
    }

    @Incoming("in-memory-channel")
    void consumeAndLog(double price) {
        System.out.println(price);
    }

    @Incoming("in-memory-channel")
    @Outgoing("prices2")
    double consumeAndSend(double price) {
        return price;
    }
}

Reciprocally, multiple producers on the same channel can be merged by setting mp.messaging.incoming.$channel.merge=true. On the @Incoming methods, you can control how multiple channels are merged using the @Merge annotation.

6. Processing Messages

Applications streaming data often need to consume some events from a topic, process them and publish the result to a different topic. A processor method can be simply implemented using both the @Incoming and @Outgoing annotations:

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceProcessor {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("price-in")
    @Outgoing("price-out")
    public double process(double price) {
        return price * CONVERSION_RATE;
    }

}

The parameter of the process method is the incoming message payload, whereas the return value will be used as the outgoing message payload. Previously mentioned signatures for parameter and return types are also supported, such as Message<T>, Record<K, V>, etc.

You can apply asynchronous stream processing by consuming and returning reactive stream Multi<T> type:

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class PriceProcessor {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("price-in")
    @Outgoing("price-out")
    public Multi<Double> process(Multi<Integer> prices) {
        return prices.filter(p -> p > 100).map(p -> p * CONVERSION_RATE);
    }

}

6.1. Propagating Record Key

When processing messages, you can propagate incoming record key to the outgoing record.

Enabled with mp.messaging.outgoing.$channel.propagate-record-key=true configuration, record key propagation produces the outgoing record with the same key as the incoming record.

If the outgoing record already contains a key, it won’t be overridden by the incoming record key. If the incoming record does have a null key, the mp.messaging.outgoing.$channel.key property is used.

7. Accessing Kafka clients directly

In rare cases, you may need to access the underlying Kafka clients. KafkaClientService provides thread-safe access to Producer and Consumer.

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;

import org.apache.kafka.clients.producer.ProducerRecord;

import io.quarkus.runtime.StartupEvent;
import io.smallrye.reactive.messaging.kafka.KafkaClientService;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;

@ApplicationScoped
public class PriceSender {

    @Inject
    KafkaClientService clientService;

    void onStartup(@Observes StartupEvent startupEvent) {
        KafkaProducer<String, Double> producer = clientService.getProducer("generated-price");
        producer.runOnSendingThread(client -> client.send(new ProducerRecord<>("prices", 2.4)))
            .await().indefinitely();
    }
}

The KafkaClientService is an experimental API and can change in the future.

You can also get the Kafka configuration injected to your application and create Kafka producer, consumer and admin clients directly:

import io.smallrye.common.annotation.Identifier;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.KafkaAdminClient;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.Map;

@ApplicationScoped
public class KafkaClients {

    @Inject
    @Identifier("default-kafka-broker")
    Map<String, Object> config;

    @Produces
    AdminClient getAdmin() {
        Map<String, Object> copy = new HashMap<>();
        for (Map.Entry<String, Object> entry : config.entrySet()) {
            if (AdminClientConfig.configNames().contains(entry.getKey())) {
                copy.put(entry.getKey(), entry.getValue());
            }
        }
        return KafkaAdminClient.create(copy);
    }

}

The default-kafka-broker configuration map contains all application properties prefixed with kafka. or KAFKA_. For more configuration options check out Kafka Configuration Resolution.

8. JSON serialization

Quarkus has built-in capabilities to deal with JSON Kafka messages.

Imagine we have a Fruit data class as follows:

public class Fruit {

    public String name;
    public int price;

    public Fruit() {
    }

    public Fruit(String name, int price) {
        this.name = name;
        this.price = price;
    }
}

And we want to use it to receive messages from Kafka, make some price transformation, and send messages back to Kafka.

import io.smallrye.reactive.messaging.annotations.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import javax.enterprise.context.ApplicationScoped;

/**
* A bean consuming data from the "fruit-in" channel and applying some price conversion.
* The result is pushed to the "fruit-out" channel.
*/
@ApplicationScoped
public class FruitProcessor {

    private static final double CONVERSION_RATE = 0.88;

    @Incoming("fruit-in")
    @Outgoing("fruit-out")
    @Broadcast
    public Fruit process(Fruit fruit) {
        fruit.price = fruit.price * CONVERSION_RATE;
        return fruit;
    }

}

To do this, we will need to setup JSON serialization with Jackson or JSON-B.

With JSON serialization correctly configured, you can also use Publisher<Fruit> and Emitter<Fruit>.

8.1. Serializing via Jackson

First, you need to include the quarkus-jackson extension.

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-jackson</artifactId>
</dependency>

There is an existing ObjectMapperSerializer that can be used to serialize all data objects via Jackson. You may create an empty subclass if you want to use Serializer/deserializer autodetection.

By default, the ObjectMapperSerializer serializes null as the "null" String, this can be customized by setting the Kafka configuration property json.serialize.null-as-null=true which will serialize null as null. This is handy when using a compacted topic, as null is used as a tombstone to know which messages delete during compaction phase.

The corresponding deserializer class needs to be subclassed. So, let’s create a FruitDeserializer that extends the ObjectMapperDeserializer.

package com.acme.fruit.jackson;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

Finally, configure your channels to use the Jackson serializer and deserializer.

# Configure the Kafka source (we read from it)
mp.messaging.incoming.fruit-in.connector=smallrye-kafka
mp.messaging.incoming.fruit-in.topic=fruit-in
mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jackson.FruitDeserializer

# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.fruit-out.connector=smallrye-kafka
mp.messaging.outgoing.fruit-out.topic=fruit-out
mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

Now, your Kafka messages will contain a Jackson serialized representation of your Fruit data object. In this case, the deserializer configuration is not necessary as the Serializer/deserializer autodetection is enabled by default.

If you want to deserialize a list of fruits, you need to create a deserializer with a Jackson TypeReference denoted the generic collection used.

package com.acme.fruit.jackson;

import java.util.List;
import com.fasterxml.jackson.core.type.TypeReference;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class ListOfFruitDeserializer extends ObjectMapperDeserializer<List<Fruit>> {
    public ListOfFruitDeserializer() {
        TypeReference<List<Fruit>> listType = new TypeReference<>() {
        };
        super(listType);
    }
}

8.2. Serializing via JSON-B

First, you need to include the quarkus-jsonb extension.

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-jsonb</artifactId>
</dependency>

There is an existing JsonbSerializer that can be used to serialize all data objects via JSON-B. You may create an empty subclass if you want to use Serializer/deserializer autodetection.

By default, the JsonbSerializer serializes null as the "null" String, this can be customized by setting the Kafka configuration property json.serialize.null-as-null=true which will serialize null as null. This is handy when using a compacted topic, as null is used as a tombstone to know which messages delete during compaction phase.

The corresponding deserializer class needs to be subclassed. So, let’s create a FruitDeserializer that extends the generic JsonbDeserializer.

package com.acme.fruit.jsonb;

import io.quarkus.kafka.client.serialization.JsonbDeserializer;

public class FruitDeserializer extends JsonbDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

Finally, configure your channels to use the JSON-B serializer and deserializer.

# Configure the Kafka source (we read from it)
mp.messaging.incoming.fruit-in.connector=smallrye-kafka
mp.messaging.incoming.fruit-in.topic=fruit-in
mp.messaging.incoming.fruit-in.value.deserializer=com.acme.fruit.jsonb.FruitDeserializer

# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.fruit-out.connector=smallrye-kafka
mp.messaging.outgoing.fruit-out.topic=fruit-out
mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer

Now, your Kafka messages will contain a JSON-B serialized representation of your Fruit data object.

If you want to deserialize a list of fruits, you need to create a deserializer with a Type denoted the generic collection used.

package com.acme.fruit.jsonb;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import io.quarkus.kafka.client.serialization.JsonbDeserializer;

public class ListOfFruitDeserializer extends JsonbDeserializer<List<Fruit>> {
    public ListOfFruitDeserializer() {
        Type listType = new ArrayList<MyEntity>() {}.getClass().getGenericSuperclass();
        super(listType);
    }
}
If you don’t want to create a deserializer for each data object, you can use the generic io.vertx.kafka.client.serialization.JsonObjectDeserializer that will deserialize to a io.vertx.core.json.JsonObject. The corresponding serializer can also be used: io.vertx.kafka.client.serialization.JsonObjectSerializer.

9. Avro Serialization

This is described in a dedicated guide: Using Apache Kafka with Schema Registry and Avro.

10. Serializer/deserializer autodetection

When using SmallRye Reactive Messaging with Kafka (io.quarkus:quarkus-smallrye-reactive-messaging-kafka), Quarkus can often automatically detect the correct serializer and deserializer class. This autodetection is based on declarations of @Incoming and @Outgoing methods, as well as injected @Channels.

For example, if you declare

@Outgoing("generated-price")
public Multi<Integer> generate() {
    ...
}

and your configuration indicates that the generated-price channel uses the smallrye-kafka connector, then Quarkus will automatically set the value.serializer to Kafka’s built-in IntegerSerializer.

Similarly, if you declare

@Incoming("my-kafka-records")
public void consume(KafkaRecord<Long, byte[]> record) {
    ...
}

and your configuration indicates that the my-kafka-records channel uses the smallrye-kafka connector, then Quarkus will automatically set the key.deserializer to Kafka’s built-in LongDeserializer, as well as the value.deserializer to ByteArrayDeserializer.

Finally, if you declare

@Inject
@Channel("price-create")
Emitter<Double> priceEmitter;

and your configuration indicates that the price-create channel uses the smallrye-kafka connector, then Quarkus will automatically set the value.serializer to Kafka’s built-in DoubleSerializer.

The full set of types supported by the serializer/deserializer autodetection is:

  • short and java.lang.Short

  • int and java.lang.Integer

  • long and java.lang.Long

  • float and java.lang.Float

  • double and java.lang.Double

  • byte[]

  • java.lang.String

  • java.util.UUID

  • java.nio.ByteBuffer

  • org.apache.kafka.common.utils.Bytes

  • io.vertx.core.buffer.Buffer

  • io.vertx.core.json.JsonObject

  • io.vertx.core.json.JsonArray

  • classes generated from Avro schemas, as well as Avro GenericRecord, if Confluent or Apicurio Registry serde is present

  • classes for which a subclass of ObjectMapperSerializer / ObjectMapperDeserializer is present, as described in Serializing via Jackson

    • it is technically not needed to subclass ObjectMapperSerializer, but in such case, autodetection isn’t possible

  • classes for which a subclass of JsonbSerializer / JsonbDeserializer is present, as described in Serializing via JSON-B

    • it is technically not needed to subclass JsonbSerializer, but in such case, autodetection isn’t possible

If a serializer/deserializer is set by configuration, it won’t be replaced by the autodetection.

In case you have any issues with serializer autodetection, you can switch it off completely by setting quarkus.reactive-messaging.kafka.serializer-autodetection.enabled=false. If you find you need to do this, please file a bug in the Quarkus issue tracker so we can fix whatever problem you have.

11. Using Schema Registry

This is described in a dedicated guide: Using Apache Kafka with Schema Registry and Avro.

12. Health Checks

Quarkus provides several health checks for Kafka. These checks are used in combination with the quarkus-smallrye-health extension.

12.1. Kafka Broker Readiness Check

When using the quarkus-kafka-client extension, you can enable readiness health check by setting the quarkus.kafka.health.enabled property to true in your application.properties. This check reports the status of the interaction with a default Kafka broker (configured using kafka.bootstrap.servers). It requires an admin connection with the Kafka broker, and it is disabled by default. If enabled, when you access the /q/health/ready endpoint of your application, you will have information about the connection validation status.

12.2. Kafka Reactive Messaging Health Checks

When using Reactive Messaging and the Kafka connector, each configured channel (incoming or outgoing) provides startup, liveness and readiness checks.

  • The startup check verifies that the communication with Kafka cluster is established.

  • The liveness check captures any unrecoverable failure happening during the communication with Kafka.

  • The readiness check verifies that the Kafka connector is ready to consume/produce messages to the configured Kafka topics.

For each channel, you can disable the checks using:

# Disable both liveness and readiness checks with `health-enabled=false`:

# Incoming channel (receiving records form Kafka)
mp.messaging.incoming.your-channel.health-enabled=false
# Outgoing channel (writing records to Kafka)
mp.messaging.outgoing.your-channel.health-enabled=false

# Disable only the readiness check with `health-readiness-enabled=false`:

mp.messaging.incoming.your-channel.health-readiness-enabled=false
mp.messaging.outgoing.your-channel.health-readiness-enabled=false
You can configure the bootstrap.servers for each channel using mp.messaging.incoming|outgoing.$channel.bootstrap.servers property. Default is kafka.bootstrap.servers.

Reactive Messaging startup and readiness checks offer two strategies. The default strategy verifies that an active connection is established with the broker. This approach is not intrusive as it’s based on built-in Kafka client metrics.

Using the health-topic-verification-enabled=true attribute, startup probe uses an admin client to check for the list of topics. Whereas the readiness probe for an incoming channel checks that at least one partition is assigned for consumption, and for an outgoing channel checks that the topic used by the producer exist in the broker.

Note that to achieve this, an admin connection is required. You can adjust the timeout for topic verification calls to the broker using the health-topic-verification-timeout configuration.

13. Kafka Streams

This is described in a dedicated guide: Using Apache Kafka Streams.

14. Using Snappy for message compression

On outgoing channels, you can enable Snappy compression by setting the compression.type attribute to snappy:

mp.messaging.outgoing.fruit-out.compression.type=snappy

In JVM mode, it will work out of the box. However, to compile your application to a native executable, you need to:

  1. Uses GraalVM 21.+

  2. Add quarkus.kafka.snappy.enabled=true to your application.properties

In native mode, Snappy is disabled by default as the use of Snappy requires embedding a native library and unpacking it when the application starts.

15. Authentication with OAuth

If your Kafka broker uses OAuth as authentication mechanism, you need to configure the Kafka consumer to enable this authentication process. First, add the following dependency to your application:

<dependency>
    <groupId>io.strimzi</groupId>
    <artifactId>kafka-oauth-client</artifactId>
</dependency>

This dependency provides the callback handler required to handle the OAuth workflow. Then, in the application.properties, add:

mp.messaging.connector.smallrye-kafka.security.protocol=SASL_PLAINTEXT
mp.messaging.connector.smallrye-kafka.sasl.mechanism=OAUTHBEARER
mp.messaging.connector.smallrye-kafka.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  oauth.client.id="team-a-client" \
  oauth.client.secret="team-a-client-secret" \
  oauth.token.endpoint.uri="http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/token" ;
mp.messaging.connector.smallrye-kafka.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

quarkus.ssl.native=true

Update the oauth.client.id, oauth.client.secret and oauth.token.endpoint.uri values.

OAuth authentication works for both JVM and native modes. Since SSL in not enabled by default in native mode, quarkus.ssl.native=true must be added to support JaasClientOauthLoginCallbackHandler, which uses SSL. (See the Using SSL with Native Executables guide for more details.)

16. Testing a Kafka application

16.1. Testing without a broker

It can be useful to test the application without having to start a Kafka broker. To achieve this, you can switch the channels managed by the Kafka connector to in-memory.

This approach only works for JVM tests. It cannot be used for native tests (because they do not support injection).

Let’s say we want to test the following processor application:

@ApplicationScoped
public class BeverageProcessor {

    @Incoming("orders")
    @Outgoing("beverages")
    Beverage process(Order order) {
        System.out.println("Order received " + order.getProduct());
        Beverage beverage = new Beverage();
        beverage.setBeverage(order.getProduct());
        beverage.setCustomer(order.getCustomer());
        beverage.setOrderId(order.getOrderId());
        beverage.setPreparationState("RECEIVED");
        return beverage;
    }

}

First, add the following test dependency to your application:

<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-reactive-messaging-in-memory</artifactId>
    <scope>test</scope>
</dependency>

Then, create a Quarkus Test Resource as follows:

public class KafkaTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager {

    @Override
    public Map<String, String> start() {
        Map<String, String> env = new HashMap<>();
        Map<String, String> props1 = InMemoryConnector.switchIncomingChannelsToInMemory("orders");     (1)
        Map<String, String> props2 = InMemoryConnector.switchOutgoingChannelsToInMemory("beverages");  (2)
        env.putAll(props1);
        env.putAll(props2);
        return env;  (3)
    }

    @Override
    public void stop() {
        InMemoryConnector.clear();  (4)
    }
}
1 Switch the incoming channel orders (expecting messages from Kafka) to in-memory.
2 Switch the outgoing channel beverages (writing messages to Kafka) to in-memory.
3 Builds and returns a Map containing all the properties required to configure the application to use in-memory channels.
4 When the test stops, clear the InMemoryConnector (discard all the received and sent messages)

Create a Quarkus Test using the test resource created above:

@QuarkusTest
@QuarkusTestResource(KafkaTestResourceLifecycleManager.class)
class BaristaTest {

    @Inject
    InMemoryConnector connector; (1)

    @Test
    void testProcessOrder() {
        InMemorySource<Order> ordersIn = connector.source("orders");     (2)
        InMemorySink<Beverage> beveragesOut = connector.sink("beverages");  (3)

        Order order = new Order();
        order.setProduct("coffee");
        order.setName("Coffee lover");
        order.setOrderId("1234");

        ordersIn.send(order);  (4)

        await().<List<? extends Message<Beverage>>>until(beveragesOut::received, t -> t.size() == 1); (5)

        Beverage queuedBeverage = beveragesOut.received().get(0).getPayload();
        Assertions.assertEquals(Beverage.State.READY, queuedBeverage.getPreparationState());
        Assertions.assertEquals("coffee", queuedBeverage.getBeverage());
        Assertions.assertEquals("Coffee lover", queuedBeverage.getCustomer());
        Assertions.assertEquals("1234", queuedBeverage.getOrderId());
    }

}
1 Inject the in-memory connector in your test class.
2 Retrieve the incoming channel (orders) - the channel must have been switched to in-memory in the test resource.
3 Retrieve the outgoing channel (beverages) - the channel must have been switched to in-memory in the test resource.
4 Use the send method to send a message to the orders channel. The application will process this message and send a message to beverages channel.
5 Use the received method on beverages channel to check the messages produced by the application.

With in-memory channels we were able to test application code processing messages without starting a Kafka broker. Note that different in-memory channels are independent, and switching channel connector to in-memory does not simulate message delivery between channels configured to the same Kafka topic.

16.2. Starting Kafka in a test resource

Alternatively, you can start a Kafka broker in a test resource. The following snippet shows a test resource starting a Kafka broker using Testcontainers:

public class KafkaResource implements QuarkusTestResourceLifecycleManager {

    private final KafkaContainer kafka = new KafkaContainer();

    @Override
    public Map<String, String> start() {
        kafka.start();
        return Collections.singletonMap("kafka.bootstrap.servers", kafka.getBootstrapServers());  (1)
    }

    @Override
    public void stop() {
        kafka.close();
    }
}
1 Configure the Kafka bootstrap location, so the application connects to this broker.

17. Dev Services for Kafka

If any Kafka-related extension is present (e.g. quarkus-smallrye-reactive-messaging-kafka), Dev Services for Kafka automatically starts a Kafka broker in dev mode and when running tests. So, you don’t have to start a broker manually. The application is configured automatically.

Because starting a Kafka broker can be long, Dev Services for Kafka uses Redpanda, a Kafka compatible broker which starts in ~1 second.

17.1. Enabling / Disabling Dev Services for Kafka

Dev Services for Kafka is automatically enabled unless:

  • quarkus.kafka.devservices.enabled is set to false

  • the kafka.bootstrap.servers is configured

  • all the Reactive Messaging Kafka channels have the bootstrap.servers attribute set

Dev Services for Kafka relies on Docker to start the broker. If your environment does not support Docker, you will need to start the broker manually, or connect to an already running broker. You can configure the broker address using kafka.bootstrap.servers.

17.2. Shared broker

Most of the time you need to share the broker between applications. Dev Services for Kafka implements a service discovery mechanism for your multiple Quarkus applications running in dev mode to share a single broker.

Dev Services for Kafka starts the container with the quarkus-dev-service-kafka label which is used to identify the container.

If you need multiple (shared) brokers, you can configure the quarkus.kafka.devservices.service-name attribute and indicate the broker name. It looks for a container with the same value, or starts a new one if none can be found. The default service name is kafka.

Sharing is enabled by default in dev mode, but disabled in test mode. You can disable the sharing with quarkus.kafka.devservices.shared=false.

17.3. Setting the port

By default, Dev Services for Kafka picks a random port and configures the application. You can set the port by configuring the quarkus.kafka.devservices.port property.

Note that the Kafka advertised address is automatically configured with the chosen port.

17.4. Configuring the image

Dev Services for Kafka uses: vectorized/redpanda images. You can select any version from https://hub.docker.com/r/vectorized/redpanda:

quarkus.kafka.devservices.image-name=vectorized/redpanda:latest
Dev Services for Kafka only support Redpanda.

17.5. Configuring Kafka topics

You can configure the Dev Services for Kafka to create topics once the broker is started. Topics are created with given number of partitions and 1 replica.

The following example creates a topic named test with 3 partitions, and a second topic named messages with 2 partitions.

quarkus.kafka.devservices.topic-partitions.test=3
quarkus.kafka.devservices.topic-partitions.messages=2

If a topic already exists with the given name, the creation is skipped, without trying to re-partition the existing topic to a different number of partitions.

You can configure timeout for Kafka admin client calls used in topic creation using quarkus.kafka.devservices.topic-partitions-timeout, it defaults to 2 seconds.

18. Kubernetes Service Bindings

Quarkus Kafka extension supports Service Binding Specification for Kubernetes. You can enable this by adding the quarkus-kubernetes-service-binding extension to your application.

When running in appropriately configured Kubernetes clusters, Kafka extension will pull its Kafka broker connection configuration from the service binding available inside the cluster, without the need for user configuration.

19. Execution model

Reactive Messaging invokes user’s methods on an I/O thread. Thus, by default, the methods must not block. As described in Blocking processing, you need to add the @Blocking annotation on the method if this method will block the caller thread.

See the Quarkus Reactive Architecture documentation for further details on this topic.

20. Configuration Reference

More details about the SmallRye Reactive Messaging configuration can be found in the SmallRye Reactive Messaging - Kafka Connector Documentation.

The most important attributes are listed in the tables below:

20.1. Incoming channel configuration (polling from Kafka)

The following attributes are configured using:

mp.messaging.incoming.your-channel-name.attribute=value

Some properties have aliases which can be configured globally:

kafka.bootstrap.servers=...

You can also pass any property supported by the underlying Kafka consumer.

For example, to configure the max.poll.records property, use:

mp.messaging.incoming.[channel].max.poll.records=1000

Some consumer client properties are configured to sensible default values:

If not set, reconnect.backoff.max.ms is set to 10000 to avoid high load on disconnection.

If not set, key.deserializer is set to org.apache.kafka.common.serialization.StringDeserializer.

The consumer client.id is configured according to the number of clients to create using mp.messaging.incoming.[channel].partitions property.

  • If a client.id is provided, it is used as-is or suffixed with client index if partitions property is set.

  • If a client.id is not provided, it is generated as kafka-consumer-[channel][-index].

Table 1. Incoming Attributes of the 'smallrye-kafka' connector
Attribute (alias) Description Mandatory Default

bootstrap.servers

(kafka.bootstrap.servers)

A comma-separated list of host:port to use for establishing the initial connection to the Kafka cluster.

Type: string

false

localhost:9092

topic

The consumed / populated Kafka topic. If neither this property nor the topics properties are set, the channel name is used

Type: string

false

health-enabled

Whether health reporting is enabled (default) or disabled

Type: boolean

false

true

health-readiness-enabled

Whether readiness health reporting is enabled (default) or disabled

Type: boolean

false

true

health-readiness-topic-verification

deprecated - Whether the readiness check should verify that topics exist on the broker. Default to false. Enabling it requires an admin connection. Deprecated: Use 'health-topic-verification-enabled' instead.

Type: boolean

false

health-readiness-timeout

deprecated - During the readiness health check, the connector connects to the broker and retrieves the list of topics. This attribute specifies the maximum duration (in ms) for the retrieval. If exceeded, the channel is considered not-ready. Deprecated: Use 'health-topic-verification-timeout' instead.

Type: long

false

health-topic-verification-enabled

Whether the startup and readiness check should verify that topics exist on the broker. Default to false. Enabling it requires an admin client connection.

Type: boolean

false

false

health-topic-verification-timeout

During the startup and readiness health check, the connector connects to the broker and retrieves the list of topics. This attribute specifies the maximum duration (in ms) for the retrieval. If exceeded, the channel is considered not-ready.

Type: long

false

2000

tracing-enabled

Whether tracing is enabled (default) or disabled

Type: boolean

false

true

cloud-events

Enables (default) or disables the Cloud Event support. If enabled on an incoming channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an outgoing, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata.

Type: boolean

false

true

kafka-configuration

Identifier of a CDI bean that provides the default Kafka consumer/producer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier.

Type: string

false

topics

A comma-separating list of topics to be consumed. Cannot be used with the topic or pattern properties

Type: string

false

pattern

Indicate that the topic property is a regular expression. Must be used with the topic property. Cannot be used with the topics property

Type: boolean

false

false

key.deserializer

The deserializer classname used to deserialize the record’s key

Type: string

false

org.apache.kafka.common.serialization.StringDeserializer

value.deserializer

The deserializer classname used to deserialize the record’s value

Type: string

true

fetch.min.bytes

The minimum amount of data the server should return for a fetch request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive.

Type: int

false

1

[.no-hyphens]#group.id

A unique string that identifies the consumer group the application belongs to.

If not set, defaults to the application name as set by the quarkus.application.name configuration property.

If that is not set either, a unique, generated id is used.

It is recommended to always define a group.id, the automatic generation is only a convenient feature for development. You can explicitly ask for automatically generated unique id by setting this property to ${quarkus.uuid}.

Type: string

false

enable.auto.commit

If enabled, consumer’s offset will be periodically committed in the background by the underlying Kafka client, ignoring the actual processing outcome of the records. It is recommended to NOT enable this setting and let Reactive Messaging handles the commit.

Type: boolean

false

false

retry

Whether or not the connection to the broker is re-attempted in case of failure

Type: boolean

false

true

retry-attempts

The maximum number of reconnection before failing. -1 means infinite retry

Type: int

false

-1

retry-max-wait

The max delay (in seconds) between 2 reconnects

Type: int

false

30

broadcast

Whether the Kafka records should be dispatched to multiple consumer

Type: boolean

false

false

auto.offset.reset

What to do when there is no initial offset in Kafka.Accepted values are earliest, latest and none

Type: string

false

latest

failure-strategy

Specify the failure strategy to apply when a message produced from a record is acknowledged negatively (nack). Values can be fail (default), ignore, or dead-letter-queue

Type: string

false

fail

commit-strategy

Specify the commit strategy to apply when a message produced from a record is acknowledged. Values can be latest, ignore or throttled. If enable.auto.commit is true then the default is ignore otherwise it is throttled

Type: string

false

throttled.unprocessed-record-max-age.ms

While using the throttled commit-strategy, specify the max age in milliseconds that an unprocessed message can be before the connector is marked as unhealthy. Setting this attribute to 0 disables this monitoring.

Type: int

false

60000

dead-letter-queue.topic

When the failure-strategy is set to dead-letter-queue indicates on which topic the record is sent. Defaults is dead-letter-topic-$channel

Type: string

false

dead-letter-queue.key.serializer

When the failure-strategy is set to dead-letter-queue indicates the key serializer to use. If not set the serializer associated to the key deserializer is used

Type: string

false

dead-letter-queue.value.serializer

When the failure-strategy is set to dead-letter-queue indicates the value serializer to use. If not set the serializer associated to the value deserializer is used

Type: string

false

partitions

The number of partitions to be consumed concurrently. The connector creates the specified amount of Kafka consumers. It should match the number of partition of the targeted topic

Type: int

false

1

consumer-rebalance-listener.name

The name set in @Identifier of a bean that implements io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener. If set, this rebalance listener is applied to the consumer.

Type: string

false

key-deserialization-failure-handler

The name set in @Identifier of a bean that implements io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler. If set, deserialization failure happening when deserializing keys are delegated to this handler which may provide a fallback value.

Type: string

false

value-deserialization-failure-handler

The name set in @Identifier of a bean that implements io.smallrye.reactive.messaging.kafka.DeserializationFailureHandler. If set, deserialization failure happening when deserializing values are delegated to this handler which may provide a fallback value.

Type: string

false

fail-on-deserialization-failure

When no deserialization failure handler is set and a deserialization failure happens, report the failure and mark the application as unhealthy. If set to false and a deserialization failure happens, a null value is forwarded.

Type: boolean

false

true

graceful-shutdown

Whether or not a graceful shutdown should be attempted when the application terminates.

Type: boolean

false

true

poll-timeout

The polling timeout in milliseconds. When polling records, the poll will wait at most that duration before returning records. Default is 1000ms

Type: int

false

1000

pause-if-no-requests

Whether the polling must be paused when the application does not request items and resume when it does. This allows implementing back-pressure based on the application capacity. Note that polling is not stopped, but will not retrieve any records when paused.

Type: boolean

false

true

batch

Whether the Kafka records are consumed in batch. The channel injection point must consume a compatible type, such as List<Payload> or KafkaRecordBatch<Payload>.

Type: boolean

false

false

max-queue-size-factor

Multiplier factor to determine maximum number of records queued for processing, using max.poll.records * max-queue-size-factor. Defaults to 2. In batch mode max.poll.records is considered 1.

Type: int

false

2

20.2. Outgoing channel configuration (writing to Kafka)

The following attributes are configured using:

mp.messaging.outgoing.your-channel-name.attribute=value

Some properties have aliases which can be configured globally:

kafka.bootstrap.servers=...

Some producer client properties are configured to sensible default values:

If not set, reconnect.backoff.max.ms is set to 10000 to avoid high load on disconnection.

If not set, key.serializer is set to org.apache.kafka.common.serialization.StringSerializer.

If not set, producer client.id is generated as kafka-producer-[channel].

Table 2. Outgoing Attributes of the 'smallrye-kafka' connector
Attribute (alias) Description Mandatory Default

acks

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. Accepted values are: 0, 1, all

Type: string

false

1

bootstrap.servers

(kafka.bootstrap.servers)

A comma-separated list of host:port to use for establishing the initial connection to the Kafka cluster.

Type: string

false

localhost:9092

buffer.memory

The total bytes of memory the producer can use to buffer records waiting to be sent to the server.

Type: long

false

33554432

close-timeout

The amount of milliseconds waiting for a graceful shutdown of the Kafka producer

Type: int

false

10000

cloud-events

Enables (default) or disables the Cloud Event support. If enabled on an incoming channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an outgoing, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata.

Type: boolean

false

true

cloud-events-data-content-type

(cloud-events-default-data-content-type)

Configure the default datacontenttype attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the datacontenttype attribute itself

Type: string

false

cloud-events-data-schema

(cloud-events-default-data-schema)

Configure the default dataschema attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the dataschema attribute itself

Type: string

false

cloud-events-insert-timestamp

(cloud-events-default-timestamp)

Whether or not the connector should insert automatically the time attribute` into the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the time attribute itself

Type: boolean

false

true

cloud-events-mode

The Cloud Event mode (structured or binary (default)). Indicates how are written the cloud events in the outgoing record

Type: string

false

binary

cloud-events-source

(cloud-events-default-source)

Configure the default source attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the source attribute itself

Type: string

false

cloud-events-subject

(cloud-events-default-subject)

Configure the default subject attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the subject attribute itself

Type: string

false

cloud-events-type

(cloud-events-default-type)

Configure the default type attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the type attribute itself

Type: string

false

health-enabled

Whether health reporting is enabled (default) or disabled

Type: boolean

false

true

health-readiness-enabled

Whether readiness health reporting is enabled (default) or disabled

Type: boolean

false

true

health-readiness-timeout

deprecated - During the readiness health check, the connector connects to the broker and retrieves the list of topics. This attribute specifies the maximum duration (in ms) for the retrieval. If exceeded, the channel is considered not-ready. Deprecated: Use 'health-topic-verification-timeout' instead.

Type: long

false

health-readiness-topic-verification

deprecated - Whether the readiness check should verify that topics exist on the broker. Default to false. Enabling it requires an admin connection. Deprecated: Use 'health-topic-verification-enabled' instead.

Type: boolean

false

health-topic-verification-enabled

Whether the startup and readiness check should verify that topics exist on the broker. Default to false. Enabling it requires an admin client connection.

Type: boolean

false

false

health-topic-verification-timeout

During the startup and readiness health check, the connector connects to the broker and retrieves the list of topics. This attribute specifies the maximum duration (in ms) for the retrieval. If exceeded, the channel is considered not-ready.

Type: long

false

2000

kafka-configuration

Identifier of a CDI bean that provides the default Kafka consumer/producer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier.

Type: string

false

key

A key to used when writing the record

Type: string

false

key.serializer

The serializer classname used to serialize the record’s key

Type: string

false

org.apache.kafka.common.serialization.StringSerializer

max-inflight-messages

The maximum number of messages to be written to Kafka concurrently. It limits the number of messages waiting to be written and acknowledged by the broker. You can set this attribute to 0 remove the limit

Type: long

false

1024

merge

Whether the connector should allow multiple upstreams

Type: boolean

false

false

partition

The target partition id. -1 to let the client determine the partition

Type: int

false

-1

propagate-record-key

Propagate incoming record key to the outgoing record

Type: boolean

false

false

retries

If set to a positive number, the connector will try to resend any record that was not delivered successfully (with a potentially transient error) until the number of retries is reached. If set to 0, retries are disabled. If not set, the connector tries to resend any record that failed to be delivered (because of a potentially transient error) during an amount of time configured by delivery.timeout.ms.

Type: long

false

2147483647

topic

The consumed / populated Kafka topic. If neither this property nor the topics properties are set, the channel name is used

Type: string

false

tracing-enabled

Whether tracing is enabled (default) or disabled

Type: boolean

false

true

value.serializer

The serializer classname used to serialize the payload

Type: string

true

waitForWriteCompletion

Whether the client waits for Kafka to acknowledge the written record before acknowledging the message

Type: boolean

false

true

20.3. Kafka Configuration Resolution

Quarkus exposes all Kafka related application properties, prefixed with kafka. or KAFKA_ inside a configuration map with default-kafka-broker name. This configuration is used to establish the connection with the Kafka broker.

In addition to this default configuration, you can configure the name of the Map producer using the kafka-configuration attribute:

mp.messaging.incoming.my-channel.connector=smallrye-kafka
mp.messaging.incoming.my-channel.kafka-configuration=my-configuration

In this case, the connector looks for the Map associated with the my-configuration name. If kafka-configuration is not set, an optional lookup for a Map exposed with the channel name (my-channel in the previous example) is done.

@Produces
@ApplicationScoped
@Identifier("my-configuration")
Map<String, Object> outgoing() {
    return Map.ofEntries(
            Map.entry("value.serializer", ObjectMapperSerializer.class.getName())
    );
}
If kafka-configuration is set and no Map can be found, the deployment fails.

Attribute values are resolved as follows:

  1. the attribute is set directly on the channel configuration (mp.messaging.incoming.my-channel.attribute=value),

  2. if not set, the connector looks for a Map with the channel name or the configured kafka-configuration (if set) and the value is retrieved from that Map

  3. If the resolved Map does not contain the value the default Map is used (exposed with the default-kafka-broker name)

21. Integrating with Kafka - Common patterns

21.1. Writing to Kafka from an HTTP endpoint

To send messages to Kafka from an HTTP endpoint, inject an Emitter (or a MutinyEmitter) in your endpoint:

package org.acme;

import java.util.concurrent.CompletionStage;

import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@Path("/")
public class ResourceSendingToKafka {

    @Channel("kafka") Emitter<String> emitter;          (1)

    @POST
    @Produces(MediaType.TEXT_PLAIN)
    public CompletionStage<Void> send(String payload) { (2)
        return emitter.send(payload);                   (3)
    }
}
1 Inject an Emitter<String>
2 The HTTP method receives the payload and returns a CompletionStage completed when the message is written to Kafka
3 Send the message to Kafka, the send method returns a CompletionStage

The endpoint sends the passed payload (from a POST HTTP request) to the emitter. The emitter’s channel is mapped to a Kafka topic in the application.properties file:

mp.messaging.outgoing.kafka.connector=smallrye-kafka
mp.messaging.outgoing.kafka.topic=my-topic

The endpoint returns a CompletionStage indicating the asynchronous nature of the method. The emitter.send method returns a CompletionStage<Void> . The returned future is completed when the message has been written to Kafka. If the writing fails, the returned CompletionStage is completed exceptionally.

If the endpoint does not return a CompletionStage, the HTTP response may be written before the message is sent to Kafka, and so failures won’t be reported to the user.

If you need to send a Kafka record, use:

package org.acme;

import java.util.concurrent.CompletionStage;

import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import io.smallrye.reactive.messaging.kafka.Record;

@Path("/")
public class ResourceSendingToKafka {

    @Channel("kafka") Emitter<Record<String,String>> emitter;  (1)


    @POST
    @Produces(MediaType.TEXT_PLAIN)
    public CompletionStage<Void> send(String payload) {
        return emitter.send(Record.of("my-key", payload));    (2)
    }
}
1 Note the usage of an Emitter<Record<K, V>>
2 Create the record using Record.of(k, v)

21.2. Persisting Kafka messages with Hibernate with Panache

To persist objects received from Kafka into a database, you can use Hibernate with Panache.

If you use Hibernate Reactive, look at Persisting Kafka messages with Hibernate Reactive.

Let’s imagine you receive Fruit objects. For simplicity purposes, our Fruit class is pretty simple:

package org.acme;

import javax.persistence.Entity;

import io.quarkus.hibernate.orm.panache.PanacheEntity;

@Entity
public class Fruit extends PanacheEntity {

    public String name;

}

To consume Fruit instances stored on a Kafka topic, and persist them into a database, you can use the following approach:

package org.acme;

import javax.enterprise.context.ApplicationScoped;
import javax.transaction.Transactional;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.smallrye.common.annotation.Blocking;

@ApplicationScoped
public class FruitConsumer {

    @Incoming("fruits")                                     (1)
    @Transactional                                          (2)
    @Blocking                                               (3)
    public void persistFruits(Fruit fruit) {                (4)
        fruit.persist();                                    (5)
    }
}
1 Configuring the incoming channel. This channel reads from Kafka.
2 As we are writing in a database, we must be in a transaction. This annotation starts a new transaction and commits it when the method returns.
3 Writing to a database using classic Hibernate is blocking. So, you must tell to Quarkus that the method must be called on a worker thread you can block (and not an I/O thread).
4 The method receives each Fruit. Note that you would need a deserializer to reconstruct the Fruit instances from the Kafka records.
5 Persist the received fruit object.

As mentioned in <4>, you need a deserializer that can create a Fruit from the record. This can be done using a Jackson deserializer:

package org.acme;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

The associated configuration would be:

mp.messaging.incoming.fruits.connector=smallrye-kafka
mp.messaging.incoming.fruits.value.deserializer=org.acme.FruitDeserializer

Check Serializing via Jackson for more detail about the usage of Jackson with Kafka. You can also use Avro.

21.3. Persisting Kafka messages with Hibernate Reactive

To persist objects received from Kafka into a database, you can use Hibernate Reactive with Panache.

Let’s imagine you receive Fruit objects. For simplicity purposes, our Fruit class is pretty simple:

package org.acme;

import javax.persistence.Entity;

import io.quarkus.hibernate.reactive.panache.PanacheEntity;  (1)

@Entity
public class Fruit extends PanacheEntity {

    public String name;

}
1 Make sure to use the reactive variant

To consume Fruit instances stored on a Kafka topic, and persist them into a database, you can use the following approach:

package org.acme;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class FruitStore {

    @Incoming("fruits")
    public Uni<Void> persist(Fruit fruit) {
        return Panache.withTransaction(() ->  (1)
            fruit.persist()                   (2)
                .map(persisted -> null)       (3)
        );
    }

}
1 Instruct Panache to run the given (asynchronous) action in a transaction. The transaction completes when the action completes.
2 Persist the entity. It returns a Uni<Fruit>.
3 Switch back to a Uni<Void>.

Unlike with classic Hibernate, you can’t use @Transactional. Instead, we use Panache.withTransaction and persist our entity. The map is used to return a Uni<Void> and not a Uni<Fruit>.

You need a deserializer that can create a Fruit from the record. This can be done using a Jackson deserializer:

package org.acme;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
    public FruitDeserializer() {
        super(Fruit.class);
    }
}

The associated configuration would be:

mp.messaging.incoming.fruits.connector=smallrye-kafka
mp.messaging.incoming.fruits.value.deserializer=org.acme.FruitDeserializer

Check Serializing via Jackson for more detail about the usage of Jackson with Kafka. You can also use Avro.

21.4. Writing entities managed by Hibernate to Kafka

Let’s imagine the following process:

  1. You receive an HTTP request with a payload,

  2. You create an Hibernate entity instance from this payload,

  3. You persist that entity into a database,

  4. You send the entity to a Kafka topic

If you use Hibernate Reactive, look at Writing entities managed by Hibernate Reactive to Kafka.

Because we write to a database, we must run this method in a transaction. Yet, sending the entity to Kafka happens asynchronously. The operation returns a CompletionStage (or a Uni if you use a MutinyEmitter) reporting when the operation completes. We must be sure that the transaction is still running until the object is written. Otherwise, you may access the object outside the transaction, which is not allowed.

To implement this process, you need the following approach:

package org.acme;

import java.util.concurrent.CompletionStage;

import javax.transaction.Transactional;
import javax.ws.rs.POST;
import javax.ws.rs.Path;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@Path("/")
public class ResourceSendingToKafka {

    @Channel("kafka") Emitter<Fruit> emitter;

    @POST
    @Path("/fruits")
    @Transactional                                                      (1)
    public CompletionStage<Void> storeAndSendToKafka(Fruit fruit) {     (2)
        fruit.persist();
        return emitter.send(fruit);                                     (3)
    }
}
1 As we are writing to the database, make sure we run inside a transaction
2 The method receives the fruit instance to persist. It returns a CompletionStage which is used for the transaction demarcation. The transaction is committed when the return CompletionStage completes. In our case, it’s when the message is written to Kafka.
3 Send the managed instance to Kafka. Make sure we wait for the message to complete before closing the transaction.

21.5. Writing entities managed by Hibernate Reactive to Kafka

To send to Kafka entities managed by Hibernate Reactive, we recommend using:

  • RESTEasy Reactive to serve HTTP requests

  • A MutinyEmitter to send message to a channel, so it can be easily integrated with the Mutiny API exposed by Hibernate Reactive or Hibernate Reactive with Panache.

The following example demonstrates how to receive a payload, store it in the database using Hibernate Reactive with Panache, and send the persisted entity to Kafka:

package org.acme;

import javax.ws.rs.POST;
import javax.ws.rs.Path;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.MutinyEmitter;

@Path("/")
public class ReactiveGreetingResource {

    @Channel("kafka") MutinyEmitter<Fruit> emitter;     (1)

    @POST
    @Path("/fruits")
    public Uni<Void> sendToKafka(Fruit fruit) {         (2)
        return Panache.withTransaction(() ->            (3)
            fruit.<Fruit>persist()
        )
            .chain(f -> emitter.send(f));               (4)
    }
}
1 Inject a MutinyEmitter which exposes a Mutiny API. It simplifies the integration with the Mutiny API exposed by Hibernate Reactive with Panache.
2 The HTTP method receiving the payload returns a Uni<Void>. The HTTP response is written when the operation completes (the entity is persisted and written to Kafka).
3 We need to write the entity into the database in a transaction.
4 Once the persist operation completes, we send the entity to Kafka. The send method returns a Uni<Void>.

21.6. Streaming Kafka topics as server-sent events

Streaming a Kafka topic as server-sent events (SSE) is straightforward:

  1. You inject the channel representing the Kafka topic in your HTTP endpoint

  2. You return that channel as a Publisher or a Multi from the HTTP method

The following code provides an example:

@Channel("fruits")
Multi<Fruit> fruits;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<Fruit> stream() {
    return fruits;
}

Some environment cuts the SSE connection when there is not enough activity. The workaround consists of sending ping messages (or empty objects) periodically.

@Channel("fruits")
Multi<Fruit> fruits;

@Inject
ObjectMapper mapper;

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> stream() {
    return Multi.createBy().merging()
            .streams(
                    fruits.map(this::toJson),
                    emitAPeriodicPing()
            );
}

Multi<String> emitAPeriodicPing() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(10))
            .onItem().transform(x -> "{}");
}

private String toJson(Fruit f) {
    try {
        return mapper.writeValueAsString(f);
    } catch (JsonProcessingException e) {
        throw new RuntimeException(e);
    }
}

The workaround is a bit more complex as besides sending the fruits coming from Kafka, we need to send pings periodically. To achieve this we merge the stream coming from Kafka and a periodic stream emitting {} every 10 seconds.

22. Logging

To reduce the amount of log written by the Kafka client, Quarkus sets the level of the following log categories to WARNING:

  • org.apache.kafka.clients

  • org.apache.kafka.common.utils

  • org.apache.kafka.common.metrics

You can override the configuration by adding the following lines to the application.properties:

quarkus.log.category."org.apache.kafka.clients".level=INFO
quarkus.log.category."org.apache.kafka.common.utils".level=INFO
quarkus.log.category."org.apache.kafka.common.metrics".level=INFO

23. Going further

This guide has shown how you can interact with Kafka using Quarkus. It utilizes SmallRye Reactive Messaging to build data streaming applications.

If you want to go further, check the documentation of SmallRye Reactive Messaging, the implementation used in Quarkus.