logo le blog invivoo blanc

Vert.x Basics (5/5) : l’event bus

11 juin 2018 | Java | 0 comments

Nous avons déjà vu l’event bus de Vert.x à l’œuvre dans l’exemple d’introduction. C’est l’une des briques fondamentales de Vert.x, en tant que composant permettant de faire communiquer les différentes parties d’une application Vert.x, quel que soit le langage d’implémentation de Vert.x.

Il est important de retenir que l’event bus opère au niveau du cluster Vert.x. Mais il reste possible d’interagir avec en dehors du cluster, et cela même pour des applications non Vert.x, grâce aux adaptateurs appelés bridge. TCP bridge par exemple est l’un de ces adaptateurs avec lequel on peut interagir grâce à un protocole basique qui spécifie le format des messages à échanger.

L’event bus supporte les patterns de communications classiques des bus de message, à savoir le publish/suscribe et la communication point-to-point avec une possibilité d’envoi de réponse qui peut conduire à l’établissement d’une conversation.

Une seule instance de l’event bus est associée à une instance de Vert.x, on l’obtient comme suit :

EventBus eventBus = vertx.eventBus();

Que ce soit pour le point-to-point ou le publish/subscribe, le principe de communication consiste à envoyer le message à destination d’une adresse indiquée comme chaine de caractères. Les consommateurs du message doivent s’enregistrer auprès de l’event bus avec la même adresse utilisée pour l’envoi du message, en spécifiant le handler qui traitera les messages délivrés :

         vertx.eventBus().consumer("GreetingService", message -> {
             System.out.println("message reçu: " + message.body());
         });

Communication point-to-point avec possibilité de réponse

Le message sera délivré à un seul consommateur enregistré à l’adresse s’il y’en a plusieurs :

String address = "GreetingService";
String message = "Need a message";
eventBus.<JsonObject>send(address, message);

La communication point-to-point permettant l’établissement de conversation entre l’émetteur et le récepteur du message, ce dernier peut répondre à la réception du message comme suit :

vertx.eventBus().consumer("GreetingService", message -> {
    JsonObject reply = new JsonObject().put("message", "Hello World!");
    message.reply(reply);
});

Le consommateur décide à son tour d’envoyer un message à l’émetteur, et ce dernier peut être réécrit pour consommer à son tour ce message :

String address = "GreetingService";
String message = "Need a message";
eventBus.<JsonObject>send(address, message, reply -> {
    if (reply.succeeded()) {
        String body = reply.result().body().getString("message");
        request.response().end(body);
    } else {
        request.response().setStatusCode(500).end();
    }
});

La conversation entre les deux parties peut se poursuivre ainsi autant qu’elles le voudront.

Communication publish/subscribe

Dans le publish/subscribe, le message envoyé est délivré à tous les consommateurs enregistrés à l’adresse :

String address = "GreetingService";
String message = "Need a message";
eventBus.publish(address, message);

Types des messages échangés sur l’event bus

L’event bus supporte par défaut l’échange de messages typés par les types suivants :

Pour échanger des données d’autres types, il faut utiliser un codec de message. Il s’agit d’un convertisseur où il faut implémenter la transformation du message en bytes pour l’envoi sur le réseau. Et la reconstruction du message à partir des bytes à la sortie du réseau, en somme le processus de marshalling/unmarshalling.

Proxy des services communiquant à travers l’event bus

Lorsqu’un service Vert.x écoute sur l’event bus, il suit le pattern illustré dans le listing 2.12 pour traiter les messages reçus.

  1. Examiner dans le message reçu l’action du service à invoquer.
  2. Invoquer l’action du service en question, en lui passant le message reçu.
  3. Dans l’action invoquée :
  4. Extraire du message le paramètre attendu.
  5. Effectuer le traitement.
  6. Retourner le résultat du traitement à l’émetteur du message.
   switch (message.headers().get("action")) {
    case "all-pages":
        fetchAllPages(message);
        break;
    case "get-page":
        fetchPage(message);
        break;
    case "create-page":
        createPage(message);
        break;
    case "save-page":
        savePage(message);
        break;
    case "delete-page":
        deletePage(message);
        break;
    default:
        message.fail(ErrorCodes.BAD_ACTION.ordinal(), "Bad action: " + action);
        
}    
 
private void deletePage(Message<JsonObject> message) {
    JsonArray data = new JsonArray().add(message.body().getString("id"));
    // ...
    message.reply(new JsonObject().put("result", "ok"));
}

Listing 2‑12 Traitement des messages par un service écoutant sur l’event bus

En respectant des règles dans la création des signatures des actions (méthodes) du service et dans la création de la structure des messages échangés entre les clients et le service, il est possible d’extraire les parties génériques du pattern de traitement que nous venons de voir dans du code généré automatiquement.

C’est cette possibilité que nous fournit Vert.x à travers la génération d’un proxy du service pour les clients et la génération d’un handler côté serveur pour gérer le dispatching des messages vers les actions du service invoqué. Pour générer ce proxy et ce handler, il faut annoter l’interface du service de l’annocation @ProxyGen.

@ProxyGen
 public interface WikiDatabaseService {
     // (...)
     WikiDatabaseService savePage(int id, String markdown,
                                  Handler<AsyncResult<Void!>>resultHandler);
    
     WikiDatabaseService deletePage(int id,
                                    Handler<AsyncResult<Void!>>resultHandler);
  
    static WikiDatabaseService createProxy(Vertx vertx, String address) {
        return new WikiDatabaseServiceVertxEBProxy(vertx, address);
    }
    // (...)
}

Ainsi côté client, le proxy peut être récupéré pour échanger avec le service, en précisant l’adresse de la communication :

dbService = WikiDatabaseService.createProxy(vertx, "wikidb.queue");

Côté serveur, il faut implémenter l’interface du service et enregistrer l’implémentation pour la rendre accessible au handler généré pour la gestion du dispatching des messages reçus sur l’event bus :

WikiDatabaseService service = new SomeDatabaseServiceImpl();
// Register the implementation
ProxyHelper.registerService(WikiDatabaseService.class, vertx, service, "wikidb.queue");