2 RxJava et les Reactive Extensions

L’origine de RxJava remonte à celle des Reactive Extensions(Rx), une bibliothèque issue des laboratoires de recherche de Microsoft dans le but de simplifier la communication entre applications, particulièrement dans un contexte de Cloud Computing, comme l’explique Erik Meijer dans cette vidéo d’introduction à Rx. Dans un contexte de programmation asynchrone, l’idée est d’abstraire toute source de production de données en un Observable.

Un Observable est une abstraction qui permet de représenter toute source de données comme une collection de type push, i.e. une collection qui pousse ses valeurs vers le consommateur, à l’inverse d’une collection de type pull que le consommateur doit parcourir au moyen d’un itérateur pour lire ses valeurs. Ainsi le résultat d’un appel asynchrone à un service distant, à une base de données, les positions successives du curseur d’une souris, les caractères produits par des frappes sur le clavier, etc., peuvent tous être unifiés avec cette abstraction qu’est un Observable et à laquelle on pourra appliquer toutes les techniques de manipulation d’une collection. Un Observable ne se limite pas à cela bien sûr, car qui parle d’asynchronisme parle de concurrence, aussi l’objectif des Reactive Extensions est également de simplifier la gestion de l’exécution des Observable dans des fils d’exécutions concurrents. Les Observable représentent la partie productrice de données, pour consommer et réagir à la production des données, Rx introduit les Observer.

Un Observer implémente une interface lui permettant de réagir à la publication d’une donnée par l’Observable, à l’occurrence d’une erreur au niveau de l’Observable ou à la fin de la publication de donnée par l’Observable. Rx est basé sur les concepts d’Observable et d’Observer.

Rx d’abord implémenté par Microsoft en .Net en propriétaire, a été mis en open source depuis. Le site reactivex.io est un portail sur l’ensemble des implémentations disponibles de Rx et fait office de site fédérateur de l’ensemble de ces projets. Donc quelle que soit la plateforme ou l’implémentation de Rx utilisée, nous devrions retrouver les mêmes concepts, avec bien sûr éventuellement des spécificités propres dues aux contraintes ou au contraire à la richesse que proposent les différents environnements d’implémentation.

2.1 RxJava

RxJava est l’implémentation Java de Rx effectuée par NetFlix. Vu que nous abordons l’usage de Rx dans un environnement Java, c’est à travers RxJava que nous visiterons les possibilités qu’il offre.

2.2 Marble diagram

Un Observable comme nous le disions est une séquence de valeurs publiée qui peut être manipulée comme une collection, qui peut être finie ou pas, et dont la publication peut se terminer en erreur. Pour simplifier le raisonnement sur le cycle de vie des Observable, une représentation visuelle appelée simplement marble diagramme a été proposée par les concepteurs de Rx, et largement adoptée par la communauté d’utilisateurs.

Figure 2-1 Flux d'événements fini d'un Observable

Figure 2-1 Flux d’événements fini d’un Observable

Figure 2-2-1 Flux d'événements d'un Observable tombé en erreur

Figure 2-2-1 Flux d’événements d’un Observable tombé en erreur

Figure 2-2-2 Flux d’événements infini d'un Observable

Figure 2-2-2 Flux d’événements infini d’un Observable

2.3 Exemples d’instructions RxJava

2.3.1 Création d’un Observable

Observable<Integer> observable = Observable.just(1, 2, 3);

Cette instruction crée un Observable capable d’émettre la séquence de valeurs 1, 2 ,3. Pour écouter cet Observable, c’est-à-dire être notifié de la publication de ses valeurs, il faut effectuer une souscription au flux de l’Observable avec un Observer.

2.3.2 Souscription d’un Observer

La notification d’un Observer par un Observable se fait à travers l’interface Observer qui comporte trois opérations.  Nous pouvons par exemple créer un Observer pour souscrire aux événements du précédent Observable :

Listing 2‑1-1 Objet Observer

Observer<Integer> observer = new Observer<Integer>() {
    @Override
    public void onNext(Integer v) {
        System.out.println("Valeur : " + v);
    }

    @Override
    public void onCompleted() {
        System.out.println("Fin");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Erreur");
    }
};

L’ordre ainsi que le nombre des appels des méthodes de l’interface Observer par un Observable sont contraints par le contrat suivant :

OnNext* (OnCompleted|OnError) ?     

Ce qui veut dire qu’un Observable peut être notifié zéro, une, plusieurs fois ou indéfiniment de la publication d’une valeur par un Observable, et ensuite être notifié soit de la fin normale du flux, soit de l’occurrence d’une erreur dans la production du flux.

