Back to Guides

Reactive Messaging AMQP 1.0 Connector Reference Documentation

This guide is the companion from the Getting Started with AMQP 1.0. It explains in more details the configuration and usage of the AMQP connector for reactive messaging.

This documentation does not cover all the details of the connector. Refer to the SmallRye Reactive Messaging website for further details.

The AMQP connector allows Quarkus applications to send and receive messages using the AMQP 1.0 protocol. More details about the protocol can be found in the AMQP 1.0 specification. It’s important to note that AMQP 1.0 and AMQP 0.9.1 (implemented by RabbitMQ) are incompatible. Check Using RabbitMQ to get more details.

AMQP connector extension

To use the connector, you need to add the quarkus-smallrye-reactive-messaging-amqp extension.

You can add the extension to your project using:

CLI
quarkus extension add 'quarkus-smallrye-reactive-messaging-amqp'
Maven
./mvnw quarkus:add-extension -Dextensions='quarkus-smallrye-reactive-messaging-amqp'
Gradle
./gradlew addExtension --extensions='quarkus-smallrye-reactive-messaging-amqp'

Or just add the following dependency to your project:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-smallrye-reactive-messaging-amqp</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-smallrye-reactive-messaging-amqp")

Once added to your project, you can map channels to AMQP addresses by configuring the connector attribute:

# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-amqp

# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-amqp
Connector auto-attachment

If you have a single connector on your classpath, you can omit the connector attribute configuration. Quarkus automatically associates orphan channels to the (unique) connector found on the classpath. Orphans channels are outgoing channels without a downstream consumer or incoming channels without an upstream producer.

This auto-attachment can be disabled using:

quarkus.reactive-messaging.auto-connector-attachment=false

Configuring the AMQP Broker access

The AMQP connector connects to AMQP 1.0 brokers such as Apache ActiveMQ or Artemis. To configure the location and credentials of the broker, add the following properties in the application.properties:

amqp-host=amqp (1)
amqp-port=5672 (2)
amqp-username=my-username (3)
amqp-password=my-password (4)

mp.messaging.incoming.prices.connector=smallrye-amqp (5)
1 Configures the broker/router host name. You can do it per channel (using the host attribute) or globally using amqp-host
2 Configures the broker/router port. You can do it per channel (using the port attribute) or globally using amqp-port. The default is 5672.
3 Configures the broker/router username if required. You can do it per channel (using the username attribute) or globally using amqp-username.
4 Configures the broker/router password if required. You can do it per channel (using the password attribute) or globally using amqp-password.
5 Instructs the prices channel to be managed by the AMQP connector

In dev mode and when running tests, Dev Services for AMQP automatically starts an AMQP broker.

Receiving AMQP messages

Let’s imagine your application receives Message<Double>. You can consume the payload directly:

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class AmqpPriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

Or, you can retrieve the Message<Double>:

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class AmqpPriceMessageConsumer {

    @Incoming("prices")
    public CompletionStage<Void> consume(Message<Double> price) {
        // process your price.

        // Acknowledge the incoming message, marking the AMQP message as `accepted`.
        return price.ack();
    }

}

Inbound Metadata

Messages coming from AMQP contain an instance of IncomingAmqpMetadata in the metadata.

Optional<IncomingAmqpMetadata> metadata = incoming.getMetadata(IncomingAmqpMetadata.class);
metadata.ifPresent(meta -> {
    String address = meta.getAddress();
    String subject = meta.getSubject();
    boolean durable = meta.isDurable();
    // Use io.vertx.core.json.JsonObject
    JsonObject properties = meta.getProperties();
    // ...
});

Deserialization

The connector converts incoming AMQP Messages into Reactive Messaging Message<T> instances. T depends on the body of the received AMQP Message.

The AMQP Type System defines the supported types.

AMQP Body Type <T>

AMQP Value containing a AMQP Primitive Type

the corresponding Java type

AMQP Value using the Binary type

byte[]

AMQP Sequence

List

AMQP Data (with binary content) and the content-type is set to application/json

JsonObject

AMQP Data with a different content-type

byte[]

If you send objects with this AMQP connector (outbound connector), it gets encoded as JSON and sent as binary. The content-type is set to application/json. So, you can rebuild the object as follows:

import io.vertx.core.json.JsonObject;
//
@ApplicationScoped
public static class Consumer {

    List<Price> prices = new CopyOnWriteArrayList<>();

