Testing reactive Java microservices

duration 20 minutes

Prerequisites:

Learn how to test reactive Java microservices in true-to-production environments using Testcontainers.

What you’ll learn

You will learn how to write integration tests for reactive Java microservices and to run the tests in true-to-production environments by using containers with Testcontainers and JUnit. Testcontainers tests your containerized application from outside the container so that you are testing the exact same image that runs in production. The reactive application in this guide sends and receives messages between services by using an external message broker, Apache Kafka. Using an external message broker enables asynchronous communications between services so that requests are non-blocking and decoupled from responses. You can learn more about reactive Java services that use an external message broker to manage communications in the Creating reactive Java microservices guide.

Reactive system inventory application

True-to-production integration testing with Testcontainers

Tests sometimes pass during the development and testing stages of an application’s lifecycle but then fail in production because of differences between your development and production environments. While you can create mock objects and custom setups to minimize differences between environments, it is difficult to mimic a production system for an application that uses an external messaging system. Testcontainers addresses this problem by enabling the testing of applications in the same Docker containers that you’ll use in production. As a result, your environment remains the same throughout the application’s lifecycle – from development, through testing, and into production. You can learn more about Testcontainers in the Building true-to-production integration tests with Testcontainers guide.

Additional prerequisites

You need to have Docker installed. For installation instructions, refer to the official Docker documentation. You will build and run the microservices in Docker containers. An installation of Apache Kafka is provided in another Docker container.

Getting started

The fastest way to work through this guide is to clone the Git repository and use the projects that are provided inside:

git clone https://github.com/openliberty/guide-reactive-service-testing.git
cd guide-reactive-service-testing

The start directory contains the starting project that you will build upon.

The finish directory contains the finished project that you will build.

Before you begin, make sure you have all the necessary prerequisites.

Try what you’ll build

The finish directory in the root of this guide contains the finished application. Give it a try before you proceed.

To try out the tests, go to the finish directory and run the following Maven goal to install the models artifact to the local Maven repository:

cd finish
mvn -pl models install

Next, navigate to the finish/system directory and run the following Maven goal to build the system microservice and run the integration tests on an Open Liberty server in a container:

cd system
mvn verify
export TESTCONTAINERS_RYUK_DISABLED=true
cd system
mvn verify

You will see the following output:

 Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 52.46 s - in it.io.openliberty.guides.system.SystemServiceIT

 Results:

 Tests run: 1, Failures: 0, Errors: 0, Skipped: 0


 --- failsafe:3.2.5:verify (verify) @ system ---
 ------------------------------------------------------------------------
 BUILD SUCCESS
 ------------------------------------------------------------------------
 Total time:  57.710 s
 Finished at: 2024-02-01T08:48:15-08:00
 ------------------------------------------------------------------------

This command might take some time to run the first time because the dependencies and the Docker image for Open Liberty must download. If you run the same command again, it will be faster.

You can also try out the inventory integration tests by repeating the same commands in the finish/inventory directory.

Testing with the Kafka consumer client

system/pom.xml

  1<?xml version='1.0' encoding='utf-8'?>
  2<project xmlns="http://maven.apache.org/POM/4.0.0"
  3    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5    <modelVersion>4.0.0</modelVersion>
  6
  7    <groupId>io.openliberty.guides</groupId>
  8    <artifactId>system</artifactId>
  9    <version>1.0-SNAPSHOT</version>
 10    <packaging>war</packaging>
 11
 12    <properties>
 13        <maven.compiler.source>11</maven.compiler.source>
 14        <maven.compiler.target>11</maven.compiler.target>
 15        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 16        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
 17        <!-- Liberty configuration -->
 18        <liberty.var.http.port>9083</liberty.var.http.port>
 19        <liberty.var.https.port>9446</liberty.var.https.port>
 20    </properties>
 21
 22    <dependencies>
 23        <!-- Provided dependencies -->
 24        <dependency>
 25            <groupId>jakarta.platform</groupId>
 26            <artifactId>jakarta.jakartaee-api</artifactId>
 27            <version>10.0.0</version>
 28            <scope>provided</scope>
 29        </dependency>
 30        <dependency>
 31            <groupId>org.eclipse.microprofile</groupId>
 32            <artifactId>microprofile</artifactId>
 33            <version>6.1</version>
 34            <type>pom</type>
 35            <scope>provided</scope>
 36        </dependency>
 37        <dependency>
 38            <groupId>org.eclipse.microprofile.reactive.messaging</groupId>
 39            <artifactId>microprofile-reactive-messaging-api</artifactId>
 40            <version>3.0</version>
 41            <scope>provided</scope>
 42        </dependency>
 43
 44        <!-- Required dependencies -->
 45        <dependency>
 46            <groupId>io.openliberty.guides</groupId>
 47            <artifactId>models</artifactId>
 48            <version>1.0-SNAPSHOT</version>
 49        </dependency>
 50        <dependency>
 51            <groupId>org.apache.kafka</groupId>
 52            <artifactId>kafka-clients</artifactId>
 53            <version>3.9.0</version>
 54        </dependency>
 55        <dependency>
 56            <groupId>io.reactivex.rxjava3</groupId>
 57            <artifactId>rxjava</artifactId>
 58            <version>3.1.10</version>
 59        </dependency>
 60
 61        <!-- For tests -->
 62        <dependency>
 63            <groupId>org.slf4j</groupId>
 64            <artifactId>slf4j-api</artifactId>
 65            <version>2.0.16</version>
 66        </dependency>
 67        <dependency>
 68            <groupId>org.slf4j</groupId>
 69            <artifactId>slf4j-simple</artifactId>
 70            <version>2.0.16</version>
 71        </dependency>
 72        <!-- tag::dependencies[] -->
 73        <dependency>
 74            <groupId>org.testcontainers</groupId>
 75            <artifactId>kafka</artifactId>
 76            <version>1.20.4</version>
 77            <scope>test</scope>
 78        </dependency>
 79        <dependency>
 80            <groupId>org.junit.jupiter</groupId>
 81            <artifactId>junit-jupiter</artifactId>
 82            <version>5.11.3</version>
 83            <scope>test</scope>
 84        </dependency>
 85        <dependency>
 86            <groupId>org.testcontainers</groupId>
 87            <artifactId>junit-jupiter</artifactId>
 88            <version>1.20.4</version>
 89            <scope>test</scope>
 90        </dependency>
 91        <!-- end::dependencies[] -->
 92    </dependencies>
 93
 94    <build>
 95        <finalName>${project.artifactId}</finalName>
 96        <plugins>
 97            <plugin>
 98                <groupId>org.apache.maven.plugins</groupId>
 99                <artifactId>maven-war-plugin</artifactId>
