Programmatically Stop Akka Streams

Posted on June 19, 2018

In certain situations we require to stop a reactive stream based on internal or external system conditions. By just using the standard graph components is not possible to configure the stream in this way, but fortunately Akka provides with the GraphStage mechanism to create custom graphs.

As can be seen on the following snippet the code consists on defining the input and output and then override the internal logic overriding createLogic. As the goal of this flow is to interrupt the processing of messages based on the condition passed as parameter createLogic extends TimerGraphLogic, which allows to check the condition using a scheduler. The relevant parts are the following:

class ConditionChecker[A](condition: () => Boolean) extends GraphStage[FlowShape[A, A]] {

  val in = Inlet[A]("Filter.in")
  val out = Outlet[A]("Filter.out")

  val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new TimerGraphStageLogic(shape) {
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          push(out, grab(in))
        }
      })
      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          if(condition()) {
            pull(in)
          } else {
            scheduleOnce(None, 2 seconds)
          }
        }
      })

      override protected def onTimer(timerKey: Any): Unit = {
        if(condition())
          pull(in)
        else
          scheduleOnce(None, 2 seconds)
      }
    }
}

Here is some crude support code that was used to run the previous code and check it’s behaviour:

object GraphTest extends App {

  implicit val system = ActorSystem("test-system", ConfigFactory.load("application-test"))
  implicit val mat = ActorMaterializer()

  def stream(start: Int): Stream[Int] = {
    Stream.cons(start, {
      println(s"Generated $start")
      stream(start+1)
    })
  }

  Source.fromIterator(() => stream(1).toIterator)
    .via(new ConditionChecker[Int](() => checkPrice))
    .map(a => {
      Thread.sleep(1000)
      a
    })
    .runForeach(r => println(s"Sink: $r"))

  var run = true

  def reverseAfter5Sec: Unit = {
    Future {
      Thread.sleep(5)
      run = !run
    }.onComplete(_ => reverseAfter5Sec)
  }

  reverseAfter5Sec

  def checkPrice: Boolean = {
    println(s"Returned $run")
    run
  }
}

This implementation is based on the amazing Akka documentation, if you enjoyed the article you should take a look to the very useful examples there.

Tweet