logo le blog invivoo blanc

Retour Devoxx France 2016: Architecture découplée grâce aux Reactive Extensions – 6eme partie

30 septembre 2016 | Front-End, Java

Pour lire la première partie de cette série d'articles, cliquez ici.

6 Back-pressure (pression arrière)

Rx est basé sur le tandem Observable/Observer, l’un produisant les données, et l’autre les consommant. Dans le fonctionnement normal, l’Observable produit les données à un rythme adapté à celui auquel l’Observer les consomme.

 

Observable / Observer Flux de production adapté au rythme de consommation

Figure 6-1 Flux de production adapté au rythme de consommation

 

Mais il peut arriver que l’Observable produise les données à un rythme insoutenable par l’Observer. Considérons par exemple le fonctionnement de l’opérateur zip, qui souscrit à plusieurs Observable dont il combine les valeurs. Si l’un des Observable émet indéfiniment des données à un rythme largement supérieur à ceux des autres Observable, une implémentation naïve de opérateur zip pourrait mettre ses éléments dans un buffer, qui à terme provoquerait une erreur de saturation de mémoire(mais heureusement, les implémentations de l’opérateur zip de Rx savent réagir intelligemment à cette situation en faisant justement appel à la back-pressure).

 

Figure 6-2 Consommateur saturé par le flux du producteur

Figure 6-2 Consommateur saturé par le flux du producteur

 

Pour éviter donc qu’un consommateur soit submergé par un Observable trop productif, Rx met à disposition des mécanismes de back-pressure (pression arrière). La back-pressure de façon plus générale est un mécanisme permettant à un composant fortement sollicité dans un système d’émettre une demande de baisse de sollicitation aux composants le sollicitant.

 

Mais avant de recourir à la back-pressure, il faut examiner les autres possibilités de Rx pour réduire le rythme de production des données, en l’occurrence :

 

  • les opérateurs de bufferisation[1] qui permettent de populer un buffer avec les derniers éléments produits par l’Observable et de retourner le contenu de ce buffer en une seule fois à l’Observer,
  • les opérateurs d’échantillonnage qui permettent de sélectionner parmi les éléments produits par un Observable uniquement un échantillon, sur des critères définis.

[1] La bufferisation est l’action de buffériser.

 

Figure 6-4 Opérateur d'échantillonnage

Figure 6-4 Opérateur d’échantillonnage

Figure 6-4 Opérateur d'échantillonnage

Figure 6-4 Opérateur d’échantillonnage

 

Une autre mesure pour limiter la surproduction de données de l’Observable est de bloquer le thread où il s’exécute. C’est une mesure qui n’est pas élégante vis-à-vis du principe de la conception des systèmes réactifs qui sont censés être des systèmes non bloquants.  Rx n’offre aucun mécanisme intégré pour réaliser cette opération. Cette solution ne peut être mise en œuvre facilement que lorsque l’Observable ainsi que tous les opérateurs de transformation plus l’Observer s’exécutent dans le même thread.

Enfin, lorsque l’on juge les solutions précédentes inappropriées et que l’on veut absolument faire de la back-pressure, voici comment on peut procéder :

 

 1         observable.subscribe(new Subscriber<Integer>() {
 2             private boolean emit = false;
 3             private int limit = 5;
 4 
 5             @Override
 6             public void onStart() {
 7                 // request to the producer to emit 5 items                
 8                 request(limit);
 9             }
10 
11             @Override
12             public void onCompleted() {
13                 System.out.println("All items emitted.");
14             }
15 
16             @Override
17             public void onError(Throwable e) {
18                 System.out.println();
19             }
20 
21             @Override
22             public void onNext(Integer item) {
23                 // process item
24                 System.out.println("new item = " + item);
25                 // check if new items must be requested
26                 if (--limit == 0) {
27                     limit = 5;
28                     // request to the producer to emit 5 items
29                     request(limit);
30                 }
31             }
32         });

Dans le code de souscription, on peut demander explicitement à l’Observable le nombre d’éléments qu’on voudrait qu’il émette (lignes 8 et 29).

Mais la back-pressure ne se fait pas par magie, il faut bien que l’Observable soit implémenté de façon à savoir répondre aux requêtes que lui soumet l’Observer sur la cadence de production des données.

 

 

6.1 Références