100                <version>3.4.0</version>
101                <configuration>
102                    <packagingExcludes>pom.xml</packagingExcludes>
103                </configuration>
104            </plugin>
105
106            <!-- Liberty plugin -->
107            <plugin>
108                <groupId>io.openliberty.tools</groupId>
109                <artifactId>liberty-maven-plugin</artifactId>
110                <version>3.11.1</version>
111                <configuration>
112                    <!-- tag::devc_config[] -->
113                    <!-- devc config -->
114                    <containerRunOpts>
115                        -p 9083:9083
116                        <!-- tag::reactive-app[] -->
117                        --network=reactive-app
118                        <!-- end::reactive-app[] -->
119                    </containerRunOpts>
120                    <!-- end::devc_config[] -->
121                </configuration>
122            </plugin>
123
124            <!-- Plugin to run unit tests -->
125            <plugin>
126                <groupId>org.apache.maven.plugins</groupId>
127                <artifactId>maven-surefire-plugin</artifactId>
128                <version>3.5.2</version>
129            </plugin>
130
131            <!-- Plugin to run integration tests -->
132            <plugin>
133                <groupId>org.apache.maven.plugins</groupId>
134                <artifactId>maven-failsafe-plugin</artifactId>
135                <version>3.5.2</version>
136                <executions>
137                    <execution>
138                        <id>integration-test</id>
139                        <goals>
140                            <goal>integration-test</goal>
141                        </goals>
142                        <configuration>
143                            <trimStackTrace>false</trimStackTrace>
144                        </configuration>
145                    </execution>
146                    <execution>
147                        <id>verify</id>
148                        <goals>
149                            <goal>verify</goal>
150                        </goals>
151                    </execution>
152                </executions>
153            </plugin>
154        </plugins>
155    </build>
156</project>

inventory/pom.xml

  1<?xml version='1.0' encoding='utf-8'?>
  2<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3    <modelVersion>4.0.0</modelVersion>
  4
  5    <groupId>io.openliberty.guides</groupId>
  6    <artifactId>inventory</artifactId>
  7    <version>1.0-SNAPSHOT</version>
  8    <packaging>war</packaging>
  9
 10    <properties>
 11        <maven.compiler.source>11</maven.compiler.source>
 12        <maven.compiler.target>11</maven.compiler.target>
 13        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 14        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
 15        <!-- Liberty configuration -->
 16        <liberty.var.http.port>9085</liberty.var.http.port>
 17        <liberty.var.https.port>9448</liberty.var.https.port>
 18    </properties>
 19    
 20    <dependencies>
 21        <!-- Provided dependencies -->
 22        <dependency>
 23            <groupId>jakarta.platform</groupId>
 24            <artifactId>jakarta.jakartaee-api</artifactId>
 25            <version>10.0.0</version>
 26            <scope>provided</scope>
 27        </dependency>
 28        <dependency>
 29            <groupId>jakarta.enterprise.concurrent</groupId>
 30            <artifactId>jakarta.enterprise.concurrent-api</artifactId>
 31            <version>3.0.3</version>
 32            <scope>provided</scope>
 33        </dependency>
 34        <dependency>
 35            <groupId>org.eclipse.microprofile</groupId>
 36            <artifactId>microprofile</artifactId>
 37            <version>6.1</version>
 38            <type>pom</type>
 39            <scope>provided</scope>
 40        </dependency>
 41        <dependency>
 42            <groupId>org.eclipse.microprofile.reactive.messaging</groupId>
 43            <artifactId>microprofile-reactive-messaging-api</artifactId>
 44            <version>3.0</version>
 45            <scope>provided</scope>
 46        </dependency>
 47        
 48        <!--  Required dependencies -->
 49        <dependency>
 50           <groupId>io.openliberty.guides</groupId>
 51           <artifactId>models</artifactId>
 52           <version>1.0-SNAPSHOT</version>
 53        </dependency>
 54        <dependency>
 55            <groupId>org.apache.kafka</groupId>
 56            <artifactId>kafka-clients</artifactId>
 57            <version>3.9.0</version>
 58        </dependency>
 59
 60        <!-- For tests -->
 61        <dependency>
 62            <groupId>org.slf4j</groupId>
 63            <artifactId>slf4j-api</artifactId>
 64            <version>2.0.16</version>
 65        </dependency>
 66        <dependency>
 67            <groupId>org.slf4j</groupId>
 68            <artifactId>slf4j-simple</artifactId>
 69            <version>2.0.16</version>
 70        </dependency>
 71        <!-- tag::dependencies[] -->
 72        <dependency>
 73            <groupId>org.testcontainers</groupId>
 74            <artifactId>kafka</artifactId>
 75            <version>1.20.4</version>
 76            <scope>test</scope>
 77        </dependency>
 78        <dependency>
 79            <groupId>org.junit.jupiter</groupId>
 80            <artifactId>junit-jupiter</artifactId>
 81            <version>5.11.3</version>
 82            <scope>test</scope>
 83        </dependency>
 84        <dependency>
 85            <groupId>org.testcontainers</groupId>
 86            <artifactId>junit-jupiter</artifactId>
 87            <version>1.20.4</version>
 88            <scope>test</scope>
 89        </dependency>
 90        <!-- end::dependencies[] -->
 91        <dependency>
 92            <groupId>org.jboss.resteasy</groupId>
 93            <artifactId>resteasy-client</artifactId>
 94            <version>6.2.11.Final</version>
 95            <scope>test</scope>
 96        </dependency>
 97        <dependency>
 98            <groupId>org.jboss.resteasy</groupId>
 99            <artifactId>resteasy-json-binding-provider</artifactId>
100            <version>6.2.11.Final</version>
101            <scope>test</scope>
102        </dependency>
103    </dependencies>
104
105    <build>
106        <finalName>${project.artifactId}</finalName>
107        <plugins>
108            <plugin>
109                <groupId>org.apache.maven.plugins</groupId>
110                <artifactId>maven-war-plugin</artifactId>
111                <version>3.4.0</version>
112                <configuration>
113                    <packagingExcludes>pom.xml</packagingExcludes>
114                </configuration>
115            </plugin>
116
117            <!-- Liberty plugin -->
118            <plugin>
119                <groupId>io.openliberty.tools</groupId>
120                <artifactId>liberty-maven-plugin</artifactId>
121                <version>3.11.1</version>
122                <configuration>
123                    <!-- devc config -->
124                    <containerRunOpts>
125                        -p 9085:9085
126                        --network=reactive-app
127                    </containerRunOpts>
128                </configuration>
129            </plugin>
130
131            <!-- Plugin to run unit tests -->
132            <plugin>
133                <groupId>org.apache.maven.plugins</groupId>
134                <artifactId>maven-surefire-plugin</artifactId>
135                <version>3.5.2</version>
136            </plugin>
137
138            <!-- Plugin to run integration tests -->
139            <plugin>
140                <groupId>org.apache.maven.plugins</groupId>
141                <artifactId>maven-failsafe-plugin</artifactId>
142                <version>3.5.2</version>
143                <executions>
144                    <execution>
145                        <goals>
146                            <goal>integration-test</goal>
147                            <goal>verify</goal>
148                        </goals>
149                    </execution>
150                </executions>
151            </plugin>
152        </plugins>
153    </build>
154</project>

system/microprofile-config.properties

 1# Liberty Kafka connector
 2# tag::kafkaConfig[]
 3mp.messaging.connector.liberty-kafka.bootstrap.servers=kafka:9092
 4# end::kafkaConfig[]
 5
 6# systemLoad stream
 7mp.messaging.outgoing.systemLoad.connector=liberty-kafka
 8mp.messaging.outgoing.systemLoad.topic=system.load
 9mp.messaging.outgoing.systemLoad.key.serializer=org.apache.kafka.common.serialization.StringSerializer
10mp.messaging.outgoing.systemLoad.value.serializer=io.openliberty.guides.models.SystemLoad$SystemLoadSerializer