    @Incoming("from-amqp") (1)
    public void consume(JsonObject p) {   (2)
        Price price = p.mapTo(Price.class);  (3)
        prices.add(price);
    }

    public List<Price> list() {
        return prices;
    }
}
1 The Price instances are automatically encoded to JSON by the connector
2 You can receive it using a JsonObject
3 Then, you can reconstruct the instance using the mapTo method
The mapTo method uses the Quarkus Jackson mapper. Check this guide to learn more about the mapper configuration.

Acknowledgement

When a Reactive Messaging Message associated with an AMQP Message is acknowledged, it informs the broker that the message has been accepted.

Failure Management

If a message produced from an AMQP message is nacked, a failure strategy is applied. The AMQP connector supports six strategies:

  • fail - fail the application; no more AMQP messages will be processed (default). The AMQP message is marked as rejected.

  • accept - this strategy marks the AMQP message as accepted. The processing continues ignoring the failure. Refer to the accepted delivery state documentation.

  • release - this strategy marks the AMQP message as released. The processing continues with the next message. The broker can redeliver the message. Refer to the released delivery state documentation.

  • reject - this strategy marks the AMQP message as rejected. The processing continues with the next message. Refer to the rejected delivery state documentation.

  • modified-failed - this strategy marks the AMQP message as modified and indicates that it failed (with the delivery-failed attribute). The processing continues with the next message, but the broker may attempt to redeliver the message. Refer to the modified delivery state documentation

  • modified-failed-undeliverable-here - this strategy marks the AMQP message as modified and indicates that it failed (with the delivery-failed attribute). It also indicates that the application cannot process the message, meaning that the broker will not attempt to redeliver the message to this node. The processing continues with the next message. Refer to the modified delivery state documentation

Sending AMQP messages

Serialization

When sending a Message<T>, the connector converts the message into an AMQP Message. The payload is converted to the AMQP Message body.

T AMQP Message Body

primitive types or String

AMQP Value with the payload

Instant or UUID

AMQP Value using the corresponding AMQP Type

JsonObject or JsonArray

AMQP Data using a binary content. The content-type is set to application/json

io.vertx.mutiny.core.buffer.Buffer

AMQP Data using a binary content. No content-type set

Any other class

The payload is converted to JSON (using a Json Mapper). The result is wrapped into AMQP Data using a binary content. The content-type is set to application/json

If the message payload cannot be serialized to JSON, the message is nacked.

Outbound Metadata

When sending Messages, you can add an instance of OutgoingAmqpMetadata to influence how the message is going to be sent to AMQP. For example, you can configure the subjects, properties:

 OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
    .withDurable(true)
    .withSubject("my-subject")
    .build();

// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);

Dynamic address names

Sometimes it is desirable to select the destination of a message dynamically. In this case, you should not configure the address inside your application configuration file, but instead, use the outbound metadata to set the address.

For example, you can send to a dynamic address based on the incoming message:

String addressName = selectAddressFromIncommingMessage(incoming);
OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
    .withAddress(addressName)
    .withDurable(true)
    .build();

// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);
To be able to set the address per message, the connector is using an anonymous sender.

Acknowledgement

By default, the Reactive Messaging Message is acknowledged when the broker acknowledged the message. When using routers, this acknowledgement may not be enabled. In this case, configure the auto-acknowledgement attribute to acknowledge the message as soon as it has been sent to the router.

If an AMQP message is rejected/released/modified by the broker (or cannot be sent successfully), the message is nacked.

Back Pressure and Credits

The back-pressure is handled by AMQP credits. The outbound connector only requests the amount of allowed credits. When the amount of credits reaches 0, it waits (in a non-blocking fashion) until the broker grants more credits to the AMQP sender.

Configuring the AMQP address

You can configure the AMQP address using the address attribute:

mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.address=my-queue

mp.messaging.outgoing.orders.connector=smallrye-amqp
mp.messaging.outgoing.orders.address=my-order-queue

If the address attribute is not set, the connector uses the channel name.

To use an existing queue, you need to configure the address, container-id and, optionally, the link-name attributes. For example, if you have an Apache Artemis broker configured with:

<queues>
    <queue name="people">
        <address>people</address>
        <durable>true</durable>
        <user>artemis</user>
    </queue>
</queues>

You need the following configuration:

mp.messaging.outgoing.people.connector=smallrye-amqp
mp.messaging.outgoing.people.durable=true
mp.messaging.outgoing.people.address=people
mp.messaging.outgoing.people.container-id=people

