Wednesday, June 12, 2013

Ultra fast reliable messaging in Java

Many times in our systems we've to use some messaging platform to provide communication between different servers. Usually we want this platform to be fast (the more the better) and reliable. There are many popular solutions like RabbitMQ, HornetQ and commercial products. But I wanted to try something completely different and really fast so I choosed Java-Chronicle! Following Peter Lawrey words: "This library is an ultra low latency, high throughput, persisted, messaging and event driven in memory database. The typical latency is as low as 80 nano-seconds and supports throughput of 5-20 million messages/record updates per second." I will add that it can also synchronously persist it into disk and replicate over network - nice :)

After cloning project from GitHub we can find two major in this context classes: ChronicleSource and ChronicleSink. The first will be our master server, which will be used as endpoint for getting new excerpts (in this post you can assume that is the same as message). It will use the same datastore which is used by message producer. ChronicleSink will connect to source server and will replicate messages into new datastore, even on remote server as it works over TCP.

Ok, at first we've to implement our producer class:
public class ChronicleProducer {

  private static final int MAX_MESSAGES = 10000000;

  public static void main(String[] args) throws IOException {
    IndexedChronicle chronicle =
        new IndexedChronicle("/tmp/chronicle_in");

    Excerpt excerpt =
        chronicle.createExcerpt();

    for (int i = 1; i < MAX_MESSAGES + 1; i++) {
      excerpt.startExcerpt(36);
      excerpt.writeLong(System.nanoTime());
      excerpt.writeBytes("TestMessageInTheBottle");
      excerpt.writeInt(i);
      excerpt.writeBoolean(i == MAX_MESSAGES);
      excerpt.finish();
    }

    chronicle.close();
  }
}

Then we need something to consume our messages:
public class ChronicleConsumer {

  public static void main(String[] args) throws IOException {
    IndexedChronicle chronicle =
        new IndexedChronicle("/tmp/chronicle_out");

    Excerpt excerpt = chronicle.createExcerpt();

    while (true) {
      while (!excerpt.nextIndex()) {
        //spin lock
      }
      long timestamp = excerpt.readLong();
      String message = excerpt.readByteString();
      int index = excerpt.readInt();
      System.out.println(index + " message: "
          + message + " created at timestamp " + timestamp);
      if (excerpt.readBoolean()) {
        break;
      }
    }

    chronicle.close();
  }
}

Now we can start up all service!
  1. $ java com.higherfrequencytrading.chronicle.tcp.ChronicleSource /tmp/chronicle_in 8099
  2. $ java com.higherfrequencytrading.chronicle.tcp.ChronicleSink /tmp/chronicle_out localhost 8099
  3. $ java ChronicleConsumer
  4. $ java ChronicleProducer
And how does it work? Let's check from the end :) ChronicleConsumer is reading excerpts from /tmp/chronicle_out which is supplied by ChronicleSink. ChronicleSink connects to localhost:8099 and asks for new messages (sending index of the recent message that it received). On localhost:8099 listens ChronicleSource which looks in /tmp/chronicle_in for message requested by Sink. And /tmp/chronicle_in is supplied by ChronicleProducer :) That's all! Extremely easy and ultra fast. Whole cycle (produce -> send -> receive -> consume) takes about 20 seconds for 10 million messages on my i5 with both stores on single ssd drive.

No comments: