Streaming messages between client and server services using gRPC

duration 30 minutes
New

Prerequisites:

Learn how to use gRPC unary calls, server streaming, client streaming, and bidirectional streaming to communicate between Java client and server services with Open Liberty.

What is gRPC?

The gRPC Remote Procedure Call is a technology that implements remote procedure call (RPC) style APIs with HTTP/2. Typically, gRPC uses protocol buffers to define the format of data to be transferred and the service interfaces to access it, which include service calls and expected messages. For each service defined in a .proto file, gRPC uses the definition to generate the skeleton code for users to implement and extend. Protocol buffers use a binary format to send and receive messages that is faster and more lightweight than the JSON that is typically used in RESTful APIs.

Protocol buffers allow cross-project support through the .proto file. As a result, gRPC clients and servers can run and communicate with each other from different environments. For example, a gRPC client running on a Java virtual machine can call a gRPC server developed in any other supported language. This feature of protocol buffers allows for easier integration between services.

What you’ll learn

You will learn how to create gRPC services and their clients by using protocol buffers and how to implement them with Open Liberty. You will use Maven to generate the gRPC stubs, deploy the services, and to interact with the running Liberty runtime.

The application that you will build in this guide consists of three projects: the systemproto model project, the query client service, and the system server service.

The query service implements four RESTful APIs by using four different gRPC streaming methods.

  • Unary RPC: The client sends a single request and receives a single response.

  • Server streaming RPC: The client sends a single request and the server returns a stream of messages.

  • Client streaming RPC: The client sends a stream of messages and the server responds with a single message.

  • Bidirectional RPC: Both client and server send a stream of messages. The client and server can read and write messages in any order.

Application architecture of the gRPC application covered in guide

Getting started

The fastest way to work through this guide is to clone the Git repository and use the projects that are provided inside:

git clone https://github.com/openliberty/guide-grpc-intro.git
cd guide-grpc-intro

The start directory contains the starting project that you will build upon.

The finish directory contains the finished project that you will build.

Before you begin, make sure you have all the necessary prerequisites.

Try what you’ll build

The finish directory in the root of this guide contains the finished application. Give it a try before you proceed.

To try out the application, first go to the finish directory and run the following Maven goal to generate all the gRPC abstract classes defined in the .proto file.

cd finish
mvn -pl systemproto install

Start the system service by running the following command:

mvn -pl system liberty:run

Next, open another command-line session, navigate to the finish directory, and start the query service by using the following command:

mvn -pl query liberty:run

Wait until you see the The defaultServer server is ready to run a smarter planet message in your consoles. Then, point your browser to the http://localhost:9081/query/properties/os.name URL to test out the basic unary service. You will see your operating system name.

Next, point your browser to the following URLs to try out the corresponding streaming RPC call:

Observe the output from the consoles running the system and query services.

After you are finished checking out the application, stop both the query and system services by pressing CTRL+C in the command-line sessions where you ran them. Alternatively, you can run the following goals from the finish directory in another command-line session:

mvn -pl system liberty:stop
mvn -pl query liberty:stop

Creating and defining the gRPC server service

Navigate to the start directory to begin.

First, create the .proto file and generate gRPC classes. You will implement the gRPC server service with the generated classes later. The .proto file defines all the service calls and message types. The message types are used in the service call definition for the parameters and returns.

Create the SystemService.proto file.
systemproto/src/main/proto/SystemService.proto

SystemService.proto

 1// tag::copyright[]
 2/*******************************************************************************
 3 * Copyright (c) 2022, 2023 IBM Corporation and others.
 4 * All rights reserved. This program and the accompanying materials
 5 * are made available under the terms of the Eclipse Public License 2.0
 6 * which accompanies this distribution, and is available at
 7 * http://www.eclipse.org/legal/epl-2.0/
 8 *
 9 * SPDX-License-Identifier: EPL-2.0
10 *******************************************************************************/
11// end::copyright[]
12
13// tag::basicConfig[]
14syntax = "proto3";
15package io.openliberty.guides.systemproto;
16option java_multiple_files = true;
17// end::basicConfig[]
18
19// tag::SystemService[]
20service SystemService {
21  // tag::getProperty[]
22  rpc getProperty (SystemPropertyName) returns (SystemPropertyValue) {}
23  // end::getProperty[]
24
25  // tag::getServerStreamingProperties[]
26  rpc getServerStreamingProperties (SystemPropertyPrefix) returns (stream SystemProperty) {}
27  // end::getServerStreamingProperties[]
28
29  // tag::getClientStreamingProperties[]
30  rpc getClientStreamingProperties (stream SystemPropertyName) returns (SystemProperties) {}
31  // end::getClientStreamingProperties[]
32
33  // tag::getBidirectionalProperties[]
34  rpc getBidirectionalProperties (stream SystemPropertyName) returns (stream SystemProperty) {}
35  // end::getBidirectionalProperties[]
36}
37// end::SystemService[]
38
39// tag::SystemPropertyName[]
40message SystemPropertyName {
41    string propertyName = 1;
42}
43// end::SystemPropertyName[]
44
45message SystemPropertyPrefix {
46    string propertyPrefix = 1;
47}
48
49// tag::SystemPropertyValue[]
50message SystemPropertyValue {
51    string propertyValue = 1;
52}
53// end::SystemPropertyValue[]
54
55// tag::SystemProperty[]
56message SystemProperty {
57    string propertyName = 1;
58    string propertyValue = 2;
59}
60// end::SystemProperty[]
61
62// tag::SystemProperties[]
63message SystemProperties {
64    map<string, string> properties = 1;
65}
66// end::SystemProperties[]

pom.xml

 1<?xml version="1.0" encoding="UTF-8"?>
 2<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
 3    <modelVersion>4.0.0</modelVersion>
 4    <groupId>io.openliberty.guides</groupId>
 5    <artifactId>systemproto</artifactId>
 6    <version>1.0-SNAPSHOT</version>
 7    <packaging>jar</packaging>
 8
 9    <properties>
10        <maven.compiler.source>1.8</maven.compiler.source>
11        <maven.compiler.target>1.8</maven.compiler.target>
12        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
13    </properties>
14
15    <dependencies>
16        <!-- Provided dependencies -->
17        <dependency>
18            <groupId>jakarta.platform</groupId>
19            <artifactId>jakarta.jakartaee-api</artifactId>
20            <version>9.1.0</version>
21            <scope>provided</scope>
22        </dependency>
23        <!-- grpc compile dependencies -->
24        <!-- tag::grpc[] -->
25        <dependency>
26            <groupId>io.grpc</groupId>
27            <artifactId>grpc-protobuf</artifactId>
28            <version>1.51.0</version>
29            <scope>provided</scope>
30        </dependency>
31        <!-- end::grpc[] -->
32        <!-- tag::grpc-stub[] -->
33        <dependency>
34            <groupId>io.grpc</groupId>
35            <artifactId>grpc-stub</artifactId>
36            <version>1.51.0</version>
37            <scope>provided</scope>
38        </dependency>
39        <!-- end::grpc-stub[] -->
40        <!-- tag::javax[] -->
41        <dependency>
42            <groupId>javax.annotation</groupId>
43            <artifactId>javax.annotation-api</artifactId>
44            <version>1.3.2</version>
45            <scope>provided</scope>
46        </dependency>
47        <!-- end::javax[] -->
48    </dependencies>
49
50    <build>
51        <finalName>${project.artifactId}</finalName>
52        <!-- tag::osmavenplugin[] -->
53        <extensions>
54            <extension>
55                <groupId>kr.motd.maven</groupId>
56                <artifactId>os-maven-plugin</artifactId>
57                <version>1.7.0</version>
58            </extension>
59        </extensions>
60        <!-- end::osmavenplugin[] -->
61        <plugins>
62            <plugin>
63                <groupId>org.apache.maven.plugins</groupId>
64                <artifactId>maven-compiler-plugin</artifactId>
65                <version>3.10.1</version>
66            </plugin>
67            <!-- tag::protobufmavenplugin[] -->
68            <plugin>
69                <groupId>org.xolstice.maven.plugins</groupId>
70                <artifactId>protobuf-maven-plugin</artifactId>
71                <version>0.6.1</version>
72                <configuration>
73                    <protocArtifact>com.google.protobuf:protoc:3.21.9:exe:${os.detected.classifier}</protocArtifact>
74                    <pluginId>grpc-java</pluginId>
75                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.51.0:exe:${os.detected.classifier}</pluginArtifact>
76                </configuration>
77                <executions>
78                    <execution>
79                        <goals>
80                            <goal>compile</goal>
81                            <goal>compile-custom</goal>
82                        </goals>
83                    </execution>
84                </executions>
85            </plugin>
86            <!-- end::protobufmavenplugin[] -->
87        </plugins>
88    </build>
89</project>

The first few lines define the syntax, package, and option basic configuration of the .proto file. The SystemService service contains the four service calls that you will implement in the coming sections.

The getProperty RPC defines the unary call. In this call, the client service sends a SystemPropertyName message to the server service, which returns a SystemPropertyValue message with the property value. The SystemPropertyName and SystemPropertyValue message types define that the propertyName and propertyValue fields must be string.

The getServerStreamingProperties RPC defines the server streaming call. The client service sends a SystemPropertyPrefix message to the server service. The server service returns a stream of SystemProperty messages. Each SystemProperty message contains propertyName and propertyValue strings.

The getClientStreamingProperties RPC defines the client streaming call. The client service streams SystemPropertyName messages to the server service. The server service returns a SystemProperties message that contains a map of the properties with their respective values.

The getBidirectionalProperties RPC defines the bidirectional streaming call. In this service, the client service streams SystemPropertyName messages to the server service. The server service returns a stream of SystemProperty messages.

To compile the .proto file, the pom.xml Maven configuration file needs the grpc-protobuf, grpc-stub, javax.annotation-api dependencies, and the protobuf-maven-plugin plugin. To install the correct version of the Protobuf compiler automatically, the os-maven-plugin extension is required in the build configuration.

Run the following command to generate the gRPC classes.

mvn -pl systemproto install

Implementing the unary call

Navigate to the start directory.

When you run Open Liberty in dev mode, dev mode listens for file changes and automatically recompiles and deploys your updates whenever you save a new change. Run the following command to start the system service in dev mode:

mvn -pl system liberty:dev

Open another command-line session, navigate to the start directory, and run the following command to start the query service in dev mode:

mvn -pl query liberty:dev

After you see the following message, your Liberty instances are ready in dev mode:

**************************************************************
*    Liberty is running in dev mode.

Dev mode holds your command-line session to listen for file changes. Open another command-line session and navigate to the start directory to continue, or open the project in your editor.

Start by implementing the first service call, the unary call. In this service call, the query client service sends a property to the system server service, which returns the property value. This type of service call resembles a RESTful API.

Create the SystemService class.
system/src/main/java/io/openliberty/guides/system/SystemService.java

SystemService.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2022, 2023 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License 2.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-2.0/
  8 *
  9 * SPDX-License-Identifier: EPL-2.0
 10 *******************************************************************************/
 11// end::copyright[]
 12package io.openliberty.guides.system;
 13
 14import java.util.HashMap;
 15import java.util.Map;
 16import java.util.logging.Logger;
 17
 18import io.grpc.stub.StreamObserver;
 19// tag::importGrpcClasses[]
 20import io.openliberty.guides.systemproto.SystemProperties;
 21import io.openliberty.guides.systemproto.SystemProperty;
 22import io.openliberty.guides.systemproto.SystemPropertyName;
 23import io.openliberty.guides.systemproto.SystemPropertyPrefix;
 24import io.openliberty.guides.systemproto.SystemPropertyValue;
 25import io.openliberty.guides.systemproto.SystemServiceGrpc;
 26// end::importGrpcClasses[]
 27
 28// tag::extends[]
 29public class SystemService extends SystemServiceGrpc.SystemServiceImplBase {
 30// end::extends[]
 31
 32    private static Logger logger = Logger.getLogger(SystemService.class.getName());
 33
 34    public SystemService() {
 35    }
 36
 37    // tag::getProperty[]
 38    @Override
 39    public void getProperty(
 40        SystemPropertyName request, StreamObserver<SystemPropertyValue> observer) {
 41
 42        // tag::pName[]
 43        String pName = request.getPropertyName();
 44        // end::pName[]
 45        // tag::pValue[]
 46        String pValue = System.getProperty(pName);
 47        // end::pValue[]
 48        // tag::response[]
 49        SystemPropertyValue value = SystemPropertyValue
 50                                        .newBuilder()
 51                                        .setPropertyValue(pValue)
 52                                        .build();
 53        // end::response[]
 54
 55        // tag::next[]
 56        observer.onNext(value);
 57        // end::next[]
 58        // tag::complete[]
 59        observer.onCompleted();
 60        // end::complete[]
 61
 62    }
 63    // end::getProperty[]
 64
 65    // tag::getServerStreamingProperties[]
 66    @Override
 67    public void getServerStreamingProperties(
 68        SystemPropertyPrefix request, StreamObserver<SystemProperty> observer) {
 69
 70        // tag::prefix[]
 71        String prefix = request.getPropertyPrefix();
 72        // end::prefix[]
 73        System.getProperties()
 74              .stringPropertyNames()
 75              .stream()
 76              // tag::filter[]
 77              .filter(name -> name.startsWith(prefix))
 78              // end::filter[]
 79              .forEach(name -> {
 80                  String pValue = System.getProperty(name);
 81                  // tag::serverMessage[]
 82                  SystemProperty value = SystemProperty
 83                      .newBuilder()
 84                      .setPropertyName(name)
 85                      .setPropertyValue(pValue)
 86                      .build();
 87                  // end::serverMessage[]
 88                  // tag::serverNext1[]
 89                  observer.onNext(value);
 90                  // end::serverNext1[]
 91                  logger.info("server streaming sent property: " + name);
 92               });
 93        // tag::serverComplete[]
 94        observer.onCompleted();
 95        // end::serverComplete[]
 96        logger.info("server streaming was completed!");
 97    }
 98    // end::getServerStreamingProperties[]
 99
100    // tag::getClientStreamingProperties[]
101    @Override
102    public StreamObserver<SystemPropertyName> getClientStreamingProperties(
103        StreamObserver<SystemProperties> observer) {
104
105        // tag::streamObserverClient[]
106        return new StreamObserver<SystemPropertyName>() {
107
108            // tag::clientStreamingMap[]
109            private Map<String, String> properties = new HashMap<String, String>();
110            // end::clientStreamingMap[]
111
112            // tag::receivingProperties[]
113            @Override
114            public void onNext(SystemPropertyName spn) {
115                String pName = spn.getPropertyName();
116                String pValue = System.getProperty(pName);
117                logger.info("client streaming received property: " + pName);
118                properties.put(pName, pValue);
119            }
120            // end::receivingProperties[]
121
122            @Override
123            public void onError(Throwable t) {
124                t.printStackTrace();
125            }
126
127            // tag::clientStreamingCompleted[]
128            @Override
129            public void onCompleted() {
130                SystemProperties value = SystemProperties.newBuilder()
131                                             .putAllProperties(properties)
132                                             .build();
133                observer.onNext(value);
134                observer.onCompleted();
135                logger.info("client streaming was completed!");
136            }
137            // end::clientStreamingCompleted[]
138        };
139        // end::streamObserverClient[]
140    }
141    // end::getClientStreamingProperties[]
142
143    // tag::getBidirectionalProperties[]
144    @Override
145    public StreamObserver<SystemPropertyName> getBidirectionalProperties(
146        StreamObserver<SystemProperty> observer) {
147
148        // tag::streamObserverBidirectional[]
149        return new StreamObserver<SystemPropertyName>() {
150            // tag::receiveBidirectionalProperties[]
151            @Override
152            public void onNext(SystemPropertyName spn) {
153                String pName = spn.getPropertyName();
154                String pValue = System.getProperty(pName);
155                logger.info("bi-directional streaming received: " + pName);
156                // tag::systemPropertyMessage[]
157                SystemProperty value = SystemProperty.newBuilder()
158                                           .setPropertyName(pName)
159                                           .setPropertyValue(pValue)
160                                           .build();
161                // end::systemPropertyMessage[]
162                // tag::serverNext2[]
163                observer.onNext(value);
164                // end::serverNext2[]
165            }
166            // end::receiveBidirectionalProperties[]
167
168            @Override
169            public void onError(Throwable t) {
170                t.printStackTrace();
171            }
172
173            // tag::bidirectionalCompleted[]
174            @Override
175            public void onCompleted() {
176                observer.onCompleted();
177                logger.info("bi-directional streaming was completed!");
178            }
179            // end::bidirectionalCompleted[]
180        };
181        // end::streamObserverBidirectional[]
182    }
183    // end::getBidirectionalProperties[]
184}

The SystemService class extends the SystemServiceGrpc class that is generated by the .proto file. The four types of services defined in the proto file are implemented in this class.

The getProperty() method implements the unary RPC call defined in the .proto file. The getPropertyName() getter method that is generated by gRPC retrieves the property name from the client, and stores it into the pName variable. The System property value is stored into the pValue variable. The gRPC library will create a SystemPropertyValue message, with its type defined in the SystemService.proto file. Then, the message is sent to the client service through the StreamObserver by using its onNext() and onComplete() methods.

Replace the system server configuration file.
system/src/main/liberty/config/server.xml

system/server.xml

 1<?xml version="1.0" encoding="UTF-8" standalone="no"?>
 2<server description="system service">
 3
 4    <featureManager>
 5        <feature>restfulWS-3.0</feature>
 6        <!-- tag::grpc[] -->
 7        <feature>grpc-1.0</feature>
 8        <!-- end::grpc[] -->
 9    </featureManager>
10
11    <!-- tag::grpcConfig[] -->
12    <!-- Due to target="*", this configuration will be applied to every gRPC service 
13         running on the server. This configuration registers a ServerInterceptor -->
14    <grpc target="*"/>
15    <!-- end::grpcConfig[] -->
16
17    <applicationManager autoExpand="true"/>
18
19    <webApplication contextRoot="/" location="guide-grpc-intro-system.war"/>
20
21    <logging consoleLogLevel="INFO"/>
22</server>

Add the grpc feature to the Liberty server configuration file. This feature enables applications running on Liberty to provide gRPC services. Configure the grpc feature with the target attribute. If you want to learn more about configuration for the grpc feature, see the GRPC Server Properties.

Next, implement the corresponding REST endpoint in the query service.

Create the PropertiesResource class.
query/src/main/java/io/openliberty/guides/query/PropertiesResource.java

