Cloudzilla Logo

Getting Started With Quarkus

A data stream is the continuous data sent from a source to a destination. Data Streams are essential in many applications.

This is because some data types come in bits. An example of such data is that from weather forecasting instruments or a live video stream.

Since data streams are fetched continuously, they can be filtered to give the required output.

In this article, you will learn how to use SmallRye Reactive Streams Operator to work with data streams in a Quarkus project.

Table of Contents

Key takeaways

At the end of this article, you will have gained the following knowledge:

  • Setting up SmallRye Reactive Streams Operator in Quarkus
  • Implementing various operations using Reactive Streams Operator in Quarkus

Prerequisites

To follow along, you need:

  • Some knowledge of Java language.
  • A Java IDE such as IntelliJ.
  • A stable internet connection.

Note that we will be using IntelliJ ultimate edition and JDK version 17 in this project.

Create a new Quarkus project

Open the IDE and navigate to file section to create a new project. In the window displayed, input the following:

Name: quarkus-reactive-stream

Group: com.stream

new quarkus project

Select the following as the dependencies:

new quarkus project dependencies

Finally, click on the Finish button.

Folder Structure

The following is the folder structure:

folder structure

Add SmallRye-Reactive dependency to the project

To do this, copy the code below into the pom.xml file in the project under the dependencies tags:

<dependency>
  <groupId>io.smallrye.reactive</groupId>
  <artifactId>smallrye-reactive-streams-operators</artifactId>
</dependency>

Reload the project to add it automatically. You can do this by right-clicking on the pom.xml file, and under the Maven section, select Reload project:

Reload project

Rename the ExampleResource class and the file to StreamResource.

Run the application on your terminal to check if the installation was successful:

./mvnw quarkus:dev

The installed dependencies are as shown below:

Installed Quarkus dependencies

To stop the operation, press q or a combination of Ctrl + C.

Reactive Operators

Reactive operators introduce a set of types that allow the creation of Reactive streams.

They are as shown in the table below:

The Reactive Streams Their Reactive Stream Operators
Publisher PublisherBuilder
Processor ProcessorBuilder
Subscriber SubscriberBuilder

All Reactive Stream Operators are terminated using build().

Ways in which one can work with Reactive Streams

There are numerous ways in which one can work with Reactive Streams. These include injecting them into the project as Beans, and integrating them directly into the application.

In this section, we will discuss how to get started with Reactive Streams.

Working with Reactive Streams directly in the application

Create a simple asynchronous stream

This asynchronous stream will be accessed through a GET request.

Inside the StreamResource class, copy the following code:

    @GET
    @Path("/async")
    @Produces(MediaType.TEXT_PLAIN)
    public CompletionStage<String> helloAsync() {
        return ReactiveStreams.of("h","e","l","l","o")
        .map(s -> s.toUpperCase())
                .toList().run().thenApply(l -> l.toString());
    }

The above code will allow the https://localhost:8080/hello/async endpoint to generate an asynchronous stream.

The stream will then:

  • Generate a stream of the characters 'h','e','l','l','o' each being separate from the other
  • Transform them to uppercase using the .map() function
  • Lists all outputs
  • Make them into a continuous string

Run the application. Then, on a separate terminal, run curl http://localhost:8080/hello/asyncs.

The result is as shown in the image below:

Asynchronous stream display

Working on an individual main class file

Create a new folder inside the java/org folder named data. Then, create a new Java class file called Demo.

Next, create a Reactive Stream that outputs the following words, "Hello Dev! Want to code today?".

The code is as shown below:

package org.data;

import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;

public class Demo {

    public static void main(String[] args) {

        /* A reactive stream that generates the words in uppercase and displays them */
        ReactiveStreams.of("Hello", "Dev!", "Want", "to", "code", "today?")
                .map(String::toUpperCase) // Transform the words to uppercase
                .filter(s -> s.length() > 1) // Filter the Stream items
                .forEach(word -> System.out.println(">>> " + word)) // Terminal operation
                .run(); // Run it (create the streams, and subscribe to it for output...)

    }
}

To run the application, navigate into the inbuilt terminal and launch a bash script as follows:

./mvnw compile exec:java -Dexec.mainClass=org.data.Demo

The above command will execute the necessary main class. Just reference it quickly using the package name or location.

The output is as follows:

Quick quarkus class run

Working with Reactive Streams as a Bean in the application

This section shall briefly deal with injecting a Java Bean into the application.

Inside the folder containing the ExampleResource file, create a new file and name it StramBean.java.