startKafka.sh

 1#!/bin/bash
 2
 3# tag::dockerNetworkSetup[]
 4NETWORK=reactive-app
 5docker network create $NETWORK
 6# end::dockerNetworkSetup[]
 7
 8docker run -d \
 9    -e ALLOW_PLAINTEXT_LISTENER=yes \
10    -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,LOCAL://localhost:9094 \
11    -e KAFKA_CFG_NODE_ID=0 \
12    -e KAFKA_CFG_PROCESS_ROLES=controller,broker \
13    -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,LOCAL://:9094 \
14    -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,LOCAL:PLAINTEXT \
15    -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 \
16    -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
17    -p 9094:9094 \
18    --network=$NETWORK \
19    --name=kafka \
20    --rm \
21    bitnami/kafka:latest &
22
23wait

SystemServiceIT.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2020, 2024 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License 2.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-2.0/
  8 *
  9 * SPDX-License-Identifier: EPL-2.0
 10 *******************************************************************************/
 11// end::copyright[]
 12package it.io.openliberty.guides.system;
 13
 14import java.net.Socket;
 15import java.time.Duration;
 16import java.util.Collections;
 17import java.util.Properties;
 18import java.nio.file.Paths;
 19
 20import org.slf4j.Logger;
 21import org.slf4j.LoggerFactory;
 22import org.junit.jupiter.api.Test;
 23import org.junit.jupiter.api.AfterAll;
 24import org.junit.jupiter.api.AfterEach;
 25import org.junit.jupiter.api.BeforeAll;
 26import org.junit.jupiter.api.BeforeEach;
 27import org.testcontainers.junit.jupiter.Testcontainers;
 28import static org.junit.jupiter.api.Assertions.assertNotNull;
 29
 30import org.testcontainers.containers.KafkaContainer;
 31import org.testcontainers.containers.Network;
 32import org.testcontainers.containers.wait.strategy.Wait;
 33import org.testcontainers.images.builder.ImageFromDockerfile;
 34import org.testcontainers.containers.GenericContainer;
 35import org.testcontainers.containers.output.Slf4jLogConsumer;
 36import org.testcontainers.utility.DockerImageName;
 37
 38import org.apache.kafka.clients.consumer.ConsumerConfig;
 39import org.apache.kafka.clients.consumer.ConsumerRecord;
 40import org.apache.kafka.clients.consumer.ConsumerRecords;
 41// tag::KafkaConsumer[]
 42import org.apache.kafka.clients.consumer.KafkaConsumer;
 43// end::KafkaConsumer[]
 44import org.apache.kafka.common.serialization.StringDeserializer;
 45
 46import io.openliberty.guides.models.SystemLoad;
 47import io.openliberty.guides.models.SystemLoad.SystemLoadDeserializer;
 48
 49@Testcontainers
 50public class SystemServiceIT {
 51
 52    private static Logger logger = LoggerFactory.getLogger(SystemServiceIT.class);
 53    // tag::network1[]
 54    private static Network network = Network.newNetwork();
 55    // end::network1[]
 56
 57    // tag::KafkaConsumerUsage[]
 58    public static KafkaConsumer<String, SystemLoad> consumer;
 59    // end::KafkaConsumerUsage[]
 60
 61    // tag::buildSystemImage[]
 62    private static ImageFromDockerfile systemImage =
 63        new ImageFromDockerfile("system:1.0-SNAPSHOT")
 64            .withDockerfile(Paths.get("./Dockerfile"));
 65    // end::buildSystemImage[]
 66
 67    // tag::kafkaContainer[]
 68    private static KafkaContainer kafkaContainer = new KafkaContainer(
 69        DockerImageName.parse("confluentinc/cp-kafka:latest"))
 70            // tag::withListener[]
 71            .withListener(() -> "kafka:19092")
 72            // end::withListener[]
 73            // tag::network2[]
 74            .withNetwork(network);
 75            // end::network2[]
 76    // end::kafkaContainer[]
 77
 78    // tag::systemContainer[]
 79    private static GenericContainer<?> systemContainer =
 80        new GenericContainer(systemImage)
 81            // tag::network3[]
 82            .withNetwork(network)
 83            // end::network3[]
 84            // tag::systemPortExpose[]
 85            .withExposedPorts(9083)
 86            // end::systemPortExpose[]
 87            .waitingFor(Wait.forHttp("/health/ready").forPort(9083))
 88            .withStartupTimeout(Duration.ofMinutes(3))
 89            .withLogConsumer(new Slf4jLogConsumer(logger))
 90            // tag::dependsOn[]
 91            .dependsOn(kafkaContainer);
 92            // end::dependsOn[]
 93    // end::systemContainer[]
 94
 95    // tag::isServiceRunning[]
 96    private static boolean isServiceRunning(String host, int port) {
 97        try {
 98            Socket socket = new Socket(host, port);
 99            socket.close();
100            return true;
101        } catch (Exception e) {
102            return false;
103        }
104    }
105    // end::isServiceRunning[]
106
107    @BeforeAll
108    public static void startContainers() {
109        if (isServiceRunning("localhost", 9083)) {
110            System.out.println("Testing with mvn liberty:devc");
111        } else {
112            kafkaContainer.start();
113            // tag::bootstrapServerSetup[]
114            systemContainer.withEnv(
115                "mp.messaging.connector.liberty-kafka.bootstrap.servers",
116                "kafka:19092");
117            // end::bootstrapServerSetup[]
118            systemContainer.start();
119            System.out.println("Testing with mvn verify");
120        }
121    }
122
123    @BeforeEach
124    public void createKafkaConsumer() {
125        // tag::KafkaConsumer2[]
126        // tag::KafkaConsumerProps[]
127        Properties consumerProps = new Properties();
128        if (isServiceRunning("localhost", 9083)) {
129            // tag::BootstrapSetting1[]
130            consumerProps.put(
131                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
132                "localhost:9094");
133            // end::BootstrapSetting1[]
134        } else {
135            consumerProps.put(
136                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
137                // tag::BootstrapSetting2[]
138                kafkaContainer.getBootstrapServers());
139                // end::BootstrapSetting2[]
140        }
141        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "system-load-status");
142        consumerProps.put(
143            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
144            StringDeserializer.class.getName());
145        // tag::valueDeserializer[]
146        consumerProps.put(
147            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
148            SystemLoadDeserializer.class.getName());
149        // end::valueDeserializer[]
150        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
151        // end::KafkaConsumerProps[]
152        consumer = new KafkaConsumer<String, SystemLoad>(consumerProps);
153        // tag::systemLoadTopic[]
154        consumer.subscribe(Collections.singletonList("system.load"));
155        // end::systemLoadTopic[]
156        // end::KafkaConsumer2[]
157    }
158
159    @AfterAll
160    public static void stopContainers() {
161        systemContainer.stop();
162        kafkaContainer.stop();
163        if (network != null) {
164            network.close();
165        }
166    }
167
168    @AfterEach
169    public void closeKafkaConsumer() {
170        consumer.close();
171    }
172
173    // tag::testCpuStatus[]
174    @Test
175    public void testCpuStatus() {
176        // tag::poll[]
177        ConsumerRecords<String, SystemLoad> records =
178            consumer.poll(Duration.ofMillis(30 * 1000));
179        // end::poll[]
180        System.out.println("Polled " + records.count() + " records from Kafka:");
181
182        for (ConsumerRecord<String, SystemLoad> record : records) {
183            SystemLoad sl = record.value();
184            System.out.println(sl);
185            // tag::assert[]
186            assertNotNull(sl.hostname);
187            assertNotNull(sl.loadAverage);
188            // end::assert[]
189        }
190        consumer.commitAsync();
191    }
192    // end::testCpuStatus[]
193}

