Vert.x Reference Guide

Vert.x is a toolkit for building reactive applications. As described in the Quarkus Reactive Architecture, Quarkus uses Vert.x underneath.

This guide is the companion to the Using Eclipse Vert.x API from a Quarkus Application guide. It provides more advanced details about the usage and the configuration of the Vert.x instance used by Quarkus.

Access the Vert.x instance

To access the managed Vert.x instance, add the quarkus-vertx extension to your project. This dependency might already be available in your project (as a transitive dependency).

With this extension, you can retrieve the managed instance of Vert.x using either field or constructor injection:

@ApplicationScoped
public class MyBean {

    // Field injection
    @Inject Vertx vertx;

    // Constructor injection
    MyBean(Vertx vertx) {
        // ...
    }

}

You can inject either the:

  • io.vertx.core.Vertx instance exposing the bare Vert.x API

  • io.vertx.mutiny.core.Vertx instance exposing the Mutiny API

We recommend using the Mutiny variant as it integrates with the other reactive APIs provided by Quarkus.

Mutiny

If you are not familiar with Mutiny, check Mutiny - an intuitive reactive programming library.

Documentation about the Vert.x Mutiny variant is available on https://smallrye.io/smallrye-mutiny-vertx-bindings.

Configure the Vert.x instance

You can configure the Vert.x instance from the application.properties file. The following table lists the supported properties:

Configuration property fixed at build time - All other configuration properties are overridable at runtime

Configuration property

Type

Default

Enables or disables the Vert.x cache.

Environment variable: QUARKUS_VERTX_CACHING

Show more

boolean

true

Enables or disabled the Vert.x classpath resource resolver.

Environment variable: QUARKUS_VERTX_CLASSPATH_RESOLVING

Show more

boolean

true

The number of event loops. By default, it matches the number of CPUs detected on the system.

Environment variable: QUARKUS_VERTX_EVENT_LOOPS_POOL_SIZE

Show more

int

The maximum amount of time the event loop can be blocked.

Environment variable: QUARKUS_VERTX_MAX_EVENT_LOOP_EXECUTE_TIME

Show more

Duration

2S

The amount of time before a warning is displayed if the event loop is blocked.

Environment variable: QUARKUS_VERTX_WARNING_EXCEPTION_TIME

Show more

Duration

2S

The maximum amount of time the worker thread can be blocked.

Environment variable: QUARKUS_VERTX_MAX_WORKER_EXECUTE_TIME

Show more

Duration

60S

The size of the internal thread pool (used for the file system).

Environment variable: QUARKUS_VERTX_INTERNAL_BLOCKING_POOL_SIZE

Show more

int

20

The queue size. For most applications this should be unbounded

Environment variable: QUARKUS_VERTX_QUEUE_SIZE

Show more

int

The executor growth resistance. A resistance factor applied after the core pool is full; values applied here will cause that fraction of submissions to create new threads when no idle thread is available. A value of 0.0f implies that threads beyond the core size should be created as aggressively as threads within it; a value of 1.0f implies that threads beyond the core size should never be created.

Environment variable: QUARKUS_VERTX_GROWTH_RESISTANCE

Show more

float

0

The amount of time a thread will stay alive with no work.

Environment variable: QUARKUS_VERTX_KEEP_ALIVE_TIME

Show more

Duration

30S

Prefill thread pool when creating a new Executor. When io.vertx.core.spi.ExecutorServiceFactory#createExecutor is called, initialise with the number of defined threads at startup

Environment variable: QUARKUS_VERTX_PREFILL

Show more

boolean

false

Enables the async DNS resolver.

Environment variable: QUARKUS_VERTX_USE_ASYNC_DNS

Show more

boolean

false

PEM Key/cert config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PEM

Show more

boolean

false

