Event streaming with MongoDB

28 Flares 28 Flares ×

MongoDB is a really great “NoSQL” database, with a very wide range of applications. In one project that we are developing at SoftwareMill, we used it as a replicated event storage, from which we stream the events to other components.

Introduction

The basic idea is pretty simple (see also Martin Fowler’s article on Event Sourcing). Our system generates a series of events. These events are persisted in the event storage. Other components in the system follow the stream of events and do “something” with them; for example they can get aggregated and written into a reporting database (this, on the other hand, resembles CQRS). Such an approach has many advantages:

  • reading and writing of the events is decoupled (asynchronous)
  • any following-component may die and then “catch up”, given that it wasn’t dead for too long
  • there may be multiple followers. The followers may read the data from slave replicas, for better scalability
  • bursts of event activity have a reduced impact on event sinks; at worst, the reports will get generated slower

The key component here is of course a fast and reliable event storage. The three key features of MongoDB that we used to implement one are:

  • capped collections and tailable cursors
  • fast collection appends
  • replica sets

Collection

As the base, we are using a capped collection, which by definition is size-constrained. If writing a new event would cause the collection to exceed the size limit, the oldest events are overwritten. This gives us something similar to a circular buffer for events. (Plus we are also quite safe from out-of-disk-space errors.)

Until version 2.2, capped collection didn’t have an _id field by default (and hence no index). However, as we wanted the events to be written reliably across the replica set, both the _id field and an index on it are mandatory.

Writing events

Writing events is a simple Mongo insert operation; inserts can also be done in batches. Depending on how tolerant we are of event loss, we may use various Mongo write concerns (e.g. waiting for a write confirmation from a single-node or from multiple nodes).

All of the events are immutable. Apart from nicer, thread-safe Java code, this is a necessity for event streaming; if the events were mutable, how would the event sink know what was updated? Also, this has good Mongo performance implications. As the data is never changed, the documents that are written to disk never shrink or expand, so there is no need to move blocks on disk. In fact, in a capped collection, Mongo doesn’t allow to grow a document that was once written.

Reading events

Reading the event stream is a little bit more complex. First of all, there may be multiple readers, each with a different level of advancement in the stream. Secondly, if there are no events in the stream, we would like the reader to wait until some events are available, and avoid active polling. Finally, we would like to process the events in batches, to improve performance.

Tailable cursors solve these problems. To create such a cursor we have to provide a starting point – an id of an event, from which we’ll start reading; if an id is not provided, the cursor will return events from the oldest one available. Thus each reader must store the last event that it has read and processed.

More importantly, tailable cursors can optionally block for some amount of time if no new data is available, solving the active polling problem.

(By the way, the oplog collection that mongo uses to replicate data across a replica set, is also a capped collection. Slave Mongo instances tail this collection, streaming the “events”, which are database operations, and applying them locally in order.)

Reading events in Java

When using the Mongo Java Driver, there are a few “catches”. First of all you need to initialise the cursor. To do that, we need to provide (1) the starting event id (2) an order in which we want to read the events (here: natural, that is the insertion order); and (3) two crucial cursor options, that we want the cursor to be tailable, and that we want to block if there’s no new data:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
DBObject query = lastReceivedEventId.isPresent()
   ? BasicDBObjectBuilder.start("_id", BasicDBObjectBuilder
         .start("$gte", lookFrom(lastReceivedEventId.get())).get())
         .get()
   : null;
 
DBObject sortBy = BasicDBObjectBuilder.start("$natural", 1).get();
 
DBCollection collection = ... // must be a capped collection
DBCursor cursor = collection
   .find(query)
   .sort(sortBy)
   .addOption(Bytes.QUERYOPTION_TAILABLE)
   .addOption(Bytes.QUERYOPTION_AWAITDATA);

You may wonder what is lookFrom. In general, we’d like to get all events inserted in the collection after the lastReceivedEventId. Unfortunately, that is not possible with Mongo currently. The ids of events (ObjectIds) have timestamps built-in, but we can’t fully rely on them, as they might be produced by clients with clock skews etc. Of course looking through all the events would work, but to speed up the process, we could use some heuristic. For example, we may start looking for events with an id greater than an id corresponding to the last event’s id timestamp minus 10 minutes. Then we need to discard all events, until the last one is reached. This assumes that clock skews are smaller than 10 minutes, and that events are persisted within 10 minutes from creation.

