Vert.x Reference Guide

Vert.x is a toolkit for building reactive applications. As described in the Quarkus Reactive Architecture, Quarkus uses Vert.x underneath.

This guide is the companion to the Using Eclipse Vert.x API from a Quarkus Application guide. It provides more advanced details about the usage and the configuration of the Vert.x instance used by Quarkus.

Accessing the Vert.x instance

To access the managed Vert.x instance, add the quarkus-vertx extension to your project. Note that this dependency may already be installed (as a transitive dependency).

With this extension, you can retrieve the managed instance of Vert.x using either field or constructor injection:

@ApplicationScoped
public class MyBean {
// Field injection
@Inject Vertx vertx;

// Constructor injection
MyBean(Vertx vertx) {
    // ...
}

}

You can inject either the:

  • io.vertx.core.Vertx instance exposing the bare Vert.x API

  • io.vertx.mutiny.core.Vertx instance exposing the Mutiny API

We recommend using the Mutiny variant as it integrates with the other reactive APIs provided by Quarkus.

Mutiny

If you are not familiar with Mutiny, check Mutiny - an intuitive reactive programming library.

Documentation about the Vert.x Mutiny variant is available on https://smallrye.io/smallrye-mutiny-vertx-bindings.

Configuring the Vert.x instance

You can configure the Vert.x instance from the application.properties file. The following table lists the supported properties:

Configuration property fixed at build time - All other configuration properties are overridable at runtime

Configuration property

Type

Default

Enables or disables the Vert.x cache.

Environment variable: QUARKUS_VERTX_CACHING

boolean

true

Enables or disabled the Vert.x classpath resource resolver.

Environment variable: QUARKUS_VERTX_CLASSPATH_RESOLVING

boolean

true

The number of event loops. 2 x the number of core by default.

Environment variable: QUARKUS_VERTX_EVENT_LOOPS_POOL_SIZE

int

The maximum amount of time the event loop can be blocked.

Environment variable: QUARKUS_VERTX_MAX_EVENT_LOOP_EXECUTE_TIME

Duration

2S

The amount of time before a warning is displayed if the event loop is blocked.

Environment variable: QUARKUS_VERTX_WARNING_EXCEPTION_TIME

Duration

2S

The size of the worker thread pool.

Environment variable: QUARKUS_VERTX_WORKER_POOL_SIZE

int

20

The maximum amount of time the worker thread can be blocked.

Environment variable: QUARKUS_VERTX_MAX_WORKER_EXECUTE_TIME

Duration

60S

The size of the internal thread pool (used for the file system).

Environment variable: QUARKUS_VERTX_INTERNAL_BLOCKING_POOL_SIZE

int

20

The queue size. For most applications this should be unbounded

Environment variable: QUARKUS_VERTX_QUEUE_SIZE

int

The executor growth resistance. A resistance factor applied after the core pool is full; values applied here will cause that fraction of submissions to create new threads when no idle thread is available. A value of 0.0f implies that threads beyond the core size should be created as aggressively as threads within it; a value of 1.0f implies that threads beyond the core size should never be created.

Environment variable: QUARKUS_VERTX_GROWTH_RESISTANCE

float

0f

The amount of time a thread will stay alive with no work.

Environment variable: QUARKUS_VERTX_KEEP_ALIVE_TIME

Duration

30S

Prefill thread pool when creating a new Executor. When io.vertx.core.spi.ExecutorServiceFactory.createExecutor is called, initialise with the number of defined threads at startup

Environment variable: QUARKUS_VERTX_PREFILL

boolean

false

Enables the async DNS resolver.

Environment variable: QUARKUS_VERTX_USE_ASYNC_DNS

boolean

false

PEM Key/cert config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PEM

boolean

false