PropertiesResource.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2022, 2023 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License 2.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-2.0/
  8 *
  9 * SPDX-License-Identifier: EPL-2.0
 10 *******************************************************************************/
 11// end::copyright[]
 12package io.openliberty.guides.query;
 13
 14import java.util.List;
 15import java.util.Properties;
 16import java.util.concurrent.CountDownLatch;
 17import java.util.concurrent.TimeUnit;
 18import java.util.stream.Collectors;
 19import java.util.logging.Logger;
 20
 21import jakarta.enterprise.context.ApplicationScoped;
 22import jakarta.inject.Inject;
 23import jakarta.ws.rs.GET;
 24import jakarta.ws.rs.Path;
 25import jakarta.ws.rs.PathParam;
 26import jakarta.ws.rs.Produces;
 27import jakarta.ws.rs.core.MediaType;
 28
 29import org.eclipse.microprofile.config.inject.ConfigProperty;
 30
 31import io.grpc.ManagedChannel;
 32import io.grpc.ManagedChannelBuilder;
 33import io.grpc.stub.StreamObserver;
 34// tag::grpcImports[]
 35import io.openliberty.guides.systemproto.SystemProperties;
 36import io.openliberty.guides.systemproto.SystemProperty;
 37import io.openliberty.guides.systemproto.SystemPropertyName;
 38import io.openliberty.guides.systemproto.SystemPropertyPrefix;
 39import io.openliberty.guides.systemproto.SystemPropertyValue;
 40import io.openliberty.guides.systemproto.SystemServiceGrpc;
 41import io.openliberty.guides.systemproto.SystemServiceGrpc.SystemServiceBlockingStub;
 42import io.openliberty.guides.systemproto.SystemServiceGrpc.SystemServiceStub;
 43// end::grpcImports[]
 44
 45@ApplicationScoped
 46@Path("/properties")
 47public class PropertiesResource {
 48
 49    private static Logger logger = Logger.getLogger(PropertiesResource.class.getName());
 50
 51    @Inject
 52    @ConfigProperty(name = "system.hostname", defaultValue = "localhost")
 53    String SYSTEM_HOST;
 54
 55    @Inject
 56    @ConfigProperty(name = "system.port", defaultValue = "9080")
 57    int SYSTEM_PORT;
 58
 59    // tag::unary[]
 60    @GET
 61    @Path("/{property}")
 62    @Produces(MediaType.TEXT_PLAIN)
 63    public String getPropertiesString(@PathParam("property") String property) {
 64
 65        // tag::createChannel1[]
 66        ManagedChannel channel = ManagedChannelBuilder
 67                                     .forAddress(SYSTEM_HOST, SYSTEM_PORT)
 68                                     .usePlaintext().build();
 69        // end::createChannel1[]
 70        // tag::createClient1[]
 71        SystemServiceBlockingStub client = SystemServiceGrpc.newBlockingStub(channel);
 72        // end::createClient1[]
 73        SystemPropertyName request = SystemPropertyName.newBuilder()
 74                                             .setPropertyName(property).build();
 75        SystemPropertyValue response = client.getProperty(request);
 76        channel.shutdownNow();
 77        return response.getPropertyValue();
 78    }
 79    // end::unary[]
 80
 81    // tag::serverStreaming[]
 82    @GET
 83    @Path("/os")
 84    @Produces(MediaType.APPLICATION_JSON)
 85    public Properties getOSProperties() {
 86
 87        // tag::createChannel2[]
 88        ManagedChannel channel = ManagedChannelBuilder
 89                                     .forAddress(SYSTEM_HOST, SYSTEM_PORT)
 90                                     .usePlaintext().build();
 91        // end::createChannel2[]
 92        // tag::createClient2[]
 93        SystemServiceStub client = SystemServiceGrpc.newStub(channel);
 94        // end::createClient2[]
 95
 96        Properties properties = new Properties();
 97        // tag::countDownLatch1[]
 98        CountDownLatch countDown = new CountDownLatch(1);
 99        // end::countDownLatch1[]
100        SystemPropertyPrefix request = SystemPropertyPrefix.newBuilder()
101                                         .setPropertyPrefix("os.").build();
102        // tag::getServerStreamingProperties[]
103        client.getServerStreamingProperties(
104            request, new StreamObserver<SystemProperty>() {
105
106            // tag::onNext1[]
107            @Override
108            public void onNext(SystemProperty value) {
109                logger.info("server streaming received: "
110                   + value.getPropertyName() + "=" + value.getPropertyValue());
111                properties.put(value.getPropertyName(), value.getPropertyValue());
112            }
113            // end::onNext1[]
114
115            @Override
116            public void onError(Throwable t) {
117                t.printStackTrace();
118            }
119
120            @Override
121            public void onCompleted() {
122                logger.info("server streaming completed");
123                // tag::countDownLatch2[]
124                countDown.countDown();
125                // end::countDownLatch2[]
126            }
127        });
128        // end::getServerStreamingProperties[]
129
130
131        // wait until completed
132        // tag::countDownLatch3[]
133        try {
134            countDown.await(30, TimeUnit.SECONDS);
135        } catch (InterruptedException e) {
136            e.printStackTrace();
137        }
138        // end::countDownLatch3[]
139
140        // tag::closeConnection[]
141        channel.shutdownNow();
142        // end::closeConnection[]
143
144        return properties;
145    }
146    // end::serverStreaming[]
147
148    // tag::clientStreaming[]
149    @GET
150    @Path("/user")
151    @Produces(MediaType.APPLICATION_JSON)
152    public Properties getUserProperties() {
153
154        ManagedChannel channel = ManagedChannelBuilder
155                                     .forAddress(SYSTEM_HOST, SYSTEM_PORT)
156                                     .usePlaintext().build();
157        SystemServiceStub client = SystemServiceGrpc.newStub(channel);
158        // tag::countDownLatch4[]
159        CountDownLatch countDown = new CountDownLatch(1);
160        // end::countDownLatch4[]
161        Properties properties = new Properties();
162
163        // tag::getClientStreamingProperties[]
164        StreamObserver<SystemPropertyName> stream = client.getClientStreamingProperties(
165            new StreamObserver<SystemProperties>() {
166
167                @Override
168                public void onNext(SystemProperties value) {
169                    logger.info("client streaming received a map that has "
170                        + value.getPropertiesCount() + " properties");
171                    properties.putAll(value.getPropertiesMap());
172                }
173
174                @Override
175                public void onError(Throwable t) {
176                    t.printStackTrace();
177                }
178
179                @Override
180                public void onCompleted() {
181                    logger.info("client streaming completed");
182                    // tag::countDownLatch5[]
183                    countDown.countDown();
184                    // end::countDownLatch5[]
185                }
186            });
187        // end::getClientStreamingProperties[]
188
189        // collect the property names starting with user.
190        // tag::collectUserProperties[]
191        List<String> keys = System.getProperties().stringPropertyNames().stream()
192                                  .filter(k -> k.startsWith("user."))
193                                  .collect(Collectors.toList());
194        // end::collectUserProperties[]
195
196        // send messages to the server
197        keys.stream()
198            // tag::clientMessage1[]
199            .map(k -> SystemPropertyName.newBuilder().setPropertyName(k).build())
200            // end::clientMessage1[]
201            // tag::streamOnNext1[]
202            .forEach(stream::onNext);
203            // end::streamOnNext1[]
204        // tag::clientCompleted1[]
205        stream.onCompleted();
206        // end::clientCompleted1[]
207
208        // wait until completed
209        // tag::countDownLatch6[]
210        try {
211            countDown.await(30, TimeUnit.SECONDS);
212        } catch (InterruptedException e) {
213            e.printStackTrace();
214        }
215        // end::countDownLatch6[]
216
217        channel.shutdownNow();
218
219        return properties;
220    }
221    // end::clientStreaming[]
222
223    // tag::bidirectionalStreaming[]
224    @GET
225    @Path("/java")
226    @Produces(MediaType.APPLICATION_JSON)
227    public Properties getJavaProperties() {
228
229        ManagedChannel channel = ManagedChannelBuilder
230                                      .forAddress(SYSTEM_HOST, SYSTEM_PORT)
231                                      .usePlaintext().build();
232        SystemServiceStub client = SystemServiceGrpc.newStub(channel);
233        Properties properties = new Properties();
234        // tag::countDownLatch7[]
235        CountDownLatch countDown = new CountDownLatch(1);
236        // end::countDownLatch7[]
237
238        // tag::getBidirectionalProperties[]
239        StreamObserver<SystemPropertyName> stream = client.getBidirectionalProperties(
240                new StreamObserver<SystemProperty>() {
241
242                    // tag::onNext2[]
243                    @Override
244                    public void onNext(SystemProperty value) {
245                        logger.info("bidirectional streaming received: "
246                            + value.getPropertyName() + "=" + value.getPropertyValue());
247                        properties.put(value.getPropertyName(),
248                                       value.getPropertyValue());
249                    }
250                    // end::onNext2[]
251
252                    @Override
253                    public void onError(Throwable t) {
254                        t.printStackTrace();
255                    }
256
257                    @Override
258                    public void onCompleted() {
259                        logger.info("bidirectional streaming completed");
260                        // tag::countDownLatch8[]
261                        countDown.countDown();
262                        // end::countDownLatch8[]
263                    }
264                });
265        // end::getBidirectionalProperties[]
266
267        // collect the property names starting with java
268        // tag::collectJavaProperties[]
269        List<String> keys = System.getProperties().stringPropertyNames().stream()
270                                  .filter(k -> k.startsWith("java."))
271                                  .collect(Collectors.toList());
272        // end::collectJavaProperties[]
273
274        // post messages to the server
275        keys.stream()
276              // tag::clientMessage2[]
277              .map(k -> SystemPropertyName.newBuilder().setPropertyName(k).build())
278              // end::clientMessage2[]
279              // tag::streamOnNext2[]
280              .forEach(stream::onNext);
281              // end::streamOnNext2[]
282        // tag::clientCompleted2[]
283        stream.onCompleted();
284        // end::clientCompleted2[]
285
286        // wait until completed
287        // tag::countDownLatch9[]
288        try {
289            countDown.await(30, TimeUnit.SECONDS);
290        } catch (InterruptedException e) {
291            e.printStackTrace();
292        }
293        // end::countDownLatch9[]
294
295        channel.shutdownNow();
296
297        return properties;
298    }
299    // end::bidirectionalStreaming[]
300}

The PropertiesResource class provides RESTful endpoints to interact with the system service. The /query/properties/${property} endpoint uses the unary service call to get the property value from the system service. The endpoint creates a channel, which it uses to create a client by the SystemServiceGrpc.newBlockingStub() API. The endpoint then uses the client to get the property value, shuts down the channel, and immediately returns the value from the system service response.

Replace the query server configuration file.
query/src/main/liberty/config/server.xml

query/server.xml

 1<?xml version="1.0" encoding="UTF-8" standalone="no"?>
 2<server description="query service">
 3
 4    <featureManager>
 5        <feature>restfulWS-3.0</feature>
 6        <feature>jsonp-2.0</feature>
 7        <feature>jsonb-2.0</feature>
 8        <feature>cdi-3.0</feature>
 9        <feature>mpConfig-3.0</feature>
10        <!-- tag::grpc[] -->
11        <feature>grpc-1.0</feature>
12        <!-- end::grpc[] -->
13        <!-- tag::grpcClient[] -->
14        <feature>grpcClient-1.0</feature>
15        <!-- end::grpcClient[] -->
16    </featureManager>
17
18    <variable defaultValue="9081" name="default.http.port"/>
19    <variable defaultValue="9444" name="default.https.port"/>
20
21    <httpEndpoint id="defaultHttpEndpoint"
22                  httpPort="${default.http.port}"
23                  httpsPort="${default.https.port}"
24                  host="*"/>
25
26    <!-- tag::grpcClientConfig[] -->
27    <!-- Due to host="*", this configuration will be applied to every gRPC client call
28         that gets made. This configuration registers a ClientInterceptor, and it directs
29         Cookie headers to get forwarded with any outbound RPC calls, in this case, that
30         enables authorization propagation. -->
31    <grpcClient headersToPropagate="Cookie" host="*"/>
32    <!-- end::grpcClientConfig[] -->
33
34    <applicationManager autoExpand="true"/>
35
36    <webApplication contextRoot="/" location="guide-grpc-intro-query.war"/>
37
38    <logging consoleLogLevel="INFO"/>
39</server>

Add the grpc and grpcClient features to the Liberty server configuration file for the query service. These two features enable gRPC server and client support on Liberty respectively. Configure the grpcClient feature with the headersToPropagate and target attributes. If you want to learn more about grpcClient feature configuration, see the GRPC Client Properties.

Because you are running the system and query services in dev mode, the changes that you made are automatically picked up. You’re now ready to check out your application in your browser.

Point your browser to the http://localhost:9081/query/properties/os.name URL to test out the unary service call. Your operating system name is displayed.

Implementing the server streaming call

In the server streaming call, the query client service provides the /query/properties/os endpoint that sends a message to the system server service. The system service streams any properties that start with os. back to the query service. A channel is created between the query and the system services to stream messages. The channel is closed by the system service only after sending the last message to the query service.

Update the SystemService class to implement the server streaming RPC call.

Replace the SystemService class.
system/src/main/java/io/openliberty/guides/system/SystemService.java

