How to Share Data Between Threads in Vert.x

Passing and sharing information or data between threads is an important part of multi-threaded programming. It’s even more important in an asynchronous programming architecture when you have to manage the application state.

Vert.x provides synchronous and asynchronous shared data functionality for this need. Shared data allows you to safely share data between different verticles in the same Vert.x instance or across a cluster of Vert.x instances.

Vert.x offers this functionality with Cluster Managers for the clustered environment. As mentioned in How to Run a Vert.x Cluster With Broadcasting Messaging article, the cluster manager is Hazelcast by default in Vert.x, but the cluster managers are pluggable. That means you can use another implementation as the cluster manager if you want. At this point, it is important to remember that the behavior of the distributed data structure depends on the cluster manager that you use.

Now, let’s look at the options provided by shared data functionality:

  • Synchronous shared maps
  • Asynchronous maps
  • Asynchronous locks
  • Asynchronous counters

Other than the first option, you can access the data locally or cluster-wide with these options. The first option only provides local access. We will now focus on the first two of these options.

Local Shared Maps

As mentioned the above, local shared maps are locally accessible. That means that the data can be shared safely between different event loops in the same Vert.x instance.

private void putData(RoutingContext routingContext){
final SharedData sd = vertx.sharedData();
  final LocalMap sharedData = sd.getLocalMap(DEFAULT_LOCAL_MAP_NAME);
  final SimpleData data = Json.decodeValue(routingContext.getBodyAsString(), SimpleData.class);
sharedData.put(data.getKey(), data.getValue());
  routingContext.response()
          .setStatusCode(201)
          .putHeader("content-type", "application/json; charset=utf-8")
          .end(Json.encodePrettily(data));
}

In this example, we obtained a SharedData object through the vertx field and then put an object(SimpleData) that received an HTTP Post request into the LocalMap. The LocalMap is obtained by passing the map name to the getLocalMap method. The SimpleData object is accessible by other event loops in the same Vert.x instance. An example of the use of local shared data can be found in this repository.

Vert.x restricts you to the types of data used with local shared maps for thread safety. You can use either immutable or certain other types that can be copied, like Buffer. In this way, you can ensure that there is no shared access to a mutable state between different threads in your application.

Asynchronous Shared Maps

Asynchronous shared maps allow you to put and retrieve the data locally and cluster-wide. When Vert.x is clustered, you can be put the data from any node and retrieved from the same node or any other node.

This feature is the perfect solution for managing the application state we mentioned at the beginning.

@Override
public void start() throws NoSuchAlgorithmException {
    final Random random = SecureRandom.getInstanceStrong();
    final SharedData sharedData = vertx.sharedData();
    vertx.setPeriodic(3000, h ->
            sharedData.getAsyncMap(DEFAULT_ASYNC_MAP_NAME, res -> {
                if (res.succeeded()) {
                    AsyncMap myAsyncMap = res.result();
                    myAsyncMap.get(DEFAULT_ASYNC_MAP_KEY, asyncDataResult -> {
    LocalDateTime dateTime = LocalDateTime.now();
                        StockExchange stockExchange = new StockExchange(String.join(":", String.valueOf(dateTime.getHour()),
                                String.valueOf(dateTime.getMinute()), String.valueOf(dateTime.getSecond())),
                                Arrays.asList(new StockExchangeData(KODCU_STOCK_NAME, random.nextInt(100)),
                                        new StockExchangeData(JUG_IST_STOCK_NAME, random.nextInt(100))));
    myAsyncMap.put(DEFAULT_ASYNC_MAP_KEY, stockExchange, resPut -> {
                            if (resPut.succeeded()) {
                                log.info("Added data into the map {} ", Json.encodePrettily(stockExchange));
                            } else {
                                log.debug("Failed to add data {} ", Json.encodePrettily(stockExchange));
                            }
                        });
                    });
                } else {
                    log.debug("Failed to get map!");
                }
            }));
}

In this example, we request a map with thegetAsyncMap method that returned an instance of AsyncMap. If the request succeeded, we put an object(StockExchange) that has values generated randomly every three seconds into the map with the same key. Another application in a clustered environment can be accessed by the map with the same value and handle it.

You can find an example of the use of asynchronous shared maps in this repository under the SharedDataProvider and SharedDataReader directories. The SharedDataReader reads the data that’s generated by the SharedMapsProvider in a clustered environment every second and displays it in a chart component on the front-end.

Conclusion

Sharing data between threads is an important issue in multi-threaded programming. Vert.x solves this problem with synchronous and asynchronous shared data functionality. The asynchronous shared data feature is especially important for the clustered environment. With that feature, for example, you can store and distribute session state of your servers in a Vert.x web application. This is a perfect solution for the need that’s managing the application state that we mentioned at the beginning of this article. Happy coding!

References

No Comments

Post a Comment

Comment
Name
Email
Website