Using Apache Kafka with Schema Registry and JSON Schema
This guide shows how your Quarkus application can use Apache Kafka, JSON Schema serialized records, and connect to a schema registry (such as the Confluent Schema Registry or Apicurio Registry).
If you are not familiar with Kafka and Kafka in Quarkus in particular, consider first going through the Using Apache Kafka with Reactive Messaging guide.
Prerequisites
To complete this guide, you need:
-
Roughly 30 minutes
-
An IDE
-
JDK 17+ installed with
JAVA_HOME
configured appropriately -
Apache Maven 3.9.8
-
Docker and Docker Compose or Podman, and Docker Compose
-
Optionally the Quarkus CLI if you want to use it
-
Optionally Mandrel or GraalVM installed and configured appropriately if you want to build a native executable (or Docker if you use a native container build)
Architecture
In this guide we are going to implement a REST resource, namely MovieResource
, that
will consume movie DTOs and put them in a Kafka topic.
Then, we will implement a consumer that will consume and collect messages from the same topic.
The collected messages will be then exposed by another resource, ConsumedMovieResource
, via
Server-Sent Events.
The Movies will be serialized and deserialized using JSON Schema. The schema, describing the Movie, is stored in Apicurio Registry. The same concept applies if you are using the Confluent JSON Schema serde and Confluent Schema Registry.
Solution
We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example.
Clone the Git repository: git clone https://github.com/quarkusio/quarkus-quickstarts.git
, or download an archive.
The solution is located in the kafka-json-schema-quickstart
directory.
Creating the Maven Project
First, we need a new project. Create a new project with the following command:
For Windows users:
-
If using cmd, (don’t use backward slash
\
and put everything on the same line) -
If using Powershell, wrap
-D
parameters in double quotes e.g."-DprojectArtifactId=kafka-json-schema-quickstart"
If you use Confluent Schema Registry, you don’t need the |
Json Schema
Json Schema is a data serialization system. Data structures are described using schemas.
The first thing we need to do is to create a schema describing the Movie
structure.
Create a file called src/main/resources/json-schema.json
with the schema for our record (Kafka message):
{
"$id": "https://example.com/person.schema.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Movie",
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "The movie's title."
},
"year": {
"type": "integer",
"description": "The movie's year."
}
}
}
Note that auto-generating the Java class from the JSON Schema definition is not possible. Therefore, you must define the Java class as follows so it can be used by the serialization process:
package org.acme.kafka;
public class Movie {
private String title;
private Integer year;
public Movie() {
}
public Movie(String title, Integer year) {
this.title = title;
this.year = year;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public Integer getYear() {
return year;
}
public void setYear(Integer year) {
this.year = year;
}
}
The Movie
producer
Having defined the schema, we can now jump to implementing the MovieResource
.
Let’s open the MovieResource
, inject an Emitter
of Movie
DTO and implement a @POST
method
that consumes Movie
and sends it through the Emitter
:
package org.acme.kafka;
import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.logging.Logger;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;
@Path("/movies")
public class MovieResource {
private static final Logger LOGGER = Logger.getLogger(MovieResource.class);
@Channel("movies")
Emitter<Movie> emitter;
@POST
public Response enqueueMovie(Movie movie) {
LOGGER.infof("Sending movie %s to Kafka", movie.getTitle());
emitter.send(movie);
return Response.accepted().build();
}
}
Now, we need to map the movies
channel (the Emitter
emits to this channel) to a Kafka topic and also map the schema to be used on this channel.
To achieve this, edit the application.properties
file, and add the following content:
# set the connector for the outgoing channel to `smallrye-kafka`
mp.messaging.outgoing.movies.connector=smallrye-kafka
# disable automatic detection of the serializers
quarkus.messaging.kafka.serializer-autodetection.enabled=false
# Set the value serializer for the channel `movies`
mp.messaging.outgoing.movies.value.serializer=io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer
# set the topic name for the channel to `movies`
mp.messaging.outgoing.movies.topic=movies
# set the schema to be used for the channel `movies`. Note that this property accepts just a name or a path and the serializer will look for the resource on the classpath.
mp.messaging.outgoing.movies.apicurio.registry.artifact.schema.location=json-schema.json
# automatically register the schema with the registry, if not present
mp.messaging.outgoing.movies.apicurio.registry.auto-register=true
Note that unlike in the avro serialization, autodetect can’t be used with JSON Schema, so we must define the If you use Confluent Schema Registry, in this case you must define the |
The Movie
consumer
So, we can write records into Kafka containing our Movie
data.
That data is serialized using JSON Schema.
Now, it’s time to implement a consumer for them.
Let’s create ConsumedMovieResource
that will consume Movie
messages
from the movies-from-kafka
channel and will expose it via Server-Sent Events:
package org.acme.kafka;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.jboss.resteasy.reactive.RestStreamElementType;
import io.smallrye.mutiny.Multi;
@ApplicationScoped
@Path("/consumed-movies")
public class ConsumedMovieResource {
@Channel("movies-from-kafka")
Multi<Movie> movies;
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<String> stream() {
return movies.map(movie -> String.format("'%s' from %s", movie.getTitle(), movie.getYear()));
}
}
The last bit of the application’s code is the configuration of the movies-from-kafka
channel in
application.properties
:
# set the connector for the incoming channel to `smallrye-kafka`
mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka
# set the topic name for the channel to `movies`
mp.messaging.incoming.movies-from-kafka.topic=movies
# set the deserializer for the incoming channel
mp.messaging.incoming.movies-from-kafka.value.deserializer=io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer
# disable auto-commit, Reactive Messaging handles it itself
mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false
mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest
Again, unlike with Avro, we have to define the If you use Confluent Schema Registry, you must configure |
Running the application
Start the application in dev mode:
quarkus dev
./mvnw quarkus:dev
./gradlew --console=plain quarkusDev
Kafka broker and Apicurio Registry instance are started automatically thanks to Dev Services. See Dev Services for Kafka and Dev Services for Apicurio Registry for more details.
You might have noticed that we didn’t configure the schema registry URL anywhere. This is because Dev Services for Apicurio Registry configures all Kafka channels in Quarkus Messaging to use the automatically started registry instance. Apicurio Registry, in addition to its native API, also exposes an endpoint that is API-compatible with Confluent Schema Registry. Therefore, this automatic configuration works both for Apicurio Registry serde and Confluent Schema Registry serde. However, note that there’s no Dev Services support for running Confluent Schema Registry itself. If you want to use a running instance of Confluent Schema Registry, configure its URL, together with the URL of a Kafka broker:
|
In the second terminal, query the ConsumedMovieResource
resource with curl
:
curl -N http://localhost:8080/consumed-movies
In the third one, post a few movies:
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Shawshank Redemption","year":1994}' \
http://localhost:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Godfather","year":1972}' \
http://localhost:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"The Dark Knight","year":2008}' \
http://localhost:8080/movies
curl --header "Content-Type: application/json" \
--request POST \
--data '{"title":"12 Angry Men","year":1957}' \
http://localhost:8080/movies
Observe what is printed in the second terminal. You should see something along the lines of:
data:'The Shawshank Redemption' from 1994
data:'The Godfather' from 1972
data:'The Dark Knight' from 2008
data:'12 Angry Men' from 1957
Running in JVM or Native mode
When not running in dev or test mode, you will need to start your own Kafka broker and Apicurio Registry.
The easiest way to get them running is to use docker-compose
to start the appropriate containers.
If you use Confluent Schema Registry, you already have a Kafka broker and Confluent Schema Registry instance running and configured.
You can ignore the docker-compose instructions here, as well as the Apicurio Registry configuration.
|
Create a docker-compose.yaml
file at the root of the project with the following content:
version: '2'
services:
zookeeper:
image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
command: [
"sh", "-c",
"bin/zookeeper-server-start.sh config/zookeeper.properties"
]
ports:
- "2181:2181"
environment:
LOG_DIR: /tmp/logs
kafka:
image: quay.io/strimzi/kafka:0.41.0-kafka-3.7.0
command: [
"sh", "-c",
"bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
]
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
schema-registry:
image: apicurio/apicurio-registry-mem:2.4.2.Final
ports:
- 8081:8080
depends_on:
- kafka
environment:
QUARKUS_PROFILE: prod
Before starting the application, let’s first start the Kafka broker and Apicurio Registry:
docker-compose up
To stop the containers, use docker-compose down . You can also clean up
the containers with docker-compose rm
|
You can build the application with:
quarkus build
./mvnw install
./gradlew build
And run it in JVM mode with:
java -Dmp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2 -jar target/quarkus-app/quarkus-run.jar
By default, the application tries to connect to a Kafka broker listening at localhost:9092 .
You can configure the bootstrap server using: java -Dkafka.bootstrap.servers=... -jar target/quarkus-app/quarkus-run.jar
|
Specifying the registry URL on the command line is not very convenient, so you can add a configuration property only for the prod
profile:
%prod.mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2
You can build a native executable with:
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
and run it with:
./target/kafka-json-schema-schema-quickstart-1.0.0-SNAPSHOT-runner -Dkafka.bootstrap.servers=localhost:9092
Testing the application
As mentioned above, Dev Services for Kafka and Apicurio Registry automatically start and configure a Kafka broker and Apicurio Registry instance in dev mode and for tests. Hence, we don’t have to set up Kafka and Apicurio Registry ourselves. We can just focus on writing the test.
First, let’s add test dependencies on REST Client and Awaitility to the build file:
<!-- we'll use Jakarta REST Client for talking to the SSE endpoint -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
testImplementation("io.quarkus:quarkus-rest-client")
testImplementation("org.awaitility:awaitility")
In the test, we will send movies in a loop and check if the ConsumedMovieResource
returns
what we send.
package org.acme.kafka;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.http.ContentType;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.sse.SseEventSource;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static io.restassured.RestAssured.given;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
@QuarkusTest
public class MovieResourceTest {
@TestHTTPResource("/consumed-movies")
URI consumedMovies;
@Test
public void testHelloEndpoint() throws InterruptedException {
// create a client for `ConsumedMovieResource` and collect the consumed resources in a list
Client client = ClientBuilder.newClient();
WebTarget target = client.target(consumedMovies);
List<String> received = new CopyOnWriteArrayList<>();
SseEventSource source = SseEventSource.target(target).build();
source.register(inboundSseEvent -> received.add(inboundSseEvent.readData()));
// in a separate thread, feed the `MovieResource`
ExecutorService movieSender = startSendingMovies();
source.open();
// check if, after at most 5 seconds, we have at least 2 items collected, and they are what we expect
await().atMost(5, SECONDS).until(() -> received.size() >= 2);
assertThat(received, Matchers.hasItems("'The Shawshank Redemption' from 1994",
"'12 Angry Men' from 1957"));
source.close();
// shutdown the executor that is feeding the `MovieResource`
movieSender.shutdownNow();
movieSender.awaitTermination(5, SECONDS);
}
private ExecutorService startSendingMovies() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
while (true) {
given()
.contentType(ContentType.JSON)
.body("{\"title\":\"The Shawshank Redemption\",\"year\":1994}")
.when()
.post("/movies")
.then()
.statusCode(202);
given()
.contentType(ContentType.JSON)
.body("{\"title\":\"12 Angry Men\",\"year\":1957}")
.when()
.post("/movies")
.then()
.statusCode(202);
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
break;
}
}
});
return executorService;
}
}
We modified the MovieResourceTest that was generated together with the project. This test class has a
subclass, NativeMovieResourceIT , that runs the same test against the native executable.
To run it, execute:
|
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
Manual setup
If we couldn’t use Dev Services and wanted to start a Kafka broker and Apicurio Registry instance manually, we would define a QuarkusTestResourceLifecycleManager.
<dependency>
<groupId>io.strimzi</groupId>
<artifactId>strimzi-test-container</artifactId>
<version>0.105.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
</exclusions>
</dependency>
testImplementation("io.strimzi:strimzi-test-container:0.105.0") {
exclude group: "org.apache.logging.log4j", module: "log4j-core"
}
package org.acme.kafka;
import java.util.HashMap;
import java.util.Map;
import org.testcontainers.containers.GenericContainer;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import io.strimzi.StrimziKafkaContainer;
public class KafkaAndSchemaRegistryTestResource implements QuarkusTestResourceLifecycleManager {
private final StrimziKafkaContainer kafka = new StrimziKafkaContainer();
private GenericContainer<?> registry;
@Override
public Map<String, String> start() {
kafka.start();
registry = new GenericContainer<>("apicurio/apicurio-registry-mem:2.4.2.Final")
.withExposedPorts(8080)
.withEnv("QUARKUS_PROFILE", "prod");
registry.start();
Map<String, String> properties = new HashMap<>();
properties.put("mp.messaging.connector.smallrye-kafka.apicurio.registry.url",
"http://" + registry.getHost() + ":" + registry.getMappedPort(8080) + "/apis/registry/v2");
properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
return properties;
}
@Override
public void stop() {
registry.stop();
kafka.stop();
}
}
@QuarkusTest
@QuarkusTestResource(KafkaAndSchemaRegistryTestResource.class)
public class MovieResourceTest {
...
}
Using compatible versions of the Apicurio Registry
The quarkus-apicurio-registry-json-schema
extension depends on recent versions of Apicurio Registry client,
and most versions of Apicurio Registry server and client are backwards compatible.
For some you need to make sure that the client used by Serdes is compatible with the server.
For example, with Apicurio Dev Service if you set the image name to use version 2.1.5.Final
:
quarkus.apicurio-registry.devservices.image-name=quay.io/apicurio/apicurio-registry-mem:2.1.5.Final
You need to make sure that apicurio-registry-serdes-json-schema-serde
dependency
and the REST client apicurio-common-rest-client-vertx
dependency are set to compatible versions:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-apicurio-registry-json-schema</artifactId>
<exclusions>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-vertx</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-json-schema-serde</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-client</artifactId>
<version>2.1.5.Final</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-common</artifactId>
<version>2.1.5.Final</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-json-schema-serde</artifactId>
<version>2.1.5.Final</version>
<exclusions>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-jdk</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-client</artifactId>
</exclusion>
<exclusion>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-common-rest-client-vertx</artifactId>
<version>0.1.5.Final</version>
</dependency>
dependencies {
implementation(platform("io.quarkus.platform:quarkus-bom:2.12.3.Final"))
...
implementation("io.quarkus:quarkus-apicurio-registry-json-schema")
implementation("io.apicurio:apicurio-registry-serdes-json-schema-serde") {
exclude group: "io.apicurio", module: "apicurio-common-rest-client-jdk"
exclude group: "io.apicurio", module: "apicurio-registry-client"
exclude group: "io.apicurio", module: "apicurio-registry-common"
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-registry-client") {
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-registry-common") {
version {
strictly "2.1.5.Final"
}
}
implementation("io.apicurio:apicurio-common-rest-client-vertx") {
version {
strictly "0.1.5.Final"
}
}
}
Known previous compatible versions for apicurio-registry-client
and apicurio-common-rest-client-vertx
are the following
-
apicurio-registry-client
2.1.5.Final withapicurio-common-rest-client-vertx
0.1.5.Final -
apicurio-registry-client
2.3.1.Final withapicurio-common-rest-client-vertx
0.1.13.Final
Using the Confluent Schema Registry
If you want to use the Confluent Schema Registry, you need the quarkus-confluent-registry-json-schema
extension, instead of the quarkus-apicurio-registry-json-schema
extension.
Also, you need to add a few dependencies and a custom Maven repository to your pom.xml
/ build.gradle
file:
<dependencies>
...
<!-- the extension -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-json-schema</artifactId>
</dependency>
<!-- Confluent registry libraries use Jakarta REST client -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-json-schema-serializer</artifactId>
<version>7.2.0</version>
<exclusions>
<exclusion>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<repositories>
<!-- io.confluent:kafka-json-schema-serializer is only available from this repository: -->
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
repositories {
...
maven {
url "https://packages.confluent.io/maven/"
}
}
dependencies {
...
implementation("io.quarkus:quarkus-confluent-registry-json-schema")
// Confluent registry libraries use Jakarta REST client
implementation("io.quarkus:quarkus-rest-client")
implementation("io.confluent:kafka-json-schema-serializer:7.2.0") {
exclude group: "jakarta.ws.rs", module: "jakarta.ws.rs-api"
}
}
In JVM mode, any version of io.confluent:kafka-json-schema-serializer
can be used.
In native mode, Quarkus supports the following versions: 6.2.x
, 7.0.x
, 7.1.x
, 7.2.x
, 7.3.x
.
For version 7.4.x
and 7.5.x
, due to an issue with the Confluent Schema Serializer, you need to add another dependency:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
</dependency>
dependencies {
implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-csv")
}
For any other versions, the native configuration may need to be adjusted.
Further reading
-
SmallRye Reactive Messaging Kafka documentation