How to Work With Multiple Verticles and Communication in Vert.x

As mentioned in the previous article, Vert.x is an event-driven, non-blocking toolkit. It was designed for asynchronous communications. In most cases, you need to work with multiple verticles in your applications. You also need communication between these verticles in these cases. In this article, I will attempt to explain how you can do this — let’s get started!

Use of Multiple Verticles

An important note to remember is that a verticle is a unit of deployment. It allows you to encapsulate your code for different needs. In addition, they can be run independently of each other. Verticles are communicated with each other by sending messages on the event bus. For this example, we have developed two verticles. One is the sender and the other is the receiver. You can access code for this example from this repository. The start method in the SenderVerticle class as follows.

@Override
public void start(Future future) throws Exception {
  final Router router = Router.router(vertx);
  router.route("/").handler(routingContext -> {
    HttpServerResponse response = routingContext.response();
    response.putHeader("content-type", "text/html").end("Hello from non-clustered messenger example!");
  });
  router.post("/send/:message").handler(this::sendMessage);
  
  vertx.createHttpServer().requestHandler(router::accept).listen(config()
       .getInteger("http.server.port", HTTP_PORT), result -> {
     if (result.succeeded()) {
        log.info("HTTP server running on port " + HTTP_PORT);
        future.complete();
     } else {
        log.error("Could not start a HTTP server", result.cause());
        future.fail(result.cause());
     }
  });
}

Router use should be the first point of caution in this method. A Router is one of the core concepts of the Vert.x web component. In this example, we use this component with the core component of Vert.x. We will need to add the following dependency to pom.xml in a Maven project:


  io.vertx
  vertx-web
  3.5.3

For a Gradle project, we will need to execute the following code:

dependencies {
  compile 'io.vertx:vertx-web:3.5.3'
}

A Router can have zero or more routes. When a Router takes an HTTP request, it finds a matching route for that request and passes the request to that route. In our example, we create two routes. The first is matching the root URL, and the second is matching the “/send/:message” URL that has a path param. Then, we specify the handlers for these routes, one of which is inline. These handlers will be called for all requests that arrive on the server.

In this way, when we make an HTTP GET request to root the URL (in our case http://localhost:8080), we will see the “Hello from non-clustered messenger example!” sentence. We bound the sendMessage method to a handler in the other route. When arrived, an HTTP POST request will match the http://localhost:8080/send/:message URL. Then, the handler will be called.

private void sendMessage(RoutingContext routingContext){
   final EventBus eventBus = vertx.eventBus();
   final String message = routingContext.request().getParam(PATH_PARAM);
   eventBus.send(ADDRESS, message, reply -> {
      if (reply.succeeded()) {
          log.info("Received reply: " + reply.result().body());
      } else {
          log.info("No reply");
      }
   });
   routingContext.response().end(message);
}

The method receives a RoutingContext object that represents the context for the handling of a request in Vert.x web. We use this object to get the request param and to end the response. Before that, we get the EventBus object over the vert.x field that’s used as a reference to the Vert.x instance.

As we mentioned in the previous article, the event bus supports the three communication modes. We use point-to-point messaging in this example by the eventBus.sendmethod. The send method receives three (there are also different forms) parameters. The first parameter represents the address. The recipient will access and consume messages via this address. The second parameter represents the message. The parameter type is the Object. Vert.x allows any primitive type, String, or buffers to be sent as messages. If you want to send a different type, you can do this by defining a codec through the MessageCodec object. The third parameter represents the asynchrony handler for the reply. The consumer can reply to the message to acknowledge that the message has been processed, and you can handle this event by using the asynchrony handler. In our example, we logged when a reply was received or not.

Now, we are looking at the start method of the ReceiverVerticle class.

@Override
public void start() throws Exception {
   final EventBus eventBus = vertx.eventBus();
   eventBus.consumer(ADDRESS, receivedMessage -> {
     log.debug("Received message: " + receivedMessage.body());
     receivedMessage.reply(DEFAULT_REPLY_MESSAGE);
   });
   log.info("Receiver ready!");
}

The consumer method creates a consumer and registers it against the specified address. The first parameter of the method represents the address, and the second parameter represents the message handler. We used that object for logging the received message and reply it.

Deploying Multiple Verticle

A verticle can be deployed inside another verticle, but, in our example, we want to ensure that all verticles are deployed successfully. If this doesn’t happen, for any reason, we want to be informed. For this, we used the CompositeFuture.all method. It is useful when several futures need to be coordinated. The start method in the MessengerLauncher class is as follows:

@Override
public void start(Future future){
 CompositeFuture.all(deployHelper(ReceiverVerticle.class.getName()),
 deployHelper(SenderVerticle.class.getName())).setHandler(result -> { 
   if(result.succeeded()){
      future.complete();
   } else {
      future.fail(result.cause());
   }
  });
}

The all method receives an N number of Future objects or a list of that object. We used the deployHelper method as a helper to provide Future objects to the all method.

private Future deployHelper(String name){
   final Future future = Future.future();
   vertx.deployVerticle(name, res -> {
      if(res.failed()){
         log.error("Failed to deploy verticle " + name);
         future.fail(res.cause());
      } else {
         log.info("Deployed verticle " + name);
         future.complete();
      }
   });
   
   return future;
}

Conclusion

Vertx offers a straightforward way of implementing communication between verticles by point to point messaging. In the next article, we will look at how to publish and subscribe to broadcasting messaging between multiple applications in a clustered environment.

References

No Comments

Post a Comment

Comment
Name
Email
Website