Comma-separated list of the path to the key files (Pem format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PEM_KEYS

Show more

list of string

Comma-separated list of the path to the certificate files (Pem format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PEM_CERTS

Show more

list of string

JKS config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_JKS

Show more

boolean

false

Path of the key file (JKS format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_JKS_PATH

Show more

string

Password of the key file.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_JKS_PASSWORD

Show more

string

PFX config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PFX

Show more

boolean

false

Path to the key file (PFX format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PFX_PATH

Show more

string

Password of the key.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PFX_PASSWORD

Show more

string

PEM Trust config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PEM

Show more

boolean

false

Comma-separated list of the trust certificate files (Pem format).

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PEM_CERTS

Show more

list of string

JKS config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_JKS

Show more

boolean

false

Path of the key file (JKS format).

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_JKS_PATH

Show more

string

Password of the key file.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_JKS_PASSWORD

Show more

string

PFX config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PFX

Show more

boolean

false

Path to the key file (PFX format).

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PFX_PATH

Show more

string

Password of the key.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PFX_PASSWORD

Show more

string

The accept backlog.

Environment variable: QUARKUS_VERTX_EVENTBUS_ACCEPT_BACKLOG

Show more

int

The client authentication.

Environment variable: QUARKUS_VERTX_EVENTBUS_CLIENT_AUTH

Show more

string

NONE

The connect timeout.

Environment variable: QUARKUS_VERTX_EVENTBUS_CONNECT_TIMEOUT

Show more

Duration

60S

The idle timeout in milliseconds.

Environment variable: QUARKUS_VERTX_EVENTBUS_IDLE_TIMEOUT

Show more

Duration

The receive buffer size.

Environment variable: QUARKUS_VERTX_EVENTBUS_RECEIVE_BUFFER_SIZE

Show more

int

The number of reconnection attempts.

Environment variable: QUARKUS_VERTX_EVENTBUS_RECONNECT_ATTEMPTS

Show more

int

0

The reconnection interval in milliseconds.

Environment variable: QUARKUS_VERTX_EVENTBUS_RECONNECT_INTERVAL

Show more

Duration

1S

Whether to reuse the address.

Environment variable: QUARKUS_VERTX_EVENTBUS_REUSE_ADDRESS

Show more

boolean

true

Whether to reuse the port.

Environment variable: QUARKUS_VERTX_EVENTBUS_REUSE_PORT

Show more

boolean

false

The send buffer size.

Environment variable: QUARKUS_VERTX_EVENTBUS_SEND_BUFFER_SIZE

Show more

int

The so linger.

Environment variable: QUARKUS_VERTX_EVENTBUS_SO_LINGER

Show more

int

Enables or Disabled SSL.

Environment variable: QUARKUS_VERTX_EVENTBUS_SSL

Show more

boolean

false

Whether to keep the TCP connection opened (keep-alive).

Environment variable: QUARKUS_VERTX_EVENTBUS_TCP_KEEP_ALIVE

Show more

boolean

false

Configure the TCP no delay.

Environment variable: QUARKUS_VERTX_EVENTBUS_TCP_NO_DELAY

Show more

boolean

true

Configure the traffic class.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRAFFIC_CLASS

Show more

int

Enables or disables the trust all parameter.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_ALL

Show more

boolean

false

The host name.

Environment variable: QUARKUS_VERTX_CLUSTER_HOST

Show more

string

localhost

The port.

Environment variable: QUARKUS_VERTX_CLUSTER_PORT

Show more

int

The public host name.

Environment variable: QUARKUS_VERTX_CLUSTER_PUBLIC_HOST

Show more

string

The public port.

Environment variable: QUARKUS_VERTX_CLUSTER_PUBLIC_PORT

Show more

int

Enables or disables the clustering.

Environment variable: QUARKUS_VERTX_CLUSTER_CLUSTERED

Show more

boolean

false

The ping interval.

Environment variable: QUARKUS_VERTX_CLUSTER_PING_INTERVAL

Show more

Duration

20S

The ping reply interval.

Environment variable: QUARKUS_VERTX_CLUSTER_PING_REPLY_INTERVAL

Show more

Duration

20S

The maximum amount of time in seconds that a successfully resolved address will be cached.

If not set explicitly, resolved addresses may be cached forever.

Environment variable: QUARKUS_VERTX_RESOLVER_CACHE_MAX_TIME_TO_LIVE

Show more

int

2147483647

The minimum amount of time in seconds that a successfully resolved address will be cached.

Environment variable: QUARKUS_VERTX_RESOLVER_CACHE_MIN_TIME_TO_LIVE

Show more

int

0

The amount of time in seconds that an unsuccessful attempt to resolve an address will be cached.

Environment variable: QUARKUS_VERTX_RESOLVER_CACHE_NEGATIVE_TIME_TO_LIVE

Show more

int

0

The maximum number of queries to be sent during a resolution.

Environment variable: QUARKUS_VERTX_RESOLVER_MAX_QUERIES

Show more

int

4

The duration after which a DNS query is considered to be failed.

Environment variable: QUARKUS_VERTX_RESOLVER_QUERY_TIMEOUT

Show more

Duration

5S

Enable or disable native transport

Environment variable: QUARKUS_VERTX_PREFER_NATIVE_TRANSPORT

Show more

boolean

false

About the Duration format

To write duration values, use the standard java.time.Duration format. See the Duration#parse() Java API documentation for more information.

You can also use a simplified format, starting with a number:

  • If the value is only a number, it represents time in seconds.

  • If the value is a number followed by ms, it represents time in milliseconds.

In other cases, the simplified format is translated to the java.time.Duration format for parsing:

  • If the value is a number followed by h, m, or s, it is prefixed with PT.

  • If the value is a number followed by d, it is prefixed with P.

See Customize the Vert.x configuration to configure the Vert.x instance using a programmatic approach.

Use Vert.x clients

In addition to Vert.x core, you can use most Vert.x ecosystem libraries. Some Quarkus extension already wraps Vert.x libraries.

Available APIs

The following table lists the most used libraries from the Vert.x ecosystem. To access these APIs, add the indicated extension or dependency to your project. Check the associated documentation to learn how to use them.

API

Extension or Dependency

Documentation

AMQP Client

io.quarkus:quarkus-messaging-amqp (extension)

Getting Started to Quarkus Messaging with AMQP

Circuit Breaker

io.smallrye.reactive:smallrye-mutiny-vertx-circuit-breaker (external dependency)

https://vertx.io/docs/vertx-circuit-breaker/java/

Consul Client

io.smallrye.reactive:smallrye-mutiny-vertx-consul-client (external dependency)

https://vertx.io/docs/vertx-consul-client/java/

DB2 Client

io.quarkus:quarkus-reactive-db2-client (extension)

Reactive SQL Clients

Kafka Client

io.quarkus:quarkus-messaging-kafka (extension)

Apache Kafka Reference Guide

Mail Client

io.quarkus:quarkus-mailer (extension)

Sending emails using SMTP

MQTT Client

io.quarkus:quarkus-messaging-mqtt (extension)

No guide yet

MS SQL Client

io.quarkus:quarkus-reactive-mssql-client (extension)

Reactive SQL Clients

MySQL Client

io.quarkus:quarkus-reactive-mysql-client (extension)

Reactive SQL Clients

Oracle Client

io.quarkus:quarkus-reactive-oracle-client (extension)

Reactive SQL Clients

PostgreSQL Client

io.quarkus:quarkus-reactive-pg-client (extension)

Reactive SQL Clients

RabbitMQ Client

io.smallrye.reactive:smallrye-mutiny-vertx-rabbitmq-client (external dependency)

https://vertx.io/docs/vertx-rabbitmq-client/java

Redis Client

io.quarkus:quarkus-redis-client (extension)

Using the Redis Client

Web Client

io.smallrye.reactive:smallrye-mutiny-vertx-web-client (external dependency)

https://vertx.io/docs/vertx-web-client/java/

To learn more about the usage of the Vert.x Mutiny API, refer to https://smallrye.io/smallrye-mutiny-vertx-bindings.

Use the Vert.x Web Client

This section gives an example using the Vert.x WebClient in the context of a Quarkus REST (formerly RESTEasy Reactive) application. As indicated in the table above, add the following dependency to your project:

pom.xml
<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>
build.gradle
implementation("io.smallrye.reactive:smallrye-mutiny-vertx-web-client")

Now, in your code, you can create an instance of WebClient:

package org.acme.vertx;


import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import io.smallrye.mutiny.Uni;

import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.ext.web.client.WebClient;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClientOptions;

@Path("/fruit-data")
public class ResourceUsingWebClient {

    private final WebClient client;

    @Inject
    VertxResource(Vertx vertx) {
        this.client = WebClient.create(vertx);
    }

    @GET
    @Produces(MediaType.APPLICATION_JSON)
    @Path("/{name}")
    public Uni<JsonObject> getFruitData(String name) {
        return client.getAbs("https://.../api/fruit/" + name)
                .send()
                .onItem().transform(resp -> {
                    if (resp.statusCode() == 200) {
                        return resp.bodyAsJsonObject();
                    } else {
                        return new JsonObject()
                                .put("code", resp.statusCode())
                                .put("message", resp.bodyAsString());
                    }
                });
    }

}

This resource creates a WebClient and, upon request, uses this client to invoke a remote HTTP API. Depending on the result, the response is forwarded as received, or it creates a JSON object wrapping the error. The WebClient is asynchronous (and non-blocking), to the endpoint returns a Uni.

The application can also run as a native executable. But, first, we need to instruct Quarkus to enable ssl (if the remote API uses HTTPS). Open the src/main/resources/application.properties and add:

quarkus.ssl.native=true

Then, create the native executable with:

CLI
quarkus build --native
Maven
./mvnw install -Dnative
Gradle
./gradlew build -Dquarkus.package.type=native

Use Vert.x JSON

Vert.x APIs often rely on JSON. Vert.x provides two convenient classes to manipulate JSON document: io.vertx.core.json.JsonObject and io.vertx.core.json.JsonArray.

JsonObject can be used to map an object into its JSON representation and build an object from a JSON document:

// Map an object into JSON
Person person = ...;
JsonObject json = JsonObject.mapFrom(person);

// Build an object from JSON
json = new JsonObject();
person = json.mapTo(Person.class);

Note that these features use the mapper managed by the quarkus-jackson extension. Refer to Jackson configuration to customize the mapping.

JSON Object and JSON Array are both supported as Quarkus HTTP endpoint requests and response bodies (using classic RESTEasy and Quarkus REST). Consider these endpoints:

package org.acme.vertx;

import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/hello")
@Produces(MediaType.APPLICATION_JSON)
public class VertxJsonResource {

    @GET
    @Path("{name}/object")
    public JsonObject jsonObject(String name) {
        return new JsonObject().put("Hello", name);
    }

    @GET
    @Path("{name}/array")
    public JsonArray jsonArray(String name) {
        return new JsonArray().add("Hello").add(name);
    }
}
{"Hello":"Quarkus"}
["Hello","Quarkus"]

This works equally well when the JSON content is a request body or is wrapped in a Uni, Multi, CompletionStage or Publisher.

Use Verticles

Verticles is "a simple, scalable, actor-like deployment and concurrency model" provided by _Vert.x. This model does not claim to be a strict actor-model implementation, but it shares similarities, especially concerning concurrency, scaling, and deployment. To use this model, you write and deploy verticles, communicating by sending messages on the event bus.

You can deploy verticles in Quarkus. It supports:

  • bare verticle - Java classes extending io.vertx.core.AbstractVerticle

  • Mutiny verticle - Java classes extending io.smallrye.mutiny.vertx.core.AbstractVerticle

Deploy Verticles

To deploy verticles, use the deployVerticle method:

@Inject Vertx vertx;

// ...
vertx.deployVerticle(MyVerticle.class.getName(), ar -> { });
vertx.deployVerticle(new MyVerticle(), ar -> { });

If you use the Mutiny-variant of Vert.x, be aware that the deployVerticle method returns a Uni, and you would need to trigger a subscription to make the actual deployment.

An example explaining how to deploy verticles during the initialization of the application will follow.

Use @ApplicationScoped beans as Verticle

In general, Vert.x verticles are not CDI beans. And so cannot use injection. However, in Quarkus, you can deploy verticles as beans. Note that in this case, CDI (Arc in Quarkus) is responsible for creating the instance.

The following snippet provides an example:

package io.quarkus.vertx.verticles;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MyBeanVerticle extends AbstractVerticle {

    @ConfigProperty(name = "address") String address;

    @Override
    public Uni<Void> asyncStart() {
        return vertx.eventBus().consumer(address)
                .handler(m -> m.replyAndForget("hello"))
                .completionHandler();
    }
}

You don’t have to inject the vertx instance; instead, leverage the protected field from AbstractVerticle.

Then, deploy the verticle instances with:

package io.quarkus.vertx.verticles;

import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;

@ApplicationScoped
public class VerticleDeployer {

    public void init(@Observes StartupEvent e, Vertx vertx, MyBeanVerticle verticle) {
         vertx.deployVerticle(verticle).await().indefinitely();
    }
}

If you want to deploy every exposed AbstractVerticle, you can use:

public void init(@Observes StartupEvent e, Vertx vertx, Instance<AbstractVerticle> verticles) {
    for (AbstractVerticle verticle : verticles) {
        vertx.deployVerticle(verticle).await().indefinitely();
    }
}

Create multiple verticles instances

When using @ApplicationScoped, you will get a single instance for your verticle. Having multiple instances of verticles can be helpful to share the load among them. Each of them will be associated with a different I/O thread (Vert.x event loop).

To deploy multiple instances of your verticle, use the @Dependent scope instead of @ApplicationScoped:

package org.acme.verticle;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;

import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;

@Dependent
public class MyVerticle extends AbstractVerticle {

    @Override
    public Uni<Void> asyncStart() {
        return vertx.eventBus().consumer("address")
                .handler(m -> m.reply("Hello from " + this))
                .completionHandler();
    }
}

Then, deploy your verticle as follows:

package org.acme.verticle;

import io.quarkus.runtime.StartupEvent;
import io.vertx.core.DeploymentOptions;
import io.vertx.mutiny.core.Vertx;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;

@ApplicationScoped
public class MyApp {

    void init(@Observes StartupEvent ev, Vertx vertx, Instance<MyVerticle> verticles) {
        vertx
                .deployVerticle(verticles::get, new DeploymentOptions().setInstances(2))
                .await().indefinitely();
    }
}

The init method receives an Instance<MyVerticle>. Then, you pass a supplier to the deployVerticle method. The supplier is just calling the get() method. Thanks to the @Dependent scope, it returns a new instance on every call. Finally, you pass the desired number of instances to the DeploymentOptions, such as two in the previous example. It will call the supplier twice, which will create two instances of your verticle.

Use the Event Bus

Vert.x comes with a built-in event bus that you can use from your Quarkus application. So, your application components (CDI beans, resources…​) can interact using asynchronous events, thus promoting loose-coupling.

With the event bus, you send messages to virtual addresses. The event bus offers three types of delivery mechanisms:

  • point-to-point - send the message, one consumer receives it. If several consumers listen to the address, a round-robin is applied;

  • publish/subscribe - publish a message; all the consumers listening to the address are receiving the message;

  • request/reply - send the message and expect a response. The receiver can respond to the message in an asynchronous fashion.

All these delivery mechanisms are non-blocking and are providing one of the fundamental bricks to build reactive applications.

Consume events

While you can use the Vert.x API to register consumers, Quarkus comes with declarative support. To consume events, use the io.quarkus.vertx.ConsumeEvent annotation:

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent                           (1)
    public String consume(String name) {    (2)
        return name.toUpperCase();
    }
}
1 If not set, the address is the fully qualified name of the bean; for instance, in this snippet, it’s org.acme.vertx.GreetingService.
2 The method parameter is the message body. If the method returns something, it’s the message response.

Configure the address

The @ConsumeEvent annotation can be configured to set the address:

@ConsumeEvent("greeting")               (1)
public String consume(String name) {
    return name.toUpperCase();
}
1 Receive the messages sent to the greeting address

The address value can be a property expression. In this case, the configured value is used instead: @ConsumeEvent("${my.consumer.address}"). Additionally, the property expression can specify a default value: @ConsumeEvent("${my.consumer.address:defaultAddress}").

Config Property Example
@ConsumeEvent("${my.consumer.address}")   (1)
public String consume(String name) {
    return name.toLowerCase();
}
1 Receive the messages sent to the address configured with the my.consumer.address key.
If no config property with the specified key exists and no default value is set then the application startup fails.

Process events asynchronously

The previous examples use synchronous processing. Asynchronous processing is also possible by returning either an io.smallrye.mutiny.Uni or a java.util.concurrent.CompletionStage:

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent
    public Uni<String> process(String name) {
        // return an Uni completed when the processing is finished.
        // You can also fail the Uni explicitly
    }
}
Mutiny

The previous example uses Mutiny reactive types. If you are not familiar with Mutiny, check Mutiny - an intuitive reactive programming library.

Blocking processing of events

By default, the code consuming the event must be non-blocking, as it’s called on an I/O thread. If your processing is blocking, use the @io.smallrye.common.annotation.Blocking annotation:

@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
    // Something blocking
}

Alternatively, you can use the blocking attribute from the @ConsumeEvent annotation:

@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
    // Something blocking
}