Navigate to the start directory to begin.

The example reactive application consists of the system and inventory microservices. The system microservice produces messages to the Kafka message broker, and the inventory microservice consumes messages from the Kafka message broker. You will write integration tests to see how you can use the Kafka consumer and producer client APIs to test each service. Kafka test containers, Testcontainers, and JUnit are already included as required test dependencies in your Maven pom.xml files for the system and inventory microservices.

The start directory contains three directories: the system microservice directory, the inventory microservice directory, and the models directory. The models directory contains the model class that defines the structure of the system load data that is used in the application. Run the following Maven goal to install the packaged models artifact to the local Maven repository so it can be used later by the system and inventory microservices:

mvn -pl models install

Launching the system microservice in dev mode with container support

Start the microservices in dev mode by running the following command to launch a Kafka instance that replicates the production environment. The startKafka script launches a local Kafka container. It also establishes a reactive-app network that allows the system and inventory microservices to connect to the Kafka message broker.

.\scripts\startKafka.bat
./scripts/startKafka.sh

Navigate to the start/system directory.

To launch the system microservice in dev mode with container support, configure the container by specifying the options within the <containerRunOpts> element to connect to the reactive-app network and expose the container port.

Run the following goal to start the system microservice in dev mode with container support:

mvn liberty:devc
export TESTCONTAINERS_RYUK_DISABLED=true
mvn liberty:devc

For more information about disabling Ryuk, see the Testcontainers custom configuration document.

After you see the following message, your Liberty instance is ready in dev mode:

**************************************************************
*    Liberty is running in dev mode.
*    ...
*    Liberty container port information:
*        Internal container HTTP port [ 9083 ] is mapped to container host port [ 9083 ] <
*   ...

Dev mode holds your command-line session to listen for file changes. Open another command-line session to continue, or open the project in your editor.

The system microservice actively seeks a Kafka topic for message push operations. After the Kafka service starts, the system microservice connects to the Kafka message broker by using the mp.messaging.connector.liberty-kafka.bootstrap.servers property. When you run your application in dev mode with container support, the running system container exposes its service on the 9083 port for testing purposes.

Testing the system microservice

Now you can start writing the test by using Testcontainers.

Create the SystemServiceIT class.
system/src/test/java/it/io/openliberty/guides/system/SystemServiceIT.java

SystemServiceIT.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2020, 2024 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License 2.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-2.0/
  8 *
  9 * SPDX-License-Identifier: EPL-2.0
 10 *******************************************************************************/
 11// end::copyright[]
 12package it.io.openliberty.guides.system;
 13
 14import java.net.Socket;
 15import java.time.Duration;
 16import java.util.Collections;
 17import java.util.Properties;
 18import java.nio.file.Paths;
 19
 20import org.slf4j.Logger;
 21import org.slf4j.LoggerFactory;
 22import org.junit.jupiter.api.Test;
 23import org.junit.jupiter.api.AfterAll;
 24import org.junit.jupiter.api.AfterEach;
 25import org.junit.jupiter.api.BeforeAll;
 26import org.junit.jupiter.api.BeforeEach;
 27import org.testcontainers.junit.jupiter.Testcontainers;
 28import static org.junit.jupiter.api.Assertions.assertNotNull;
 29
 30import org.testcontainers.containers.KafkaContainer;
 31import org.testcontainers.containers.Network;
 32import org.testcontainers.containers.wait.strategy.Wait;
 33import org.testcontainers.images.builder.ImageFromDockerfile;
 34import org.testcontainers.containers.GenericContainer;
 35import org.testcontainers.containers.output.Slf4jLogConsumer;
 36import org.testcontainers.utility.DockerImageName;
 37
 38import org.apache.kafka.clients.consumer.ConsumerConfig;
 39import org.apache.kafka.clients.consumer.ConsumerRecord;
 40import org.apache.kafka.clients.consumer.ConsumerRecords;
 41// tag::KafkaConsumer[]
 42import org.apache.kafka.clients.consumer.KafkaConsumer;
 43// end::KafkaConsumer[]
 44import org.apache.kafka.common.serialization.StringDeserializer;
 45
 46import io.openliberty.guides.models.SystemLoad;
 47import io.openliberty.guides.models.SystemLoad.SystemLoadDeserializer;
 48
 49@Testcontainers
 50public class SystemServiceIT {
 51
 52    private static Logger logger = LoggerFactory.getLogger(SystemServiceIT.class);
 53    // tag::network1[]
 54    private static Network network = Network.newNetwork();
 55    // end::network1[]
 56
 57    // tag::KafkaConsumerUsage[]
 58    public static KafkaConsumer<String, SystemLoad> consumer;
 59    // end::KafkaConsumerUsage[]
 60
 61    // tag::buildSystemImage[]
 62    private static ImageFromDockerfile systemImage =
 63        new ImageFromDockerfile("system:1.0-SNAPSHOT")
 64            .withDockerfile(Paths.get("./Dockerfile"));
 65    // end::buildSystemImage[]
 66
 67    // tag::kafkaContainer[]
 68    private static KafkaContainer kafkaContainer = new KafkaContainer(
 69        DockerImageName.parse("confluentinc/cp-kafka:latest"))
 70            // tag::withListener[]
 71            .withListener(() -> "kafka:19092")
 72            // end::withListener[]
 73            // tag::network2[]
 74            .withNetwork(network);
 75            // end::network2[]
 76    // end::kafkaContainer[]
 77
 78    // tag::systemContainer[]
 79    private static GenericContainer<?> systemContainer =
 80        new GenericContainer(systemImage)
 81            // tag::network3[]
 82            .withNetwork(network)
 83            // end::network3[]
 84            // tag::systemPortExpose[]
 85            .withExposedPorts(9083)
 86            // end::systemPortExpose[]
 87            .waitingFor(Wait.forHttp("/health/ready").forPort(9083))
 88            .withStartupTimeout(Duration.ofMinutes(3))
 89            .withLogConsumer(new Slf4jLogConsumer(logger))
 90            // tag::dependsOn[]
 91            .dependsOn(kafkaContainer);
 92            // end::dependsOn[]
 93    // end::systemContainer[]
 94
 95    // tag::isServiceRunning[]
 96    private static boolean isServiceRunning(String host, int port) {
 97        try {
 98            Socket socket = new Socket(host, port);
 99            socket.close();
100            return true;
101        } catch (Exception e) {
102            return false;
103        }
104    }
105    // end::isServiceRunning[]
106
107    @BeforeAll
108    public static void startContainers() {
109        if (isServiceRunning("localhost", 9083)) {
110            System.out.println("Testing with mvn liberty:devc");
111        } else {
112            kafkaContainer.start();
113            // tag::bootstrapServerSetup[]
114            systemContainer.withEnv(
115                "mp.messaging.connector.liberty-kafka.bootstrap.servers",
116                "kafka:19092");
117            // end::bootstrapServerSetup[]
118            systemContainer.start();
119            System.out.println("Testing with mvn verify");
120        }
121    }
122
123    @BeforeEach
124    public void createKafkaConsumer() {
125        // tag::KafkaConsumer2[]
126        // tag::KafkaConsumerProps[]
127        Properties consumerProps = new Properties();
128        if (isServiceRunning("localhost", 9083)) {
129            // tag::BootstrapSetting1[]
130            consumerProps.put(
131                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
132                "localhost:9094");
133            // end::BootstrapSetting1[]
134        } else {
135            consumerProps.put(
136                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
137                // tag::BootstrapSetting2[]
138                kafkaContainer.getBootstrapServers());
139                // end::BootstrapSetting2[]
140        }
141        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "system-load-status");
142        consumerProps.put(
143            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
144            StringDeserializer.class.getName());
145        // tag::valueDeserializer[]
146        consumerProps.put(
147            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
148            SystemLoadDeserializer.class.getName());
149        // end::valueDeserializer[]
150        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
151        // end::KafkaConsumerProps[]
152        consumer = new KafkaConsumer<String, SystemLoad>(consumerProps);
153        // tag::systemLoadTopic[]
154        consumer.subscribe(Collections.singletonList("system.load"));
155        // end::systemLoadTopic[]
156        // end::KafkaConsumer2[]
157    }
158
159    @AfterAll
160    public static void stopContainers() {
161        systemContainer.stop();
162        kafkaContainer.stop();
163        if (network != null) {
164            network.close();
165        }
166    }
167
168    @AfterEach
169    public void closeKafkaConsumer() {
170        consumer.close();
171    }
172
173    // tag::testCpuStatus[]
174    @Test
175    public void testCpuStatus() {
176        // tag::poll[]
177        ConsumerRecords<String, SystemLoad> records =
178            consumer.poll(Duration.ofMillis(30 * 1000));
179        // end::poll[]
180        System.out.println("Polled " + records.count() + " records from Kafka:");
181
182        for (ConsumerRecord<String, SystemLoad> record : records) {
183            SystemLoad sl = record.value();
184            System.out.println(sl);
185            // tag::assert[]
186            assertNotNull(sl.hostname);
187            assertNotNull(sl.loadAverage);
188            // end::assert[]
189        }
190        consumer.commitAsync();
191    }
192    // end::testCpuStatus[]
193}