L’Observer créé, il ne reste plus qu’à le souscrire au flux de l’Observable pour recevoir les notifications :

observable.subscribe(observer);

Ainsi nous verrons à la console :

Valeur : 1
Valeur : 2
Valeur : 3
Fin

2.3.3 Souscription avec les lambda

Avec Java 8, il est possible de souscrire auprès d’un Observable en utilisant des lambdas :

Figure 2-3 Implémentation d'un Observer avec des lambdas

Figure 2-3 Implémentation d’un Observer avec des lambdas

Le code de souscription tire directement parti de la concision des lambda.

2.3.4 Transformation des Observable

Comme cela a été dit, un Observable est un flux d’événements qui peut être manipulé comme une collection. La liste des opérateurs applicables aux Observable est consultable sur le site Reactivex.io.

Soit un Observable de type String :

Observable<String> observable = Observable.just("bjr", "DevoXx");

On se fixe comme objectif de transformer les valeurs de cet Observable en minuscules et d’appliquer un filtre sur les valeurs pour ne garder que les mots de plus de trois lettres.

La première transformation est effectuée comme suit :

observable=observable.map(str -> str.toLowerCase());

La deuxième transformation est effectuée comme suit :

observable = observable.filter(str -> str.length() > 3);

Et enfin en souscrivant avec un Observer, nous pouvons afficher le résultat des transformations :

observable.subscribe(System.out::println);

qui est bien entendu la chaine « devoxx »;-)

Pour un besoin de clarté les transformations ont été affectées à chaque fois à une variable, mais vu la simplicité de l’exemple, tous les traitements peuvent être enchainés comme suit :

Observable.just("bjr", "DevoXx").map(str -> str.toLowerCase()).filter(str -> str.length() > 3).subscribe(System.out::println);

 2.4 Observable et programmation fonctionnelle : concept de monade et effets de bord

Les Observable ont les propriétés d’une structure de programmation connue dans le domaine de la programmation fonctionnelle sous le nom de monade.  Une monade est une structure de programmation permettant de construire et d’appliquer un enchainement de traitements sur une valeur encapsulée. Elle montre son utilité lorsque ces traitements doivent prendre en compte des situations d’exception. De façon plus concrète, une monade est une structure de programmation ayant :

  • un type paramétré (Observable<T>)
  • une factory (just(…)))
  • une méthode flatMap permettant de construire une autre monade (Observable.flatMap(…)).

Les Observable sont donc des monades. Tout comme les types Stream et  Optional introduits en Java 8. En tant que tels, ils doivent être manipulés dans l’idéal dans les règles de la programmation fonctionnelle, en évitant d’y produire des effets de bord. Les effets de bord sont les instructions qui ne rentrent pas dans le cadre de la logique de transformation d’une opération(fonction), et particulièrement qui modifient des états dont la portée est en dehors de celle de la fonction. Dans la pratique cela dit, il est difficile de se passer de ce genre d’instructions pour certains besoins ou dans certaines situations. Lorsqu’on se sent obligé d’introduire des effets de bord dans nos Observable, il faut consulter la documentation et suivre les conseils fournis sur la façon de s’y prendre.

2.4.1 Exemple 1 Collecter les valeurs d’un Observable

List<Integer> result = new ArrayList<>();

Observable.just(1, 2, 3)
        .map(i -> result.add(i))
        .subscribe();

La modification de la variable result dans la transformation de l’Observable est une instruction à effet de bord. Plutôt que de procéder de cette façon, on peut faire comme suit :

Observable.just(1, 2, 3).toList().subscribe(result -> {/* Consommation de la liste... */});

 2.4.2 Exemple 2 Logger les valeurs d’un Observable

Observable.just(1, 2, 3).map(i -> {
    logger.info("value : " + i);
    return i;
}).subscribe();

Une instruction de log est une instruction à effet de bord qui ne rentre pas dans le cadre de la logique de transformation de l’Observable. Mais on comprend que l’on puisse avoir besoin d’avoir des traces des valeurs prises par un Observable dans des sorties de log. Dans ces situations, Rx fournit des opérateurs explicites dédiés à l’écriture d’instructions à effet de bord, ainsi nous pouvons tracer l’exécution de notre Observable en invoquant les méthodes dédiées explicitement à cet effet :

Observable.just(1, 2, 3)
        .doOnSubscribe(() -> logger.info("Subscribe"))
        .doOnNext(i -> logger.info("Valeur :" + i))
        .doOnTerminate(() -> logger.info("Terminate"))
        .subscribe();