Comma-separated list of the path to the key files (Pem format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PEM_KEYS

list of string

Comma-separated list of the path to the certificate files (Pem format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PEM_CERTS

list of string

JKS config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_JKS

boolean

false

Path of the key file (JKS format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_JKS_PATH

string

Password of the key file.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_JKS_PASSWORD

string

PFX config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PFX

boolean

false

Path to the key file (PFX format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PFX_PATH

string

Password of the key.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PFX_PASSWORD

string

PEM Trust config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PEM

boolean

false

Comma-separated list of the trust certificate files (Pem format).

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PEM_CERTS

list of string

JKS config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_JKS

boolean

false

Path of the key file (JKS format).

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_JKS_PATH

string

Password of the key file.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_JKS_PASSWORD

string

PFX config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PFX

boolean

false

Path to the key file (PFX format).

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PFX_PATH

string

Password of the key.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PFX_PASSWORD

string

The accept backlog.

Environment variable: QUARKUS_VERTX_EVENTBUS_ACCEPT_BACKLOG

int

The client authentication.

Environment variable: QUARKUS_VERTX_EVENTBUS_CLIENT_AUTH

string

NONE

The connect timeout.

Environment variable: QUARKUS_VERTX_EVENTBUS_CONNECT_TIMEOUT

Duration

60S

The idle timeout in milliseconds.

Environment variable: QUARKUS_VERTX_EVENTBUS_IDLE_TIMEOUT

Duration

The receive buffer size.

Environment variable: QUARKUS_VERTX_EVENTBUS_RECEIVE_BUFFER_SIZE

int

The number of reconnection attempts.

Environment variable: QUARKUS_VERTX_EVENTBUS_RECONNECT_ATTEMPTS

int

0

The reconnection interval in milliseconds.

Environment variable: QUARKUS_VERTX_EVENTBUS_RECONNECT_INTERVAL

Duration

1S

Whether to reuse the address.

Environment variable: QUARKUS_VERTX_EVENTBUS_REUSE_ADDRESS

boolean

true

Whether to reuse the port.

Environment variable: QUARKUS_VERTX_EVENTBUS_REUSE_PORT

boolean

false

The send buffer size.

Environment variable: QUARKUS_VERTX_EVENTBUS_SEND_BUFFER_SIZE

int

The so linger.

Environment variable: QUARKUS_VERTX_EVENTBUS_SOLINGER

int

Enables or Disabled SSL.

Environment variable: QUARKUS_VERTX_EVENTBUS_SSL

boolean

false

Whether to keep the TCP connection opened (keep-alive).

Environment variable: QUARKUS_VERTX_EVENTBUS_TCP_KEEP_ALIVE

boolean

false

Configure the TCP no delay.

Environment variable: QUARKUS_VERTX_EVENTBUS_TCP_NO_DELAY

boolean

true

Configure the traffic class.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRAFFIC_CLASS

int

Enables or disables the trust all parameter.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_ALL

boolean

false

The host name.

Environment variable: QUARKUS_VERTX_CLUSTER_HOST

string

localhost

The port.

Environment variable: QUARKUS_VERTX_CLUSTER_PORT

int

The public host name.

Environment variable: QUARKUS_VERTX_CLUSTER_PUBLIC_HOST

string

The public port.

Environment variable: QUARKUS_VERTX_CLUSTER_PUBLIC_PORT

int

Enables or disables the clustering.

Environment variable: QUARKUS_VERTX_CLUSTER_CLUSTERED

boolean

false

The ping interval.

Environment variable: QUARKUS_VERTX_CLUSTER_PING_INTERVAL

Duration

20S

The ping reply interval.

Environment variable: QUARKUS_VERTX_CLUSTER_PING_REPLY_INTERVAL

Duration

20S

The maximum amount of time in seconds that a successfully resolved address will be cached. If not set explicitly, resolved addresses may be cached forever.

Environment variable: QUARKUS_VERTX_RESOLVER_CACHE_MAX_TIME_TO_LIVE

int

2147483647

The minimum amount of time in seconds that a successfully resolved address will be cached.

Environment variable: QUARKUS_VERTX_RESOLVER_CACHE_MIN_TIME_TO_LIVE

int

0

The amount of time in seconds that an unsuccessful attempt to resolve an address will be cached.

Environment variable: QUARKUS_VERTX_RESOLVER_CACHE_NEGATIVE_TIME_TO_LIVE

int

0

The maximum number of queries to be sent during a resolution.

Environment variable: QUARKUS_VERTX_RESOLVER_MAX_QUERIES

int

4

The duration after which a DNS query is considered to be failed.

Environment variable: QUARKUS_VERTX_RESOLVER_QUERY_TIMEOUT

Duration

5S

Enable or disable native transport

Environment variable: QUARKUS_VERTX_PREFER_NATIVE_TRANSPORT

boolean

false

About the Duration format

The format for durations uses the standard java.time.Duration format. You can learn more about it in the Duration#parse() javadoc.

You can also provide duration values starting with a number. In this case, if the value consists only of a number, the converter treats the value as seconds. Otherwise, PT is implicitly prepended to the value to obtain a standard java.time.Duration format.

Using Vert.x clients

In addition to Vert.x core, you can use most Vert.x ecosystem libraries. Some Quarkus extension already wraps Vert.x libraries.

Available APIs

The following table lists the most used libraries from the Vert.x ecosystem. To access these APIs, add the indicated extension or dependency to your project. Refer to the associated documentation to learn how to use them.

API

Extension or Dependency

Documentation

AMQP Client

io.quarkus:quarkus-smallrye-reactive-messaging-amqp (extension)

https://quarkus.io/guides/amqp

Circuit Breaker

io.smallrye.reactive:smallrye-mutiny-vertx-circuit-breaker (external dependency)

https://vertx.io/docs/vertx-circuit-breaker/java/

Consul Client

io.smallrye.reactive:smallrye-mutiny-vertx-consul-client (external dependency)

https://vertx.io/docs/vertx-consul-client/java/

DB2 Client

io.quarkus:quarkus-reactive-db2-client (extension)

https://quarkus.io/guides/reactive-sql-clients

Kafka Client

io.quarkus:quarkus-smallrye-reactive-messaging-kafka (extension)

https://quarkus.io/guides/kafka

Mail Client

io.quarkus:quarkus-mailer (extension)

https://quarkus.io/guides/mailer

MQTT Client

io.quarkus:quarkus-smallrye-reactive-messaging-mqtt (extension)

No guide yet

MS SQL Client

io.quarkus:quarkus-reactive-mssql-client (extension)

https://quarkus.io/guides/reactive-sql-clients

MySQL Client

io.quarkus:quarkus-reactive-mysql-client (extension)

https://quarkus.io/guides/reactive-sql-clients

Oracle Client

io.quarkus:quarkus-reactive-oracle-client (extension)

https://quarkus.io/guides/reactive-sql-clients

PostgreSQL Client

io.quarkus:quarkus-reactive-pg-client (extension)

https://quarkus.io/guides/reactive-sql-clients

RabbitMQ Client

io.smallrye.reactive:smallrye-mutiny-vertx-rabbitmq-client (external dependency)

https://vertx.io/docs/vertx-rabbitmq-client/java

Redis Client

io.quarkus:quarkus-redis-client (extension)

https://quarkus.io/guides/redis

Web Client

io.smallrye.reactive:smallrye-mutiny-vertx-web-client (external dependency)

https://vertx.io/docs/vertx-web-client/java/

To learn more about the usage of the Vert.x Mutiny API, refer to https://smallrye.io/smallrye-mutiny-vertx-bindings.

Example of usage

This section gives an example using the Vert.x WebClient in the context of a RESTEasy Reactive application. As indicated in the table above, add the following dependency to your project:

pom.xml
<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>
build.gradle
implementation("io.smallrye.reactive:smallrye-mutiny-vertx-web-client")

Now, in your code, you can create an instance of WebClient:

package org.acme.vertx;


import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import io.smallrye.mutiny.Uni;

import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.ext.web.client.WebClient;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClientOptions;

@Path("/fruit-data")
public class ResourceUsingWebClient {

    private final WebClient client;

    @Inject
    VertxResource(Vertx vertx) {
        this.client = WebClient.create(vertx);
    }

    @GET
    @Produces(MediaType.APPLICATION_JSON)
    @Path("/{name}")
    public Uni<JsonObject> getFruitData(String name) {
        return client.getAbs("https://.../api/fruit/" + name)
                .send()
                .onItem().transform(resp -> {
                    if (resp.statusCode() == 200) {
                        return resp.bodyAsJsonObject();
                    } else {
                        return new JsonObject()
                                .put("code", resp.statusCode())
                                .put("message", resp.bodyAsString());
                    }
                });
    }

}

This resource creates a WebClient and, upon request, uses this client to invoke a remote HTTP API. Depending on the result, the response is forwarded as received, or it creates a JSON object wrapping the error. The WebClient is asynchronous (and non-blocking), to the endpoint returns a Uni.

The application can also run as a native executable. But, first, we need to instruct Quarkus to enable ssl (if the remote API uses HTTPS). Open the src/main/resources/application.properties and add:

quarkus.ssl.native=true

Then, create the native executable with:

CLI
quarkus build --native
Maven
./mvnw package -Dnative
Gradle
./gradlew build -Dquarkus.package.type=native

Using Vert.x JSON

Vert.x APIs often rely on JSON. Vert.x provides two convenient classes to manipulate JSON document: io.vertx.core.json.JsonObject and io.vertx.core.json.JsonArray.

JsonObject can be used to map an object into its JSON representation and build an object from a JSON document:

// Map an object into JSON
Person person = ...;
JsonObject json = JsonObject.mapFrom(person);

// Build an object from JSON
json = new JsonObject();
person = json.mapTo(Person.class);

Note that these features use the mapper managed by the quarkus-jackson extension. Refer to Jackson configuration to customize the mapping.

JSON Object and JSON Array are both supported as Quarkus HTTP endpoint requests and response bodies (using classic RESTEasy and RESTEasy Reactive). Consider these endpoints:

package org.acme.vertx;

import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/hello")
@Produces(MediaType.APPLICATION_JSON)
public class VertxJsonResource {

    @GET
    @Path("{name}/object")
    public JsonObject jsonObject(String name) {
        return new JsonObject().put("Hello", name);
    }

    @GET
    @Path("{name}/array")
    public JsonArray jsonArray(String name) {
        return new JsonArray().add("Hello").add(name);
    }
}
{"Hello":"Quarkus"}
["Hello","Quarkus"]

This works equally well when the JSON content is a request body or is wrapped in a Uni, Multi, CompletionStage or Publisher.

Using verticles

Verticles is "a simple, scalable, actor-like deployment and concurrency model" provided by _Vert.x. This model does not claim to be a strict actor-model implementation, but it shares similarities, especially concerning concurrency, scaling, and deployment. To use this model, you write and deploy verticles, communicating by sending messages on the event bus.

You can deploy verticles in Quarkus. It supports:

  • bare verticle - Java classes extending io.vertx.core.AbstractVerticle

  • Mutiny verticle - Java classes extending io.smallrye.mutiny.vertx.core.AbstractVerticle

Deploying verticles

To deploy verticles, use the deployVerticle method:

@Inject Vertx vertx;

// ...
vertx.deployVerticle(MyVerticle.class.getName(), ar -> { });
vertx.deployVerticle(new MyVerticle(), ar -> { });

If you use the Mutiny-variant of Vert.x, be aware that the deployVerticle method returns a Uni, and you would need to trigger a subscription to make the actual deployment.

An example explaining how to deploy verticles during the initialization of the application will follow.

Using @ApplicationScoped Beans as Verticle

In general, Vert.x verticles are not CDI beans. And so cannot use injection. However, in Quarkus, you can deploy verticles as beans. Note that in this case, CDI (Arc in Quarkus) is responsible for creating the instance.

The following snippet provides an example:

package io.quarkus.vertx.verticles;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MyBeanVerticle extends AbstractVerticle {

    @ConfigProperty(name = "address") String address;

    @Override
    public Uni<Void> asyncStart() {
        return vertx.eventBus().consumer(address)
                .handler(m -> m.replyAndForget("hello"))
                .completionHandler();
    }
}

You don’t have to inject the vertx instance; instead, leverage the protected field from AbstractVerticle.

Then, deploy the verticle instances with:

package io.quarkus.vertx.verticles;

import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;

@ApplicationScoped
public class VerticleDeployer {

    public void init(@Observes StartupEvent e, Vertx vertx, MyBeanVerticle verticle) {
         vertx.deployVerticle(verticle).await().indefinitely();
    }
}

If you want to deploy every exposed AbstractVerticle, you can use:

public void init(@Observes StartupEvent e, Vertx vertx, Instance<AbstractVerticle> verticles) {
    for (AbstractVerticle verticle : verticles) {
        vertx.deployVerticle(verticle).await().indefinitely();
    }
}

Using multiple verticles instances

When using @ApplicationScoped, you will get a single instance for your verticle. Having multiple instances of verticles can be helpful to share the load among them. Each of them will be associated with a different I/O thread (Vert.x event loop).

To deploy multiple instances of your verticle, use the @Dependent scope instead of @ApplicationScoped:

package org.acme.verticle;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;

import javax.enterprise.context.Dependent;
import javax.inject.Inject;

@Dependent
public class MyVerticle extends AbstractVerticle {

    @Override
    public Uni<Void> asyncStart() {
        return vertx.eventBus().consumer("address")
                .handler(m -> m.reply("Hello from " + this))
                .completionHandler();
    }
}

Then, deploy your verticle as follows:

package org.acme.verticle;

import io.quarkus.runtime.StartupEvent;
import io.vertx.core.DeploymentOptions;
import io.vertx.mutiny.core.Vertx;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;

@ApplicationScoped
public class MyApp {

    void init(@Observes StartupEvent ev, Vertx vertx, Instance<MyVerticle> verticles) {
        vertx
                .deployVerticle(verticles::get, new DeploymentOptions().setInstances(2))
                .await().indefinitely();
    }
}

The init method receives an Instance<MyVerticle>. Then, you pass a supplier to the deployVerticle method. The supplier is just calling the get() method. Thanks to the @Dependent scope, it returns a new instance on every call. Finally, you pass the desired number of instances to the DeploymentOptions, such as two in the previous example. It will call the supplier twice, which will create two instances of your verticle.

Using the event bus

Vert.x comes with a built-in event bus that you can use from your Quarkus application. So, your application components (CDI beans, resources…​) can interact using asynchronous events, thus promoting loose-coupling.

With the event bus, you send messages to virtual addresses. The event bus offers three types of delivery mechanisms:

  • point-to-point - send the message, one consumer receives it. If several consumers listen to the address, a round-robin is applied;

  • publish/subscribe - publish a message; all the consumers listening to the address are receiving the message;

  • request/reply - send the message and expect a response. The receiver can respond to the message in an asynchronous fashion.

All these delivery mechanisms are non-blocking and are providing one of the fundamental bricks to build reactive applications.

Consuming events

While you can use the Vert.x API to register consumers, Quarkus comes with declarative support. To consume events, use the io.quarkus.vertx.ConsumeEvent annotation:

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent                           (1)
    public String consume(String name) {    (2)
        return name.toUpperCase();
    }
}
1 If not set, the address is the fully qualified name of the bean; for instance, in this snippet, it’s org.acme.vertx.GreetingService.
2 The method parameter is the message body. If the method returns something, it’s the message response.

Configuring the address

The @ConsumeEvent annotation can be configured to set the address:

@ConsumeEvent("greeting")               (1)
public String consume(String name) {
    return name.toUpperCase();
}
1 Receive the messages sent to the greeting address

Asynchronous processing

The previous examples use synchronous processing. Asynchronous processing is also possible by returning either an io.smallrye.mutiny.Uni or a java.util.concurrent.CompletionStage:

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent
    public CompletionStage<String> consume(String name) {
        // return a CompletionStage completed when the processing is finished.
        // You can also fail the CompletionStage explicitly
    }

    @ConsumeEvent
    public Uni<String> process(String name) {
        // return an Uni completed when the processing is finished.
        // You can also fail the Uni explicitly
    }
}
Mutiny

The previous example uses Mutiny reactive types. If you are not familiar with Mutiny, check Mutiny - an intuitive reactive programming library.

Blocking processing

By default, the code consuming the event must be non-blocking, as it’s called on an I/O thread. If your processing is blocking, use the @io.smallrye.common.annotation.Blocking annotation:

@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
    // Something blocking
}

Alternatively, you can use the blocking attribute from the @ConsumeEvent annotation:

@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
    // Something blocking
}