SystemLoad.java

 1// tag::copyright[]
 2/*******************************************************************************
 3 * Copyright (c) 2020, 2024 IBM Corporation and others.
 4 * All rights reserved. This program and the accompanying materials
 5 * are made available under the terms of the Eclipse Public License 2.0
 6 * which accompanies this distribution, and is available at
 7 * http://www.eclipse.org/legal/epl-2.0/
 8 *
 9 * SPDX-License-Identifier: EPL-2.0
10 *******************************************************************************/
11// end::copyright[]
12package io.openliberty.guides.models;
13
14import java.util.Objects;
15
16import jakarta.json.bind.Jsonb;
17import jakarta.json.bind.JsonbBuilder;
18
19import org.apache.kafka.common.serialization.Deserializer;
20import org.apache.kafka.common.serialization.Serializer;
21
22public class SystemLoad {
23
24    private static final Jsonb JSONB = JsonbBuilder.create();
25
26    public String hostname;
27    public Double loadAverage;
28
29    public SystemLoad(String hostname, Double cpuLoadAvg) {
30        this.hostname = hostname;
31        this.loadAverage = cpuLoadAvg;
32    }
33
34    public SystemLoad() {
35    }
36
37    @Override
38    public boolean equals(Object o) {
39        if (this == o) {
40            return true;
41        }
42        if (!(o instanceof SystemLoad)) {
43            return false;
44        }
45        SystemLoad sl = (SystemLoad) o;
46        return Objects.equals(hostname, sl.hostname)
47               && Objects.equals(loadAverage, sl.loadAverage);
48    }
49
50    @Override
51    public int hashCode() {
52        return Objects.hash(hostname, loadAverage);
53    }
54
55    @Override
56    public String toString() {
57        return "CpuLoadAverage: " + JSONB.toJson(this);
58    }
59
60    // tag::SystemLoadSerializer[]
61    public static class SystemLoadSerializer implements Serializer<Object> {
62        @Override
63        public byte[] serialize(String topic, Object data) {
64          return JSONB.toJson(data).getBytes();
65        }
66    }
67    // end::SystemLoadSerializer[]
68
69    // tag::SystemLoadDeserializer[]
70    public static class SystemLoadDeserializer implements Deserializer<SystemLoad> {
71        @Override
72        public SystemLoad deserialize(String topic, byte[] data) {
73            if (data == null) {
74                return null;
75            }
76            return JSONB.fromJson(new String(data), SystemLoad.class);
77        }
78    }
79    // end::SystemLoadDeserializer[]
80}

SystemService.java

 1// tag::copyright[]
 2/*******************************************************************************
 3 * Copyright (c) 2020, 2024 IBM Corporation and others.
 4 * All rights reserved. This program and the accompanying materials
 5 * are made available under the terms of the Eclipse Public License 2.0
 6 * which accompanies this distribution, and is available at
 7 * http://www.eclipse.org/legal/epl-2.0/
 8 *
 9 * SPDX-License-Identifier: EPL-2.0
10 *******************************************************************************/
11// end::copyright[]
12package io.openliberty.guides.system;
13
14import java.lang.management.ManagementFactory;
15import java.lang.management.OperatingSystemMXBean;
16import java.net.InetAddress;
17import java.net.UnknownHostException;
18import java.util.concurrent.TimeUnit;
19
20import jakarta.enterprise.context.ApplicationScoped;
21
22import org.eclipse.microprofile.reactive.messaging.Outgoing;
23import org.reactivestreams.Publisher;
24
25import io.openliberty.guides.models.SystemLoad;
26import io.reactivex.rxjava3.core.Flowable;
27
28@ApplicationScoped
29public class SystemService {
30
31    private static final OperatingSystemMXBean OS_MEAN =
32            ManagementFactory.getOperatingSystemMXBean();
33    private static String hostname = null;
34
35    private static String getHostname() {
36        if (hostname == null) {
37            try {
38                return InetAddress.getLocalHost().getHostName();
39            } catch (UnknownHostException e) {
40                return System.getenv("HOSTNAME");
41            }
42        }
43        return hostname;
44    }
45
46    // tag::systemLoad[]
47    // tag::Outgoing[]
48    @Outgoing("systemLoad")
49    // end::Outgoing[]
50    public Publisher<SystemLoad> sendSystemLoad() {
51        return Flowable.interval(15, TimeUnit.SECONDS)
52                       .map((interval -> new SystemLoad(getHostname(),
53                             OS_MEAN.getSystemLoadAverage())));
54    }
55    // end::systemLoad[]
56
57}

Dockerfile

 1FROM icr.io/appcafe/open-liberty:kernel-slim-java11-openj9-ubi
 2
 3ARG VERSION=1.0
 4ARG REVISION=SNAPSHOT
 5
 6LABEL \
 7  org.opencontainers.image.authors="My Name" \
 8  org.opencontainers.image.vendor="Open Liberty" \
 9  org.opencontainers.image.url="local" \
