Dans cette partie, nous allons voir comment Vert.x gère la back-pressure dans un système d’upload de fichier. Mais l’exemple présenté peut se généraliser à de nombreux autres contextes possibles dans Vert.x, grâce à une généralisation de la gestion de la back-pressure.

Service web de chargement de fichier dans un back-end S3

Le besoin traité dans l’exemple comme nous le disions est le chargement de fichiers envoyés à un serveur web vers le stockage S3 de AWS. La première implémentation du besoin est illustrée par le listing 3.1. Les instructions les plus importantes sont celles entre les lignes 5 et 12, où le serveur HTTP récupère tout le contenu du fichier envoyé par le client HTTP et l’envoi au backend S3. 

HttpServer server = vertx.createHttpServer();
 S3Client s3Client = new S3Client();
 server.requestHandler(request -> {
     if (request.method() == PUT && request.path().equals("/upload")) {
         request.bodyHandler(data -> {
             s3Client.put("the-bucket", "the-key", data, s3Response -> {
                 HttpServerResponse response = request.response();
                 response
                         .setStatusCode(s3Response.statusCode())
                         .end();
             });
         });
     }
 });
 
 server.listen(8080);

Listing 3‑1 Chargement de fichier dans S3 par un serveur HTTP

Cette première implémentation de l’upload de fichier présente un inconvénient, c’est le fait de devoir envoyer tout le contenu du fichier en une seule fois au backend S3. Le chargement de fichiers de taille importante entrainera en effet un problème de saturation de mémoire sur le serveur HTTP. Pour remédier à cela, la deuxième implémentation du service d’upload va uploader le fichier vers S3 par chunk, comme le montre le listing 3.2 à la ligne 10.

server.requestHandler(request ->   {
     if (request.method() == PUT && request.path().equals("/upload")) {
         S3ClientRequest s3Request = s3Client.createPutRequest(
                 "the-bucket", "the-key", s3Response -> {
                     HttpServerResponse response = request.response();
                     response
                             .setStatusCode(s3Response.statusCode())
                             .end();
                 });
         request.handler(chunk -> s3Request.write(chunk));
         request.endHandler(v -> s3Request.end());
     }
 });

Listing 3‑2 Streaming du fichier vers S3

Le fait de streamer le fichier vers S3 semble résoudre le problème de risque de saturation de la mémoire. Mais pourtant le streaming lui aussi peut poser un problème : c’est la possibilité que le client S3 ne soutienne pas la cadence et bloque temporairement l’envoi des chunks, ce qui entrainera l’accumulation des chunks envoyés par le client HTTP sur le serveur HTTP et risquera à nouveau de saturer sa mémoire. Pour éviter une telle situation, il faut signaler au client HTTP de stopper l’envoi des chunks lorsque nous ne pouvons plus les transmettre au backend S3, c’est-à-dire appliquer la back-pressure.

Vert.x permet d’appliquer la back-pressure en s’appuyant sur l’implémentation de la back-pressure existant dans de nombreux systèmes et protocoles : TCP, HTTP/2, WebSocket/SockJS, AMQP, les pipes des OS, etc. Dans TCP par exemple, un récepteur indique à l’émetteur la capacité disponible de son buffer, ce qui va permettre à l’émetteur de savoir s’il peut envoyer des données et quelle quantité envoyer. Ces informations de gestion de back-pressure peuvent être remontées et exploitées à un plus haut niveau applicatif. Ainsi nous pouvons vérifier avant chaque envoi de donnée au back-end S3 qu’il est disposé à consommer les données :

request.handler(data -> {
    if (!s3request.writeQueueFull()) {
        s3request.write(data);
    }
});    

Cette vérification permet d’éviter d’être bloqué en attente du retour de l’appel au back-end, mais le chunk reçu n’est alors plus transmis et est perdu. Cette perte n’est pas forcément grave dans certaines applications, comme le traitement de flux audio par exemple, où il est possible de restituer une qualité de son acceptable sans recevoir le flux d’origine dans son intégralité. Mais dans d’autres applications où  tous les chunks sont naturellement requis, que faire lorsque le back-end ne peut temporairement les accepter ? Les bufferiser ? Cela risquerait encore une fois de conduire à une saturation de la mémoire si l’indisponibilité du back-end se prolonge et si le serveur HTTP reçoit beaucoup de données pendant ce temps. La meilleure solution est d’arrêter la réception des données au niveau du serveur HTTP (ligne 5) :

request.handler(data -> {
    s3request.write(data);
    if (s3request.writeQueueFull()) {
        // stop reading from the request
        request.pause();
    }
});

Le client du serveur HTTP est informé de l’arrêt de la réception des données qu’il envoie, à lui de prendre ses dispositions pour gérer cette situation.

Ensuite il faut savoir reprendre la transmission lorsque le back-end devient de nouveau prêt à accepter les données, en réagissant à cet événement (lignes 5, 6) :

request.handler(data -> {
    s3request.write(data);
    if (s3request.writeQueueFull()) {
        request.pause();
        s3request.drainHandler(v -> {
            request.resume();
        });
    }
});

L’appel de la méthode resume sur un flux de lecture permet de reprendre la lecture, si la source du flux, le client HTTP dans notre exemple, sait encore une fois gérer cette instruction comme attendu.

Généralisation des flux dans Vert.x

La gestion de la back-pressure telle que nous venons de le voir dans l’exemple précédent est généralisée par Vert.x aux flux de données grâce aux implémentations  des interfaces ReadStream et WriteStream :

public interface ReadStream<T> {
    void pause();
    void resume();
    void handler(Handler<T> handler);
    void endHandler(Handler<Void> handler);
}    
public interface WriteStream<T> {
    void write(T data);
    boolean writeQueueFull();
    void drainHandler(Handler<Void> handler);
    void end();
}    

On peut trouver les implémentations de ces interfaces pour les systèmes suivants :

·         Communication TCP

·         Communication HTTP

·         WebSocket

·         AsyncFile, une interface pour la manipulation asynchrone des fichiers.

·         Client Kafka

·         SockJS

Et bien sûr, cette liste d’implémentation n’est pas fermée. Nous pouvons implémenter ces interfaces pour tout autre système assimilable à un flux en lecture ou en écriture, et les traiter de façon uniforme dans Vert.x.

Pumping

Vert.x a désigné par pumping l’action de faire écouler un flux en lecture vers un flux en écriture tout en gérant la back-pressure, c’est-à-dire ce que nous avons fait dans notre exemple de transfert de fichier dans S3. C’est en effet un idiome qui revient souvent lorsqu’on manipule des flux, et grâce aux interfaces génériques pour manipuler les flux, l’implémentation de cet idiome dans la classe utilitaire Pump peut être utilisée pour tous les flux :

Pump pump = Pump.pump(request, s3request);
pump.start();
request.endHandler(v -> {
    pump.stop();
    s3Request.end();
});