When using @Blocking, it ignores the value of the blocking attribute of @ConsumeEvent.

Reply to events

The return value of a method annotated with @ConsumeEvent is used to respond to the incoming message. For instance, in the following snippet, the returned String is the response.

@ConsumeEvent("greeting")
public String consume(String name) {
    return name.toUpperCase();
}

You can also return a Uni<T> or a CompletionStage<T> to handle asynchronous reply:

@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
    return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}

You can inject an executor if you use the Context Propagation extension:

@Inject Executor executor;

Implement fire-and-forget interactions

You don’t have to reply to received messages. Typically, for a fire and forget interaction, the messages are consumed, and the sender does not need to know about it. To implement this pattern, your consumer method returns void.

@ConsumeEvent("greeting")
public void consume(String event) {
    // Do something with the event
}

Consume messages (instead of events)

Unlike the previous example using the payloads directly, you can also use Message directly:

@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
    System.out.println(msg.address());
    System.out.println(msg.body());
}

Handle failures

If a method annotated with @ConsumeEvent throws an exception, then:

  • if a reply handler is set, then the failure is propagated back to the sender via an io.vertx.core.eventbus.ReplyException with code ConsumeEvent#FAILURE_CODE and the exception message,

  • if no reply handler is set, then the exception is rethrown (and wrapped in a RuntimeException if necessary) and can be handled by the default exception handler, i.e. io.vertx.core.Vertx#exceptionHandler().