SystemService.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2022, 2023 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License 2.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-2.0/
  8 *
  9 * SPDX-License-Identifier: EPL-2.0
 10 *******************************************************************************/
 11// end::copyright[]
 12package io.openliberty.guides.system;
 13
 14import java.util.HashMap;
 15import java.util.Map;
 16import java.util.logging.Logger;
 17
 18import io.grpc.stub.StreamObserver;
 19// tag::importGrpcClasses[]
 20import io.openliberty.guides.systemproto.SystemProperties;
 21import io.openliberty.guides.systemproto.SystemProperty;
 22import io.openliberty.guides.systemproto.SystemPropertyName;
 23import io.openliberty.guides.systemproto.SystemPropertyPrefix;
 24import io.openliberty.guides.systemproto.SystemPropertyValue;
 25import io.openliberty.guides.systemproto.SystemServiceGrpc;
 26// end::importGrpcClasses[]
 27
 28// tag::extends[]
 29public class SystemService extends SystemServiceGrpc.SystemServiceImplBase {
 30// end::extends[]
 31
 32    private static Logger logger = Logger.getLogger(SystemService.class.getName());
 33
 34    public SystemService() {
 35    }
 36
 37    // tag::getProperty[]
 38    @Override
 39    public void getProperty(
 40        SystemPropertyName request, StreamObserver<SystemPropertyValue> observer) {
 41
 42        // tag::pName[]
 43        String pName = request.getPropertyName();
 44        // end::pName[]
 45        // tag::pValue[]
 46        String pValue = System.getProperty(pName);
 47        // end::pValue[]
 48        // tag::response[]
 49        SystemPropertyValue value = SystemPropertyValue
 50                                        .newBuilder()
 51                                        .setPropertyValue(pValue)
 52                                        .build();
 53        // end::response[]
 54
 55        // tag::next[]
 56        observer.onNext(value);
 57        // end::next[]
 58        // tag::complete[]
 59        observer.onCompleted();
 60        // end::complete[]
 61
 62    }
 63    // end::getProperty[]
 64
 65    // tag::getServerStreamingProperties[]
 66    @Override
 67    public void getServerStreamingProperties(
 68        SystemPropertyPrefix request, StreamObserver<SystemProperty> observer) {
 69
 70        // tag::prefix[]
 71        String prefix = request.getPropertyPrefix();
 72        // end::prefix[]
 73        System.getProperties()
 74              .stringPropertyNames()
 75              .stream()
 76              // tag::filter[]
 77              .filter(name -> name.startsWith(prefix))
 78              // end::filter[]
 79              .forEach(name -> {
 80                  String pValue = System.getProperty(name);
 81                  // tag::serverMessage[]
 82                  SystemProperty value = SystemProperty
 83                      .newBuilder()
 84                      .setPropertyName(name)
 85                      .setPropertyValue(pValue)
 86                      .build();
 87                  // end::serverMessage[]
 88                  // tag::serverNext1[]
 89                  observer.onNext(value);
 90                  // end::serverNext1[]
 91                  logger.info("server streaming sent property: " + name);
 92               });
 93        // tag::serverComplete[]
 94        observer.onCompleted();
 95        // end::serverComplete[]
 96        logger.info("server streaming was completed!");
 97    }
 98    // end::getServerStreamingProperties[]
 99
100    // tag::getClientStreamingProperties[]
101    @Override
102    public StreamObserver<SystemPropertyName> getClientStreamingProperties(
103        StreamObserver<SystemProperties> observer) {
104
105        // tag::streamObserverClient[]
106        return new StreamObserver<SystemPropertyName>() {
107
108            // tag::clientStreamingMap[]
109            private Map<String, String> properties = new HashMap<String, String>();
110            // end::clientStreamingMap[]
111
112            // tag::receivingProperties[]
113            @Override
114            public void onNext(SystemPropertyName spn) {
115                String pName = spn.getPropertyName();
116                String pValue = System.getProperty(pName);
117                logger.info("client streaming received property: " + pName);
118                properties.put(pName, pValue);
119            }
120            // end::receivingProperties[]
121
122            @Override
123            public void onError(Throwable t) {
124                t.printStackTrace();
125            }
126
127            // tag::clientStreamingCompleted[]
128            @Override
129            public void onCompleted() {
130                SystemProperties value = SystemProperties.newBuilder()
131                                             .putAllProperties(properties)
132                                             .build();
133                observer.onNext(value);
134                observer.onCompleted();
135                logger.info("client streaming was completed!");
136            }
137            // end::clientStreamingCompleted[]
138        };
139        // end::streamObserverClient[]
140    }
141    // end::getClientStreamingProperties[]
142
143    // tag::getBidirectionalProperties[]
144    @Override
145    public StreamObserver<SystemPropertyName> getBidirectionalProperties(
146        StreamObserver<SystemProperty> observer) {
147
148        // tag::streamObserverBidirectional[]
149        return new StreamObserver<SystemPropertyName>() {
150            // tag::receiveBidirectionalProperties[]
151            @Override
152            public void onNext(SystemPropertyName spn) {
153                String pName = spn.getPropertyName();
154                String pValue = System.getProperty(pName);
155                logger.info("bi-directional streaming received: " + pName);
156                // tag::systemPropertyMessage[]
157                SystemProperty value = SystemProperty.newBuilder()
158                                           .setPropertyName(pName)
159                                           .setPropertyValue(pValue)
160                                           .build();
161                // end::systemPropertyMessage[]
162                // tag::serverNext2[]
163                observer.onNext(value);
164                // end::serverNext2[]
165            }
166            // end::receiveBidirectionalProperties[]
167
168            @Override
169            public void onError(Throwable t) {
170                t.printStackTrace();
171            }
172
173            // tag::bidirectionalCompleted[]
174            @Override
175            public void onCompleted() {
176                observer.onCompleted();
177                logger.info("bi-directional streaming was completed!");
178            }
179            // end::bidirectionalCompleted[]
180        };
181        // end::streamObserverBidirectional[]
182    }
183    // end::getBidirectionalProperties[]
184}

The getServerStreamingProperties() method implements the server streaming RPC call. The getPropertyPrefix() getter method retrieves the property prefix from the client. Properties that start with the prefix are filtered out. For each property, a SystemProperty message is built and streamed to the client through the StreamObserver by using its onNext() method. When all properties are streamed, the service stops streaming by calling the onComplete() method.

Update the PropertiesResource class to implement the /query/properties/os endpoint of the query service.

Replace the PropertiesResource class.
query/src/main/java/io/openliberty/guides/query/PropertiesResource.java

PropertiesResource.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2022, 2023 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License 2.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-2.0/
  8 *
  9 * SPDX-License-Identifier: EPL-2.0
 10 *******************************************************************************/
 11// end::copyright[]
 12package io.openliberty.guides.query;
 13
 14import java.util.List;
 15import java.util.Properties;
 16import java.util.concurrent.CountDownLatch;
 17import java.util.concurrent.TimeUnit;
 18import java.util.stream.Collectors;
 19import java.util.logging.Logger;
 20
 21import jakarta.enterprise.context.ApplicationScoped;
 22import jakarta.inject.Inject;
 23import jakarta.ws.rs.GET;
 24import jakarta.ws.rs.Path;
 25import jakarta.ws.rs.PathParam;
 26import jakarta.ws.rs.Produces;
 27import jakarta.ws.rs.core.MediaType;
 28
 29import org.eclipse.microprofile.config.inject.ConfigProperty;
 30
 31import io.grpc.ManagedChannel;
 32import io.grpc.ManagedChannelBuilder;
 33import io.grpc.stub.StreamObserver;
 34// tag::grpcImports[]
 35import io.openliberty.guides.systemproto.SystemProperties;
 36import io.openliberty.guides.systemproto.SystemProperty;
 37import io.openliberty.guides.systemproto.SystemPropertyName;
 38import io.openliberty.guides.systemproto.SystemPropertyPrefix;
 39import io.openliberty.guides.systemproto.SystemPropertyValue;
 40import io.openliberty.guides.systemproto.SystemServiceGrpc;
 41import io.openliberty.guides.systemproto.SystemServiceGrpc.SystemServiceBlockingStub;
 42import io.openliberty.guides.systemproto.SystemServiceGrpc.SystemServiceStub;
 43// end::grpcImports[]
 44
 45@ApplicationScoped
 46@Path("/properties")
 47public class PropertiesResource {
 48
 49    private static Logger logger = Logger.getLogger(PropertiesResource.class.getName());
 50
 51    @Inject
 52    @ConfigProperty(name = "system.hostname", defaultValue = "localhost")
 53    String SYSTEM_HOST;
 54
 55    @Inject
 56    @ConfigProperty(name = "system.port", defaultValue = "9080")
 57    int SYSTEM_PORT;
 58
 59    // tag::unary[]
 60    @GET
 61    @Path("/{property}")
 62    @Produces(MediaType.TEXT_PLAIN)
 63    public String getPropertiesString(@PathParam("property") String property) {
 64
 65        // tag::createChannel1[]
 66        ManagedChannel channel = ManagedChannelBuilder
 67                                     .forAddress(SYSTEM_HOST, SYSTEM_PORT)
 68                                     .usePlaintext().build();
 69        // end::createChannel1[]
 70        // tag::createClient1[]
 71        SystemServiceBlockingStub client = SystemServiceGrpc.newBlockingStub(channel);
 72        // end::createClient1[]
 73        SystemPropertyName request = SystemPropertyName.newBuilder()
 74                                             .setPropertyName(property).build();
 75        SystemPropertyValue response = client.getProperty(request);
 76        channel.shutdownNow();
 77        return response.getPropertyValue();
 78    }
 79    // end::unary[]
 80
 81    // tag::serverStreaming[]
 82    @GET
 83    @Path("/os")
 84    @Produces(MediaType.APPLICATION_JSON)
 85    public Properties getOSProperties() {
 86
 87        // tag::createChannel2[]
 88        ManagedChannel channel = ManagedChannelBuilder
 89                                     .forAddress(SYSTEM_HOST, SYSTEM_PORT)
 90                                     .usePlaintext().build();
 91        // end::createChannel2[]
 92        // tag::createClient2[]
 93        SystemServiceStub client = SystemServiceGrpc.newStub(channel);
 94        // end::createClient2[]
 95
 96        Properties properties = new Properties();
 97        // tag::countDownLatch1[]
 98        CountDownLatch countDown = new CountDownLatch(1);
 99        // end::countDownLatch1[]
100        SystemPropertyPrefix request = SystemPropertyPrefix.newBuilder()
101                                         .setPropertyPrefix("os.").build();
102        // tag::getServerStreamingProperties[]
103        client.getServerStreamingProperties(
104            request, new StreamObserver<SystemProperty>() {
105
106            // tag::onNext1[]
107            @Override
108            public void onNext(SystemProperty value) {
109                logger.info("server streaming received: "
110                   + value.getPropertyName() + "=" + value.getPropertyValue());
111                properties.put(value.getPropertyName(), value.getPropertyValue());
112            }
113            // end::onNext1[]
114
115            @Override
116            public void onError(Throwable t) {
117                t.printStackTrace();
118            }
119
120            @Override
121            public void onCompleted() {
122                logger.info("server streaming completed");
123                // tag::countDownLatch2[]
124                countDown.countDown();
125                // end::countDownLatch2[]
126            }
127        });
128        // end::getServerStreamingProperties[]
129
130
131        // wait until completed
132        // tag::countDownLatch3[]
133        try {
134            countDown.await(30, TimeUnit.SECONDS);
135        } catch (InterruptedException e) {
136            e.printStackTrace();
137        }
138        // end::countDownLatch3[]
139
140        // tag::closeConnection[]
141        channel.shutdownNow();
142        // end::closeConnection[]
143
144        return properties;
145    }
146    // end::serverStreaming[]
147
148    // tag::clientStreaming[]
149    @GET
150    @Path("/user")
151    @Produces(MediaType.APPLICATION_JSON)
152    public Properties getUserProperties() {
153
154        ManagedChannel channel = ManagedChannelBuilder
155                                     .forAddress(SYSTEM_HOST, SYSTEM_PORT)
156                                     .usePlaintext().build();
157        SystemServiceStub client = SystemServiceGrpc.newStub(channel);
158        // tag::countDownLatch4[]
159        CountDownLatch countDown = new CountDownLatch(1);
160        // end::countDownLatch4[]
161        Properties properties = new Properties();
162
163        // tag::getClientStreamingProperties[]
164        StreamObserver<SystemPropertyName> stream = client.getClientStreamingProperties(
165            new StreamObserver<SystemProperties>() {
166
167                @Override
168                public void onNext(SystemProperties value) {
169                    logger.info("client streaming received a map that has "
170                        + value.getPropertiesCount() + " properties");
171                    properties.putAll(value.getPropertiesMap());
172                }
173
174                @Override
175                public void onError(Throwable t) {
176                    t.printStackTrace();
177                }
178
179                @Override
180                public void onCompleted() {
181                    logger.info("client streaming completed");
182                    // tag::countDownLatch5[]
183                    countDown.countDown();
184                    // end::countDownLatch5[]
185                }
186            });
187        // end::getClientStreamingProperties[]
188
189        // collect the property names starting with user.
190        // tag::collectUserProperties[]
191        List<String> keys = System.getProperties().stringPropertyNames().stream()
192                                  .filter(k -> k.startsWith("user."))
193                                  .collect(Collectors.toList());
194        // end::collectUserProperties[]
195
196        // send messages to the server
197        keys.stream()
198            // tag::clientMessage1[]
199            .map(k -> SystemPropertyName.newBuilder().setPropertyName(k).build())
200            // end::clientMessage1[]
201            // tag::streamOnNext1[]
202            .forEach(stream::onNext);
203            // end::streamOnNext1[]
204        // tag::clientCompleted1[]
205        stream.onCompleted();
206        // end::clientCompleted1[]
207
208        // wait until completed
209        // tag::countDownLatch6[]
210        try {
211            countDown.await(30, TimeUnit.SECONDS);
212        } catch (InterruptedException e) {
213            e.printStackTrace();
214        }
215        // end::countDownLatch6[]
216
217        channel.shutdownNow();
218
219        return properties;
220    }
221    // end::clientStreaming[]
222
223    // tag::bidirectionalStreaming[]
224    @GET
225    @Path("/java")
226    @Produces(MediaType.APPLICATION_JSON)
227    public Properties getJavaProperties() {
228
229        ManagedChannel channel = ManagedChannelBuilder
230                                      .forAddress(SYSTEM_HOST, SYSTEM_PORT)
231                                      .usePlaintext().build();
232        SystemServiceStub client = SystemServiceGrpc.newStub(channel);
233        Properties properties = new Properties();
234        // tag::countDownLatch7[]
235        CountDownLatch countDown = new CountDownLatch(1);
236        // end::countDownLatch7[]
237
238        // tag::getBidirectionalProperties[]
239        StreamObserver<SystemPropertyName> stream = client.getBidirectionalProperties(
240                new StreamObserver<SystemProperty>() {
241
242                    // tag::onNext2[]
243                    @Override
244                    public void onNext(SystemProperty value) {
245                        logger.info("bidirectional streaming received: "
246                            + value.getPropertyName() + "=" + value.getPropertyValue());
247                        properties.put(value.getPropertyName(),
248                                       value.getPropertyValue());
249                    }
250                    // end::onNext2[]
251
252                    @Override
253                    public void onError(Throwable t) {
254                        t.printStackTrace();
255                    }
256
257                    @Override
258                    public void onCompleted() {
259                        logger.info("bidirectional streaming completed");
260                        // tag::countDownLatch8[]
261                        countDown.countDown();
262                        // end::countDownLatch8[]
263                    }
264                });
265        // end::getBidirectionalProperties[]
266
267        // collect the property names starting with java
268        // tag::collectJavaProperties[]
269        List<String> keys = System.getProperties().stringPropertyNames().stream()
270                                  .filter(k -> k.startsWith("java."))
271                                  .collect(Collectors.toList());
272        // end::collectJavaProperties[]
273
274        // post messages to the server
275        keys.stream()
276              // tag::clientMessage2[]
277              .map(k -> SystemPropertyName.newBuilder().setPropertyName(k).build())
278              // end::clientMessage2[]
279              // tag::streamOnNext2[]
280              .forEach(stream::onNext);
281              // end::streamOnNext2[]
282        // tag::clientCompleted2[]
283        stream.onCompleted();
284        // end::clientCompleted2[]
285
286        // wait until completed
287        // tag::countDownLatch9[]
288        try {
289            countDown.await(30, TimeUnit.SECONDS);
290        } catch (InterruptedException e) {
291            e.printStackTrace();
292        }
293        // end::countDownLatch9[]
294
295        channel.shutdownNow();
296
297        return properties;
298    }
299    // end::bidirectionalStreaming[]
300}