10  org.opencontainers.image.source="https://github.com/OpenLiberty/guide-reactive-service-testing" \
11  org.opencontainers.image.version="$VERSION" \
12  org.opencontainers.image.revision="$REVISION" \
13  vendor="Open Liberty" \
14  name="system" \
15  version="$VERSION-$REVISION" \
16  summary="The system microservice from the 'Testing reactive Java microservices' guide" \
17  description="This image contains the system microservice running with the Open Liberty runtime."
18
19COPY --chown=1001:0 src/main/liberty/config /config/
20RUN features.sh
21COPY --chown=1001:0 target/system.war /config/apps
22
23EXPOSE 9083 9446
24
25# It is recommended to run the configure.sh script when building the image for production.
26# RUN configure.sh

Construct the systemImage by using the ImageFromDockerfile class, which allows Testcontainers to build the Docker image from a Dockerfile during the test run time. For instance, the provided Dockerfile at the specified ./Dockerfile paths is used to generate the system:1.0-SNAPSHOT image.

Use the kafkaContainer class to instantiate the kafkaContainer test container, initiating the confluentinc/cp-kafka:latest Docker image. Similarly, use the GenericContainer class to create the systemContainer test container, starting the system:1.0-SNAPSHOT Docker image.

The withListener() is configured to kafka:19092, as the containerized system microservice functions as an additional producer. Therefore, the Kafka container needs to set up a listener to accommodate this requirement. For more information about using an additional consumer or producer with a Kafka container, see the Testcontainers Kafka documentation

Because containers are isolated by default, facilitating communication between the kafkaContainer and the systemContainer requires placing them on the same network. The dependsOn() method is used to indicate that the system microservice container starts only after ensuring the readiness of the Kafka container.

Before you start the systemContainer, you must override the mp.messaging.connector.liberty-kafka.bootstrap.servers property with kafka:19092 by using the withEnv() method. This step creates a listener in the Kafka container that is configured to handle an additional producer.

The test uses the KafkaConsumer client API, configuring the consumer to use the BOOTSTRAP_SERVERS_CONFIG property with the Kafka broker address if a local system microservice container is present. In the absence of a local service container, it uses the getBootstrapServers() method to obtain the broker address from the Kafka test container. Then, the consumer is set up to consume messages from the system.load topic within the Kafka container.

To consume messages from a stream, the messages need to be deserialized from bytes. Kafka has its own default deserializer, but a custom deserializer is provided for you. The deserializer is configured by the VALUE_DESERIALIZER_CLASS_CONFIG property and is implemented in the SystemLoad class. To learn more about Kafka APIs and their usage, see the official Kafka Documentation.

The running system microservice container produces messages to the systemLoad Kafka topic, as denoted by the @Outgoing annotation. The testCpuStatus() test method uses the consumer.poll() method from the KafkaConsumer client API to retrieve a record from Kafka every 3 seconds within a specified timeout limit. This record is produced by the system service. Then, the method uses Assertions to verify that the polled record aligns with the expected record.

Running the tests

Because you started Open Liberty in dev mode, you can run the tests by pressing the enter/return key from the command-line session where you started dev mode.

You will see the following output:

 Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 25.674 s - in it.io.openliberty.guides.system.SystemServiceIT

 Results:

 Tests run: 1, Failures: 0, Errors: 0, Skipped: 0

 Integration tests finished.

After you are finished running tests, stop the Open Liberty server by pressing CTRL+C in the command-line session where you ran the server.

If you aren’t running in dev mode, you can run the tests by running the following command:

mvn clean verify

You will see the following output:

 Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 50.63 s - in it.io.openliberty.guides.system.SystemServiceIT

 Results:

 Tests run: 1, Failures: 0, Errors: 0, Skipped: 0


 --- failsafe:3.2.5:verify (verify) @ system ---
 ------------------------------------------------------------------------
 BUILD SUCCESS
 ------------------------------------------------------------------------
 Total time:  55.636 s
 Finished at: 2024-01-31T11:33:40-08:00
 ------------------------------------------------------------------------

Testing with the Kafka producer client

The inventory microservice is tested in the same way as the system microservice. The only difference is that the inventory microservice consumes messages, which means that tests are written to use the Kafka producer client.

Launching the inventory microservice in dev mode with container

Navigate to the start/inventory directory.

Run the following goal to start the inventory microservice in dev mode with container support:

mvn liberty:devc
mvn liberty:devc

Building a test REST client

Create a REST client interface to access the inventory microservice.

Create the InventoryResourceClient class.
inventory/src/test/java/it/io/openliberty/guides/inventory/InventoryResourceClient.java

InventoryResourceClient.java

 1// tag::copyright[]
 2/*******************************************************************************
 3 * Copyright (c) 2024 IBM Corporation and others.
 4 * All rights reserved. This program and the accompanying materials
 5 * are made available under the terms of the Eclipse Public License 2.0
 6 * which accompanies this distribution, and is available at
 7 * http://www.eclipse.org/legal/epl-2.0/
 8 *
 9 * SPDX-License-Identifier: EPL-2.0
10 *******************************************************************************/
11// end::copyright[]
12package it.io.openliberty.guides.inventory;
13
14import jakarta.ws.rs.DELETE;
15import jakarta.ws.rs.GET;
16import jakarta.ws.rs.Path;
17import jakarta.ws.rs.Produces;
18import jakarta.ws.rs.core.MediaType;
19import jakarta.ws.rs.core.Response;
20
21@Path("/inventory")
22public interface InventoryResourceClient {
23
24    // tag::getSystems[]
25    @GET
26    @Path("/systems")
27    @Produces(MediaType.APPLICATION_JSON)
28    Response getSystems();
29    // end::getSystems[]
30
31    // tag::resetSystems[]
32    @DELETE
33    @Produces(MediaType.APPLICATION_JSON)
34    Response resetSystems();
35    // end::resetSystems[]
36
37}

The InventoryResourceClient interface declares the getSystems() and resetSystems() methods for accessing the corresponding endpoints within the inventory microservice.

Testing the inventory microservice

Now you can start writing the test by using Testcontainers.

Create the InventoryServiceIT class.
inventory/src/test/java/it/io/openliberty/guides/inventory/InventoryServiceIT.java