When using @Blocking, it ignores the value of the blocking attribute of @ConsumeEvent.

Replying to messages

The return value of a method annotated with @ConsumeEvent is used to respond to the incoming message. For instance, in the following snippet, the returned String is the response.

@ConsumeEvent("greeting")
public String consume(String name) {
    return name.toUpperCase();
}

You can also return a Uni<T> or a CompletionStage<T> to handle asynchronous reply:

@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
    return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}

You can inject an executor if you use the Context Propagation extension:

@Inject Executor executor;

Implementing fire and forget interactions

You don’t have to reply to received messages. Typically, for a fire and forget interaction, the messages are consumed, and the sender does not need to know about it. To implement this pattern, your consumer method returns void.

@ConsumeEvent("greeting")
public void consume(String event) {
    // Do something with the event
}

Dealing with messages

Unlike the previous example using the payloads directly, you can also use Message directly:

@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
    System.out.println(msg.address());
    System.out.println(msg.body());
}

Handling Failures

If a method annotated with @ConsumeEvent throws an exception, then:

  • if a reply handler is set, then the failure is propagated back to the sender via an io.vertx.core.eventbus.ReplyException with code ConsumeEvent#FAILURE_CODE and the exception message,

  • if no reply handler is set, then the exception is rethrown (and wrapped in a RuntimeException if necessary) and can be handled by the default exception handler, i.e. io.vertx.core.Vertx#exceptionHandler().