The endpoint creates a channel to the system service and a client by using the SystemServiceGrpc.newStub() API. Then, it calls the getServerStreamingProperties() method with an implementation of the StreamObserver interface. The onNext() method receives messages streaming from the server service individually and stores them into the properties placeholder. After all properties are received, the system service shuts down the channel and returns the placeholder. Because the RPC call is asynchronous, a CountDownLatch instance synchronizes the streaming flow.

Point your browser to the http://localhost:9081/query/properties/os URL to test out the server streaming call. The os. properties from the system service are displayed. Observe the output from the consoles running the system and query services.

Implementing the client streaming call

In the client streaming call, the query client service provides the /query/properties/user endpoint, which streams the user properties to the system server service. The system service returns a map of user properties with their values.

Update the SystemService class to implement the client streaming RPC call.

Replace the SystemService class.
system/src/main/java/io/openliberty/guides/system/SystemService.java

SystemService.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2022, 2023 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License 2.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-2.0/
  8 *
  9 * SPDX-License-Identifier: EPL-2.0
 10 *******************************************************************************/
 11// end::copyright[]
 12package io.openliberty.guides.system;
 13
 14import java.util.HashMap;
 15import java.util.Map;
 16import java.util.logging.Logger;
 17
 18import io.grpc.stub.StreamObserver;
 19// tag::importGrpcClasses[]
 20import io.openliberty.guides.systemproto.SystemProperties;
 21import io.openliberty.guides.systemproto.SystemProperty;
 22import io.openliberty.guides.systemproto.SystemPropertyName;
 23import io.openliberty.guides.systemproto.SystemPropertyPrefix;
 24import io.openliberty.guides.systemproto.SystemPropertyValue;
 25import io.openliberty.guides.systemproto.SystemServiceGrpc;
 26// end::importGrpcClasses[]
 27
 28// tag::extends[]
 29public class SystemService extends SystemServiceGrpc.SystemServiceImplBase {
 30// end::extends[]
 31
 32    private static Logger logger = Logger.getLogger(SystemService.class.getName());
 33
 34    public SystemService() {
 35    }
 36
 37    // tag::getProperty[]
 38    @Override
 39    public void getProperty(
 40        SystemPropertyName request, StreamObserver<SystemPropertyValue> observer) {
 41
 42        // tag::pName[]
 43        String pName = request.getPropertyName();
 44        // end::pName[]
 45        // tag::pValue[]
 46        String pValue = System.getProperty(pName);
 47        // end::pValue[]
 48        // tag::response[]
 49        SystemPropertyValue value = SystemPropertyValue
 50                                        .newBuilder()
 51                                        .setPropertyValue(pValue)
 52                                        .build();
 53        // end::response[]
 54
 55        // tag::next[]
 56        observer.onNext(value);
 57        // end::next[]
 58        // tag::complete[]
 59        observer.onCompleted();
 60        // end::complete[]
 61
 62    }
 63    // end::getProperty[]
 64
 65    // tag::getServerStreamingProperties[]
 66    @Override
 67    public void getServerStreamingProperties(
 68        SystemPropertyPrefix request, StreamObserver<SystemProperty> observer) {
 69
 70        // tag::prefix[]
 71        String prefix = request.getPropertyPrefix();
 72        // end::prefix[]
 73        System.getProperties()
 74              .stringPropertyNames()
 75              .stream()
 76              // tag::filter[]
 77              .filter(name -> name.startsWith(prefix))
 78              // end::filter[]
 79              .forEach(name -> {
 80                  String pValue = System.getProperty(name);
 81                  // tag::serverMessage[]
 82                  SystemProperty value = SystemProperty
 83                      .newBuilder()
 84                      .setPropertyName(name)
 85                      .setPropertyValue(pValue)
 86                      .build();
 87                  // end::serverMessage[]
 88                  // tag::serverNext1[]
 89                  observer.onNext(value);
 90                  // end::serverNext1[]
 91                  logger.info("server streaming sent property: " + name);
 92               });
 93        // tag::serverComplete[]
 94        observer.onCompleted();
 95        // end::serverComplete[]
 96        logger.info("server streaming was completed!");
 97    }
 98    // end::getServerStreamingProperties[]
 99
100    // tag::getClientStreamingProperties[]
101    @Override
102    public StreamObserver<SystemPropertyName> getClientStreamingProperties(
103        StreamObserver<SystemProperties> observer) {
104
105        // tag::streamObserverClient[]
106        return new StreamObserver<SystemPropertyName>() {
107
108            // tag::clientStreamingMap[]
109            private Map<String, String> properties = new HashMap<String, String>();
110            // end::clientStreamingMap[]
111
112            // tag::receivingProperties[]
113            @Override
114            public void onNext(SystemPropertyName spn) {
115                String pName = spn.getPropertyName();
116                String pValue = System.getProperty(pName);
117                logger.info("client streaming received property: " + pName);
118                properties.put(pName, pValue);
119            }
120            // end::receivingProperties[]
121
122            @Override
123            public void onError(Throwable t) {
124                t.printStackTrace();
125            }
126
127            // tag::clientStreamingCompleted[]
128            @Override
129            public void onCompleted() {
130                SystemProperties value = SystemProperties.newBuilder()
131                                             .putAllProperties(properties)
132                                             .build();
133                observer.onNext(value);
134                observer.onCompleted();
135                logger.info("client streaming was completed!");
136            }
137            // end::clientStreamingCompleted[]
138        };
139        // end::streamObserverClient[]
140    }
141    // end::getClientStreamingProperties[]
142
143    // tag::getBidirectionalProperties[]
144    @Override
145    public StreamObserver<SystemPropertyName> getBidirectionalProperties(
146        StreamObserver<SystemProperty> observer) {
147
148        // tag::streamObserverBidirectional[]
149        return new StreamObserver<SystemPropertyName>() {
150            // tag::receiveBidirectionalProperties[]
151            @Override
152            public void onNext(SystemPropertyName spn) {
153                String pName = spn.getPropertyName();
154                String pValue = System.getProperty(pName);
155                logger.info("bi-directional streaming received: " + pName);
156                // tag::systemPropertyMessage[]
157                SystemProperty value = SystemProperty.newBuilder()
158                                           .setPropertyName(pName)
159                                           .setPropertyValue(pValue)
160                                           .build();
161                // end::systemPropertyMessage[]
162                // tag::serverNext2[]
163                observer.onNext(value);
164                // end::serverNext2[]
165            }
166            // end::receiveBidirectionalProperties[]
167
168            @Override
169            public void onError(Throwable t) {
170                t.printStackTrace();
171            }
172
173            // tag::bidirectionalCompleted[]
174            @Override
175            public void onCompleted() {
176                observer.onCompleted();
177                logger.info("bi-directional streaming was completed!");
178            }
179            // end::bidirectionalCompleted[]
180        };
181        // end::streamObserverBidirectional[]
182    }
183    // end::getBidirectionalProperties[]
184}

The getClientStreamingProperties() method implements client streaming RPC call. This method returns an instance of the StreamObserver interface. Its onNext() method receives the messages from the client individually and stores the property values into the properties map placeholder. When the streaming is completed, the properties placeholder is sent back to the client by the onCompleted() method.

Update the PropertiesResource class to implement the /query/properties/user endpoint of the query service.

Replace the PropertiesResource class.
query/src/main/java/io/openliberty/guides/query/PropertiesResource.java

