Nous avons vu que la programmation réactive est au cœur de Vert.x. Nous expliquions que la programmation réactive est un style de programmation asynchrone où l’on écrit des instructions dont l’exécution est déclenchée par l’occurrence d’événements. Et c’est en effet de cette façon qu’on écrit une application Vert.x. Un projet a émergé ces dernières années pour faire de la programmation réactive, il s’agit des Reactive Extensions (Rx) qui ont vu le jour chez Microsoft en 2009. Elles ont été implémentées dans plusieurs langages de programmation (ReactiveX), dont Java avec le projet RxJava. Voyant la popularité de ce dernier pour faire de la programmation réactive, Vert.x a réfléchi à l’intérêt de générer automatiquement des versions RxJava de son API, versions dites Rxified.

Comment écrit-on du RxJava ?

Avant de voir des exemples d’API Vert.x Rxified, expliquons très brièvement en quoi consiste la programmation réactive avec RxJava. (A savoir que j’en parle plus dans mon compte rendu sur l’université de David Wursteisen et Brice Dutheil sur RxJava durant Devoxx France 2016).
RxJava s’articule autour de deux concepts : Observable et Observer. Un Observable est une abstraction qui représente un flux de données. Et à laquelle peut s’appliquer plusieurs opérateurs pour produire à chaque fois un autre Observable. Pour consommer le flux produit par l’Observable, un Observer souscrit auprès de lui. Ensuite nous convertissons les String en majuscules et enfin nous souscrivons un Observer auquel l’Observable publiera ses données.

1         Observable<String> observable = Observable.fromIterable(List.of("C", "C++", "Java", "Scala", "Kotlin"));
2         observable.map(String::toUpperCase).subscribe(item -> System.out.println(item), throwable -> System.err.print(throwable), () -> System.out.println("Terminated"));

Exemples d’API Vert.x Rxified

Les API Rxified sont automatiquement générées par Vert.x au même titre que les API générées automatiquement par Vert.x vers les langages supportés : Groovy, Ruby, JavaScript, etc, moyennant l’annotation de l’API avec @VertxGen.
Les API Rxified de Vert.x sont générées dans les packages où le préfix io.vertx est remplacé par le préfix io.vertx.rxjava. Par exemple pour écrire une classe de verticle Rxified :

1 class HelloWorldRx extends io.vertx.rxjava.core.AbstractVerticle{

Maintenant voyons les versions Rxified de quelques API importantes.

Rxified ReadStream

L’interface io.vertx.rxjava.core.streams.ReadStream comporte la méthode toObservable pour retourner la version RxJava de l’objet. Exemple : (dans tous les exemples de cette section, nous utilisons les versions Rxified des API) :

1         HttpServer httpServer = vertx.createHttpServer();
2         ReadStream<HttpServerRequest> requestReadStream = httpServer.requestStream();
3         Observable<HttpServerRequest> requestObservable = requestReadStream.toObservable();

L’Observable obtenu est utilisé comme n’importe quel Observable RxJava:

1         requestObservable.subscribe(request -> {request.response().end("Hello World");},
2                 error -> System.err.println(error)
3         );

Rxified HttpServerRequest

La classe io.vertx.rxjava.core.http.HttpServerRequest comporte la méthode toObservable qui retourne l’Observable équivalent de l’objet sous-jacent :

1         HttpServer httpServer = vertx.createHttpServer();
2         ReadStream<HttpServerRequest> requestReadStream = httpServer.requestStream();
3         Observable<HttpServerRequest> requestObservable = requestReadStream.toObservable();
4         requestObservable.subscribe(request -> { 
5             Observable<io.vertx.rxjava.core.buffer.Buffer> buffer = request.toObservable();
6                        // ...

Rxified Handler

Nous avons parlé des interfaces Handler et AsyncResult utilisées pour la gestion des appels asynchrones. Les versions Rxified des méthodes qui transmettent un AsyncResult comme résultat au Handler qui leur est transmis vont retourner un Single de RxJava, qui est comme un Observable mais n’émettant qu’une seule valeur.
Exemple, la version non Rxified pour envoyer un message sur l’event bus et attendre une réponse :

1            Handler<AsyncResult<Message<JsonObject>>> asyncResultHandler = reply -> {
 2                 if (reply.succeeded()) {
 3                     String message = reply.result().body().getString("message");
 4                     System.out.println(message);
 5                 } else {
 6                     System.out.println("error");
 7                 }
 8             };
 9             
10             vertx.eventBus().<JsonObject>send("GreetingService", "I want a greeting message", asyncResultHandler);

Et la version Rxified :

1         Single<Message<JsonObject>> single = vertx.eventBus().<JsonObject>rxSend("GreetingService", "I want a greeting message");
2 
3         single.subscribe(reply -> {
4             String message = reply.body().getString("message");
5             System.out.println(message);
6         }, error -> System.out.println("error"));

RxJava 2

A l’époque de ce talk, Vert.x ne supportait pas encore RxJava 2 (qui était encore relativement récent), mais c’est chose faite avec la dernière version de Vert.x (3..5) qui vient de sortir pendant que j’écris ces lignes. Cette version contient d’autres nouveautés très intéressantes que je mentionnerai juste après.
RxJava 2 est donc la nouvelle version de RxJava qui est sortie fin 2016, grâce à la contribution majeure de David Karnock, qui est d’ailleurs le lead du projet depuis plus d’un an (je ne sais pas quand exactement 😉), car il semble que la contribution de Netflix qui est à l’origine du projet ait considérablement diminué.
Les différences par rapport à la version 1.0 sont nombreuses et bien énumérées sur cette page : What’s different in 2.0. Je vais juste mentionner les quelques points que j’ai retenus.

Implémentation de la spécification Reactive Streams

RxJava 2 implémente donc la spécification Reactive Streams, qui est « une initiative pour standardiser le traitement de flux asynchrone » dans le monde Java, JavaScript et dans les protocoles réseaux. La spécification pour la JVM est incluse dans Java pour la première fois dans la release Java 9, dans la classe java.util.concurrent.Flow. Le fait de standardiser le traitement des flux va surtout permettre aux différentes librairies de programmation réactive de pouvoir interopérer entre elles. Le site des Reactive Streams liste les implémentations reconnues de la spécification : http://www.reactive-streams.org/announce-1.0.0.

Observable et Flowable

RxJava 2 a introduit le nouveau type de flux Flowable en plus d’Observable pour distinguer clairement le traitement des flux nécessitant ou non de la back-pressure.
Avant RxJava 2, ces deux types de flux sont en effet représentés dans RxJava par l’unique type Observable. Ce qui a pour inconvénient d’obliger le développeur à gérer parfois des contraintes liées à la back-pressure alors même qu’elle n’est pas utilisée, typiquement des exceptions liées à la back-pressure déclarées dans la signature de certaines opérations.
En adressant les deux types de flux dans des classes distinctes dans RxJava 2, on peut à présent utiliser Observable pour gérer plus simplement les flux ne nécessitant pas de back-pressure, et recourir à Flowable lorsqu’on a besoin de faire de la back-pressure.

Valeurs nulles

Les valeurs nulles ne sont plus supportées dans les flux et provoquent des exceptions.
On peut aussi signaler une amélioration de performance de RxJava 2 dans les opérations de transformation des flux comme on peut le constater avec les résultats des benchmarks que David Karnock a l’habitude de publier.

Shakespeare plays crabble

Benchmark des librairies de programmation réactive par David Karnok.