Sending messages

Sending and publishing messages use the Vert.x event bus:

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/async")
public class EventResource {

    @Inject
    EventBus bus;                                            (1)

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)        (2)
                .onItem().transform(Message::body);
    }
}
1 Inject the Event bus
2 Send a message to the address greeting. Message payload is name

The EventBus object provides methods to:

  1. send a message to a specific address - one single consumer receives the message.

  2. publish a message to a specific address - all consumers receive the messages.

  3. request a message and expect a reply

// Case 1
bus.sendAndForget("greeting", name)
// Case 2
bus.publish("greeting", name)
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
        .onItem().transform(Message::body);

Using codecs

The Vert.x Event Bus uses codecs to serialize and deserialize objects. Quarkus provides a default codec for local delivery. So you can exchange objects as follows:

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", new MyName(name))
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello " + name.getName());
}

If you want to use a specific codec, you need to set it on both ends explicitly:

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", name,
        new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting", codec = MyNameCodec.class)            (2)
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 Set the name of the codec to use to send the message
2 Set the codec to use to receive the message

Combining HTTP and the event bus

Let’s revisit a greeting HTTP endpoint and use asynchronous message passing to delegate the call to a separated bean. It uses the request/reply dispatching mechanism. Instead of implementing the business logic inside the JAX-RS endpoint, we are sending a message. Another bean consumes this message, and the response is sent using the reply mechanism.