Send messages

Sending and publishing messages use the Vert.x event bus:

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/async")
public class EventResource {

    @Inject
    EventBus bus;                                            (1)

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)        (2)
                .onItem().transform(Message::body);
    }
}
1 Inject the Event bus
2 Send a message to the address greeting. Message payload is name

The EventBus object provides methods to:

  1. send a message to a specific address - one single consumer receives the message.

  2. publish a message to a specific address - all consumers receive the messages.

  3. request a message and expect a reply

// Case 1
bus.sendAndForget("greeting", name)
// Case 2
bus.publish("greeting", name)
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
        .onItem().transform(Message::body);

Process events on virtual threads

Methods annotated with @ConsumeEvent can also be annotated with @RunOnVirtualThread. In this case, the method is invoked on a virtual thread. Each event is invoked on a different virtual thread.

To use this feature, make sure:

  1. Your Java runtime supports virtual threads.

  2. Your method uses a blocking signature.

The second point means only methods returning an object or void can use @RunOnVirtualThread. Methods returning a Uni or a CompletionStage cannot run on virtual threads.

Read the virtual thread guide for more details.

Use codecs

The https://vertx.io/docs/vertx-core/java/event_bus[Vert.x Event Bus] uses https://vertx.io/docs/vertx-core/java/message_codecs[codecs] to _serialize and deserialize message objects. Quarkus provides a default codec for local delivery. This codec is automatically used for return types and message body parameters of local consumers, i.e. methods annotated with @ConsumeEvent whete ConsumeEvent#local() == true (which is the default).

