Streaming updates to a client using Server-Sent Events

duration 30 minutes

Prerequisites:

Learn how to stream updates from a MicroProfile Reactive Messaging service to a front-end client by using Server-Sent Events (SSE).

What you’ll learn

You will learn how to stream messages from a MicroProfile Reactive Messaging service to a front-end client by using Server-Sent Events (SSE).

MicroProfile Reactive Messaging provides an easy way for Java services to send requests to other Java services, and asynchronously receive and process the responses as a stream of events. SSE provides a framework to stream the data in these events to a browser client.

What is SSE?

Server-Sent Events is an API that allows clients to subscribe to a stream of events that is pushed from a server. First, the client makes a connection with the server over HTTP. The server continuously pushes events to the client as long as the connection persists. SSE differs from traditional HTTP requests, which use one request for one response. SSE also differs from Web Sockets in that SSE is unidirectional from the server to the client, and Web Sockets allow for bidirectional communication.

For example, an application that provides real-time stock quotes might use SSE to push price updates from the server to the browser as soon as the server receives them. Such an application wouldn’t need Web Sockets because the data travels in only one direction, and polling the server by using HTTP requests wouldn’t provide real-time updates.

The application that you will build in this guide consists of a frontend service, a bff (backend for frontend) service, and three instances of a system service. The system services periodically publish messages that contain their hostname and current system load. The bff service receives the messages from the system services and pushes the contents as SSE to a JavaScript client in the frontend service. This client uses the events to update a table in the UI that displays each system’s hostname and its periodically updating load. The following diagram depicts the application that is used in this guide:

SSE Diagram

In this guide, you will set up the bff service by creating an endpoint that clients can use to subscribe to events. You will also enable the service to read from the reactive messaging channel and push the contents to subscribers via SSE. After that, you will configure the Kafka connectors to allow the bff service to receive messages from the system services. Finally, you will configure the client in the frontend service to subscribe to these events, consume them, and display them in the UI.

To learn more about the reactive Java services that are used in this guide, check out the Creating reactive Java microservices guide.

Additional prerequisites

You will build and run the services in Docker containers. You can learn more about containerizing services with Docker in the Containerizing microservices guide.

Install Docker and start your Docker environment by following the instructions from Docker.

Getting started

The fastest way to work through this guide is to clone the Git repository and use the projects that are provided inside:

git clone https://github.com/openliberty/guide-reactive-messaging-sse.git
cd guide-reactive-messaging-sse

The start directory contains the starting project that you will build upon.

The finish directory contains the finished project that you will build.

Before you begin, make sure you have all the necessary prerequisites.

Setting up SSE in the bff service

