Pour revenir à la première partie de cette série d'articles sur les Reactive Extensions, c'est par ici.

7 Leçons apprises en utilisant les Reactive Extensions

Cette dernière partie est consacrée à la présentation des bonnes pratiques, des écueils et des patterns retenus par les orateurs dans leur expérience d’utilisation de RxJava en utilisant les Reactive Extensions.

7.1 RxJava et les Stream Java 8

Même si RxJava fournit un ensemble d’opérateurs très riche pour manipuler les Observable, il peut être parfois raisonnable d’utiliser les Stream Java 8 à la place de ces opérateurs. Les Stream ont l’avantage d’être plus performants pour les traitements sur les données en local. Voici un exemple où l’utilisation d’un Stream dans du Rx est raisonnable :1 Observable.just(1, 2, 3, 4) 2 .toList() 3 .map( 4 // Stream Java 8 5 list -> list.stream() 6 .filter(i -> i % 2 == 0) 7 .collect(Collectors.toList())) 8 .subscribe();

 Un Stream Java 8 est utilisé pour filtrer les éléments de la liste.

On peut tout simplement utiliser des collections de l’API standard dans les instructions RxJava, comme cet exemple qui crée un Observable à partir d’une collection d’Observables :

1         List<Observable<?>> obs = new ArrayList<>(); // collection Java
2         obs.add(Observable.defer(() -> Observable.just(1, 2, 3)));
3         obs.add(Observable.just(4, 5, 6));
4         obs.add(Observable.fromCallable(() -> 7));
5         Observable.merge(obs).subscribe(); // utilisation de la collection Java

Attention !

Cette combinaison de Stream et RxJava doit être utilisée avec parcimonie. Car sinon on a vite fait de se retrouver avec des codes de ce genre où il devient impossible de démêler les appels RxJava des appels de Stream Java 8 :

7.2 Utiliser les lamba pour simplifier les transformations des Observables

Soit l’instruction Rx suivante dans une application Android :

 1         RxView.clicks(startButton).doOnNext(new Action1<Void>() {
 2             @Override
 3             public void call(Void aVoid) {
 4                 logger.debug("--------- GOT A TAP");
 5             }
 6         }).map(new Func1<Void, Integer>() {
 7             @Override
 8             public Integer call(Void aVoid) {
 9                 return 1;
10             }
11         }).subscribe();

En la récrivant en utilisant les lambdas, elle revient à :

1         RxView.clicks(startButton).doOnNext(arg -> logger.debug("--------- GOT A TAP"))
2                 .map(arg -> 1).subscribe();

Si l’environnement de développement le permet, il ne faut donc pas hésiter à utiliser les lambdas pour simplifier l’écriture d’instructions Rx.

7.3 Découper les longues transformations Rx pour améliorer leur lisibilité

Soit l’instruction Rx suivante :

 1         Observable.just(1, 2, 3)
 2                 .flatMap(i -> {
 3                     Observable<String> complex = webService.complexify(i)
 4                             .map(str -> str.toLowerCase())
 5                             .retry(4);
 6                     Observable<String> human = webService.toHumanString(i)
 7                             .filter(str -> str.length() < 3)
 8                             .take(1)
 9                             .onErrorResumeNext(Observable.empty())
10                             .defaultIfEmpty("Oups !");
11                     
12                     return Observable.zip(complex, human, (c, h) -> c + h)
13                             .flatMap(str -> webService.frequencyOf(str))
14                             .flatMapIterable(itr -> itr);
15                 }).subscribe();
16

Sa lisibilité peut être améliorée en extrayant certaines parties dans des méthodes :

Observable.just(1, 2, 3)
        .flatMap(i -> transform(i))
        .subscribe();

public Observable<Object> transform ( int i){
    Observable<String> complex = fetchComplexify(i);
    Observable<String> human = fetchHuman(i);

    return Observable.zip(complex, human, (c, h) -> c + h)
            .flatMap(str -> webService.frequencyOf(str))
            .flatMapIterable(itr -> itr)
}
private Observable<String> fetchHuman(int i) {
    return webService.complexify(i)
            .map(str -> str.toLowerCase())
            .retry(4);
}

private Observable<String> fetchComplexify(int i) {
    return webService.toHumanString(i)
            .filter(str -> str.length() < 3)
            .take(1)
            .onErrorResumeNext(Observable.empty())
            .defaultIfEmpty("Oups !");
}

 7.4 Usage de flatMap

L’opérateur flatMap retourne un Observable construit par une fonction qu’elle accepte en argument. Voici quelques cas remarquables d’utilisation de flatMap lorsqu’on débute avec Rx.

7.4.1 Imbrication de souscriptions

Lorsque nous avons besoin des valeurs d’un Observable pour souscrire aux valeurs d’un autre Observable, on peut avoir le mauvais réflexe d’écrire quelque chose du genre :

Observable.just(1, 2, 3)
        .subscribe(arg -> {
            webservice.call(arg).subscribe(System.out::println);
        });

Mais en faisant cela, outre le fait qu’on s’engage dans du “callback hell”, on se prive de l’usage de la propriété de monade des Observable, car rappelez-vous flatMap est une des propriétés d’une structure monadique. Ainsi en utilisant flatMap, cette imbrication de souscription s’écrira (et devrait) s’écrire comme suit :

Observable.just(1, 2, 3)
        .flatMap(arg -> webservice.call(arg))
        .subscribe(System.out::println);

 7.4.2 Observable infini

Soit un Observable émettant une séquence de valeurs chaque seconde, soit une séquence sans fin, à laquelle on applique l’opérateur flatMap :

Observable.interval(1, TimeUnit.SECONDS)
        .flatMap(tps -> Observable.just(1, 2, 3))
        .subscribe(System.out::println);

 A chaque écoulement de seconde, l’Observer écrit sur la sortie standard la séquence 1, 2, 3. Bien que l’Observer soit notifié par l’opérateur flatMap, il ne faut pas oublier que la source principale d’événement reste:

Observable.interval(1, TimeUnit.SECONDS)

Donc le flux reste infini même avec l’application de l’opérateur flatMap, autrement dit il n’y aura pas d’événement de fin de flux.

7.5 Observable infini et operateur toList

Un piège à éviter est de vouloir appliquer l’opérateur toList à un flux infini:

Observable.interval(1, TimeUnit.SECONDS)
        .toList()
        .subscribe(System.out::println);

 Cette instruction va à coup sûr aboutir à une saturation de mémoire si le code reste en exécution assez longtemps, car il faut savoir que l’opérateur toList essaye de retourner tous les éléments émis par l’Observable source dans une seule liste. Vu que le flux de l’Observable source n’aura pas de fin, l’opérateur toList va continuer à stocker indéfiniment les éléments qu’il va recevoir. En résumer une situation à éviter.

7.6 Cold et Hot Observable

Les Observable peuvent être répartis en deux catégories : Cold et Hot. En fonction du contexte, l’appartenance d’un Observable à l’une ou l’autre des deux catégories peut avoir un impact important.

7.6.1 Cold Observable

Un Cold Observable crée l’émetteur des valeurs à émettre lors des souscriptions. Exemple:

Observable.create(subscriber -> {
    Timer timer = new Timer();
    timer.onTick(() -> subscriber.onNext("tick"));
}).subscribe();

 7.6.2 Hot Observable

A l’inverse, un Hot Observable utilise un émetteur de valeurs existant et actif indépendamment des souscriptions. Exemple:

Timer timer = new Timer();
Observable.create(subscriber -> {
    timer.onTick(() -> subscriber.onNext("tick"));
}).subscribe();

7.6.3 Illustration de l’importance du type Cold ou Hot de l’Observable

Soit un appel de web service qui retourne un Observable:

Observable<String> response = ws.call();

 Et soit l’utilisation suivante de cet Observable:

response
        .map(str -> str.toLowerCase())
        .subscribe(System.out::println);

Si l’Observable est de type Cold, l’appel du Web Service se fera au moment de la souscription, un coût dont le client doit avoir conscience, surtout s’il compte souscrire à l’Observable dans d’autres instructions, chacune des souscriptions aboutissant à un nouvel appel du Web Service.

7.7 Utilisation de MDC dans les instructions RX

Le mécanisme MDC (Mapped Diagnostic Context) bien connu des bibliothèques de journalisation (log4j, logback, slf4j) permet de logger des informations de contexte propres à une hiérarchie de threads (c’est-à-dire que l’information de contexte présente dans le MDC est héritée par les threads fils lors de leur création).

Ce mécanisme de MDC ne marche malheureusement plus tout seul dans certains types d’environnements de multithreading sans intervention du développeur. En effet dans un environnement où un pool de thread est utilisé pour exécuter des tâches, une tâche peut se retrouver à être exécutée dans un thread ayant un MDC avec des valeurs inattendues. Simplement parce qu’au lieu d’être exécutée sur un nouveau thread du pool héritant du MDC du thread courant, la tâche a été exécutée sur un thread du pool déjà utilisé pour l’exécution d’autres tâches ayant modifié son MDC. Ce même phénomène se produit avec RxJava lorsqu’on utilise les Schedulers pour paralléliser les traitements. Il faut utiliser un workaround pour fiabiliser le mécanisme du MDC dans ces cas.

D’abord illustrons concrètement le problème que nous évoquons avec le billet repris du blog http://blog.mabn.pl/2014/11/rxjava-logback-and-mdc-threadlocal.html. Soit la suite d’instructions suivante :

 1         MDC.put("requestId", "AAA");
 2 
 3         Observable.interval(1, TimeUnit.SECONDS, Schedulers.io())
 4                 .subscribe((seq) -> {
 5                     LoggerFactory.getLogger("first_observable").warn(""+seq);
 6                 });
 7 
 8         sleep(2500);
 9 
10         log.info("before changing to BBB");
11         MDC.put("requestId", "BBB");
12         log.info("after changing to BBB");
13 
14         Observable.interval(1, TimeUnit.SECONDS, Schedulers.io())
15                 .take(3)
16                 .subscribe((seq) -> {
17                     LoggerFactory.getLogger("second_observable").warn(""+seq);
18                 });
19 
20         sleep(5500);
21         MDC.put("requestId", "CCC");
22         log.info("after changing to CCC");
23 
24         Observable.interval(1, TimeUnit.SECONDS, Schedulers.io())
25                 .subscribe((seq) -> {
26                     LoggerFactory.getLogger("third_observable").warn(""+seq);
27                 });

 et la sortie correspondante :

 1         WARN  AAA [2014-11-10 02:57:15,568] first_observable: 0
 2 // as expected - first io scheduler thread inherited MDC 
 3         WARN  AAA [2014-11-10 02:57:16,566] first_observable: 1
 4         INFO  AAA [2014-11-10 02:57:17,067] main_thread: before changing to BBB
 5         INFO  BBB [2014-11-10 02:57:17,067] main_thread: after changing to BBB
 6 // main thread changed its requestId. First observable still uses the old one, but...
 7         WARN  AAA [2014-11-10 02:57:17,566] first_observable: 2
 8         WARN  BBB [2014-11-10 02:57:18,074] second_observable: 0
 9 // newly created observable got the new one
10         WARN  AAA [2014-11-10 02:57:18,566] first_observable: 3
11         WARN  BBB [2014-11-10 02:57:19,074] second_observable: 1
12         WARN  AAA [2014-11-10 02:57:19,566] first_observable: 4
13         WARN  AAA [2014-11-10 02:57:20,566] first_observable: 5
14         WARN  AAA [2014-11-10 02:57:21,567] first_observable: 6
15         WARN  AAA [2014-11-10 02:57:22,566] first_observable: 7
16         INFO  CCC [2014-11-10 02:57:22,574] main_thread: after changing to CCC
17         WARN  AAA [2014-11-10 02:57:23,566] first_observable: 8
18         WARN  BBB [2014-11-10 02:57:23,576] third_observable: 0
19 // A-ha! second observable released a thread, so third reuses the thread with “BBB” requestId
20         WARN  AAA [2014-11-10 02:57:24,566] first_observable: 9
21         WARN  BBB [2014-11-10 02:57:24,576] third_observable: 1
22         WARN  AAA [2014-11-10 02:57:25,566] first_observable: 10
23         WARN  BBB [2014-11-10 02:57:25,576] third_observable: 2

Le plus intéressant commence à la ligne 18 des messages de log, où on constate que l’Observable «third_observable» est exécuté avec la valeur «BBB» dans le MDC, alors que d’après la ligne 21 du code, on s’entendrait à ce que le MDC contienne plutôt «CCC». L’explication de ce qui s’est passé est simplement qu’avant le début de l’exécution de l’Observable «third_observable», l’Observable «second observable» se terminait, libérant le thread qui l’exécutait, et c’est ce thread ayant « BBB » dans son MDC que le Scheduler va utiliser pour exécuter l’Observable « third_observable ».

Le workaround proposé notamment dans la continuité de l’article du blog est d’étendre le hook des Scheduler pour y injecter nous même le MDC du thread courant dans le contexte de la prochaine action à exécuter par le Scheduler. Bien sûr, il faudra ensuite créer un Scheduler qui saura extraire le MDC que nous aurons injecté pour l’utiliser correctement.

1     static class RequestContextSchedulersHook extends RxJavaSchedulersHook {
 2         @Override
 3         public Action0 onSchedule(Action0 action) {
 4             return super.onSchedule(new WrappedAction0(action));
 5         }
 6 
 7         static class WrappedAction0 implements Action0 {
 8             private final Action0 actual;
 9             private Map<String, String> mdcContextMap;
10 
11             public WrappedAction0(Action0 actual) {
12                 this.actual = actual;
13                 mdcContextMap = MDC.getCopyOfContextMap();
14             }
15 
16             @Override
17             public void call() {
18                 MDC.setContextMap(mdcContextMap);
19                 try {
20                     actual.call();
21                 } finally {
22                     MDC.clear();
23                 }
24             }
25         }
26     }
27 
28     RxJavaPlugins.getInstance().registerSchedulersHook(new RequestContextSchedulersHook());

 7.8 Enrichissement des exceptions des Observable: OnErrorThrowable

Dans un contexte d’exécution asynchrone, les messages des exceptions doivent être particulièrement riches pour faciliter les investigations. Or dans un traitement comme dans l’instruction suivante, il n’y a aucune information sur le contexte de la valeur émise lorsque l’erreur se produit :

Observable.interval(1, TimeUnit.SECONDS)
        .flatMap(i -> {
            try {
                RandomException.mayThrowException();
                return Observable.empty();
            } catch (IOException e) {
                return Observable.error(e);
            }
        }).subscribe(System.out::println, e -> { /* ... */ });

La valeur émise par l’Observable au moment de l’erreur est pourtant une information très utile sur la cause de l’erreur. Raison pour laquelle il existe dans RxJava des méthodes dédiées pour faciliter la capture et la lecture de cette valeur en cas d’erreur (lignes 8 et 12) :

 1             Observable.interval(1, TimeUnit.SECONDS)
 2                     .flatMap(i -> {
 3                         try {
 4                             RandomException.mayThrowException();
 5                             return Observable.empty();
 6                         } catch (IOException e) {
 7                             // association de la valeur courante de l'Observable à l'exception
 8                             return Observable.error(OnErrorThrowable.addValueAsLastCause(e, i));
 9                         }
10                     }).subscribe(System.out::println, e -> {
11                 // récupération de la valeur de l'Observation lorsque l'erreur s'est produite
12                 Object value = OnErrorThrowable.from(e).getValue();
13             });

 7.9 Enrichissement sémantique de la pluralité des valeurs des Observable

Lorsque nous avons affaire à une API étrangère retournant un Observable, nous souhaiterions bien avoir une idée du nombre d’éléments émis par l’Observer, au moins savoir si l’Observer émet zéro, un ou plusieurs éléments. Une bonne documentation à jour permettra bien sûr de répondre à ce souci. Mais un typage approprié sera encore plus convenable. Ainsi RxJava propose deux spécialisations (pas au sens POO) des Observable pour répondre à ce besoin de clarté.

7.9.1 Single

Un Single est comme un Observable, avec la particularité qu’elle n’émet que l’une des deux notifications : succès ou erreur.

Single = onSuccess | onError

L’opération suivante retourne un Single qui n’émettra donc qu’une valeur ou une erreur:

Single<String> get();

Un Single comporte une méthode toObservable() qui permet de le transformer en un Observable émettant les mêmes événements.

7.9.2 Completable

Un Completable est aussi comme un Observable, avec la particularité de n’émettre qu’un événement de complétion ou d’erreur, mais pas de valeur.

Completable = onCompleted | onError

L’opération suivante qui ne retourne pas de valeur est donc convenablement typé avec un Completable :

Completable perform();

Un Completable comporte des méthodes toObservable() et toSingle() qui permettent de le transformer respectivement en un Observable et un Single émettant les mêmes événements que lui.

7.9.3 Désabonnement aux Observables

La souscription à un Observable retourne un objet Subscription qui permet de mettre fin à l’abonnement lorsqu’on le souhaite. Dans l’exemple suivant :

Subscription interval = Observable.interval(1, TimeUnit.SECONDS)
        .subscribe(System.out::println);

interval.unsubscribe(); // désabonnement

 le désabonnement qui est correctement géré par le résultat de l’opérateur interval  mettra fin à l’émission des valeurs. Après un désabonnement, l’Observer désabonné ne recevra pas de notification de fin de flux.

7.9.3.1 CompositeSubscription pour grouper la gestion des souscriptions

1             Subscription flux1 = Observable.interval(1, TimeUnit.SECONDS)
2                     .subscribe(System.out::println);
3             Subscription flux2 = Observable.interval(100, TimeUnit.MILLISECONDS)
4                     .subscribe(System.err::println);
5             
6             flux1.unsubscribe();
7             flux2.unsubscribe();

 Ce type de besoin de désabonnement peut être simplifié avec l’usage d’un objet CompositeSubscription. Avec l’exemple précédent, cela donne :

Subscription flux1 = Observable.interval(1, TimeUnit.SECONDS)
        .subscribe(System.out::println);
Subscription flux2 = Observable.interval(100, TimeUnit.MILLISECONDS)
        .subscribe(System.err::println);
CompositeSubscription subscription = new CompositeSubscription();
subscription.add(flux1);
subscription.add(flux2);

subscription.unsubscribe();

C’est une fonction qui s’avère particulièrement pratique dans un environnement tel que celui d’Android où le cycle de vie des applications peut conduire à des abonnements et désabonnements massifs:

 1             private CompositeSubscription subscription;
 2             
 3             @Override
 4             public void onResume() {
 5                 Subscription interval = Observable.interval(1, TimeUnit.SECONDS)
 6                         .subscribe(/* ... */);
 7                 Subscription click = clicks()
 8                         .subscribe(/*...*/);
 9                 subscription = new CompositeSubscription();
10                 subscription.add(interval);
11                 subscription.add(click);
12             }
13             @Override
14             public void onPause() {
15                 subscription.unsubscribe();
16             }

 7.9.3.2 Action de l’Observable lors du désabonnement

Nous l’avons déjà vu, l’Observable peur spécifier l’action à exécuter lors du désabonnement d’un Observer.

Avec la méthode de création create :

1             Observable.create(subscriber -> {
2                 subscriber.add(new Subscription() {
3                     @Override
4                     public void unsubscribe() {
5                         // fermeture du flux
6                     }
7                 });
8             });

Encore une fois, la méthode de création create n’étant à utiliser que quand on ne peut faire autrement, il faudra penser à la présence de la factory using lorsqu’on a besoin de réagir à une action de désabonnement :

1             Observable.using(
2                     () -> new DB(),
3                     db -> Observable.just(db.doQuery()),
4                     db -> db.closeDb()  // fermeture des ressources
5             ).subscribe();

 8 Conclusion

Cette université était une très bonne occasion d’avoir une large vision de ce qu’est RxJava et au passage les Reactive Extensions. Après l’introduction de RxJava comme alternative aux CompletableFuture pour faire de la programmation concurrente, nous avons vu les principes de base de RxJava, à travers le couple Observable/Observer et l’utilisation d’un certain nombre de constructeurs représentatifs d’Observable. Nous avons ensuite découvert comment enfin faire réellement de la programmation asynchrone avec RxJava (et Rx aussi) avec la présentation de la notion de Scheduler. Nous avons constaté que la spécification JAX-RS en l’état actuel ne supportait pas nativement Rx, et que pour bénéficier d’une telle intégration native, il faut faire l’effort de voir au-delà de Java EE les nouvelles alternatives que sont les frameworks orientés Microservices tels que Netty/RxNetty ou Vert.x. Nous avons abordé la composition de services avec RxJava, le sujet le plus représentatif du thème de la conférence (« Architecture découplée grâce aux Reactive Extensions »), en découvrant comment la gestion des erreurs peut être améliorée entre les Observable et les Observer, comment RxJava permet de gérer entre autres le nombre de tentatives d’exécution d’une opération où la durée d’exécution maximale d’une opération. Toujours dans le cadre de la composition de services, Hytrix avec l’implémentation du Circuit Breaker a été introduit comme alternative spécialisée pour la gestion des erreurs des services composés par le service composite. Ensuite nous avons eu une brève introduction à la notion de back-pressure dans RxJava. La dernière partie de la session a porté sur les leçons apprises par les orateurs dans leur expérience d’utilisation de RxJava.

Ici donc prend fin la restitution de ce que j’ai appris sur Rx et RxJava en particulier durant cette université de Devoxx France. Cet exercice était intéressant car n’ayant pas d’expérience avec Rx, j’ai essayé de remonter à son origine, ce qui m’a permis de découvrir pas mal de choses, comme par exemple le fait que Rx existait depuis 2009 chez Microsoft où elle a été créée. J’ai aussi appris le concours de circonstances ayant permis  la naissance de RxJava. C’est Jafar Husain, un ingénieur de Netflix qui avait découvert Rx lorsqu’il travaillait chez Microsoft qui a poussé l’adoption de cette technologie auprès de son collègue Ben Christensen, celui-là même qui se laissera convaincre de l’utilité de Rx et lancera le développement de l’implémentation en Java qui sera RxJava. Une très bonne ressource que je conseillerais pour un apprentissage de RxJava est le livre Reactive Programming with RxJava de Tomasz Nurkiewicz et Ben Christensen. Le livre décrit bien entendu l’essentiel de l’API de RxJava, mais dans un contexte d’utilisation dans la vraie vie, donc en évoquant des problématiques connus qui peuvent être résolus par tel type d’usage où au contraires être engendrés par des usages déconseillés.  Le livre explique aussi bien comment introduire RxJava dans une application existante non construite suivant le paradigme réactif(des conseils que je commence personnellement à mettre en pratique) que comment créer une application sur une stack reactive. Le livre traite aussi des questions qu’on va se poser inévitablement lorsqu’on commence à s’intéresser à  RxJava: apport par rapport à l’API standard de Java: Stream, Completable Future, etc. Pleins d’autres sujets intéressants et importants sont traités avec sérieux dans le livre.

RxJava n’est pas (ou plus?) la seule bibliothèque pour le traitement de flux asynchrone en Java. Je vais juste mentionner les deux autres bibliothèques dont on ne peut s’empêcher de voir mentionner lorsqu’on s’intéresse à ce sujet: Akka Streams et Reactor. Ces deux projets ainsi que la version 2 de RxJava implémentent tous la spécification Reactive Streams, qui comme expliqué sur son site, est une initiative visant à standardiser le traitement de flux asynchrones en Java.

Les systèmes réactifs de façon générale sont en train d’être adoptés par le monde du développement. L’orientation de Java EE 8 vers ce paradigme confirme cette adoption, ainsi que la pertinence de ce paradigme. Il est donc tout à fait raisonnable de s’intéresser aux activités autour de ce courant.