Common model : latency can cause dramatical issues
Reduce latency effects to improve stability and scalability
Work with data streams in a asynchronous and non-blocking way.
A stream is a sequence of 0 to n items, which eventually completes or fails.
A reactive library/framework gives tools to create and manipulate streams in a functional style.
Reactive streams specification
Implementations
Flux and Mono implement Publisher interface
Flux
.just("strings", "to", "test", "reactor", ".") // create Flux with static data
.map(String::toUpperCase) // 1 to 1 transformation
.flatMap(s -> Flux.fromArray(s.split(""))) // 1 to n transformation, can call an async service...
.filter(s -> !"S".equals(s))
.map(s -> {
if (".".equals(s))
throw new IllegalArgumentException("I don't want ending '.' .");
return s;
})
.doOnError(e -> System.err.println("Error : " + e.toString()))
.subscribe(s -> System.out.println(s)) // nothing happens if we don't subscribe
;
More operators and samples on the official doc!
(Can use Spring Initializr through https://start.spring.io or directly in IDE)
@Component
public class FunctionalStyleRouter {
@Bean
public RouterFunction<ServerResponse> route() {
return RouterFunctions
.route(RequestPredicates.GET("/index"),
request -> ServerResponse.ok().build());
}
}
curl http://localhost:8080/index
The HandlerFunction takes an incoming ServerRequest, returns a Mono<ServerResponse>
UserHandler handler = [...]
RouterFunction<ServerResponse> route =
route(GET("/users/{id}").and(accept(APPLICATION_JSON)), handler::getUser)
.andRoute(GET("/users").and(accept(APPLICATION_JSON)), handler::listUsers)
.andRoute(POST("/users"), handler::createUser);
public class HousePrice {
private String city;
private Integer pricePerSquare;
// getters/setters...
Create handler with fake data...
@Component
public class HousePriceHandler {
public Mono<ServerResponse> getAll(ServerRequest request) {
Flux<HousePrice> housePrices =
Flux.just(
new HousePrice("Paris", 9000),
new HousePrice("London", 13000),
new HousePrice("Rio", 11000));
return ok()
.contentType(MediaType.APPLICATION_JSON)
.body(housePrices, HousePrice.class);
}
Create a router to expose your data...
@Bean
public RouterFunction<ServerResponse> route() {
return RouterFunctions
.route(GET("/houseprices"), handler::getAll);
}
... and test it!
curl http://localhost:8080/houseprices | jq
[
{
"city": "Paris",
"pricePerSquare": 9000
},
{
"city": "London",
"pricePerSquare": 13000
},
...
]
We have to use a non blocking API for our application to be reactive
docker run --rm --name mongo -d -p 27017:27017 mongo
de.flapdoodle.embed:de.flapdoodle.embed.mongo
org.springframework.boot:spring-boot-starter-data-mongodb-reactive
@Id
private ObjectId id;
public interface [Model]Repository extends ReactiveMongoRepository<[Model], ObjectId> {
}
@Component
public class DBFiller implements CommandLineRunner {
@Override
public void run(String... args) throws Exception {
// do it here...
}
}
Create a new endpoint which delivers infinite stream :
curl http://localhost:8080/streamhouseprices
{"city":"Paris","pricePerSquare":10420}
{"city":"Rio","pricePerSquare":11920}
{"city":"London","pricePerSquare":14170}
{"city":"Paris","pricePerSquare":10430}
{"city":"Rio","pricePerSquare":11930}
{"city":"London","pricePerSquare":14180}
{"city":"Paris","pricePerSquare":10440}
{"city":"Rio","pricePerSquare":11940}
{"city":"London","pricePerSquare":14190}
......
Create a new web client application. We will use Webclient to call our streaming service and render server-sent events.
To do this we use media type text/event-stream :
@Controller
public class HousePricesController {
@GetMapping(value="/ssehouseprices", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<HousePrice> streamData() {
return WebClient
.create("http://localhost:8080")
.get()
.uri("/streamhouseprices")
.retrieve()
.bodyToFlux(HousePrice.class);
}
}
The server-sent events allow to refresh information in browser without making any new request.
<script type="text/javascript">
var evtSource = new EventSource("/ssehouseprices");
evtSource.onmessage = function(e) {
housePrice = JSON.parse(e.data);
// now we can use housePrice.pricePerSquare for example..
}
</script>