So that you can exchange the message objects as follows:

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", new MyName(name))
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello " + name.getName());
}

If you want to use a specific codec, you need to set it on both ends explicitly:

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", name,
        new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting", codec = MyNameCodec.class)            (2)
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 Set the name of the codec to use to send the message
2 Set the codec to use to receive the message

Combine HTTP and the Event Bus

Let’s revisit a greeting HTTP endpoint and use asynchronous message passing to delegate the call to a separated bean. It uses the request/reply dispatching mechanism. Instead of implementing the business logic inside the Jakarta REST endpoint, we are sending a message. Another bean consumes this message, and the response is sent using the reply mechanism.

In your HTTP endpoint class, inject the event bus and uses the request method to send a message to the event bus and expect a response:

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/bus")
public class EventResource {

    @Inject
    EventBus bus;

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)            (1)
                .onItem().transform(Message::body);            (2)
    }
}
1 send the name to the greeting address and request a response
2 when we get the response, extract the body and send it to the user
the HTTP method returns a Uni. If you are using Quarkus REST, Uni support is built-in. If you are using classic RESTEasy, you need to add the quarkus resteasy-mutiny extension to your project.

We need a consumer listening on the greeting address. This consumer can be in the same class or another bean such as:

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent("greeting")
    public String greeting(String name) {
        return "Hello " + name;
    }

}