In your HTTP endpoint class, inject the event bus and uses the request method to send a message to the event bus and expect a response:

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/bus")
public class EventResource {

    @Inject
    EventBus bus;

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)            (1)
                .onItem().transform(Message::body);            (2)
    }
}
1 send the name to the greeting address and request a response
2 when we get the response, extract the body and send it to the user
the HTTP method returns a Uni. If you are using RESTEasy Reactive, Uni support is built-in. If you are using classic RESTEasy, you need to add the quarkus resteasy-mutiny extension to your project.

We need a consumer listening on the greeting address. This consumer can be in the same class or another bean such as:

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent("greeting")
    public String greeting(String name) {
        return "Hello " + name;
    }

}

This bean receives the name and returns the greeting message.

With this in place, every HTTP request on /bus/quarkus sends a message to the event bus, waits for a reply, and when this one arrives, writes the HTTP response:

Hello Quarkus

To better understand, let’s detail how the HTTP request/response has been handled:

  1. The request is received by the greeting method

  2. a message containing the name is sent to the event bus

  3. Another bean receives this message and computes the response

  4. This response is sent back using the reply mechanism

  5. Once the reply is received by the sender, the content is written to the HTTP response

Bidirectional communication with browsers using SockJS

The SockJS bridge provided by Vert.x allows browser applications and Quarkus applications to communicate using the event bus. It connects both sides. So, both sides can send messages received on the other side. It supports the three delivery mechanisms.