BFFResource.java

  1// tag::copyright[]
  2/*******************************************************************************
  3 * Copyright (c) 2020 IBM Corporation and others.
  4 * All rights reserved. This program and the accompanying materials
  5 * are made available under the terms of the Eclipse Public License v1.0
  6 * which accompanies this distribution, and is available at
  7 * http://www.eclipse.org/legal/epl-v10.html
  8 *
  9 * Contributors:
 10 *     IBM Corporation - Initial implementation
 11 *******************************************************************************/
 12// end::copyright[]
 13package io.openliberty.guides.bff;
 14
 15import io.openliberty.guides.models.SystemLoad;
 16
 17import org.eclipse.microprofile.reactive.messaging.Incoming;
 18
 19import javax.enterprise.context.ApplicationScoped;
 20import javax.ws.rs.GET;
 21import javax.ws.rs.Path;
 22import javax.ws.rs.Produces;
 23import javax.ws.rs.core.Context;
 24import javax.ws.rs.core.MediaType;
 25import javax.ws.rs.sse.OutboundSseEvent;
 26import javax.ws.rs.sse.Sse;
 27import javax.ws.rs.sse.SseBroadcaster;
 28import javax.ws.rs.sse.SseEventSink;
 29import java.util.logging.Logger;
 30
 31@ApplicationScoped
 32@Path("/sse")
 33// tag::frontendResource[]
 34public class BFFResource {
 35
 36    private Logger logger = Logger.getLogger(BFFResource.class.getName());
 37
 38    // tag::sse[]
 39    private Sse sse;
 40    // end::sse[]
 41    // tag::broadcaster[]
 42    private SseBroadcaster broadcaster;
 43    // end::broadcaster[]
 44
 45    // tag::subscribeToSystems[]
 46    @GET
 47    @Path("/")
 48    // tag::sseMimeType[]
 49    @Produces(MediaType.SERVER_SENT_EVENTS)
 50    // end::sseMimeType[]
 51    public void subscribeToSystem(
 52        // tag::sseEventSinkParam[]
 53        @Context SseEventSink sink,
 54        // end::sseEventSinkParam[]
 55        // tag::sseParam[]
 56        @Context Sse sse
 57        // end::sseParam[]
 58        ) {
 59
 60        if (this.sse == null || this.broadcaster == null) { 
 61            this.sse = sse;
 62            // tag::newBroadcaster[]
 63            this.broadcaster = sse.newBroadcaster();
 64            // end::newBroadcaster[]
 65        }
 66        
 67        // tag::registerSink[]
 68        this.broadcaster.register(sink);
 69        // end::registerSink[]     
 70        logger.info("New sink registered to broadcaster.");
 71    }
 72    // end::subscribeToSystems
 73
 74    // tag::broadcastData[]
 75    private void broadcastData(String name, Object data) {
 76        // tag::notNull[]
 77        if (broadcaster != null) {
 78        // end::notNull[]
 79            // tag::createEvent[]
 80            // tag::newEventBuilder[]
 81            OutboundSseEvent event = sse.newEventBuilder()
 82            // end::newEventBuilder[]
 83                                        // tag::name[]
 84                                        .name(name)
 85                                        // end::name[]
 86                                        // tag::data[]
 87                                        .data(data.getClass(), data)
 88                                        // end::data[]
 89                                        // tag::mediaType[]
 90                                        .mediaType(MediaType.APPLICATION_JSON_TYPE)
 91                                        // end::mediaType[]
 92                                        // tag::build[]
 93                                        .build();
 94                                        // end::build[]
 95            // end::createEvent[]
 96            // tag::broadcastEvent[]        
 97            broadcaster.broadcast(event);
 98            // end::broadcastEvent[]
 99        } else {
100            logger.info("Unable to send SSE. Broadcaster context is not set up.");
101        }
102    }
103    // end::broadcastData[]
104
105    // tag::systemLoad[]
106    @Incoming("systemLoad")
107    // end::systemLoad[]
108    // tag::getSystemLoadMessage[]
109    public void getSystemLoadMessage(SystemLoad sl)  {
110        logger.info("Message received from system.load topic. " + sl.toString());
111        // tag::broadcastCall[]
112        broadcastData("systemLoad", sl);
113        // end::broadcastCall[]
114    }
115    // end::getSystemLoadMessage[]
116}
117// end::frontendResource[]

microprofile-config.properties

 1mp.messaging.connector.liberty-kafka.bootstrap.servers=localhost:9093
 2
 3# tag::systemLoadChannel[]
 4mp.messaging.incoming.systemLoad.connector=liberty-kafka
 5# tag::systemLoadTopic[]
 6mp.messaging.incoming.systemLoad.topic=system.load
 7# end::systemLoadTopic[]
 8# tag::keyDeserializer[]
 9mp.messaging.incoming.systemLoad.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
10# end::keyDeserializer[]
11# tag::valueDeserializer[]
12mp.messaging.incoming.systemLoad.value.deserializer=io.openliberty.guides.models.SystemLoad$SystemLoadDeserializer
13# end::valueDeserializer[]
14# tag::groupId[]
15mp.messaging.incoming.systemLoad.group.id=bff
16# end::groupId[]
17# end::systemLoadChannel[]

In this section, you will create a REST API for SSE in the bff service. When a client makes a request to this endpoint, the initial connection between the client and server is established and the client is subscribed to receive events that are pushed from the server. Later in this guide, the client in the frontend service uses this endpoint to subscribe to the events that are pushed from the bff service.

Additionally, you will enable the bff service to read messages from the incoming stream and push the contents as events to subscribers via SSE.

Navigate to the start directory to begin.

Create the BFFResource class.
bff/src/main/java/io/openliberty/guides/bff/BFFResource.java

