Mercury: Highly-scalable and Distributed Socket Messaging Library

When we want to develop a client-to-client or client-to-server messaging application, which may be an instant messaging application or an application that IoT machines talking to each other, we firstly think to build a server that accepts TCP socket connections and orchestrate the messaging between clients. We choose a messaging protocol like XMPP, LEMP, MQTT, etc. The protocol we choose is independant from the server's technical implementation. Namely, we must have a server platform that clients can connect to over TCP and that they can identify themselves and send messages to other clients through this server platform before we choose the messaging protocol. After that, we implement the protocol and our messaging platform is built.

At this point, I want to introduce Mercury, a Java library, which is for building the distributed structure of a such system without thinking the messaging protocol. It handles TCP client connections, holds the presence of the clients on top of Apache Ignite distributed cache, and maintains a cluster that is responsible for distributing the client-to-client messages between multiple Mercury nodes with Protocol Buffers (Google's data interchange format).

Let's look the basic structure of a Mercury cluster:

Image title

In the cluster above, you see 4 Mercury nodes composing a cluster, and the nodes talk each other via gRPC and clients are connected to the cluster via TCP sockets. When a new Mercury application is initialized, it automatically discovers Mercury cluster and joins. Namely, scaling of a Mercury application/cluster is very easy, only start a new Mercury node.

The clients must identify themselves after they connect to a Mercury node. The identification is necessity for a client to receive and send messages. Main application, which uses Mercury, is responsible for identifying the client (implement a login mechanism etc.) and inform Mercury.

Demo application on Github contains the necessary codes to build a small sample. Let's examine demo code:

Firstly we initialize Mercury:

MercuryConfig mercuryConfig = new MercuryConfig.MercuryConfigBuilder()
.setGRpcIp("127.0.0.1")
.setGRpcPort(6666)
.setServerPort(5555)
.setClientClass(Client.class)
.build();
Mercury mercury = new Mercury().init(mercuryConfig);

We need a Client class, which we gave to config builder and will hold client socket connections:

public class Client extends MercuryClient {
    @Override
    protected void handleMessage(String message) {

    }
}

Client class must extend from MercuryClient as you see. handleMessage method is called when a connected client sends a message to the server. We are going to enrich this method's content.

For example, after a client opened a connection to the server it must identify itself. Let's define a very simple protocol for client identification. Let's say that the client must send a formatted message like that to identify itself:

id:Alice

Then we will handle this message like that:

@Override
protected void handleMessage(String message) {
  if(message.startsWith("id:")) {
    identify(message.split("id:")[1].trim());
  } 
}

You recognized that we called identify method of MercuryClient class to determine the id of the client. identify method informs Mercury about this client's identity. Of course this logic is very basic, lot's of different mechanism can be implemented for authentication, e.g. from database authentication etc. But let's keep it simple to understand the fundamentals.

Our client is identified itself as Alice and ready to get and send messages.

At this point, let's start another Mercury node and have a two node Mercury cluster.

MercuryConfig mercuryConfig = new MercuryConfig.MercuryConfigBuilder()
.setGRpcIp("127.0.0.1")
.setGRpcPort(6667)
.setServerPort(5556)
.setClientClass(DemoClient.class)
.setMessageThreadPoolTaskExecutor(executor)
.build();
Mercury mercury = new Mercury().init(mercuryConfig);

As you see, we changed gRPC and TCP server node ports because we run the nodes on our local machine.

Now, let's connect to two Mercury nodes with two different clients via telnet and identify them:

Connect Alice node 5555:

Image title

Connect Bob node 5556:

Image title

Now we have two Mercury nodes and two clients, Alice and Bob, connected to these Mercury nodes. We should define a message protocol to send a message from Alice to Bob. Let's say a client must send a formatted string to the server for sending a message to another client as below:

send:Bob:Hi Bob!

We are going to handle this format in our Client class handleMessage method. New version of handleMessage method looks like:

@Override
protected void handleMessage(String message) {
  if(message.startsWith("id:")) {
    identify(message.split("id:")[1].trim());
  } else if (message.startsWith("send:")) {
    String to = message.split(":")[1];
    String msg = message.split(":")[2];
    route(to, msg);
  }
}

We are building our own messaging protocol, but this is so simple that we are trying Mercury library. The message sent by Alice is going to be routed to Bob via route method of MercuryClient class. We do not worry about on which Mercury node Bob is and how we are going to deliver the message to Bob, we only look at the call route method and that's all. Mercury will handle the rest. Let's see what happens when we sent a message from Alice to Bob on our telnet client:

Alice's client:

Image title

Bob's client:

Image title

Bob received Alice's message immediately. Of course this is not a magic. The good thing is that we wrote very little code, but we have a highly-scalable and distributed messaging platform and also if the load increases we do not have to do complex things to enlarge the system, only start a new Mercury node on a different machine.

Now let's see the listeners of Mercury library:

  1. IOEventListener
  2. ClientEventListener
  3. MessageEventListener

The listeners should be registered to Mercury instance after the initialization:

mercury.getEventBus().register(new DemoIOEventListener());
mercury.getEventBus().register(new DemoClientEventListener());
mercury.getEventBus().register(new DemoMessageEventListener());

Let's start with IOEventListener:

public class DemoIOEventListener extends IOEventListener {
    @Override
    protected void handle(IOEvent event) {
        switch (event.getType()) {
            case CLIENT_CONNECTED:
                break;
            case CLIENT_DISCONNECTED:
                break;
        }
    }
}

As you see there are two IO events of client to catch: connect and disconnect. When a socket client opens a new connection to the Mercury node IOEventListener's handle method is called with CLIENT_CONNECTED event type and when the connection is broken method is called with CLIENT_DISCONNECTED event.

Secondly ClientEventListener:

public class DemoClientEventListener extends ClientEventListener {
    @Override
    protected void handle(ClientEvent event) {
        switch (event.getType()) {
            case IDENTIFIED:
                break;
        }
    }
}

When a client identifies itself, handle method of ClientEventListener is called by mercury.

Thirdly MessageEventListener:

public class DemoMessageEventListener extends MessageEventListener {
    @Override
    protected void handle(MessageEvent event) {
        switch (event.getType()) {
            case SENT:
                break;
            case NOT_SENT:
                break;
        }
    }
}

There are two event types SENT and NOT_SENT. When a message is routed successfully to a connected and identified client, this listener's handle method is called with SENT event type. If the message can not be routed for some reason, for example, the target client is not connected or identified, then the handle method is called with NOT_SENT event type.

It is important to realize that this is the initial version of Mercury. It is not completed yet and needs to be improved. You can use and make new development requests for Mercury, then we can develop and improve Mercury together.

 

 

 

 

Top