In the file, add the ApplicationScoped annotation to make it visible throughout the application:

package com.stream;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class StreamBean {


}

Next, add some functions used to create a counter that automatically increases, generates random numbers, and adds them together

AtomicInteger counter = new AtomicInteger();

    Random rand = new Random(); //instance of random class
    int upperbound = 25;

    public int showRandom(){
        //generate random values from 0-24
        int int_random = rand.nextInt(upperbound);
        return int_random;
    }

    public int myNumber(){
        int x;
        x=counter.incrementAndGet() + showRandom();
        return x;
    }

Implement the below the myNumber function:

  • Create a Publisher that outputs a continuous stream in a String form.
  • It shall output a Streamflow that brings out results every 100 milliseconds.
  • It also generates a random number.
  • It then processes the data stream being generated e.g. by use of filter(), takeWhile(), distinct(), and limit() functions. These concepts shall be covered later in the article.
    // It produces a stream
    public Publisher<String> stream() {

    // The output shall be displayed after every 100 millseconds as a continous stream
        return Flowable.interval(100, TimeUnit.MILLISECONDS)
                .map(i -> myNumber())
                .filter(i -> i>30).skip(41).takeWhile(i -> i<100)
                .distinct()
                .limit(5).map(i -> i + 1)
                .map(i -> i.toString());
    }

Import all the needed dependencies into the file using the IDE. Then, inject the Bean into the application by adding the following code under the StreamResource class:

    @Inject
    StreamBean bean;

Next, add an EndPoint to allow the generation of the stream from the Bean when accessed as shown below:

    /* 
     * Create a GET request that produces Server-Sent Events as the output
     * Its output will be the output from the Bean created
    */
    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public Publisher<String> helloStream() {
        return bean.stream();
    }

It produces an output of type SERVER_SENT_EVENTS.

Run the application to see the output that is generated in the terminal.

The above examples show that Reactive streams can also be produced directly or injected as a Bean into the application.

If one uses or wants to work with Camel applications or Vert.X application, check the official docs for further details.

Reactive Operators types and examples

Since we now understand basic aspects of Reactive Streams in a Quarkus application, let's discuss some common operator types and examples.

These operators are classified according to functionality. The table below shows the classification of the SmallRye Reactive Streams Operators:

Category Usages and Examples
Creation of Streams It allows the API to create Streams using the PublisherBuilder
Processing of Streams Operators in this category transform Stream items
Actions in Streams These operators allow one to react to different events happening in an application
Error management of Streams These operators allow recovery after a stream failure

Creation of streams

Operators in this category include the ones that do the following:

  • Creation of empty streams.
  • Creation of streams from elements.
Operator Description Operators
Creation of empty streams This creates an empty Stream. It has no items of any type in it. .empty()
Creating streams from elements This creates a stream of either 0, 1 or n elements. .of(), .ofNullable()
Creation of failing streams These streams are meant to fail .failed()
Creation of streams from CompletionStage This operator creates a stream of either 0 or 1 element produced when the passed CompletionStage is completed. .fromCompletionStage(), .fromCompletionStageNullable()
Creation of streams from collections This operator creates a stream that emits elements from the passed iterable, then sends the completion signal back. .fromIterable()
Wraps a Reactive Stream Publisher The operator creates a stream that emits the elements from the passed Publisher. .fromPublisher()
Generation of infinite streams The operator in this instance creates a stream using the generator method. Then the number of generated elements depends on the request. .generate(), .iterate()

Let's focus on the popular operators highlighted in the above table. We will add them to the 'Demo' class that we created earlier.

Creating streams from elements

Open the file in the IDE and paste the following code into it:

// Utilizes the '.of()' operator to return an output of ten elements
ReactiveStreams.of(0,1,2,3,4,5,6,7,8,9)
        .forEach(number -> System.out.println(">>> " + number))
        .run();

This code generates a stream of many numbers. It then prints them on the console.

Next, create a publisher using the PublisherBuilder() method. Its return type will be of the 'Integer' type. It will be built from the above stream as follows:

PublisherBuilder<Integer> streamOfMany = (PublisherBuilder<Integer>) ReactiveStreams.of(0,1,2,3,4,5,6,7,8,9)
        .forEach(number -> System.out.println(">>> " + number))
        .run();
Generation of infinite streams

One can generate infinite streams by using the .iterate() operator. One can also generate it using the following steps:

In the GreetingResource.java file, add the following code:

    // Gets the output from the Bean
    // The output is of Integer type
    @GET
    @Path("/stream2")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public Publisher<Integer> helloStream2() {
        return bean.stream2();
    }

Add the code below in the StreamBean.java file:

// Generates infinite loop of numbers each 50 ms
public Publisher<Integer> stream2(){
        return Flowable.interval(50, TimeUnit.MILLISECONDS)
        .map(i -> myNumber());
        }

Run the main application using ./mvnw quarkus:dev. You can access it on a new terminal using curl http://localhost:8080/hello/stream2.

Processing of Streams

As mentioned earlier, this kind of operator transforms Stream items that are transiting. Examples of this kind of operator are shown below:

Operator Description Operators
Creation of a processor The processor is a component from Reactive Streams that is both a Publisher and a Subscriber. It can consume and emit elements. ProcessorBuilder<I, O>
Filtering of elements These are quite popular. They filter items that are transiting in the stream (produces only the desired output) .dropWhile(), .distinct(), .skip(), .filter(), .takeWhile(), .limit()
Composition of asynchronous actions These operators produce a stream for each element of the stream. It then flattens (serializes) the stream that is returned .flatMap(), .flatMapIterable(), .flatMapCompletionStage(), .flatMapRsPublisher()
Transformation of items They produce a value synchronously .map()
Combination of a Processor It forwards the items to a Processor or a ProcessorBuilder() function .via()

Now, let's get into popular and common Stream Processors.

Creation of a processor and Combination of a Processor

As mentioned earlier, its expected outcome is a processor that takes in an input and gives an output.

Inside the 'Demo' class, add the following code. It generates a processor with this format processor(I, O); where I is the input and O the output.

// Create a processor called 'SimpleProcessor'.
ProcessorBuilder<Integer, String> SimpleProcessor = ReactiveStreams
.<Integer>builder().map(i -> Integer.toString(i));

// Combine it to the application, and subscribe to it so that the outcome can be visible
ReactiveStreams.of(10, 20)
.via(SimpleProcessor)
.forEach(x -> System.out.println("-- " + x))
.run(); // ("10", "20")

This code will create a new processor. It also uses it to fetch the input and display it.

Filtering of elements

This type is widely used in Streams. It customizes the items to fit one's needs. The operators include:

Inside the infinite stream in the main application, add the following filters under the 'StreamBean.java' file:

-filter: This operator selects the element using a set of rules.

// Will only output values above 30 but below 60
public Publisher<Integer> stream2(){
        return Flowable.interval(50, TimeUnit.MILLISECONDS)
        .map(i -> myNumber())
        .filter(i -> i>30);//filter
        }

skip: It neglects some elements in its input e.g. .skip(41)

public Publisher<Integer> stream2(){
        return Flowable.interval(50, TimeUnit.MILLISECONDS)
        .map(i -> myNumber())
        .filter(i -> i>30)
        .skip(41);
        }

takeWhile: It acts like the .filter() function in some way. An example is shown below:

public Publisher<Integer> stream2(){
    return Flowable.interval(50, TimeUnit.MILLISECONDS)
    .map(i -> myNumber())
    .filter(i -> i>30).takeWhile(i -> i<100);
    }

distinct: It makes sure that the output items in the stream are distinct from one another.

/* In case the output was 1, 1, 1, 2, 3; then the output is 1,2,3 */
public Publisher<Integer> stream2(){
        return Flowable.interval(50, TimeUnit.MILLISECONDS)
        .map(i -> myNumber())
        .filter(i -> i>30).takeWhile(i -> i<100)
        .distinct(); //distinct
        }

limit: It stops adding elements to the stream after the size reaches the input value.

public Publisher<Integer> stream2(){
        return Flowable.interval(50, TimeUnit.MILLISECONDS)
        .map(i -> myNumber())
        .filter(i -> i>30).takeWhile(i -> i<100)
        .distinct()
        .limit(5).map(i -> i + 1)
        .map(i -> i.toString());
        }
Composition of asynchronous actions

flatMap: It automatically returns a PublisherBuilder() and serializes the elements in the returned stream.

It looks as follows:

ReactiveStreams.of(1, 2, 3)
    .flatMap(i -> ReactiveStreams.of(i, i, i)); // ((1, 1, 1, 2, 2, 2, 3, 3, 3)

ReactiveStreams.of(1, 2, 3)
    .flatMapIterable(i -> Arrays.asList(i, i, i))
     .forEach(x -> System.out.println("[ " + x + " ]"))
    .run(); // (1, 1, 1, 2, 2, 2, 3, 3, 3)