SockJS negotiates the communication channel between the Quarkus application and the browser. If WebSockets are supported, it uses them; otherwise, it degrades to SSE, long polling, etc.

So use SockJS, you need to configure the bridge, especially the addresses that will be used to communicate:

package org.acme.vertx;

import io.vertx.core.Vertx;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.sockjs.SockJSBridgeOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import java.util.concurrent.atomic.AtomicInteger;

@ApplicationScoped
public class SockJsExample {

    @Inject
    Vertx vertx;

    public void init(@Observes Router router) {
        SockJSHandler sockJSHandler = SockJSHandler.create(vertx);
        sockJSHandler.bridge(new SockJSBridgeOptions()
                .addOutboundPermitted(new PermittedOptions().setAddress("ticks")));
        router.route("/eventbus/*").handler(sockJSHandler);
    }

}

This code configures the SockJS bridge to send all the messages targeting the ticks address to the connected browsers. More detailed explanations about the configuration can be found on the Vert.x SockJS Bridge documentation.

The browser must use the vertx-eventbus JavaScript library to consume the message:

<!doctype html>
<html>
<head>
    <meta charset="utf-8"/>
    <title>SockJS example - Quarkus</title>
    <script src="https://code.jquery.com/jquery-3.3.1.min.js"
            integrity="sha256-FgpCb/KJQlLNfOu91ta32o/NMZxltwRo8QtmkMRdAu8=" crossorigin="anonymous"></script>
    <script type="application/javascript" src="https://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script>
    <script src="https://cdn.jsdelivr.net/npm/vertx3-eventbus-client@3.8.5/vertx-eventbus.min.js"></script>
