Reactive Queue with Akka Reactive Streams

Update 15/09/2014: introduced API changes from akka-streams 0.7.
Update 30/10/2014: introduced API changes from akka-streams 0.9.

Update 15/12/2014: introduced API changes from akka-streams 1.0-M1.

Reactive streams is a recently announced initiative to create a standard for asynchronous stream processing with built-in back-pressure, on the JVM. The working group is formed by companies such as Typesafe, Red Hat, Oracle, Netflix, and others.

One of the early, experimental implementations is based on Akka. Preview version 0.3 includes actor publishers & subscribers, which opens up some new integration possibilities.

Cascade on the river

To test the new technology, I implemented a very simple Reactive Message Queue. The code is at a PoC stage, lacks error handling and such, but if used properly – works ;).

The queue is reactive, meaning that messages will be delivered to interested parties whenever there’s demand, without polling. Back-pressure is applied both when sending messages (so that senders do not overwhelm the broker), and when receiving messages (so that the broker sends only as much messages as the receivers can consume).

Let’s see how it works!

The queue

First, the queue itself is an actor, and doesn’t know anything about (reactive) streams. The code is in the com.reactmq.queue package. The actor accepts the following actor-messages (the term “message” is overloaded here, so I’ll use plain “message” to mean the messages we send to and receive from the queue, and “actor-messages” to be the Scala class instances sent to actors):

  • SendMessage(content) – sends a message with the specified String content. A reply (SentMessage(id)) is sent back to the sender with the id of the message
  • ReceiveMessages(count) – signals that the sender (actor) would like to receive up to count messages. The count is cumulated with previously signalled demand.
  • DeleteMessage(id) – unsurprisingly, deletes a message

The queue implementation is a simplified version of what’s in ElasticMQ. After a message is received, if it is not deleted (acknowledged) within 10 seconds, it becomes available for receiving again.

When an actor signals demand for messages (by sending ReceiveMessages to the queue actor), it should expect any number of ReceivedMessages(msgs) actor-messages replies, containing the received data.

Going reactive

To create and test our reactive queue, we need three applications:

We can run any number of Senders and Receivers, but of course we should run only one Broker.

The first thing that we need to do is to connect the Sender with the Broker, and the Receiver with the Broker over a network. We can do that with the Akka reactive StreamTCP extension. Using a bind & outgoingConnection pair, we get a stream of connections on the binding side:

1
2
3
4
5
6
7
8
9
10
11
12
implicit val materializer = FlowMaterializer()
 
// sender:
val serverFlow = StreamTcp().outgoingConnection(sendServerAddress).flow
 
// serverFlow is of type Flow[ByteString, ByteString] and can be used on the
// client to build a flow graph including server-side processing
 
// broker:
StreamTcp().bind(sendServerAddress).connections.foreach { conn =>
    // per-connection logic
}

There’s a different address for sending and receiving messages.

The sender

Let’s look at the per-connection logic of the Sender first.

1
2
3
4
5
6
7
8
9
10
val source = 
  Source(1.second, 1.second, () => { idx += 1; s"Message $idx from $senderName" })
  .map { msg =>
    logger.debug(s"Sender: sending $msg")
    createFrame(msg)
  }
val sink = OnCompleteSink[ByteString] { t => logger.info("Stream completed"); () }
 
val completeFlow = source.via(serverConnection.flow).to(sink)
completeFlow.run()

We are creating a tick-flow source which produces a new message every second (very convenient for testing). Using the map stream transformer, we are creating a byte-frame with the message (more on that later). Having constructed a source, and a very simple sink (which just logs a message on stream completion), we define a complete flow: from the source, through the server’s flow, back to our sink. And we now have a reactive over-the-network stream of messages, meaning that messages will be sent only when the Broker can accept them. Otherwise back-pressure will be applied all the way up to the tick publisher.

But that’s only a description of how our (very simple) stream should look like; it needs to be materialized using the run method, which will provide concrete implementations of the stream transformation nodes. Currently there’s only one FlowMaterializer, which – again unsurprisingly – uses Akka actors under the hood, to actually create the stream and the flow; the materializer instance is implicit in e.g. the run method call.

reactmq actors

The broker: sending messages

On the other side of the network sits the Broker. Let’s see what happens when a message arrives.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
StreamTcp().bind(sendServerAddress).connections.foreach { conn =>
  logger.info(s"Broker: send client connected (${conn.remoteAddress})")
 
  val sendToQueueSubscriber = ActorSubscriber[String](
    system.actorOf(Props(new SendToQueueSubscriber(queueActor))))
 
  // sending messages to the queue, receiving from the client
  val reconcileFrames = new ReconcileFrames()
 
  val sendSink = Flow[ByteString]
    .mapConcat(reconcileFrames.apply)
    .to(Sink(sendToQueueSubscriber))
 
  conn.flow.to(sendSink).runWith(FutureSource(Promise().future))
}

The connections returned by bind are a (reactive) Source: here we simply iterate over each connection, handling the incoming stream of bytes. We re-construct the String instances that were sent using our framing, and finally we direct that stream to a sink: the send-to-queue subscriber.

To connect all the pieces together, we specify that the client-side flow (conn.flow) should be connected to our previously created sink; on the other side, we don’t send anything, so we just provide a never-completed, no-element source.

The SendToQueueSubscriber is a per-connection bridge to the main queue actor. It uses the ActorSubscriber trait from Akka’s Reactive Streams implementation, to automatically manage the demand that should be signalled upstream. Using that trait we can create a reactive-stream-Subscriber[_], backed by an actor – so a fully customisable sink.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class SendToQueueSubscriber(queueActor: ActorRef) extends ActorSubscriber {
 
  private var inFlight = 0
 
  override protected def requestStrategy = new MaxInFlightRequestStrategy(10) {
    override def inFlightInternally = inFlight
  }
 
  override def receive = {
    case OnNext(msg: String) =>
      queueActor ! SendMessage(msg)
      inFlight += 1
 
    case SentMessage(_) => inFlight -= 1
  }
}

What needs to be provided to an ActorSubscriber, is a way of measuring how many stream items are currently processed. Here, we are counting the number of messages that have been sent to the queue, but for which we have not yet received an id (so they are being processed by the queue).

The subscriber receives new messages wrapped in the OnNext actor-message; so OnNext is sent to the actor by the stream, and SentMessage is sent in reply to a SendMessage by the queue actor.

Receiving

The receiving part is done in a similar way, though it requires some extra steps. First, if you take a look at the Receiver, you’ll see that we are reading bytes from the input stream, re-constructing messages from frames, and sending back the ids, hence acknowledging the message. In reality, we would run some message-processing-logic between receiving a message and sending back the id.

On the Broker side, we create both a source and a sink for each connection.

The source is a stream of messages sent to receivers, the sink is a stream of acknowledged message ids from the receivers, which are simply transformed to sending DeleteMessage actor-messages to the queue actor.

As before, we need to connect all these pieces together: the source sends data via the client-side flow, to the sink.

Similarly to the subscriber, we need a per-connection receiving bridge from the queue actor, to the stream. That’s implemented in ReceiveFromQueuePublisher. Here we are extending the ActorPublisher trait, which lets you fully control the process of actually creating the messages which go into the stream.

In this actor, the Request actor-message is being sent by the stream, to signal demand. When there’s demand, we request messages from the queue. The queue will eventually respond with one or more ReceivedMessages actor-message (when there are any messages in the queue); as the number of messages will never exceed the signalled demand, we can safely call the ActorPublisher.onNext method, which sends the given items downstream.

Framing

One small detail is that we need a custom framing protocol (thanks to Roland Kuhn for the clarification), as the TCP stream is just a stream of bytes, so we can get arbitrary fragments of the data, which need to be recombined later. Luckily implementing such a framing is quite simple – see the Framing class. Each frame consists of the size of the message, and the message itself.

Summing up

Using Reactive Streams and the Akka implementation it is very easy to create reactive applications with end-to-end back-pressure. The queue above, while missing a lot of features and proofing, won’t allow the Broker to be overloaded by the Senders, and on the other side the Receivers to be overloaded by the Broker. And all that, without the need to actually write any of the backpressure-handling code!

Follow-up posts:

  • Stanislav

    Thanks

  • Tomer Ben David

    Thanks

  • Sean Zhong

    1. In QueueActor, it use a unbounded queue to store message from the Sender, will the queue cause OOM?

    var messageQueue = mutable.PriorityQueue[InternalMessage]()

    2. In QueueActor handleQueueMsg,we call tryReply() in persistAsync, but tryReply() is not protected against concurrent write since there maybe multiple persistAsync running, is this a problem?

    persistAsync(msg.toMessageAdded) { msgAdded =>
    sender() ! SentMessage(msgAdded.id)
    tryReply()
    }

  • http://www.warski.org/ Adam Warski

    1. Yes, if there’s a lot of messages in flight, there will be on OOM. Some off-loading/paging mechanism would be needed

    2. No, as described in the “Event Sourcing” (http://doc.akka.io/docs/akka/2.3.4/scala/persistence.html) section (” An event handler may close over persistent actor state and mutate it. …”), at most one callback will be handled at a time (it’s an actor message)

  • Alexander Gavrilov

    Adam, what is the simplest way to make this queue bounded? I’ve tried to modify the QueueActorReceive by stashing all SendMessage events if the messageQueue size is to big and the unstash all messages after messageQueue will drain out by Receiver. But it didn’t work.

  • http://www.warski.org/ Adam Warski

    So you want to save the messages somewhere when the queue is too big? Or drop them? What should be the expected behavior?

  • Alexander Gavrilov

    I want to stop accepting new messages. I’d like to emulate back pressure from a broker to a sender by saturating the TCP buffer.

    I run a broker and a sender. After the queue is full the sender should eventually stop sending new messages. Then I run a receiver, who should drain a queue and I expect the sender should resume, but it does not work

  • http://www.warski.org/ Adam Warski

    I would attempt this by modifying SendToQueueSubscriber, and when the queue is full, signalling this fact to the subscriber actor, and e.g. artifically maxing out in-flight message count, or providing another InFlightRequestStrategy altogether.