InventoryServiceIT.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2020, 2024 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License 2.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-2.0/
  8 *
  9 * SPDX-License-Identifier: EPL-2.0
 10 *******************************************************************************/
 11// end::copyright[]
 12package it.io.openliberty.guides.inventory;
 13
 14import java.util.List;
 15import java.net.Socket;
 16import java.time.Duration;
 17
 18import static org.junit.jupiter.api.Assertions.assertEquals;
 19
 20import java.math.BigDecimal;
 21import java.nio.file.Paths;
 22import java.util.Properties;
 23
 24import jakarta.ws.rs.core.GenericType;
 25import jakarta.ws.rs.core.Response;
 26import jakarta.ws.rs.core.UriBuilder;
 27import jakarta.ws.rs.client.ClientBuilder;
 28
 29import org.slf4j.Logger;
 30import org.slf4j.LoggerFactory;
 31import org.junit.jupiter.api.Test;
 32import org.junit.jupiter.api.AfterAll;
 33import org.junit.jupiter.api.AfterEach;
 34import org.junit.jupiter.api.BeforeAll;
 35import org.junit.jupiter.api.BeforeEach;
 36import org.junit.jupiter.api.Assertions;
 37import org.testcontainers.containers.Network;
 38import org.testcontainers.utility.DockerImageName;
 39import org.apache.kafka.clients.producer.ProducerRecord;
 40import org.apache.kafka.clients.producer.ProducerConfig;
 41// tag::KafkaProducer[]
 42import org.apache.kafka.clients.producer.KafkaProducer;
 43// end::KafkaProducer[]
 44import org.testcontainers.containers.KafkaContainer;
 45import org.testcontainers.containers.GenericContainer;
 46import org.testcontainers.junit.jupiter.Testcontainers;
 47import org.testcontainers.images.builder.ImageFromDockerfile;
 48import org.apache.kafka.common.serialization.StringSerializer;
 49import org.jboss.resteasy.client.jaxrs.ResteasyClient;
 50import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder;
 51import org.jboss.resteasy.client.jaxrs.ResteasyWebTarget;
 52import org.testcontainers.containers.wait.strategy.Wait;
 53import org.testcontainers.containers.output.Slf4jLogConsumer;
 54
 55import io.openliberty.guides.models.SystemLoad;
 56import io.openliberty.guides.models.SystemLoad.SystemLoadSerializer;
 57
 58
 59@Testcontainers
 60// tag::InventoryServiceIT[]
 61public class InventoryServiceIT {
 62
 63    private static Logger logger = LoggerFactory.getLogger(InventoryServiceIT.class);
 64
 65    public static InventoryResourceClient client;
 66
 67    private static Network network = Network.newNetwork();
 68    // tag::KafkaProducer2[]
 69    public static KafkaProducer<String, SystemLoad> producer;
 70    // end::KafkaProducer2[]
 71    private static ImageFromDockerfile inventoryImage =
 72        new ImageFromDockerfile("inventory:1.0-SNAPSHOT")
 73            .withDockerfile(Paths.get("./Dockerfile"));
 74
 75    private static KafkaContainer kafkaContainer = new KafkaContainer(
 76        DockerImageName.parse("confluentinc/cp-kafka:latest"))
 77            .withListener(() -> "kafka:19092")
 78            .withNetwork(network);
 79
 80    private static GenericContainer<?> inventoryContainer =
 81        new GenericContainer(inventoryImage)
 82            .withNetwork(network)
 83            .withExposedPorts(9085)
 84            .waitingFor(Wait.forHttp("/health/ready").forPort(9085))
 85            .withStartupTimeout(Duration.ofMinutes(3))
 86            .withLogConsumer(new Slf4jLogConsumer(logger))
 87            .dependsOn(kafkaContainer);
 88
 89    // tag::RESTClient[]
 90    private static InventoryResourceClient createRestClient(String urlPath) {
 91        ClientBuilder builder = ResteasyClientBuilder.newBuilder();
 92        // tag::ResteasyClient[]
 93        ResteasyClient client = (ResteasyClient) builder.build();
 94        // end::ResteasyClient[]
 95        ResteasyWebTarget target = client.target(UriBuilder.fromPath(urlPath));
 96        return target.proxy(InventoryResourceClient.class);
 97    }
 98    // end::RESTClient[]
 99
100    // tag::isServiceRunning[]
101    private static boolean isServiceRunning(String host, int port) {
102        try {
103            Socket socket = new Socket(host, port);
104            socket.close();
105            return true;
106        } catch (Exception e) {
107            return false;
108        }
109    }
110    // end::isServiceRunning[]
111
112    @BeforeAll
113    public static void startContainers() {
114
115        String urlPath;
116        if (isServiceRunning("localhost", 9085)) {
117            System.out.println("Testing with mvn liberty:devc");
118            // tag::urlPathSetup1[]
119            urlPath = "http://localhost:9085";
120            // end::urlPathSetup1[]
121        } else {
122            System.out.println("Testing with mvn verify");
123            kafkaContainer.start();
124            inventoryContainer.withEnv(
125                "mp.messaging.connector.liberty-kafka.bootstrap.servers",
126                "kafka:19092");
127            inventoryContainer.start();
128            // tag::urlPathSetup2[]
129            urlPath = "http://"
130                + inventoryContainer.getHost()
131                + ":" + inventoryContainer.getFirstMappedPort();
132            // end::urlPathSetup2[]
133        }
134
135        System.out.println("Creating REST client with: " + urlPath);
136        client = createRestClient(urlPath);
137    }
138
139    @BeforeEach
140    public void createKafkaProducer() {
141        // tag::KafkaProducerProps[]
142        Properties producerProps = new Properties();
143        if (isServiceRunning("localhost", 9085)) {
144            // tag::BootstrapServerConfig[]
145            producerProps.put(
146                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
147                "localhost:9094");
148            // end::BootstrapServerConfig[]
149        } else {
150            // tag::BootstrapServerConfig2[]
151            producerProps.put(
152                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
153                kafkaContainer.getBootstrapServers());
154            // end::BootstrapServerConfig2[]
155        }
156
157        producerProps.put(
158            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
159            StringSerializer.class.getName());
160        producerProps.put(
161            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
162            SystemLoadSerializer.class.getName());
163
164        producer = new KafkaProducer<String, SystemLoad>(producerProps);
165        // end::KafkaProducerProps[]
166    }
167
168    @AfterAll
169    public static void stopContainers() {
170        client.resetSystems();
171        inventoryContainer.stop();
172        kafkaContainer.stop();
173        if (network != null) {
174            network.close();
175        }
176    }
177
178    @AfterEach
179    public void closeKafkaProducer() {
180        producer.close();
181    }
182
183    // tag::testCpuUsage[]
184    @Test
185    public void testCpuUsage() throws InterruptedException {
186        SystemLoad sl = new SystemLoad("localhost", 1.1);
187        // tag::systemLoadMsg[]
188        producer.send(new ProducerRecord<String, SystemLoad>("system.load", sl));
189        // end::systemLoadMsg[]
190        Thread.sleep(5000);
191        Response response = client.getSystems();
192        Assertions.assertEquals(200, response.getStatus(), "Response should be 200");
193        List<Properties> systems =
194            response.readEntity(new GenericType<List<Properties>>() { });
195        // tag::assert[]
196        assertEquals(systems.size(), 1);
197        // end::assert[]
198        for (Properties system : systems) {
199            // tag::assert2[]
200            assertEquals(sl.hostname, system.get("hostname"),
201                "Hostname doesn't match!");
202            // end::assert2[]
203            BigDecimal systemLoad = (BigDecimal) system.get("systemLoad");
204            // tag::assert3[]
205            assertEquals(sl.loadAverage, systemLoad.doubleValue(),
206                "CPU load doesn't match!");
207            // end::assert3[]
208        }
209    }
210    // end::testCpuUsage[]
211}
212// end::InventoryServiceIT[]

