Package org.eclipse.microprofile.reactive.streams.operators
This provides operators for building stream graphs that consume or produce elements using the
Publisher
, Subscriber
and
Processor
interfaces.
There are four primary classes used for building these graphs:
PublisherBuilder
, which when built produces aPublisher
SubscriberBuilder
, which when built produces aCompletionSubscriber
ProcessorBuilder
, which when built produces aProcessor
CompletionRunner
, which when built produces aCompletionStage
Operations on these builders may change the shape of the builder, for example,
ProcessorBuilder.toList()
changes the builder to a
SubscriberBuilder
, since the processor now has a
termination stage to direct its elements to.
SubscriberBuilder
's are a special case, in that they don't just build a
Subscriber
, they build a
CompletionSubscriber
, which encapsulates both a
Subscriber
and a CompletionStage
of the result of the
subscriber processing. This CompletionStage
will be redeemed with a result when the
stream terminates normally, or if the stream terminates with an error, will be redeemed with an error. The result is
specific to whatever the Subscriber
does, for example, in the case of
ProcessorBuilder.toList()
, the result will be a
List
of all the elements produced by the Processor
, while in the case
of ProcessorBuilder.findFirst()
, it's an
Optional
of the first element of the stream. In some cases, there is no result, in which case the
result is the Void
type, and the CompletionStage
is only useful for
signalling normal or error termination of the stream.
The CompletionRunner
builds a closed graph, in that case
both a Publisher
and Subscriber
have been provided, and
building the graph will run it and return the result of the Subscriber
as a
CompletionStage
.
An example use of this API is perhaps you have a Publisher
of rows from a database, and
you want to output it as a comma separated list of lines to publish to an HTTP client request body, which expects a
Publisher
of ByteBuffer
. Here's how this might be implemented:
Publisher<Row> rowsPublisher = ...; Publisher<ByteBuffer> bodyPublisher = // Create a publisher builder from the rows publisher ReactiveStreams.fromPublisher(rowsPublisher) // Map the rows to CSV strings .map(row -> String.format("%s, %s, %d\n", row.getString("firstName"), row.getString("lastName"), row.getInt("age)) ) // Convert to ByteBuffer .map(line -> ByteBuffer.wrap(line.getBytes("utf-8"))) // Build the publisher .build(); // Make the request HttpClient client = HttpClient.newHttpClient(); client.send( HttpRequest .newBuilder(new URI("http://www.foo.com/")) .POST(BodyProcessor.fromPublisher(bodyPublisher)) .build() );
The documentation for each operator uses marble diagrams to visualize how the operator functions. Each element flowing in and out of the stream is represented as a coloured marble that has a value, with the operator applying some transformation or some side effect, termination and error signals potentially being passed, and for operators that subscribe to the stream, an output value being redeemed at the end.
Below is an example diagram labelling all the parts of the stream.
-
ClassDescriptionA builder for a closed reactive streams graph.CompletionSubscriber<T,
R> A subscriber that redeems a completion stage when it completes.Operators for connecting different graphs together.Operators for completing a stream.Operators for handling errors in streams.Operations for transforming a stream.Operations for peeking at elements and signals in the stream, without impacting the stream itself.ProcessorBuilder<T,R> A builder for aProcessor
.A stream that completes with a single result.A builder for aPublisher
.Primary entry point into the Reactive Streams utility API.Factory interface for providing the implementation of the static factory methods inReactiveStreams
.SubscriberBuilder<T,R> A builder for aSubscriber
and its result.Operations for transforming a stream.