You may need to configure the link-name attribute, if the queue name is not the channel name:

mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=people
mp.messaging.outgoing.people-out.container-id=people
mp.messaging.outgoing.people-out.link-name=people

To use a MULTICAST queue, you need to provide the FQQN (fully-qualified queue name) instead of just the name of the queue:

mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=foo
mp.messaging.outgoing.people-out.container-id=foo

mp.messaging.incoming.people-out.connector=smallrye-amqp
mp.messaging.incoming.people-out.durable=true
mp.messaging.incoming.people-out.address=foo::bar # Note the syntax: address-name::queue-name
mp.messaging.incoming.people-out.container-id=bar
mp.messaging.incoming.people-out.link-name=people

More details about the AMQP Address model can be found in the Artemis documentation.

Execution model and Blocking processing

Reactive Messaging invokes your method on an I/O thread. See the Quarkus Reactive Architecture documentation for further details on this topic. But, you often need to combine Reactive Messaging with blocking processing such as database interactions. For this, you need to use the @Blocking annotation indicating that the processing is blocking and should not be run on the caller thread.

For example, The following code illustrates how you can store incoming payloads to a database using Hibernate with Panache:

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;
import javax.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

There are 2 @Blocking annotations:

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

They have the same effect. Thus, you can use both. The first one provides more fine-grained tuning such as the worker pool to use and whether it preserves the order. The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order.

@Transactional

If your method is annotated with @Transactional, it will be considered blocking automatically, even if the method is not annotated with @Blocking.

Customizing the underlying AMQP client

The connector uses the Vert.x AMQP client underneath. More details about this client can be found in the Vert.x website.

You can customize the underlying client configuration by producing an instance of AmqpClientOptions as follows:

@Produces
@Identifier("my-named-options")
public AmqpClientOptions getNamedOptions() {
  // You can use the produced options to configure the TLS connection
  PemKeyCertOptions keycert = new PemKeyCertOptions()
    .addCertPath("./tls/tls.crt")
    .addKeyPath("./tls/tls.key");
  PemTrustOptions trust = new PemTrustOptions().addCertPath("./tlc/ca.crt");
  return new AmqpClientOptions()
        .setSsl(true)
        .setPemKeyCertOptions(keycert)
        .setPemTrustOptions(trust)
        .addEnabledSaslMechanism("EXTERNAL")
        .setHostnameVerificationAlgorithm("")
        .setConnectTimeout(30000)
        .setReconnectInterval(5000)
        .setContainerId("my-container");
}

This instance is retrieved and used to configure the client used by the connector. You need to indicate the name of the client using the client-options-name attribute:

mp.messaging.incoming.prices.client-options-name=my-named-options

Health reporting

If you use the AMQP connector with the quarkus-smallrye-health extension, it contributes to the readiness and liveness probes. The AMQP connector reports the readiness and liveness of each channel managed by the connector. At the moment, the AMQP connector uses the same logic for the readiness and liveness checks.

To disable health reporting, set the health-enabled attribute for the channel to false. On the inbound side (receiving messages from AMQP), the check verifies that the receiver is attached to the broker. On the outbound side (sending records to AMQP), the check verifies that the sender is attached to the broker.

Note that a message processing failures nacks the message, which is then handled by the failure-strategy. It is the responsibility of the failure-strategy to report the failure and influence the outcome of the checks. The fail failure strategy reports the failure, and so the check will report the fault.

Using RabbitMQ

This connector is for AMQP 1.0. RabbitMQ implements AMQP 0.9.1. RabbitMQ does not provide AMQP 1.0 by default, but there is a plugin for it. To use RabbitMQ with this connector, enable and configure the AMQP 1.0 plugin.

Despite the existence of the plugin, a few AMQP 1.0 features won’t work with RabbitMQ. Thus, we recommend the following configurations.

To receive messages from RabbitMQ:

  • Set durable to false

mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.durable=false

To send messages to RabbitMQ:

  • set the destination address (anonymous sender are not supported)

  • set use-anonymous-sender to false

mp.messaging.outgoing.generated-price.connector=smallrye-amqp
mp.messaging.outgoing.generated-price.address=prices
mp.messaging.outgoing.generated-price.use-anonymous-sender=false

As a consequence, it’s not possible to change the destination dynamically (using message metadata) when using RabbitMQ.

Receiving Cloud Events

The AMQP connector supports Cloud Events. When the connector detects a structured or binary Cloud Events, it adds a IncomingCloudEventMetadata<T> into the metadata of the Message. IncomingCloudEventMetadata contains accessors to the mandatory and optional Cloud Event attributes.