Creating the SSE API endpoint

The subscribeToSystem() method allows clients to subscribe to events via an HTTP GET request to the /bff/sse/ endpoint. The @Produces(MediaType.SERVER_SENT_EVENTS) annotation sets the Content-Type in the response header to text/event-stream. This content type indicates that client requests that are made to this endpoint are to receive Server-Sent Events. Additionally, the method parameters take in an instance of the SseEventSink class and the Sse class, both of which are injected using the @Context annotation. First, the method checks if the sse and broadcaster instance variables are assigned. If these variables aren’t assigned, the sse variable is obtained from the @Context injection and the broadcaster variable is obtained by using the Sse.newBroadcaster() method. Then, the register() method is called to register the SseEventSink instance to the SseBroadcaster instance to subscribe to events.

For more information about these interfaces, see the Javadocs for OutboundSseEvent and OutboundSseEvent.Builder.

Reading from the reactive messaging channel

The getSystemLoadMessage() method receives the message that contains the hostname and the average system load. The @Incoming("systemLoad") annotation indicates that the method retrieves the message by connecting to the systemLoad channel in Kafka, which you configure in the next section.

Each time a message is received, the getSystemLoadMessage() method is called, and the hostname and system load contained in that message are broadcasted in an event to all subscribers.

Broadcasting events

Broadcasting events is handled in the broadcastData() method. First, it checks whether the broadcaster value is null. The broadcaster value must include at least one subscriber or there’s no client to send the event to. If the broadcaster value is specified, the OutboundSseEvent interface is created by using the Sse.newEventBuilder() method, where the name of the event, the data it contains, and the mediaType are set. The OutboundSseEvent interface is then broadcasted, or sent to all registered sinks, by invoking the SseBroadcaster.broadcast() method.

You just set up an endpoint in the bff service that the client in the frontend service can use to subscribe to events. You also enabled the service to read from the reactive messaging channel and broadcast the information as events to subscribers via SSE.

Configuring the Kafka connector for the bff service

microprofile-config.properties

 1mp.messaging.connector.liberty-kafka.bootstrap.servers=localhost:9093
 2
 3# tag::systemLoadChannel[]
 4mp.messaging.incoming.systemLoad.connector=liberty-kafka
 5# tag::systemLoadTopic[]
 6mp.messaging.incoming.systemLoad.topic=system.load
 7# end::systemLoadTopic[]
 8# tag::keyDeserializer[]
 9mp.messaging.incoming.systemLoad.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
10# end::keyDeserializer[]
11# tag::valueDeserializer[]
12mp.messaging.incoming.systemLoad.value.deserializer=io.openliberty.guides.models.SystemLoad$SystemLoadDeserializer
13# end::valueDeserializer[]
14# tag::groupId[]
15mp.messaging.incoming.systemLoad.group.id=bff
16# end::groupId[]
17# end::systemLoadChannel[]

A complete system service is provided for you in the start/system directory. The system service is the producer of the messages that are published to the Kafka messaging system. The periodically published messages contain the system’s hostname and a calculation of the average system load (its CPU usage) for the last minute.

Configure the Kafka connector in the bff service to receive the messages from the system service.

Create the microprofile-config.properties file.
bff/src/main/resources/META-INF/microprofile-config.properties

The bff service uses an incoming connector to receive messages through the systemLoad channel. The messages are then published by the system service to the system.load topic in the Kafka message broker. The key.deserializer and value.deserializer properties define how to deserialize the messages. The group.id property defines a unique name for the consumer group. All of these properties are required by the Apache Kafka Consumer Configs documentation.

Configuring the frontend service to subscribe to and consume events

