back to all blogsSee all blog posts

JAX-RS 2.1 reactive extensions with RxJava Backpressure

image of author
John Koehler on Apr 10, 2019
Post available in languages:

Combining reactive programming with JAX-RS 2.1 reactive extensions make for a powerful asynchronous processing that avoids overwhelming the consumer. Before we jump into a sample, let’s define some of the basic concepts:

Reactive programming

An asynchronous programming paradigm that deals with data streams and the propagation of change.

RxJava

A reactive programming API library that enables users to write event-based, asynchronous applications.

Backpressure

The strategy a consumer employs when a producer emits items so fast that the consumer can’t keep up with the flow and there is a build-up of emitted but unconsumed items. There are several types of backpressure strategies:

  • BUFFER Buffers all onNext values until the downstream consumes it.

  • DROP Drops the most recent onNext value if the downstream can’t keep up.

  • ERROR Signals a MissingBackpressureException in case the downstream can’t keep up.

  • LATEST Keeps only the latest onNext value, overwriting any previous value if the downstream can’t keep up.

  • MISSING OnNext events are written without any buffering or dropping.

Let’s see how all of these concepts work together. We’ll follow a sample data collection and retrieval application at: https://github.com/OpenLiberty/sample-reactive-extensions The application data collection is accomplished with a simple loop emitting Integer items. The application data retrieval is accomplished with a reactive extension asynchronous get. The application makes use of RxJava’s Flowable with backpressure to control the maximum amount of data collected for a given customer. Flowable is an emitter of items that implements a backpressure protocol. Flowable was introduced in RxJava2. Flowable comes into play when huge numbers of items are emitted that can not be handled by the consumer.

Controlling the maximum amount of data collected using Flowable with backpressure

The following code shows how Flowable with backpressure is used to control the maximum amount of data to collect. The onBackpressureBuffer method is used to specify the maximum number of data points. The onNext method in the ResourceSubscriber inner class is used to store the collected data. The lambda in the retryWhen method is used to delay the data collection when the maximum number of data points are collected.

   private void collectData(Customer customer) {

      String customerName = customer.getName();
      int backPressure = Customer.FREE_MAX;

      if (customer.getServiceLevel().equals(Customer.BEST_SERVICE_LEVEL)) {
         backPressure = Customer.BEST_MAX;
      } else if (customer.getServiceLevel().equals(Customer.BETTER_SERVICE_LEVEL)) {
         backPressure = Customer.BETTER_MAX;
      }

      System.out.println("collectData: " + customerName + " backPressure " + backPressure);

      Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
                                                      @Override
                                                      public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                                                         for (int i =0;i<DATA_LOOP;i++) {
                                                            e.onNext(i);
                                                         }
                                                         e.onComplete();
                                                      }}, BackpressureStrategy.BUFFER);

      flowable.onBackpressureBuffer(backPressure)
              .observeOn(Schedulers.computation(), true, backPressure)
              .retryWhen(errors -> {
                  AtomicInteger counter = new AtomicInteger();
                  return errors.takeWhile(e -> counter.getAndIncrement() != RETRY_COUNTER)
                               .flatMap(e -> {
                                    System.out.println("collectData: " + customerName + " delay retry :" + counter.get());
                                    return Flowable.timer(1, TimeUnit.SECONDS);
                               });
                  })
              .subscribe(new ResourceSubscriber<Integer>() {
                    @Override
                    public void onNext(Integer id) {
                       synchronized (store) {
                          ArrayList<String> ids = store.get(customerName);
                          ids.add(String.valueOf(id));
                       }
                    }

                    @Override
                    public void onError(Throwable throwable) {
                       System.out.println("collectData: " + customer.getName() + " onError " + throwable.getMessage());
                       dispose();
                    }

                    @Override
                    public void onComplete() {
                       System.out.println("collectData: " + customer.getName() + " onComplete ");
                       dispose();
                    }
                 });
   }

So what does this actually do? First, notice that we create a FlowableOnSubscribe inner class in the Flowable.create method. Its subscribe method loops to simulate data collection and the BackpressureStrategy is set to BUFFER, which buffers the onNext values until they are consumed. We will be using a full buffer to determine the maximum amount of data to collect to prevent overloading the back-end resource, a simulated database in this case. Next, notice that the observeOn method specifies true to delay onError notifications. This allows the threads to continue processing until all the treads in the buffer are completed. Lastly, notice that the onNext method in the ResourceSubscriber is used to update the simulated database with the data points.

Retrieving the collected data in a remote client using JAX-RS 2.1 Reactive Extensions

Let’s say that you want to retrieve the collected data in a remote client. We can use reactive extensions from JAX-RS 2.1. The following code shows how register, and rx is used to asynchronously retrieve the collected data.

   private  void asyncGet(Customer customer) {
      Client c = null;
      try {
         c = ClientBuilder.newBuilder()
                          .executorService(Executors.newSingleThreadExecutor(Executors.defaultThreadFactory()))
                          .register(FlowableRxInvokerProvider.class)
                          .build();

         Flowable<List<String>> flowable = c.target("http://localhost:" + PORT + "/reactive/collecteddatastore/rxget/")
                                            .path(customer.getName())
                                            .request()
                                            .rx(FlowableRxInvoker.class).get(new GenericType<List<String>>() {});

         final Holder<List<String>> holder = new Holder<List<String>>();

         long endTime = System.currentTimeMillis() + TIME_LOOP;
         while (System.currentTimeMillis() < endTime) {

            flowable
               .observeOn(Schedulers.computation(), true)
               .subscribe(v -> {
                             holder.value = v; // onNext
                             if (holder.value.size() > 10) {
                                // Shorten the output for the sample
                                System.out.println("asyncGet: " + customer.getName() + " onNext " + holder.value.get(0) + ", " + holder.value.get(1) + " - " + holder.value.get(holder.value.size() - 1));
                             } else {
                                System.out.println("asyncGet: " + customer.getName() + " onNext " + holder.value);
                             }
                          },
                          throwable -> {
                             System.out.println("asyncGet: " + customer.getName() + " onError " + throwable.getMessage()); // onError
                          },
                          () -> System.out.println("asyncGet: " + customer.getName() + " onCompleted ")); // onCompleted
            sleepFor(GET_COLLECTED_DATA_SLEEP);
         }

         List<String> response = holder.value;
         System.out.println("asyncGet2: " + customer.getName() + " " + response.get(response.size() - 1));
         countDownLatch.countDown();
      } finally {
         if (c != null) {
            c.close();
         }
      }
   }
   @GET
   @Path("/rxget/{customerName}")
   @Produces("application/json")
   public void getCollectedDataList(@Suspended AsyncResponse async, @PathParam("customerName") String customerName) {

      List<String> ids = null;

      synchronized (store) {
         ids = store.get(customerName);
         store.put(customerName, new ArrayList<String>(Arrays.asList(customerName)));
      }

      async.resume(new GenericEntity<List<String>>(ids) {});
   }

First, notice that the FlowableRxInvokerProvider class needs to be registered with the client. Next, notice that the FlowableRxInvoker class is used to access the non-default reactive invoker. Lastly, notice that generics are used to return the list of collected data.

Summary

It’s getting easier to do things asynchronously with REST and Java. JAX-RS 2.1 reactive extensions and RxJava give you some powerful tools in this space. Hopefully now you are more prepared to write and consumer RESTful services with RxJava asynchronously.

If you’ve got any questions or run into any problems, please let us know. Thanks!