If the connector cannot extract the Cloud Event metadata, it sends the Message without the metadata.

For more information on receiving Cloud Events, see Receiving Cloud Events in SmallRye Reactive Messaging documentation.

Sending Cloud Events

The AMQP connector supports Cloud Events. The connector sends the outbound record as Cloud Events if:

  • the message metadata contains an io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata instance,

  • the channel configuration defines the cloud-events-type and cloud-events-source attributes.

For more information on sending Cloud Events, see Sending Cloud Events in SmallRye Reactive Messaging documentation.

AMQP Connector Configuration Reference

Quarkus specific configuration

Configuration property fixed at build time - All other configuration properties are overridable at runtime

Configuration property

Type

Default

If Dev Services for AMQP has been explicitly enabled or disabled. Dev Services are generally enabled by default, unless there is an existing configuration present. For AMQP, Dev Services starts a broker unless amqp-host or amqp-port are set or if all the Reactive Messaging AMQP channel are configured with host or port.

Environment variable: QUARKUS_AMQP_DEVSERVICES_ENABLED

boolean

Optional fixed port the dev service will listen to. If not defined, the port will be chosen randomly.

Environment variable: QUARKUS_AMQP_DEVSERVICES_PORT

int

The image to use. Note that only ActiveMQ Artemis images are supported. Specifically, the image repository must end with artemiscloud/activemq-artemis-broker. Check https://quay.io/repository/artemiscloud/activemq-artemis-broker to find the available versions.

Environment variable: QUARKUS_AMQP_DEVSERVICES_IMAGE_NAME

string

quay.io/artemiscloud/activemq-artemis-broker:0.1.2

The value of the AMQ_EXTRA_ARGS environment variable to pass to the container.

Environment variable: QUARKUS_AMQP_DEVSERVICES_EXTRA_ARGS

string

--no-autotune --mapped --no-fsync

Indicates if the AMQP broker managed by Quarkus Dev Services is shared. When shared, Quarkus looks for running containers using label-based service discovery. If a matching container is found, it is used, and so a second one is not started. Otherwise, Dev Services for AMQP starts a new container. The discovery uses the quarkus-dev-service-amqp label. The value is configured using the service-name property. Container sharing is only used in dev mode.

Environment variable: QUARKUS_AMQP_DEVSERVICES_SHARED

boolean

true

The value of the quarkus-dev-service-aqmp label attached to the started container. This property is used when shared is set to true. In this case, before starting a container, Dev Services for AMQP looks for a container with the quarkus-dev-service-amqp label set to the configured value. If found, it will use this container instead of starting a new one. Otherwise, it starts a new container with the quarkus-dev-service-amqp label set to the specified value. This property is used when you need multiple shared AMQP brokers.

Environment variable: QUARKUS_AMQP_DEVSERVICES_SERVICE_NAME

string

amqp

Incoming channel configuration

Attribute (alias) Description Mandatory Default

address

The AMQP address. If not set, the channel name is used

Type: string

false

auto-acknowledgement

Whether the received AMQP messages must be acknowledged when received

Type: boolean

false

false

broadcast

Whether the received AMQP messages must be dispatched to multiple subscribers

Type: boolean

false

false

client-options-name

(amqp-client-options-name)

The name of the AMQP Client Option bean used to customize the AMQP client configuration

Type: string

false

cloud-events

Enables (default) or disables the Cloud Event support. If enabled on an incoming channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an outgoing, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata.

Type: boolean

false

true

connect-timeout

(amqp-connect-timeout)

The connection timeout in milliseconds

Type: int

false

1000

container-id

The AMQP container id

Type: string

false

durable

Whether AMQP subscription is durable

Type: boolean

false

false

failure-strategy

Specify the failure strategy to apply when a message produced from an AMQP message is nacked. Accepted values are fail (default), accept, release, reject, modified-failed, modified-failed-undeliverable-here

Type: string

false

fail

health-timeout

The max number of seconds to wait to determine if the connection with the broker is still established for the readiness check. After that threshold, the check is considered as failed.

Type: int

false

3

host

(amqp-host)

The broker hostname

Type: string

false

localhost

link-name

The name of the link. If not set, the channel name is used.

Type: string

false

password

(amqp-password)

The password used to authenticate to the broker

Type: string

false

port

(amqp-port)

The broker port

Type: int

false

5672