</head>
<body>

<h1>SockJS Examples</h1>

<p><strong>Last Tick:</strong> <span id="tick"></span></p>

</body>
<script>
    var eb = new EventBus('/eventbus');

    eb.onopen = function () {

        eb.registerHandler('ticks', function (error, message) {
            $("#tick").html(message.body);
        });
    }

</script>
</html>

Native Transport

Native transports are not supported in GraalVM produced binaries.

Vert.x is capable of using Netty’s native transports, which offers performance improvements on specific platforms.To enable them, you must include the appropriate dependency for your platform. It’s usually a good idea to have both to keep your application platform-agnostic. Netty is smart enough to use the correct one, that includes none at all on unsupported platforms:

pom.xml
<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-transport-native-epoll</artifactId>
  <classifier>linux-x86_64</classifier>
</dependency>

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-transport-native-kqueue</artifactId>
  <classifier>osx-x86_64</classifier>
</dependency>
build.gradle
implementation("io.netty:netty-transport-native-epoll::linux-x86_64")

implementation("io.netty:netty-transport-native-kqueue::osx-x86_64")

You will also have to explicitly configure Vert.x to use the native transport. In application.properties add:

quarkus.vertx.prefer-native-transport=true

Or in application.yml:

quarkus:
  vertx:
    prefer-native-transport: true

If all is well quarkus will log:

[io.qua.ver.cor.run.VertxCoreRecorder] (main) Vertx has Native Transport Enabled: true

Native Linux Transport

On Linux you can enable the following socket options:

  • SO_REUSEPORT

quarkus.http.so-reuse-port=true
  • TCP_QUICKACK

quarkus.http.tcp-quick-ack=true
  • TCP_CORK

quarkus.http.tcp-cork=true
  • TCP_FASTOPEN

quarkus.http.tcp-fast-open=true

Native macOS Transport

On macOS Sierra and above you can enable the following socket options:

  • SO_REUSEPORT

quarkus.http.so-reuse-port=true

Listening to a Unix Domain Socket

Listening on a Unix domain socket allows us to dispense with the overhead of TCP if the connection to the quarkus service is established from the same host. This can happen if access to the service goes through a proxy which is often the case if you’re setting up a service mesh with a proxy like Envoy.

This will only work on platforms that support Native Transport.

Enable the appropriate Native Transport and set the following environment property:

quarkus.http.domain-socket=/var/run/io.quarkus.app.socket
quarkus.http.domain-socket-enabled=true

quarkus.vertx.prefer-native-transport=true

By itself this will not disable the tcp socket which by default will open on 0.0.0.0:8080. It can be explicitly disabled:

quarkus.http.host-enabled=false

These properties can be set through Java’s -D command line parameter or on application.properties.

Do not forget to add the native transport dependency. See Native Transport for details.
Make sure your application has the right permissions to write to the socket.

Read only deployment environments

In environments with read only file systems you may receive errors of the form:

java.lang.IllegalStateException: Failed to create cache dir

Assuming /tmp/ is writable this can be fixed by setting the vertx.cacheDirBase property to point to a directory in /tmp/ for instance in OpenShift by creating an environment variable JAVA_OPTS with the value -Dvertx.cacheDirBase=/tmp/vertx.