Introduction to

Reactive programming

in Spring framework

The problematics

Common model : latency can cause dramatical issues

S. Deleuze & B. Clozel - https://speakerdeck.com/sdeleuze/reactive-spring

Reactive applications

Reduce latency effects to improve stability and scalability

S. Deleuze & B. Clozel - https://speakerdeck.com/sdeleuze/reactive-spring

Reactive programming

Work with data streams in a asynchronous and non-blocking way.

A stream is a sequence of 0 to n items, which eventually completes or fails.

A reactive library/framework gives tools to create and manipulate streams in a functional style.

Reactive in Java

Reactive streams specification

http://www.reactive-streams.org/reactive-streams-1.0.2-javadoc/org/reactivestreams/package-summary.html

Implementations

Reactor basics

Flux and Mono implement Publisher interface

  • Mono : 0 to 1 item
  • Flux : 0 to n items

Let's play with Reactor


Flux
    .just("strings", "to", "test", "reactor", ".") // create Flux with static data
    .map(String::toUpperCase)                      // 1 to 1 transformation
    .flatMap(s -> Flux.fromArray(s.split("")))     // 1 to n transformation, can call an async service...
    .filter(s -> !"S".equals(s))
    .map(s -> {
        if (".".equals(s))
            throw new IllegalArgumentException("I don't want ending '.' .");
        return s;
    })
    .doOnError(e -> System.err.println("Error : " + e.toString()))
    .subscribe(s -> System.out.println(s))         // nothing happens if we don't subscribe
    ;
				

More operators and samples on the official doc!

Reactive Spring

  • Support added in Spring 5 / Spring-Boot 2 releases
  • Reactive support based on Reactor library
  • Web support :
    • module : Spring WebFlux
    • server : Netty (default), Undertow, Servlet 3.1+ containers (non blocking IO)
    • client : WebClient is reactive alternative of RestTemplate

Spring Web-MVC vs WebFlux

Go - Init WebFlux project

(Can use Spring Initializr through https://start.spring.io or directly in IDE)

  • Create your first functional router :

@Component
public class FunctionalStyleRouter {
    @Bean
    public RouterFunction<ServerResponse> route() {
        return RouterFunctions
                .route(RequestPredicates.GET("/index"), 
                       request -> ServerResponse.ok().build());
    }
}
					
  • Test :
curl http://localhost:8080/index

Router/Handler for REST API

The HandlerFunction takes an incoming ServerRequest, returns a Mono<ServerResponse>


UserHandler handler = [...]

RouterFunction<ServerResponse> route =
    route(GET("/users/{id}").and(accept(APPLICATION_JSON)), handler::getUser)
        .andRoute(GET("/users").and(accept(APPLICATION_JSON)), handler::listUsers)
        .andRoute(POST("/users"), handler::createUser);
        			

go - Init model and handler


public class HousePrice {
    private String city;
    private Integer pricePerSquare;
    // getters/setters...
					

Create handler with fake data...


@Component
public class HousePriceHandler {
    public Mono<ServerResponse> getAll(ServerRequest request) {
        Flux<HousePrice> housePrices = 
                Flux.just(
                        new HousePrice("Paris", 9000),
                        new HousePrice("London", 13000),
                        new HousePrice("Rio", 11000));
        return ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(housePrices, HousePrice.class);
    }
					

Create a router to expose your data...


@Bean
public RouterFunction<ServerResponse> route() {
    return RouterFunctions
            .route(GET("/houseprices"), handler::getAll);
}
					

... and test it!

curl http://localhost:8080/houseprices | jq
[
  {
    "city": "Paris",
    "pricePerSquare": 9000
  },
  {
    "city": "London",
    "pricePerSquare": 13000
  },
  ...
]
					

Blocking DB API vs Reactive

We have to use a non blocking API for our application to be reactive

S. Deleuze & B. Clozel - https://speakerdeck.com/sdeleuze/reactive-spring

Let's do it with mongoDB

  • Start mongo on docker
    docker run --rm --name mongo -d -p 27017:27017 mongo
  • Or you can use embedded mongo with dependency
    de.flapdoodle.embed:de.flapdoodle.embed.mongo
  • Add reactive mongo support with artifact
    org.springframework.boot:spring-boot-starter-data-mongodb-reactive
  • Add id for mongo persistence on model...
    
    @Id
    private ObjectId id;
    					
  • ...Create reactive mongo repository
    
    public interface [Model]Repository extends ReactiveMongoRepository<[Model], ObjectId> {
    }
    					
  • Use the repository in the handler instead of the mock
  • Add data in the repository ! Can use CommandLineRunner for example...
    
    @Component
    public class DBFiller implements CommandLineRunner {
    @Override
    	public void run(String... args) throws Exception {
    		// do it here...
    	}
    }
    					
  • Test again!

Infinite streams

Create a new endpoint which delivers infinite stream :

  • Method Flux.interval to create infinite flux
  • Use media type application/stream+json
  • Update randomly your data in repository to see changes
  • Try it!
  • curl http://localhost:8080/streamhouseprices
    {"city":"Paris","pricePerSquare":10420}
    {"city":"Rio","pricePerSquare":11920}
    {"city":"London","pricePerSquare":14170}
    {"city":"Paris","pricePerSquare":10430}
    {"city":"Rio","pricePerSquare":11930}
    {"city":"London","pricePerSquare":14180}
    {"city":"Paris","pricePerSquare":10440}
    {"city":"Rio","pricePerSquare":11940}
    {"city":"London","pricePerSquare":14190}
    ......
    						

Webclient & Server-sent events

Create a new web client application. We will use Webclient to call our streaming service and render server-sent events.

To do this we use media type text/event-stream :


@Controller
public class HousePricesController {
	
	@GetMapping(value="/ssehouseprices", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
	@ResponseBody
	public Flux<HousePrice> streamData() {
		return WebClient
				.create("http://localhost:8080")	
				.get()
				.uri("/streamhouseprices")
				.retrieve()
				.bodyToFlux(HousePrice.class);
	}
}
					

SSE in browser

The server-sent events allow to refresh information in browser without making any new request.


<script type="text/javascript">
	var evtSource = new EventSource("/ssehouseprices");
	evtSource.onmessage = function(e) {
		housePrice = JSON.parse(e.data);
		// now we can use housePrice.pricePerSquare for example..
	}
</script>