PropertiesResource.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2022, 2023 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License 2.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-2.0/
  8 *
  9 * SPDX-License-Identifier: EPL-2.0
 10 *******************************************************************************/
 11// end::copyright[]
 12package io.openliberty.guides.query;
 13
 14import java.util.List;
 15import java.util.Properties;
 16import java.util.concurrent.CountDownLatch;
 17import java.util.concurrent.TimeUnit;
 18import java.util.stream.Collectors;
 19import java.util.logging.Logger;
 20
 21import jakarta.enterprise.context.ApplicationScoped;
 22import jakarta.inject.Inject;
 23import jakarta.ws.rs.GET;
 24import jakarta.ws.rs.Path;
 25import jakarta.ws.rs.PathParam;
 26import jakarta.ws.rs.Produces;
 27import jakarta.ws.rs.core.MediaType;
 28
 29import org.eclipse.microprofile.config.inject.ConfigProperty;
 30
 31import io.grpc.ManagedChannel;
 32import io.grpc.ManagedChannelBuilder;
 33import io.grpc.stub.StreamObserver;
 34// tag::grpcImports[]
 35import io.openliberty.guides.systemproto.SystemProperties;
 36import io.openliberty.guides.systemproto.SystemProperty;
 37import io.openliberty.guides.systemproto.SystemPropertyName;
 38import io.openliberty.guides.systemproto.SystemPropertyPrefix;
 39import io.openliberty.guides.systemproto.SystemPropertyValue;
 40import io.openliberty.guides.systemproto.SystemServiceGrpc;
 41import io.openliberty.guides.systemproto.SystemServiceGrpc.SystemServiceBlockingStub;
 42import io.openliberty.guides.systemproto.SystemServiceGrpc.SystemServiceStub;
 43// end::grpcImports[]
 44
 45@ApplicationScoped
 46@Path("/properties")
 47public class PropertiesResource {
 48
 49    private static Logger logger = Logger.getLogger(PropertiesResource.class.getName());
 50
 51    @Inject
 52    @ConfigProperty(name = "system.hostname", defaultValue = "localhost")
 53    String SYSTEM_HOST;
 54
 55    @Inject
 56    @ConfigProperty(name = "system.port", defaultValue = "9080")
 57    int SYSTEM_PORT;
 58
 59    // tag::unary[]
 60    @GET
 61    @Path("/{property}")
 62    @Produces(MediaType.TEXT_PLAIN)
 63    public String getPropertiesString(@PathParam("property") String property) {
 64
 65        // tag::createChannel1[]
 66        ManagedChannel channel = ManagedChannelBuilder
 67                                     .forAddress(SYSTEM_HOST, SYSTEM_PORT)
 68                                     .usePlaintext().build();
 69        // end::createChannel1[]
 70        // tag::createClient1[]
 71        SystemServiceBlockingStub client = SystemServiceGrpc.newBlockingStub(channel);
 72        // end::createClient1[]
 73        SystemPropertyName request = SystemPropertyName.newBuilder()
 74                                             .setPropertyName(property).build();
 75        SystemPropertyValue response = client.getProperty(request);
 76        channel.shutdownNow();
 77        return response.getPropertyValue();
 78    }
 79    // end::unary[]
 80
 81    // tag::serverStreaming[]
 82    @GET
 83    @Path("/os")
 84    @Produces(MediaType.APPLICATION_JSON)
 85    public Properties getOSProperties() {
 86
 87        // tag::createChannel2[]
 88        ManagedChannel channel = ManagedChannelBuilder
 89                                     .forAddress(SYSTEM_HOST, SYSTEM_PORT)
 90                                     .usePlaintext().build();
 91        // end::createChannel2[]
 92        // tag::createClient2[]
 93        SystemServiceStub client = SystemServiceGrpc.newStub(channel);
 94        // end::createClient2[]
 95
 96        Properties properties = new Properties();
 97        // tag::countDownLatch1[]
 98        CountDownLatch countDown = new CountDownLatch(1);
 99        // end::countDownLatch1[]
100        SystemPropertyPrefix request = SystemPropertyPrefix.newBuilder()
101                                         .setPropertyPrefix("os.").build();
102        // tag::getServerStreamingProperties[]
103        client.getServerStreamingProperties(
104            request, new StreamObserver<SystemProperty>() {
105
106            // tag::onNext1[]
107            @Override
108            public void onNext(SystemProperty value) {
109                logger.info("server streaming received: "
110                   + value.getPropertyName() + "=" + value.getPropertyValue());
111                properties.put(value.getPropertyName(), value.getPropertyValue());
112            }
113            // end::onNext1[]
114
115            @Override
116            public void onError(Throwable t) {
117                t.printStackTrace();
118            }
119
120            @Override
121            public void onCompleted() {
122                logger.info("server streaming completed");
123                // tag::countDownLatch2[]
124                countDown.countDown();
125                // end::countDownLatch2[]
126            }
127        });
128        // end::getServerStreamingProperties[]
129
130
131        // wait until completed
132        // tag::countDownLatch3[]
133        try {
134            countDown.await(30, TimeUnit.SECONDS);
135        } catch (InterruptedException e) {
136            e.printStackTrace();
137        }
138        // end::countDownLatch3[]
139
140        // tag::closeConnection[]
141        channel.shutdownNow();
142        // end::closeConnection[]
143
144        return properties;
145    }
146    // end::serverStreaming[]
147
148    // tag::clientStreaming[]
149    @GET
150    @Path("/user")
151    @Produces(MediaType.APPLICATION_JSON)
152    public Properties getUserProperties() {
153
154        ManagedChannel channel = ManagedChannelBuilder
155                                     .forAddress(SYSTEM_HOST, SYSTEM_PORT)
156                                     .usePlaintext().build();
157        SystemServiceStub client = SystemServiceGrpc.newStub(channel);
158        // tag::countDownLatch4[]
159        CountDownLatch countDown = new CountDownLatch(1);
160        // end::countDownLatch4[]
161        Properties properties = new Properties();
162
163        // tag::getClientStreamingProperties[]
164        StreamObserver<SystemPropertyName> stream = client.getClientStreamingProperties(
165            new StreamObserver<SystemProperties>() {
166
167                @Override
168                public void onNext(SystemProperties value) {
169                    logger.info("client streaming received a map that has "
170                        + value.getPropertiesCount() + " properties");
171                    properties.putAll(value.getPropertiesMap());
172                }
173
174                @Override
175                public void onError(Throwable t) {
176                    t.printStackTrace();
177                }
178
179                @Override
180                public void onCompleted() {
181                    logger.info("client streaming completed");
182                    // tag::countDownLatch5[]
183                    countDown.countDown();
184                    // end::countDownLatch5[]
185                }
186            });
187        // end::getClientStreamingProperties[]
188
189        // collect the property names starting with user.
190        // tag::collectUserProperties[]
191        List<String> keys = System.getProperties().stringPropertyNames().stream()
192                                  .filter(k -> k.startsWith("user."))
193                                  .collect(Collectors.toList());
194        // end::collectUserProperties[]
195
196        // send messages to the server
197        keys.stream()
198            // tag::clientMessage1[]
199            .map(k -> SystemPropertyName.newBuilder().setPropertyName(k).build())
200            // end::clientMessage1[]
201            // tag::streamOnNext1[]
202            .forEach(stream::onNext);
203            // end::streamOnNext1[]
204        // tag::clientCompleted1[]
205        stream.onCompleted();
206        // end::clientCompleted1[]
207
208        // wait until completed
209        // tag::countDownLatch6[]
210        try {
211            countDown.await(30, TimeUnit.SECONDS);
212        } catch (InterruptedException e) {
213            e.printStackTrace();
214        }
215        // end::countDownLatch6[]
216
217        channel.shutdownNow();
218
219        return properties;
220    }
221    // end::clientStreaming[]
222
223    // tag::bidirectionalStreaming[]
224    @GET
225    @Path("/java")
226    @Produces(MediaType.APPLICATION_JSON)
227    public Properties getJavaProperties() {
228
229        ManagedChannel channel = ManagedChannelBuilder
230                                      .forAddress(SYSTEM_HOST, SYSTEM_PORT)
231                                      .usePlaintext().build();
232        SystemServiceStub client = SystemServiceGrpc.newStub(channel);
233        Properties properties = new Properties();
234        // tag::countDownLatch7[]
235        CountDownLatch countDown = new CountDownLatch(1);
236        // end::countDownLatch7[]
237
238        // tag::getBidirectionalProperties[]
239        StreamObserver<SystemPropertyName> stream = client.getBidirectionalProperties(
240                new StreamObserver<SystemProperty>() {
241
242                    // tag::onNext2[]
243                    @Override
244                    public void onNext(SystemProperty value) {
245                        logger.info("bidirectional streaming received: "
246                            + value.getPropertyName() + "=" + value.getPropertyValue());
247                        properties.put(value.getPropertyName(),
248                                       value.getPropertyValue());
249                    }
250                    // end::onNext2[]
251
252                    @Override
253                    public void onError(Throwable t) {
254                        t.printStackTrace();
255                    }
256
257                    @Override
258                    public void onCompleted() {
259                        logger.info("bidirectional streaming completed");
260                        // tag::countDownLatch8[]
261                        countDown.countDown();
262                        // end::countDownLatch8[]
263                    }
264                });
265        // end::getBidirectionalProperties[]
266
267        // collect the property names starting with java
268        // tag::collectJavaProperties[]
269        List<String> keys = System.getProperties().stringPropertyNames().stream()
270                                  .filter(k -> k.startsWith("java."))
271                                  .collect(Collectors.toList());
272        // end::collectJavaProperties[]
273
274        // post messages to the server
275        keys.stream()
276              // tag::clientMessage2[]
277              .map(k -> SystemPropertyName.newBuilder().setPropertyName(k).build())
278              // end::clientMessage2[]
279              // tag::streamOnNext2[]
280              .forEach(stream::onNext);
281              // end::streamOnNext2[]
282        // tag::clientCompleted2[]
283        stream.onCompleted();
284        // end::clientCompleted2[]
285
286        // wait until completed
287        // tag::countDownLatch9[]
288        try {
289            countDown.await(30, TimeUnit.SECONDS);
290        } catch (InterruptedException e) {
291            e.printStackTrace();
292        }
293        // end::countDownLatch9[]
294
295        channel.shutdownNow();
296
297        return properties;
298    }
299    // end::bidirectionalStreaming[]
300}

After a connection is created between the two services, the client.getClientStreamingProperties() method is called to get a stream and collect the properties with property names that are prefixed by user.. The method creates a SystemPropertyName message individually and sends the message to the server by the stream::onNext action. When all property names are sent, the onCompleted() method is called to finish the streaming. Again, a CountDownLatch instance synchronizes the streaming flow.

Point your browser to the http://localhost:9081/query/properties/user URL to test the client streaming call. The user. properties from the system service are displayed. Observe the output from the consoles running the system and query services.

Implementing the bidirectional streaming call

In the bidirectional streaming call, the query client service provides the /query/properties/java endpoint, which streams the property names that start with java. to the system server service. The system service streams the property values back to the query service.

Update the SystemService class to implement the bidirectional streaming RPC call.

Replace the SystemService class.
system/src/main/java/io/openliberty/guides/system/SystemService.java