SystemLoad.java

 1// tag::copyright[]
 2/*******************************************************************************
 3 * Copyright (c) 2020, 2024 IBM Corporation and others.
 4 * All rights reserved. This program and the accompanying materials
 5 * are made available under the terms of the Eclipse Public License 2.0
 6 * which accompanies this distribution, and is available at
 7 * http://www.eclipse.org/legal/epl-2.0/
 8 *
 9 * SPDX-License-Identifier: EPL-2.0
10 *******************************************************************************/
11// end::copyright[]
12package io.openliberty.guides.models;
13
14import java.util.Objects;
15
16import jakarta.json.bind.Jsonb;
17import jakarta.json.bind.JsonbBuilder;
18
19import org.apache.kafka.common.serialization.Deserializer;
20import org.apache.kafka.common.serialization.Serializer;
21
22public class SystemLoad {
23
24    private static final Jsonb JSONB = JsonbBuilder.create();
25
26    public String hostname;
27    public Double loadAverage;
28
29    public SystemLoad(String hostname, Double cpuLoadAvg) {
30        this.hostname = hostname;
31        this.loadAverage = cpuLoadAvg;
32    }
33
34    public SystemLoad() {
35    }
36
37    @Override
38    public boolean equals(Object o) {
39        if (this == o) {
40            return true;
41        }
42        if (!(o instanceof SystemLoad)) {
43            return false;
44        }
45        SystemLoad sl = (SystemLoad) o;
46        return Objects.equals(hostname, sl.hostname)
47               && Objects.equals(loadAverage, sl.loadAverage);
48    }
49
50    @Override
51    public int hashCode() {
52        return Objects.hash(hostname, loadAverage);
53    }
54
55    @Override
56    public String toString() {
57        return "CpuLoadAverage: " + JSONB.toJson(this);
58    }
59
60    // tag::SystemLoadSerializer[]
61    public static class SystemLoadSerializer implements Serializer<Object> {
62        @Override
63        public byte[] serialize(String topic, Object data) {
64          return JSONB.toJson(data).getBytes();
65        }
66    }
67    // end::SystemLoadSerializer[]
68
69    // tag::SystemLoadDeserializer[]
70    public static class SystemLoadDeserializer implements Deserializer<SystemLoad> {
71        @Override
72        public SystemLoad deserialize(String topic, byte[] data) {
73            if (data == null) {
74                return null;
75            }
76            return JSONB.fromJson(new String(data), SystemLoad.class);
77        }
78    }
79    // end::SystemLoadDeserializer[]
80}

InventoryResource.java

 1// tag::copyright[]
 2/*******************************************************************************
 3 * Copyright (c) 2020, 2024 IBM Corporation and others.
 4 * All rights reserved. This program and the accompanying materials
 5 * are made available under the terms of the Eclipse Public License 2.0
 6 * which accompanies this distribution, and is available at
 7 * http://www.eclipse.org/legal/epl-2.0/
 8 *
 9 * SPDX-License-Identifier: EPL-2.0
10 *******************************************************************************/
11// end::copyright[]
12package io.openliberty.guides.inventory;
13
14import java.util.ArrayList;
15import java.util.List;
16import java.util.Optional;
17import java.util.Properties;
18import java.util.logging.Logger;
19
20import jakarta.enterprise.context.ApplicationScoped;
21import jakarta.inject.Inject;
22import jakarta.ws.rs.DELETE;
23import jakarta.ws.rs.GET;
24import jakarta.ws.rs.Path;
25import jakarta.ws.rs.PathParam;
26import jakarta.ws.rs.Produces;
27import jakarta.ws.rs.core.MediaType;
28import jakarta.ws.rs.core.Response;
29
30import org.eclipse.microprofile.reactive.messaging.Incoming;
31
32import io.openliberty.guides.models.SystemLoad;
33
34
35@ApplicationScoped
36@Path("/inventory")
37// tag::InventoryResource[]
38public class InventoryResource {
39
40    private static Logger logger = Logger.getLogger(InventoryResource.class.getName());
41
42    @Inject
43    private InventoryManager manager;
44
45    @GET
46    @Path("/systems")
47    @Produces(MediaType.APPLICATION_JSON)
48    public Response getSystems() {
49        List<Properties> systems = new ArrayList<>(manager.getSystems().values());
50        return Response.status(Response.Status.OK)
51                       .entity(systems)
52                       .build();
53    }
54
55    @GET
56    @Path("/system/{hostname}")
57    @Produces(MediaType.APPLICATION_JSON)
58    public Response getSystem(@PathParam("hostname") String hostname) {
59        Optional<Properties> system = manager.getSystem(hostname);
60        if (system.isPresent()) {
61            return Response.status(Response.Status.OK)
62                           .entity(system)
63                           .build();
64        }
65        return Response.status(Response.Status.NOT_FOUND)
66                       .entity("hostname does not exist.")
67                       .build();
68    }
69
70    @DELETE
71    @Produces(MediaType.APPLICATION_JSON)
72    public Response resetSystems() {
73        manager.resetSystems();
74        return Response.status(Response.Status.OK)
75                       .build();
76    }
77
78    @Incoming("systemLoad")
79    public void updateStatus(SystemLoad s)  {
80        String hostname = s.hostname;
81        if (manager.getSystem(hostname).isPresent()) {
82            manager.updateCpuStatus(hostname, s.loadAverage);
83            logger.info("Host " + hostname + " was updated: " + s);
84        } else {
85            manager.addSystem(hostname, s.loadAverage);
86            logger.info("Host " + hostname + " was added: " + s);
87        }
88    }
89}
90// end::InventoryResource[]

The InventoryServiceIT class uses the KafkaProducer client API to generate messages in the test environment, which are then consumed by the inventory microservice container.

Similar to system microservice testing, the configuration of the producer BOOTSTRAP_SERVERS_CONFIG property depends on whether a local inventory microservice container is detected. In addition, the producer is configured with a custom serializer provided in the SystemLoad class.

The testCpuUsage test method uses the producer.send() method, using the KafkaProducer client API, to generate the Systemload message. Then, it uses Assertions to verify that the response from the inventory microservice aligns with the expected outcome.

Running the tests

Because you started Open Liberty in dev mode, you can run the tests by pressing the enter/return key from the command-line session where you started dev mode.

You will see the following output:

 Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 32.564 s - in it.io.openliberty.guides.inventory.InventoryServiceIT

 Results:

 Tests run: 1, Failures: 0, Errors: 0, Skipped: 0

 Integration tests finished.

After you are finished running tests, stop the Open Liberty server by pressing CTRL+C in the command-line session where you ran the server.

If you aren’t running in dev mode, you can run the tests by running the following command:

mvn clean verify

You will see the following output:

 Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 53.22 s - in it.io.openliberty.guides.inventory.InventoryServiceIT

 Results:

 Tests run: 1, Failures: 0, Errors: 0, Skipped: 0


 --- failsafe:3.2.5:verify (verify) @ inventory ---
 ------------------------------------------------------------------------
 BUILD SUCCESS
 ------------------------------------------------------------------------
 Total time:  58.789 s
 Finished at: 2024-01-31T11:40:43-08:00
 ------------------------------------------------------------------------

When you’re finished trying out the microservice, you can stop the local Kafka container by running the following command from the start directory:

.\scripts\stopKafka.bat
./scripts/stopKafka.sh

Great work! You’re done!

You just tested two reactive Java microservices using Testcontainers.

Learn more about Testcontainers.

Guide Attribution

Testing reactive Java microservices by Open Liberty is licensed under CC BY-ND 4.0

Copy file contents
Copied to clipboard

Prerequisites:

Nice work! Where to next?

What did you think of this guide?

Extreme Dislike Dislike Like Extreme Like

What could make this guide better?

Raise an issue to share feedback

Create a pull request to contribute to this guide

Need help?

Ask a question on Stack Overflow

Like Open Liberty? Star our repo on GitHub.

Star