This bean receives the name and returns the greeting message.

With this in place, every HTTP request on /bus/quarkus sends a message to the event bus, waits for a reply, and when this one arrives, writes the HTTP response:

Hello Quarkus

To better understand, let’s detail how the HTTP request/response has been handled:

  1. The request is received by the greeting method

  2. a message containing the name is sent to the event bus

  3. Another bean receives this message and computes the response

  4. This response is sent back using the reply mechanism

  5. Once the reply is received by the sender, the content is written to the HTTP response

Bidirectional communication with browsers by using SockJS

The SockJS bridge provided by Vert.x allows browser applications and Quarkus applications to communicate using the event bus. It connects both sides. So, both sides can send messages received on the other side. It supports the three delivery mechanisms.

SockJS negotiates the communication channel between the Quarkus application and the browser. If WebSockets are supported, it uses them; otherwise, it degrades to SSE, long polling, etc.

So use SockJS, you need to configure the bridge, especially the addresses that will be used to communicate:

package org.acme;

import io.vertx.core.Vertx;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.sockjs.SockJSBridgeOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.util.concurrent.atomic.AtomicInteger;

@ApplicationScoped
public class SockJsExample {

    @Inject
    Vertx vertx;

    public void init(@Observes Router router) {
        SockJSHandler sockJSHandler = SockJSHandler.create(vertx);
        Router bridge = sockJSHandler.bridge(new SockJSBridgeOptions()
                .addOutboundPermitted(new PermittedOptions().setAddress("ticks")));
        router.route("/eventbus/*").subRouter(bridge);

        AtomicInteger counter = new AtomicInteger();
        vertx.setPeriodic(1000,
                ignored -> vertx.eventBus().publish("ticks", counter.getAndIncrement()));
    }

}

This code configures the SockJS bridge to send all the messages targeting the ticks address to the connected browsers. More detailed explanations about the configuration can be found on the Vert.x SockJS Bridge documentation.

The browser must use the vertx-eventbus JavaScript library to consume the message:

<!doctype html>
<html>
<head>
    <meta charset="utf-8"/>
    <title>SockJS example - Quarkus</title>
    <script src="https://code.jquery.com/jquery-3.3.1.min.js"
            integrity="sha256-FgpCb/KJQlLNfOu91ta32o/NMZxltwRo8QtmkMRdAu8=" crossorigin="anonymous"></script>
    <script type="application/javascript" src="https://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script>
    <script src="https://cdn.jsdelivr.net/npm/vertx3-eventbus-client@3.8.5/vertx-eventbus.min.js"></script>