reconnect-attempts

(amqp-reconnect-attempts)

The number of reconnection attempts

Type: int

false

100

reconnect-interval

(amqp-reconnect-interval)

The interval in second between two reconnection attempts

Type: int

false

10

sni-server-name

(amqp-sni-server-name)

If set, explicitly override the hostname to use for the TLS SNI server name

Type: string

false

tracing-enabled

Whether tracing is enabled (default) or disabled

Type: boolean

false

true

use-ssl

(amqp-use-ssl)

Whether the AMQP connection uses SSL/TLS

Type: boolean

false

false

username

(amqp-username)

The username used to authenticate to the broker

Type: string

false

virtual-host

(amqp-virtual-host)

If set, configure the hostname value used for the connection AMQP Open frame and TLS SNI server name (if TLS is in use)

Type: string

false

Outgoing channel configuration

Attribute (alias) Description Mandatory Default

address

The AMQP address. If not set, the channel name is used

Type: string

false

client-options-name

(amqp-client-options-name)

The name of the AMQP Client Option bean used to customize the AMQP client configuration

Type: string

false

cloud-events

Enables (default) or disables the Cloud Event support. If enabled on an incoming channel, the connector analyzes the incoming records and try to create Cloud Event metadata. If enabled on an outgoing, the connector sends the outgoing messages as Cloud Event if the message includes Cloud Event Metadata.

Type: boolean

false

true

cloud-events-data-content-type

(cloud-events-default-data-content-type)

Configure the default datacontenttype attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the datacontenttype attribute itself

Type: string

false

cloud-events-data-schema

(cloud-events-default-data-schema)

Configure the default dataschema attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the dataschema attribute itself

Type: string

false

cloud-events-insert-timestamp

(cloud-events-default-timestamp)

Whether the connector should insert automatically the time attribute into the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the time attribute itself

Type: boolean

false

true

cloud-events-mode

The Cloud Event mode (structured or binary (default)). Indicates how are written the cloud events in the outgoing record

Type: string

false

binary

cloud-events-source

(cloud-events-default-source)

Configure the default source attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the source attribute itself

Type: string

false

cloud-events-subject

(cloud-events-default-subject)

Configure the default subject attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the subject attribute itself

Type: string

false

cloud-events-type

(cloud-events-default-type)

Configure the default type attribute of the outgoing Cloud Event. Requires cloud-events to be set to true. This value is used if the message does not configure the type attribute itself

Type: string

false

connect-timeout

(amqp-connect-timeout)

The connection timeout in milliseconds

Type: int

false

1000

container-id

The AMQP container id

Type: string

false

credit-retrieval-period

The period (in milliseconds) between two attempts to retrieve the credits granted by the broker. This time is used when the sender run out of credits.

Type: int

false

2000

durable

Whether sent AMQP messages are marked durable

Type: boolean

false

false

health-timeout

The max number of seconds to wait to determine if the connection with the broker is still established for the readiness check. After that threshold, the check is considered as failed.

Type: int

false

3

host

(amqp-host)

The broker hostname

Type: string

false

localhost

link-name

The name of the link. If not set, the channel name is used.

Type: string

false

merge

Whether the connector should allow multiple upstreams

Type: boolean

false

false

password

(amqp-password)

The password used to authenticate to the broker

Type: string

false

port

(amqp-port)

The broker port

Type: int

false

5672

reconnect-attempts

(amqp-reconnect-attempts)

The number of reconnection attempts

Type: int

false

100

reconnect-interval

(amqp-reconnect-interval)

The interval in second between two reconnection attempts

Type: int

false

10

sni-server-name

(amqp-sni-server-name)

If set, explicitly override the hostname to use for the TLS SNI server name

Type: string

false

tracing-enabled

Whether tracing is enabled (default) or disabled

Type: boolean

false

true

ttl

The time-to-live of the sent AMQP messages. 0 to disable the TTL

Type: long

false

0

use-anonymous-sender

Whether the connector should use an anonymous sender. Default value is true if the broker supports it, false otherwise. If not supported, it is not possible to dynamically change the destination address.

Type: boolean

false

use-ssl

(amqp-use-ssl)

Whether the AMQP connection uses SSL/TLS

Type: boolean

false

false

username

(amqp-username)

The username used to authenticate to the broker

Type: string

false

virtual-host

(amqp-virtual-host)

If set, configure the hostname value used for the connection AMQP Open frame and TLS SNI server name (if TLS is in use)

Type: string

false