index.js

 1// tag::initSSE[]
 2function initSSE() {
 3    // tag::eventSource[]
 4    var source = new EventSource('http://localhost:9084/bff/sse', { withCredentials: true });
 5    // end::eventSource[]
 6    // tag::eventListener[]
 7    source.addEventListener(
 8        // tag::systemLoad[]
 9        'systemLoad',
10        // end::systemLoad[]
11        // tag::setHandler[]
12        systemLoadHandler
13        // end::setHandler[]
14    );
15    // end::eventListener[]
16}
17// end::initSSE[]
18
19// tag::systemLoadHandler[]
20function systemLoadHandler(event) {
21    // tag::parse[]
22    var system = JSON.parse(event.data);
23    // end::parse[]
24    if (document.getElementById(system.hostname)) {
25        document.getElementById(system.hostname).cells[1].innerHTML =
26                                        system.loadAverage.toFixed(2);
27    } else {
28        var tableRow = document.createElement('tr');
29        tableRow.id = system.hostname;
30        tableRow.innerHTML = '<td>' + system.hostname + '</td><td>'
31                             + system.loadAverage.toFixed(2) + '</td>';
32        document.getElementById('sysPropertiesTableBody').appendChild(tableRow);
33    }
34}
35// end::systemLoadHandler[]
36

In this section, you will configure the client in the frontend service to subscribe to events and display their contents in a table in the UI.

The front-end UI is a table where each row contains the hostname and load of one of the three system services. The HTML and styling for the UI is provided for you but you must populate the table with information that is received from the Server-Sent Events.

Create the index.js file.
frontend/src/main/webapp/js/index.js

Subscribing to SSE

The initSSE() method is called when the page first loads. This method subscribes the client to the SSE by creating a new instance of the EventSource interface and specifying the http://localhost:9084/bff/sse URL in the parameters. To connect to the server, the EventSource interface makes a GET request to this endpoint with a request header of Accept: text/event-stream.

Because this request comes from localhost:9080 and is made to localhost:9084, it must follow the Cross-Origin Resource Sharing (CORS) specification to avoid being blocked by the browser. To enable CORS for the client, set the withCredentials configuration element to true in the parameters of the EventSource interface. CORS is already enabled for you in the bff service. To learn more about CORS, check out the CORS guide.

Consuming the SSE

The EventSource.addEventListener() method is called to add an event listener. This event listener listens for events with the name of systemLoad. The systemLoadHandler() function is set as the handler function, and each time an event is received, this function is called. The systemLoadHandler() function will take the event object and parse the event’s data property from a JSON string into a JavaScript object. The contents of this object are used to update the table with the system hostname and load. If a system is already present in the table, the load is updated, otherwise a new row is added for the system.

Building and running the application

To build the application, navigate to the start directory and run the following Maven install and package goals from the command line:

mvn -pl models install
mvn package

Run the following command to download or update to the latest Open Liberty Docker image:

docker pull openliberty/open-liberty:full-java11-openj9-ubi

Run the following commands to containerize the frontend, bff, and system services:

docker build -t frontend:1.0-SNAPSHOT frontend/.
docker build -t bff:1.0-SNAPSHOT bff/.
docker build -t system:1.0-SNAPSHOT system/.

Next, use the following startContainers.sh script to start the application in Docker containers:

.\scripts\startContainers.bat
./scripts/startContainers.sh

This script creates a network for the containers to communicate with each other. It also creates containers for Kafka, Zookeeper, the frontend service, the bff service , and three instances of the system service.

The application might take some time to get ready. See the http://localhost:9084/health URL to confirm that the bff microservice is up and running.

Once your application is up and running, open your browser and check out your service by going to http://localhost:9080.

The latest version of most modern web browsers supports Server-Sent Events. The exception is Internet Explorer, which does not support SSE. When you visit the URL, look for a table similar to the following example:

System table

The table contains three rows, one for each of the running system containers. If you can see the loads updating, you know that your bff service is successfully receiving messages and broadcasting them as SSE to the client in the frontend service.

Tearing down the environment

Run the following script to stop the application:

.\scripts\stopContainers.bat
./scripts/stopContainers.sh

Great work! You’re done!

You developed an application that subscribes to Server-Sent Events by using MicroProfile Reactive Messaging, Open Liberty, and Kafka.

Guide Attribution

Streaming updates to a client using Server-Sent Events by Open Liberty is licensed under CC BY-ND 4.0

Copied to clipboard
Copy code block
Copy file contents

Prerequisites:

Nice work! Where to next?

What did you think of this guide?

Extreme Dislike Dislike Like Extreme Like

What could make this guide better?

Raise an issue to share feedback

Create a pull request to contribute to this guide

Need help?

Ask a question on Stack Overflow

Like Open Liberty? Star our repo on GitHub.

Star