</head>
<body>

<h1>SockJS Examples</h1>

<p><strong>Last Tick:</strong> <span id="tick"></span></p>

</body>
<script>
    var eb = new EventBus('/eventbus');

    eb.onopen = function () {

        eb.registerHandler('ticks', function (error, message) {
            $("#tick").html(message.body);
        });
    }

</script>
</html>

Use native transports

Native transports are not supported in native executables.
To use io_uring, refer to the Use io_uring section.

Vert.x is capable of using Netty’s native transports, which offers performance improvements on specific platforms. To enable them, you must include the appropriate dependency for your platform. It’s usually a good idea to have both to keep your application platform-agnostic. Netty is smart enough to use the correct one, that includes none at all on unsupported platforms:

pom.xml
<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-transport-native-epoll</artifactId>
  <classifier>linux-x86_64</classifier>
</dependency>

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-transport-native-kqueue</artifactId>
  <classifier>osx-x86_64</classifier>
</dependency>
build.gradle
implementation("io.netty:netty-transport-native-epoll::linux-x86_64")

implementation("io.netty:netty-transport-native-kqueue::osx-x86_64")

You will also have to explicitly configure Vert.x to use the native transport. In application.properties add:

quarkus.vertx.prefer-native-transport=true

Or in application.yml:

quarkus:
  vertx:
    prefer-native-transport: true

If all is well quarkus will log:

[io.qua.ver.cor.run.VertxCoreRecorder] (main) Vertx has Native Transport Enabled: true

Native Linux transport

On Linux you can enable the following socket options:

  • SO_REUSEPORT

quarkus.http.so-reuse-port=true
  • TCP_QUICKACK

quarkus.http.tcp-quick-ack=true
  • TCP_CORK

quarkus.http.tcp-cork=true
  • TCP_FASTOPEN

quarkus.http.tcp-fast-open=true

Native macOS transport

On macOS Sierra and above you can enable the following socket options:

  • SO_REUSEPORT

quarkus.http.so-reuse-port=true

Use a Vert.x context-aware scheduler

Some Mutiny operators need to schedule work on an executor thread pool. A good example is .onItem().delayIt().by(Duration.ofMillis(10) as it needs such an executor to delay emissions.

The default executor is returned by io.smallrye.mutiny.infrastructure.Infrastructure and it is already configured and managed by Quarkus.

That being said, there are cases where you need to make sure that an operation is run on a Vert.x (duplicated) context and not just on any random thread.

The io.smallrye.mutiny.vertx.core.ContextAwareScheduler interface offers an API to obtain context-aware schedulers. Such a scheduler is configured with:

  1. a delegate ScheduledExecutorService of your choice (hint: you can reuse Infrastructure.getDefaultWorkerPool()), and

  2. a context fetching strategy among:

    • an explicit Context, or

    • calling Vertx::getOrCreateContext() either on the current thread or later when the scheduling request happens, or

    • calling Vertx::currentContext(), which fails if the current thread is not a Vert.x thread.

Here is a sample where ContextAwareScheduler is used:

class MyVerticle extends AbstractVerticle {

    @Override
    public Uni<Void> asyncStart() {
        vertx.getOrCreateContext().put("foo", "bar");

        var delegate = Infrastructure.getDefaultWorkerPool();
        var scheduler = ContextAwareScheduler.delegatingTo(delegate)
            .withCurrentContext();

        return Uni.createFrom().voidItem()
                .onItem().delayIt().onExecutor(scheduler).by(Duration.ofMillis(10))
                .onItem().invoke(() -> {
                    // Prints "bar"
                    var ctx = vertx.getOrCreateContext();
                    System.out.println(ctx.get("foo"));
                });
    }
}

In this example a scheduler is created by capturing the context of the Vert.x event-loop that calls asyncStart(). The delayIt operator uses that scheduler, and we can check that the context that we get in invoke is a Vert.x duplicated context where the data for key "foo" has been propagated.

Use a Unix domain socket

Listening on a Unix domain socket allows us to dispense with the overhead of TCP if the connection to the quarkus service is established from the same host. This can happen if access to the service goes through a proxy which is often the case if you’re setting up a service mesh with a proxy like Envoy.

This will only work on platforms that support Use native transports.

Enable the appropriate Use native transports and set the following environment property:

quarkus.http.domain-socket=/var/run/io.quarkus.app.socket
quarkus.http.domain-socket-enabled=true

quarkus.vertx.prefer-native-transport=true

By itself this will not disable the tcp socket which by default will open on 0.0.0.0:8080. It can be explicitly disabled:

quarkus.http.host-enabled=false

These properties can be set through Java’s -D command line parameter or on application.properties.

Do not forget to add the native transport dependency. See Use native transports for details.
Make sure your application has the right permissions to write to the socket.

Use io_uring

io_uring is not supported in native executables.
io_uring support is experimental

io_uring is a Linux kernel interface that allows you to send and receive data asynchronously. It provides unified semantics for both file and network I/O. It was originally designed to target block devices and files but has since gained the ability to work with things like network sockets. It has the potential to provide modest performance benefits to network I/O on its own and greater benefits for mixed file and network I/O application workloads.

To learn more about io_uring, we recommend the following links:

  • Why you should use io_uring for network I/O: The main benefit of io_uring for network I/O is a modern asynchronous API that is straightforward to use and provides unified semantics for file and network I/O. A potential performance benefit of io_uring for network I/O is reducing the number of syscalls. This could provide the biggest benefit for high volumes of small operations where the overhead of system calls can be significant.

  • The Backend Revolution and Why io_uring Is So Important: The io_uring API uses two ring buffers for communication between application and kernel (hence the API name) and designed in a way that enables natural batching of requests and responses. Besides, it provides a way to submit multiple requests in one system call, which can reduce overhead.

  • What exactly is io_uring?: io_uring is a Linux kernel interface to efficiently allow you to send and receive data asynchronously. It was originally designed to target block devices and files but has since gained the ability to work with things like network sockets.

To use io_uring, you need to add two dependencies to your project and enable native transport. First add the following dependencies to your project:

pom.xml
<dependency>
    <groupId>io.netty.incubator</groupId>
    <artifactId>netty-incubator-transport-native-io_uring</artifactId>
    <version>0.0.21.Final</version> <!-- Update this version (https://github.com/netty/netty-incubator-transport-io_uring/tags) -->
    <classifier>linux-x86_64</classifier>
</dependency>
<dependency>
      <groupId>io.vertx</groupId>
      <artifactId>vertx-io_uring-incubator</artifactId>
</dependency>
build.gradle
// Update the io_uring version by picking the latest from https://github.com/netty/netty-incubator-transport-io_uring/tags
implementation("io.netty.incubator:netty-incubator-transport-native-io_uring:0.0.21.Final")
implementation("io.vertx:vertx-io_uring-incubator")

Then, in the application.properties, add:

quarkus.vertx.prefer-native-transport=true
Can I use io_uring on my Linux machine?

To check if you can use io_uring on your Linux machine, execute the following command:

> grep io_uring_setup /proc/kallsyms
0000000000000000 t __pfx_io_uring_setup
0000000000000000 t io_uring_setup
0000000000000000 T __pfx___x64_sys_io_uring_setup
0000000000000000 T __x64_sys_io_uring_setup
0000000000000000 T __pfx___ia32_sys_io_uring_setup
0000000000000000 T __ia32_sys_io_uring_setup
0000000000000000 d event_exit__io_uring_setup
0000000000000000 d event_enter__io_uring_setup
0000000000000000 d __syscall_meta__io_uring_setup
0000000000000000 d args__io_uring_setup
0000000000000000 d types__io_uring_setup
0000000000000000 d __event_exit__io_uring_setup
0000000000000000 d __event_enter__io_uring_setup
0000000000000000 d __p_syscall_meta__io_uring_setup

If it prints something like above, you can use io_uring.

Troubleshooting

io_uring support is still experimental. Check the Netty io_uring FAQ if you see some odd behavior. Also, the netty io_uring was slower than epoll issue describes a few configuration mistakes.

Domain sockets are not yet supported with io_uring.
The Vert.x asynchronous file system API does not use io_uring yet.

Deploy on read-only environments

In environments with read only file systems you may receive errors of the form:

java.lang.IllegalStateException: Failed to create cache dir

Assuming /tmp/ is writable this can be fixed by setting the vertx.cacheDirBase property to point to a directory in /tmp/ for instance in OpenShift by creating an environment variable JAVA_OPTS with the value -Dvertx.cacheDirBase=/tmp/vertx.

Customize the Vert.x configuration

The configuration of the managed Vert.x instance can be provided using the application.properties file, but also using special beans. CDI beans exposing the io.quarkus.vertx.VertxOptionsCustomizer interface can be used to customize the Vert.x configuration. For example, the following customizer change the tmp base directory:

@ApplicationScoped
public class MyCustomizer implements VertxOptionsCustomizer {

    @Override
    public void accept(VertxOptions options) {
        options.setFileSystemOptions(new FileSystemOptions().setFileCacheDir("target"));
    }
}

The customizer beans received the VertxOptions (coming from the application configuration), and can modify them.