SystemService.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2022, 2023 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License 2.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-2.0/
  8 *
  9 * SPDX-License-Identifier: EPL-2.0
 10 *******************************************************************************/
 11// end::copyright[]
 12package io.openliberty.guides.system;
 13
 14import java.util.HashMap;
 15import java.util.Map;
 16import java.util.logging.Logger;
 17
 18import io.grpc.stub.StreamObserver;
 19// tag::importGrpcClasses[]
 20import io.openliberty.guides.systemproto.SystemProperties;
 21import io.openliberty.guides.systemproto.SystemProperty;
 22import io.openliberty.guides.systemproto.SystemPropertyName;
 23import io.openliberty.guides.systemproto.SystemPropertyPrefix;
 24import io.openliberty.guides.systemproto.SystemPropertyValue;
 25import io.openliberty.guides.systemproto.SystemServiceGrpc;
 26// end::importGrpcClasses[]
 27
 28// tag::extends[]
 29public class SystemService extends SystemServiceGrpc.SystemServiceImplBase {
 30// end::extends[]
 31
 32    private static Logger logger = Logger.getLogger(SystemService.class.getName());
 33
 34    public SystemService() {
 35    }
 36
 37    // tag::getProperty[]
 38    @Override
 39    public void getProperty(
 40        SystemPropertyName request, StreamObserver<SystemPropertyValue> observer) {
 41
 42        // tag::pName[]
 43        String pName = request.getPropertyName();
 44        // end::pName[]
 45        // tag::pValue[]
 46        String pValue = System.getProperty(pName);
 47        // end::pValue[]
 48        // tag::response[]
 49        SystemPropertyValue value = SystemPropertyValue
 50                                        .newBuilder()
 51                                        .setPropertyValue(pValue)
 52                                        .build();
 53        // end::response[]
 54
 55        // tag::next[]
 56        observer.onNext(value);
 57        // end::next[]
 58        // tag::complete[]
 59        observer.onCompleted();
 60        // end::complete[]
 61
 62    }
 63    // end::getProperty[]
 64
 65    // tag::getServerStreamingProperties[]
 66    @Override
 67    public void getServerStreamingProperties(
 68        SystemPropertyPrefix request, StreamObserver<SystemProperty> observer) {
 69
 70        // tag::prefix[]
 71        String prefix = request.getPropertyPrefix();
 72        // end::prefix[]
 73        System.getProperties()
 74              .stringPropertyNames()
 75              .stream()
 76              // tag::filter[]
 77              .filter(name -> name.startsWith(prefix))
 78              // end::filter[]
 79              .forEach(name -> {
 80                  String pValue = System.getProperty(name);
 81                  // tag::serverMessage[]
 82                  SystemProperty value = SystemProperty
 83                      .newBuilder()
 84                      .setPropertyName(name)
 85                      .setPropertyValue(pValue)
 86                      .build();
 87                  // end::serverMessage[]
 88                  // tag::serverNext1[]
 89                  observer.onNext(value);
 90                  // end::serverNext1[]
 91                  logger.info("server streaming sent property: " + name);
 92               });
 93        // tag::serverComplete[]
 94        observer.onCompleted();
 95        // end::serverComplete[]
 96        logger.info("server streaming was completed!");
 97    }
 98    // end::getServerStreamingProperties[]
 99
100    // tag::getClientStreamingProperties[]
101    @Override
102    public StreamObserver<SystemPropertyName> getClientStreamingProperties(
103        StreamObserver<SystemProperties> observer) {
104
105        // tag::streamObserverClient[]
106        return new StreamObserver<SystemPropertyName>() {
107
108            // tag::clientStreamingMap[]
109            private Map<String, String> properties = new HashMap<String, String>();
110            // end::clientStreamingMap[]
111
112            // tag::receivingProperties[]
113            @Override
114            public void onNext(SystemPropertyName spn) {
115                String pName = spn.getPropertyName();
116                String pValue = System.getProperty(pName);
117                logger.info("client streaming received property: " + pName);
118                properties.put(pName, pValue);
119            }
120            // end::receivingProperties[]
121
122            @Override
123            public void onError(Throwable t) {
124                t.printStackTrace();
125            }
126
127            // tag::clientStreamingCompleted[]
128            @Override
129            public void onCompleted() {
130                SystemProperties value = SystemProperties.newBuilder()
131                                             .putAllProperties(properties)
132                                             .build();
133                observer.onNext(value);
134                observer.onCompleted();
135                logger.info("client streaming was completed!");
136            }
137            // end::clientStreamingCompleted[]
138        };
139        // end::streamObserverClient[]
140    }
141    // end::getClientStreamingProperties[]
142
143    // tag::getBidirectionalProperties[]
144    @Override
145    public StreamObserver<SystemPropertyName> getBidirectionalProperties(
146        StreamObserver<SystemProperty> observer) {
147
148        // tag::streamObserverBidirectional[]
149        return new StreamObserver<SystemPropertyName>() {
150            // tag::receiveBidirectionalProperties[]
151            @Override
152            public void onNext(SystemPropertyName spn) {
153                String pName = spn.getPropertyName();
154                String pValue = System.getProperty(pName);
155                logger.info("bi-directional streaming received: " + pName);
156                // tag::systemPropertyMessage[]
157                SystemProperty value = SystemProperty.newBuilder()
158                                           .setPropertyName(pName)
159                                           .setPropertyValue(pValue)
160                                           .build();
161                // end::systemPropertyMessage[]
162                // tag::serverNext2[]
163                observer.onNext(value);
164                // end::serverNext2[]
165            }
166            // end::receiveBidirectionalProperties[]
167
168            @Override
169            public void onError(Throwable t) {
170                t.printStackTrace();
171            }
172
173            // tag::bidirectionalCompleted[]
174            @Override
175            public void onCompleted() {
176                observer.onCompleted();
177                logger.info("bi-directional streaming was completed!");
178            }
179            // end::bidirectionalCompleted[]
180        };
181        // end::streamObserverBidirectional[]
182    }
183    // end::getBidirectionalProperties[]
184}

The getBidirectionalProperties() method implements bidirectional streaming RPC call. This method returns an instance of the StreamObserver interface. Its onNext() method receives the messages from the client individually, creates a SystemProperty message with the property name and value, and sends the message back to the client. When the client streaming is completed, the method closes the server streaming by calling the onCompleted() method.

Update the PropertiesResource class to implement of /query/properties/java endpoint of the query service.

Replace the PropertiesResource class.
query/src/main/java/io/openliberty/guides/query/PropertiesResource.java