Transformation of items

The .map() operator is the most commonly used operator. It's used in the creation of a synchronous stream.

An example is shown below:

        ReactiveStreams.of(1, 2, 3)
        .map(i -> i + 11)
        .forEach(i -> System.out.println("( " + i + " )"))
        .run(); // (12, 13, 14)

Action Reactive Stream operators

The most commonly used operator is the .peek(). It's called for each item.

It does not alter the stream output but instead creates a separate small stream and performs as specified in its arguments. Its usage looks as follows:

ReactiveStreams.of(1, 2, 3)
                .peek(i -> System.out.println("Receiving: " + i))
                .ignore()
                .run();

Reactive Error management operators

Let's now learn about Error management operators. They are essential since Asynchronous streams do not allow the usage of the Java try/catch to catch on errors.

Some of its categories are discussed below:

Name Description Operators
Error management operators based on events These facilitate reaction to various events e.g. when an element is received, an error or when the stream completes. .onErrorResumeWith(), .onErrorResume(), .onErrorResumeWithRsPublisher()
A Terminal operator and computation asynchronous result These Reactive operators act as subscribers. They produce a result that can be computed asynchronously. One can then retrieve a CompletionStage object. '<CompletionStage>'
Cancellation of a stream As long as one is subscribed to a stream, the results of the publisher will continuously be shown. This operator cancels this subscription .cancel()
Ignoring of elements It ignores all elements transiting on the streams .ignore()
Result collection They accumulate the results, then does a batch processing of the whole items. .collect(), .toList() , .reduce()
Getting Streams first element If there are any item in the stream, it returns the first item .findFirst()
Execution of a method for each element This is a terminal operation, unlike .peek(), that executes a method for each stream element. .forEach()
Passing to a Reactive Streams Subscriber It forwards the elements of a stream to a Subscriber or .SubscriberBuilder(). .to()

Let's look at some common stream operators:

Ignoring elements

In the code below, the stream is on, but nothing will be displayed.

ReactiveStreams.of( 1, 2, 3, 4, 5, 6, 7, 8, 9)
                .ignore()
                .run() // Subscribe
                .thenAccept(x -> System.out.println("Streaming is Done!"));
Result collection

It adds up all the streams together after the stream ends.

ReactiveStreams.of(10, 20, 30, 40)
                .collect(Collectors.summingInt(x -> x))
                .run()
                // Produces 100
                .thenAccept(result -> System.out.println("Result is: " + result));
Getting the first element in a stream

This operator can be easily applied as follows:

ReactiveStreams.of(9, 8, 7, 6, 5)
                .findFirst()
                .run()
                // Produces [9]
                .thenAccept(number -> System.out.println(number));
Execution of a method for each element

This operator acts as an iterator in some way, as shown below:

ReactiveStreams.of(9, 8, 7, 6, 5)
                .forEach(z -> System.out.println("The app is Receiving " + z))
                .run();
Passing to a Reactive Streams Subscriber

Lastly, this is how one can pass stream items to a Reactive Subscriber:

        SubscriberBuilder<Integer, Optional<Integer>> SimpleSubscriber = ReactiveStreams.<Integer>builder()
        .map(i -> i + 1)
        .findFirst();

        ReactiveStreams.of(9, 8, 7, 6, 5)
        .to(SimpleSubscriber)
        .run()
        // Produces [10] since 1 is added to the first element
        .thenAccept(optional -> optional.ifPresent(result -> System.out.println("Result: " + result)));

Conclusion

In this article, we have learned:

  • What Reactive Stream Operators are.
  • Setting up SmallRye Reactive Streams Operator in Quarkus
  • Some Operations that can be done using Reactive Streams Operator in Quarkus
  • Classification of the SmallRye Reactive Stream Operators based on functionality

Further reading


Peer Review Contributions by: Odhiambo Paul

Author
Chris Mutua
Chris Mutua is an undergraduate student pursuing a Bachelor of Science in Computer Technology at Jomo Kenyatta University of Agriculture and Technology. He is interested in Network Automation, DevOps, Network Security, Fullstack System Development, and any upcoming trend in technology. He spends most of his time learning new systems, processes, and technologies both in software and hardware systems.
More Articles by Author
Related Articles
Cloudzilla is FREE for React and Node.js projects
No Credit Card Required

Cloudzilla is FREE for React and Node.js projects

Deploy GitHub projects across every major cloud in under 3 minutes. No credit card required.
Get Started for Free