The cursor’s class extends the basic Java Iterator interface, so it’s fairly easy to use. So now we can take care of batching. When iterating over a cursor, the driver receives the data from the Mongo server in batches; so we may simply call hasNext() and next(), as with any other iterator, to receive subsequent elements, and only some calls will actually cause network communication with the server.

In the Mongo Java driver the call that is actually potentially blocking is hasNext(). If we want to process the events in batches, we need to (a) read the elements as long as they are available, and (b) have some way of knowing before getting blocked that there are no more events, and that we can process the events already batched. And as hasNext() can block, we can’t do this directly.

That’s why we introduced an intermediate queue (LinkedBlockingQueue). In a separate thread, events read from the cursor are put on the queue as they come. If there are no events, the thread will block on cursor.hasNext(). The blocking queue has an optional size limit, so if it’s full, putting an element will block as well until space is available. In the event-consumer thread, we first try to read a single element from the queue, in a blocking fashion (using .poll, so here we wait until any event is available). Then we try to drain the whole content of the queue to a temporary collection (using .drainTo, building the batch, and potentially getting 0 elements, but we always have the first one).

An important thing to mention is that if the collection is empty, Mongo won’t block, so we have to fall back to active polling. We also have to take care of the fact that the cursor may die during this wait; to check this we should verify that cursor.getCursorId() != 0, where 0 is an id of a “dead cursor”. In such a case we simply need to re-instantiate the cursor.

Summing up

To sum up, we got a very fast event sourcing/streaming solution. It is “self regulating”, in the sense that if there’s a peak of event activity, they will be read by the event sinks with a delay, in large batches. If the event activity is low, they will be processed quickly in small batches.

We’re also using the same Mongo instance for other purposes; having a single DB system to cluster and maintain both for regular data and events is certainly great from an ops point of view.

EDIT 25/02/2013: Fixed a bug with re-starting a stream, looking from a given point.

  • Piotr Buda

    If you lose first events, how do you replay report generators i.e. generate a new report based on already existing events? IMO the whole point of having event sourcing is to preserve all history in an efficient and explicit manner.

  • http://www.warski.org Adam Warski

    Depends how many events you get – storing the whole history could quickly become way too large. You can always replay the events up until some point back in time – depending on how big the capped collection is.

    If there was a requirement to store all events always, I suppose they would have to get moved to yet another storage (~ archive), which would only be used for recovery/new report generation.

  • Piotr Buda

    Well, I don’t know requirements hence my question :) Some dudes who work with events say that the best event storage is file system ;) and I guess with java.nio and buffers you could implement quite efficient solution as well. I suppose that file system takes up much less space than Mongo.

  • http://www.warski.org Adam Warski

    One of the requirements is replication & write guarantees – which is harder with a file system (if you consider only software solutions).
    For single-node event storage, there are other ready solutions, don’t think you’d have to write your own, unless for fun ;).

    Plus we have some additional indexes on the event storage so we can also run other ad-hoc queries (I know, it’s not “pure” event sourcing).

  • Piotr Buda

    Right, it just hit me you don’t use events as a state storage but rather a limited history.

    As for regular event storage, if you’re interested then check out geteventstore.com – Greg Young knows how to do such stuff :)

  • http://www.warski.org Adam Warski

    Didn’t know about geteventstore, thanks. There are also a couple of other solutions whose names I can’t exactly remember … RavenDB or https://github.com/eligosource/eventsourced for example.

    We do use events as state storage, we just don’t need to store all of the state forever. It’s enough to store the old data in the reports (and archive the reports). In our particular use-case of course.

    Anyway in our case Mongo works great. And you can always use 10 other solutions … ;)

  • moo

    Good to see that people abuse nosql databases as well as they abuse relational databases. Try a message bus next time

  • http://www.warski.org Adam Warski

    I may be wrong of course, but message buses have rather different use-cases.

    Unless you were thinking about something like Kafka (http://incubator.apache.org/kafka/), but that doesn’t fit our requirements as good as Mongo does.

  • Yuji Kiriki

    Message buses are SPOF. In many cases a E/S solution is implemented to deal with this.

  • http://www.warski.org Adam Warski

    E/S solution?

    If the message bus is clustered (it is possible to implement a simple replicated MQ ontop of Mongo), then you are pretty safe. Or you can use e.g. Kestrel if loosing some messages isn’t a problem.

28 Flares Twitter 6 Facebook 6 Google+ 12 LinkedIn 4 Email -- 28 Flares ×