2.5 Comment créer des Observable

Il existe plusieurs méthodes pour créer des Observable, ici nous allons en voir cinq qui sont représentatives de l’ensemble. Par ordre de simplicité d’usage, nous avons :

  1. Observable.just()
  2. Observable.defer()
  3. Observable.fromCallable()
  4. Observable.using()
  5. Observable.create()

2.5.1 Observable.just()

Comme vu à travers les précédents exemples, cet opérateur permet de créer un Observable à partir de valeurs existantes. Il faut surtout retenir que toute expression passée en guise d’argument à la méthode Observable.just() fera l’objet d’une évaluation immédiate.

Ainsi l’expression :

Observable.just(compute()).subscribe();

est équivalente à la suite d’instructions :

int result = compute();
Observable.just(result).subscribe();

 2.5.2 Observable.defer()

Permet de retarder la création d’un Observable jusqu’à ce qu’il y ait une souscription. Ainsi pour retarder la création du précédent Observable jusqu’à la prochaine souscription, on peut écrire :

Observable.defer(() -> Observable.just(compute())).subscribe();

 2.5.3 Observable.fromCallable()

Accepte en argument un Callable qu’il transforme en un Observable qui permettra d’invoquer le Callable lorsqu’il y aura une souscription On peut par exemple retarder l’invocation de la méthode compute() des exemples précédents en utilisant Observable.fromCallable() comme suit :

Observable.fromCallable(() -> compute()).subscribe();

 2.5.4 Observable.using()

Cette factory permet d’associer la durée de vie d’une ressource à celle d’un Observable. Pour cela elle prend en arguments :

  • une factory pour créer la ressource,
  • une autre pour créer l’Observable,
  • et une action à invoquer à la fin de l’exécution de l’Observable associée à la ressource.

Un exemple d’usage pourrait être l’exécution d’une requête sur une base de données où nous aurons besoin de créer une connexion à la base de données, de créer un Observable avec le résultat de la requête sur la base de données et enfin de fermer la connexion après la consommation du résultat de la requête, comme suit :

Observable.using(
        () -> new DB(),
        db -> Observable.just(db.doQuery()),
        db -> db.closeDb())
        .subscribe();

 2.5.5 Observable.create()

Cette factory est assez différente de celles qui viennent d’être vues. Elle permet au développeur d’avoir un contrôle explicite sur l’interaction entre l’Observable et l’Observer. Pour cela, elle accepte comme argument la spécification de ce contrôle qui concerne :

  • le désabonnement de l’Observer,
  • la gestion du back-pressure,
  • la gestion du contrat Rx.

Mais autant cette factory permet un contrôle spécifique de l’interaction avec l’Observer, autant son usage correct nécessite une très bonne compréhension du fonctionnement des Reactive Extensions lorsqu’on s’éloigne des cas les plus simples. Aussi, il est conseillé de s’en passer tant que cela est possible en ayant recours aux autres fabriques qui peuvent servir de solution de substitution dans la plupart des cas.

Figure 2-4 Interaction avec un Observer avec la factory Create

Figure 2-4 Interaction avec un Observer avec la factory Create

Par exemple, pour ne déclencher un Observable que durant une souscription, on peut être tenté d’utiliser la factory Observable.create :

Observable.create(subscriber -> {
    subscriber.onNext(compute());
    subscriber.onCompleted();
}).subscribe();

Mais nous avons vu précédemment que cela était possible avec la factory Observable.fromCallable():

Observable.fromCallable(() -> compute()).subscribe();

On peut également vouloir utiliser la factory Observable.create pour gérer la création, l’utilisation et la libération d’une ressource requise pour la production du flux envoyé à l’Observer, comme suit :

Observable.create(subscriber -> {
    DB db = new DB();
    subscriber.add(Subscriptions.create(() -> db.closeDb()));
    subscriber.onNext(db);
    subscriber.onCompleted();
}).subscribe();

Mais comme vu précédemment, l’utilisation du constructeur Observable.using() est adapté à ce besoin et permet de faire les choses plus simplement :

Observable.using(
        () -> new DB(),
        db -> Observable.just(db.doQuery()),
        db -> db.closeDb())
        .subscribe();

Ainsi se termine cet article de présentation des notions de base de Rx/RxJava. Jusqu’à présent nous avons fait impasse sur le caractère asynchrone des instructions, qui est pourtant l’une des principales propriétés de Rx. Le prochain article abordera la mise œuvre de la concurrence dans Rx.

2.6 Références