PropertiesResource.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2022, 2023 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License 2.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-2.0/
  8 *
  9 * SPDX-License-Identifier: EPL-2.0
 10 *******************************************************************************/
 11// end::copyright[]
 12package io.openliberty.guides.query;
 13
 14import java.util.List;
 15import java.util.Properties;
 16import java.util.concurrent.CountDownLatch;
 17import java.util.concurrent.TimeUnit;
 18import java.util.stream.Collectors;
 19import java.util.logging.Logger;
 20
 21import jakarta.enterprise.context.ApplicationScoped;
 22import jakarta.inject.Inject;
 23import jakarta.ws.rs.GET;
 24import jakarta.ws.rs.Path;
 25import jakarta.ws.rs.PathParam;
 26import jakarta.ws.rs.Produces;
 27import jakarta.ws.rs.core.MediaType;
 28
 29import org.eclipse.microprofile.config.inject.ConfigProperty;
 30
 31import io.grpc.ManagedChannel;
 32import io.grpc.ManagedChannelBuilder;
 33import io.grpc.stub.StreamObserver;
 34// tag::grpcImports[]
 35import io.openliberty.guides.systemproto.SystemProperties;
 36import io.openliberty.guides.systemproto.SystemProperty;
 37import io.openliberty.guides.systemproto.SystemPropertyName;
 38import io.openliberty.guides.systemproto.SystemPropertyPrefix;
 39import io.openliberty.guides.systemproto.SystemPropertyValue;
 40import io.openliberty.guides.systemproto.SystemServiceGrpc;
 41import io.openliberty.guides.systemproto.SystemServiceGrpc.SystemServiceBlockingStub;
 42import io.openliberty.guides.systemproto.SystemServiceGrpc.SystemServiceStub;
 43// end::grpcImports[]
 44
 45@ApplicationScoped
 46@Path("/properties")
 47public class PropertiesResource {
 48
 49    private static Logger logger = Logger.getLogger(PropertiesResource.class.getName());
 50
 51    @Inject
 52    @ConfigProperty(name = "system.hostname", defaultValue = "localhost")
 53    String SYSTEM_HOST;
 54
 55    @Inject
 56    @ConfigProperty(name = "system.port", defaultValue = "9080")
 57    int SYSTEM_PORT;
 58
 59    // tag::unary[]
 60    @GET
 61    @Path("/{property}")
 62    @Produces(MediaType.TEXT_PLAIN)
 63    public String getPropertiesString(@PathParam("property") String property) {
 64
 65        // tag::createChannel1[]
 66        ManagedChannel channel = ManagedChannelBuilder
 67                                     .forAddress(SYSTEM_HOST, SYSTEM_PORT)
 68                                     .usePlaintext().build();
 69        // end::createChannel1[]
 70        // tag::createClient1[]
 71        SystemServiceBlockingStub client = SystemServiceGrpc.newBlockingStub(channel);
 72        // end::createClient1[]
 73        SystemPropertyName request = SystemPropertyName.newBuilder()
 74                                             .setPropertyName(property).build();
 75        SystemPropertyValue response = client.getProperty(request);
 76        channel.shutdownNow();
 77        return response.getPropertyValue();
 78    }
 79    // end::unary[]
 80
 81    // tag::serverStreaming[]
 82    @GET
 83    @Path("/os")
 84    @Produces(MediaType.APPLICATION_JSON)
 85    public Properties getOSProperties() {
 86
 87        // tag::createChannel2[]
 88        ManagedChannel channel = ManagedChannelBuilder
 89                                     .forAddress(SYSTEM_HOST, SYSTEM_PORT)
 90                                     .usePlaintext().build();
 91        // end::createChannel2[]
 92        // tag::createClient2[]
 93        SystemServiceStub client = SystemServiceGrpc.newStub(channel);
 94        // end::createClient2[]
 95
 96        Properties properties = new Properties();
 97        // tag::countDownLatch1[]
 98        CountDownLatch countDown = new CountDownLatch(1);
 99        // end::countDownLatch1[]
100        SystemPropertyPrefix request = SystemPropertyPrefix.newBuilder()
101                                         .setPropertyPrefix("os.").build();
102        // tag::getServerStreamingProperties[]
103        client.getServerStreamingProperties(
104            request, new StreamObserver<SystemProperty>() {
105
106            // tag::onNext1[]
107            @Override
108            public void onNext(SystemProperty value) {
109                logger.info("server streaming received: "
110                   + value.getPropertyName() + "=" + value.getPropertyValue());
111                properties.put(value.getPropertyName(), value.getPropertyValue());
112            }
113            // end::onNext1[]
114
115            @Override
116            public void onError(Throwable t) {
117                t.printStackTrace();
118            }
119
120            @Override
121            public void onCompleted() {
122                logger.info("server streaming completed");
123                // tag::countDownLatch2[]
124                countDown.countDown();
125                // end::countDownLatch2[]
126            }
127        });
128        // end::getServerStreamingProperties[]
129
130
131        // wait until completed
132        // tag::countDownLatch3[]
133        try {
134            countDown.await(30, TimeUnit.SECONDS);
135        } catch (InterruptedException e) {
136            e.printStackTrace();
137        }
138        // end::countDownLatch3[]
139
140        // tag::closeConnection[]
141        channel.shutdownNow();
142        // end::closeConnection[]
143
144        return properties;
145    }
146    // end::serverStreaming[]
147
148    // tag::clientStreaming[]
149    @GET
150    @Path("/user")
151    @Produces(MediaType.APPLICATION_JSON)
152    public Properties getUserProperties() {
153
154        ManagedChannel channel = ManagedChannelBuilder
155                                     .forAddress(SYSTEM_HOST, SYSTEM_PORT)
156                                     .usePlaintext().build();
157        SystemServiceStub client = SystemServiceGrpc.newStub(channel);
158        // tag::countDownLatch4[]
159        CountDownLatch countDown = new CountDownLatch(1);
160        // end::countDownLatch4[]
161        Properties properties = new Properties();
162
163        // tag::getClientStreamingProperties[]
164        StreamObserver<SystemPropertyName> stream = client.getClientStreamingProperties(
165            new StreamObserver<SystemProperties>() {
166
167                @Override
168                public void onNext(SystemProperties value) {
169                    logger.info("client streaming received a map that has "
170                        + value.getPropertiesCount() + " properties");
171                    properties.putAll(value.getPropertiesMap());
172                }
173
174                @Override
175                public void onError(Throwable t) {
176                    t.printStackTrace();
177                }
178
179                @Override
180                public void onCompleted() {
181                    logger.info("client streaming completed");
182                    // tag::countDownLatch5[]
183                    countDown.countDown();
184                    // end::countDownLatch5[]
185                }
186            });
187        // end::getClientStreamingProperties[]
188
189        // collect the property names starting with user.
190        // tag::collectUserProperties[]
191        List<String> keys = System.getProperties().stringPropertyNames().stream()
192                                  .filter(k -> k.startsWith("user."))
193                                  .collect(Collectors.toList());
194        // end::collectUserProperties[]
195
196        // send messages to the server
197        keys.stream()
198            // tag::clientMessage1[]
199            .map(k -> SystemPropertyName.newBuilder().setPropertyName(k).build())
200            // end::clientMessage1[]
201            // tag::streamOnNext1[]
202            .forEach(stream::onNext);
203            // end::streamOnNext1[]
204        // tag::clientCompleted1[]
205        stream.onCompleted();
206        // end::clientCompleted1[]
207
208        // wait until completed
209        // tag::countDownLatch6[]
210        try {
211            countDown.await(30, TimeUnit.SECONDS);
212        } catch (InterruptedException e) {
213            e.printStackTrace();
214        }
215        // end::countDownLatch6[]
216
217        channel.shutdownNow();
218
219        return properties;
220    }
221    // end::clientStreaming[]
222
223    // tag::bidirectionalStreaming[]
224    @GET
225    @Path("/java")
226    @Produces(MediaType.APPLICATION_JSON)
227    public Properties getJavaProperties() {
228
229        ManagedChannel channel = ManagedChannelBuilder
230                                      .forAddress(SYSTEM_HOST, SYSTEM_PORT)
231                                      .usePlaintext().build();
232        SystemServiceStub client = SystemServiceGrpc.newStub(channel);
233        Properties properties = new Properties();
234        // tag::countDownLatch7[]
235        CountDownLatch countDown = new CountDownLatch(1);
236        // end::countDownLatch7[]
237
238        // tag::getBidirectionalProperties[]
239        StreamObserver<SystemPropertyName> stream = client.getBidirectionalProperties(
240                new StreamObserver<SystemProperty>() {
241
242                    // tag::onNext2[]
243                    @Override
244                    public void onNext(SystemProperty value) {
245                        logger.info("bidirectional streaming received: "
246                            + value.getPropertyName() + "=" + value.getPropertyValue());
247                        properties.put(value.getPropertyName(),
248                                       value.getPropertyValue());
249                    }
250                    // end::onNext2[]
251
252                    @Override
253                    public void onError(Throwable t) {
254                        t.printStackTrace();
255                    }
256
257                    @Override
258                    public void onCompleted() {
259                        logger.info("bidirectional streaming completed");
260                        // tag::countDownLatch8[]
261                        countDown.countDown();
262                        // end::countDownLatch8[]
263                    }
264                });
265        // end::getBidirectionalProperties[]
266
267        // collect the property names starting with java
268        // tag::collectJavaProperties[]
269        List<String> keys = System.getProperties().stringPropertyNames().stream()
270                                  .filter(k -> k.startsWith("java."))
271                                  .collect(Collectors.toList());
272        // end::collectJavaProperties[]
273
274        // post messages to the server
275        keys.stream()
276              // tag::clientMessage2[]
277              .map(k -> SystemPropertyName.newBuilder().setPropertyName(k).build())
278              // end::clientMessage2[]
279              // tag::streamOnNext2[]
280              .forEach(stream::onNext);
281              // end::streamOnNext2[]
282        // tag::clientCompleted2[]
283        stream.onCompleted();
284        // end::clientCompleted2[]
285
286        // wait until completed
287        // tag::countDownLatch9[]
288        try {
289            countDown.await(30, TimeUnit.SECONDS);
290        } catch (InterruptedException e) {
291            e.printStackTrace();
292        }
293        // end::countDownLatch9[]
294
295        channel.shutdownNow();
296
297        return properties;
298    }
299    // end::bidirectionalStreaming[]
300}

After a connection is created between the two services, the client.getBidirectionalProperties() method is called with an implementation of the StreamObserver interface. The onNext() method receives messages that are streaming from the server individually and stores them into the properties placeholder. Then, collect the properties . For each property name that starts with java., a SystemPropertyName message is created and sent to the server by the stream::onNext action. When all property names are sent, the streaming is ended by calling the onCompleted() method. Again, a CountDownLatch instance synchronizes the streaming flow.

Point your browser to the http://localhost:9081/query/properties/java URL to test the bidirectional streaming call. The java. properties from the system service are displayed. Observe the output from the consoles running the system and query services.

Testing the application

Create the QueryIT class.
query/src/test/java/it/io/openliberty/guides/query/QueryIT.java

QueryIT.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2022, 2023 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License 2.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-2.0/
  8 *
  9 * SPDX-License-Identifier: EPL-2.0
 10 *******************************************************************************/
 11// end::copyright[]
 12package it.io.openliberty.guides.query;
 13
 14import static org.junit.jupiter.api.Assertions.assertEquals;
 15import static org.junit.jupiter.api.Assertions.assertFalse;
 16
 17import java.net.MalformedURLException;
 18
 19import jakarta.json.JsonObject;
 20import jakarta.ws.rs.client.Client;
 21import jakarta.ws.rs.client.ClientBuilder;
 22import jakarta.ws.rs.client.WebTarget;
 23import jakarta.ws.rs.core.Response;
 24
 25import org.junit.jupiter.api.AfterAll;
 26import org.junit.jupiter.api.BeforeAll;
 27import org.junit.jupiter.api.Test;
 28
 29public class QueryIT {
 30
 31    private static final String PORT = System.getProperty("http.port", "9081");
 32    private static final String URL = "http://localhost:" + PORT + "/";
 33    private static Client client;
 34
 35    @BeforeAll
 36    private static void setup() {
 37        client = ClientBuilder.newClient();
 38    }
 39
 40    @AfterAll
 41    private static void teardown() {
 42        client.close();
 43    }
 44
 45    @Test
 46    // tag::getPropertiesString[]
 47    public void testGetPropertiesString() throws MalformedURLException {
 48        WebTarget target = client.target(URL + "query/properties/os.name");
 49        Response response = target.request().get();
 50        assertEquals(200, response.getStatus(),
 51                     "Incorrect response code from " + target.getUri().getPath());
 52        assertFalse(response.readEntity(String.class).isEmpty(),
 53                    "response should not be empty.");
 54        response.close();
 55    }
 56    // end::getPropertiesString[]
 57
 58    @Test
 59    // tag::getOSProperties[]
 60    public void testGetOSProperties() throws MalformedURLException {
 61        WebTarget target = client.target(URL + "query/properties/os");
 62        Response response = target.request().get();
 63        assertEquals(200, response.getStatus(),
 64                     "Incorrect response code from " + target.getUri().getPath());
 65        JsonObject obj = response.readEntity(JsonObject.class);
 66        assertFalse(obj.getString("os.name").isEmpty(),
 67                    "os.name should not be empty.");
 68        response.close();
 69    }
 70    // end::getOSProperties[]
 71
 72    @Test
 73    // tag::getUserProperties[]
 74    public void testGetUserProperties() throws MalformedURLException {
 75        WebTarget target = client.target(URL + "query/properties/user");
 76        Response response = target.request().get();
 77        assertEquals(200, response.getStatus(),
 78                     "Incorrect response code from " + target.getUri().getPath());
 79        JsonObject obj = response.readEntity(JsonObject.class);
 80        assertFalse(obj.getString("user.name").isEmpty(),
 81                    "user.name should not be empty.");
 82        response.close();
 83    }
 84    // end::getUserProperties[]
 85
 86    @Test
 87    // tag::getJavaProperties[]
 88    public void testGetJavaProperties() throws MalformedURLException {
 89        WebTarget target = client.target(URL + "query/properties/java");
 90        Response response = target.request().get();
 91        assertEquals(200, response.getStatus(),
 92                     "Incorrect response code from " + target.getUri().getPath());
 93        JsonObject obj = response.readEntity(JsonObject.class);
 94        assertFalse(obj.getString("java.home").isEmpty(),
 95                    "java.home should not be empty.");
 96        response.close();
 97    }
 98    // end::getJavaProperties[]
 99}

Each test case tests one type of the gRPC calls that you implemented.

The testGetPropertiesString() tests the /query/properties/os.name endpoint and confirms that a response is received.

The testGetOSProperties() tests the /query/properties/os endpoint and confirms that a response is received.

The testGetUserProperties() tests the /query/properties/user endpoint and confirms that a response is received.

The testGetJavaProperties() tests the /query/properties/java endpoint and confirms that a response is received.

Running the tests

Because you started Open Liberty in dev mode, you can run the tests by pressing the enter/return key from the command-line session where you started the query service.

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running it.io.openliberty.guides.query.QueryIT
Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.247 s - in it.io.openliberty.guides.query.QueryIT

Results:

Tests run: 4, Failures: 0, Errors: 0, Skipped: 0

When you are done checking out the services, exit dev mode by pressing CTRL+C in the command-line sessions where you ran the system and query services, or by typing q and then pressing the enter/return key. Alternatively, you can run the liberty:stop goal from the start directory in another command-line session for the system and query services:

mvn -pl system liberty:stop
mvn -pl query liberty:stop

Great work! You’re done!

You just developed a Java application that implements four types of gRPC calls with Open Liberty. For more information, see Provide and consume gRPC services on Open Liberty in the Open Liberty docs.

Guide Attribution

Streaming messages between client and server services using gRPC by Open Liberty is licensed under CC BY-ND 4.0

Copied to clipboard
Copy code block
Copy file contents

Prerequisites:

Nice work! Where to next?

What did you think of this guide?

Extreme Dislike Dislike Like Extreme Like

What could make this guide better?

Raise an issue to share feedback

Create a pull request to contribute to this guide

Need help?

Ask a question on Stack Overflow

Like Open Liberty? Star our repo on GitHub.

Star