Emitter - Bridging the imperative and the reactive worlds

In a previous blog post about Kafka and Avro, we used an emitter to send Kafka messages.

architecture

In this post, we are going look at this emitter construct a little bit more closely.

Injecting an Emitter

Injecting an emitter is straightforward. You indicate the targeted channel, i.e., where do you send your messages:

@Inject @Channel("movies") Emitter<Movie> emitter;

Remember that reactive messaging uses channels as a primary abstraction. They can be in-memory channels or mapped to a remote broker.

In the previous code snippet, we inject an Emitter<Movie>. It means you will send messages containing movies as payload. So, the specified type is the payload type. That lets you send: payloads directly (wrapped automatically in a message) or more detailed messages containing a movie as payload:

Movie movie = ...

// Send payloads directly
emitter.send(movie);

// Send messages
emitter.send(Message.of(movie));

Sending payloads

Sending payload is the simplest way to send data. You just pass the payload to the send method like an instance of Movie. Under the hood, it just creates a simple Message wrapping the payload.

When used with payload, the send method returns a CompletionStage indicating if the message processing succeeded or failed:

emitter.send(movie)
    .whenComplete((success, failure) -> {
        if (failure != null) {
            System.out.println("D'oh! " + failure.getMessage());
        } else {
            System.out.println("Message processed successfully");
        }
    });

Processing, and will see later event the emission, happens asynchronously. So, the returned CompletionStage lets you know when the message is processed. The CompletionStage is completed successfully when the message is acknowledged. Most of the time, it means that the processing has been completed smoothly, or the message has been sent to a broker successfully. If something wrong happens, the CompletionStage is completed exceptionally. The passed exception gives you an idea of the reason.

Sending messages

While sending payloads is more straightforward, sometimes you want to attach metadata to the message, like configuring how it should be written in Kafka, tracing information, etc. The emitter also allows sending messages, and so attach the metadata you want. In the following example, we configure the outbound Kafka record. We set the key, the topic, and so on. That way, you can dispatch messages to different topics and even decide dynamically:

OutgoingKafkaRecordMetadata<?> metadata = OutgoingKafkaRecordMetadata.builder()
        .withTopic("movies")
        .withKey(movie.getYear())
        .build();
emitter.send(Message.of(movie).addMetadata(metadata));

Emissions are asynchronous

Emitters form a bridge between the imperative and the reactive worlds. When you emit a message, this message is not processed immediately. The downstream component consuming the message are part of a Reactive Streams. Passing the message immediately would violate the Reactive Streams protocol. We must be sure that the downstream components are ready to accept this message. As a result, the emitter is not pushing the message directly, but enqueue it in a buffer used to handle the downstream capacity (requests in Reactive Streams lingo).

buffer

The downstream component receives the messages according to the requests it makes, ensuring its capacity is never exceeded.

Overflow management

But with buffer comes…​ overflow. If you emit too many messages and the downstream cannot keep up, the messages are stored in the buffer until it reaches its maximum capacity. Then, you cannot emit anymore, and attempting to emit will throw exceptions. But what can we do in this case? When injecting the emitter, you can configure an Overflow strategy. For example, you can set the buffer size, use an unbounded buffer, drop the messages, fail, or just ignore the back pressure and let the downstream handle it. By default, it uses a buffer, but depending on your use case, you may want to configure it differently:

@Inject
@Channel("movies")
@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 1000)
Emitter<Movie> emitter1;

@Inject
@Channel("movies")
@OnOverflow(value = OnOverflow.Strategy.NONE)
Emitter<Movie> emitter2;

@Inject
@Channel("movies")
@OnOverflow(value = OnOverflow.Strategy.UNBOUNDED_BUFFER)
Emitter<Movie> emitter3;

Conclusion

This post is a brief introduction to the Emitter construct from Reactive Messaging. More information is available on the SmallRye Reactive Messaging documentation.

In the next Quarkus version (1.9), this feature will be improved with two very nice enhancements. First, it will offer a Mutiny variant, easing the integration with Mutiny APIs. Then, it would be possible for the Kafka case to directly emit key/value pairs without needing to use metadata.

Stay tuned! Will will cover these in a follow-up post!