3 Modèle de concurrence de RxJava : les Scheduler

Contrairement à ce qu’on pourrait croire, les traitements de Rx ne s’exécutent pas par défaut en concurrence, du moins excepté pour des cas particuliers. Les transformations des Observable et les consommations de flux dans les Observer se font en effet par défaut dans un seul thread, encore une fois excepté dans des cas que nous évoquerons.

3.1 Scheduler

Rx a été conçu pour être agnostique au modèle de threading sous-jacent, en apportant une solution élégante à la manière de transformer un flux de données à pousser vers des consommateurs. Ceci étant, Rx tient compte de la volonté de pouvoir exécuter ces étapes de transformation et de consommation dans plusieurs threads en concurrence, en proposant l’usage des Schedulers. Les Schedulers permettent donc de spécifier à Rx le modèle de concurrence à utiliser pour l’exécution des traitements.

Il existe plusieurs modèles de concurrence dans RxJava, mais nous allons voir uniquement les deux principaux : computation et io. Avant de poursuivre avec la présentation de ces deux Schedulers, voyons d’abord comment est-ce qu’on spécifie un Scheduler sur un Observable.

3.1.1 Observable.observeOn et Observable.subscribeOn

Ces deux méthodes permettent de spécifier le Scheduler à utiliser pour l’exécution des opérations d’un Observable.

La méthode Observable.subscribeOn permet de spécifier le contexte d’exécution initial de l’Observable, qui est également le contexte où les données de l’Observable seront produites/consommées. Il ne doit y avoir qu’une seule déclaration d’Observable.subscribeOn, une seule sera retenue (la plus proche de l’Observable source) même si on en déclarait plusieurs.

La méthode Observable.observeOn est appliquée à l’opérateur dont elle précède l’invocation, il peut donc y en avoir autant qu’il y a d’opérateurs dans la séquence de manipulation de l’Observable. Elle permet de spécifier le contexte d’exécution de l’opérateur auquel elle s’applique.

Une bonne explication des effets d’Observable.observeOn et Observable.subscribeOn est donnée sur ce blog.

3.1.2 Scheduler computation

Ce Scheduler permet de déporter l’exécution des opérateurs qui suivent sa déclaration dans un pool de threads unique dédié, dont la taille par défaut est le nombre de processeurs/cœurs disponible. Comme son nom le suggère, l’usage de ce Scheduler est approprié pour des traitements de calcul, de façon générale pour des traitements faisant un usage intensif des processeurs ou des suites d’instructions ordinaires. Et vu la taille fixe du pool de threads, il est déconseillé d’utiliser ce Scheduler pour des traitements comportant des instructions bloquantes.

Figure 3-1 Changement de contexte d'exécution par un Scheduler

Figure 3-1 Changement de contexte d’exécution par un Scheduler

3.1.3 Scheduler io

Ce Scheduler permet également de déporter l’exécution des opérateurs suivant sa déclaration dans un pool de threads dédié. Comme son nom le suggère, ce Scheduler doit être utilisé pour des traitements impliquant des opérations bloquantes, typiquement de type I/O. Contrairement au pool de threads du  Scheduler  computation, le pool de threads du Scheduler  io n’a pas de taille fixe, il est agrandi au besoin.

Figure 3-2 Changement de contexte d'exécution par un Scheduler

Figure 3-2 Changement de contexte d’exécution par un Scheduler

3.1.4 Fonctions utilisant par défaut un Scheduler

Certaines opérations utilisent par défaut des Schedulers pour changer leur contexte d’exécution, comme par exemple :  Observable.interval(), certaines versions de Observable.delay().

3.2 Rx dans un serveur HTPP

3.2.1 Remplacement des Executor par Rx dans JAX-RS

Comme évoqué dans la problématique d’introduction, le temps de traitement des requêtes peut être optimisé avec l’usage de threads pour paralléliser les traitements. Dans ce qui suit on montre comment RxJava peut être utilisé à la place des Executor pour un besoin de parallélisation. Soit donc le service REST suivant implémenté avec JAX-RS:

Figure 3-3 Service JAX-RX utilisant un Executor

Figure 3-3 Service JAX-RX utilisant un Executor

Grâce au support de l’asynchronisme dans Servet 3.0 et JAX-RS 2.0, les réponses des requêtes peuvent être retournées en dehors des threads de travail.

En utilisant RxJava pour faire le traitement asynchrone, le service s’écrit ainsi :

Figure 3-4 Remplacement de l'Executor par RxJava

Figure 3-4 Remplacement de l’Executor par RxJava

3.2.2 Adoption de l’approche event-driven de Netty

JAX-RS n’est pas réactif et de ce fait ne propose pas un support natif de Rx permettant de traiter les requêtes clientes comme des Observable, nous devons créer nous même les Observable encapsulant ces requêtes.

Les créateurs de RxJava convaincus de la pertinence des architectures réactives se sont intéressés de très près au serveur Netty adoptant l’approche event-driven avec des opérations I/O non bloquantes. A travers des benchmarks, ils ont montré que le serveur Netty présente de meilleurs résultats de performance que le serveur Tomcat durant la montée du nombre de connexions : moins de consommation CPU pour le traitement d’une requête, débit de traitement des requêtes, etc. Les résultats de ces benchmarks en plus de son approche réactive font de Netty un framework approprié pour l’usage de RxJava. RxNetty a été conçu pour faciliter cet usage de Netty avec RxJava.

Voici par exemple comment RxNetty permet de traiter directement une requête HTPP exposée nativement en un Observable :

Figure 3-5 Exemple de (micro)service RxNetty exploitant l'API Rx native

Figure 3-5 Exemple de (micro)service RxNetty exploitant l’API Rx native

Dans le prochain article nous verrons comment RxJava peut être utilisé pour implémenter un service composite dépendant d’autres services. L’implémentation correcte d’une telle opération est loin d’être évidente car elle implique comme nous allons le voir la prise en compte des erreurs des services composés, des latences des appels, etc.

3.3 Références