akka-streams - 从应用角度学习:basic stream parts

   实际上很早就写了一系列关于akka-streams的博客。但那个时候纯粹是为了了解akka而去学习的,主要是从了解akka-streams的原理为出发点。由于akka-streams是akka系列工具的基础,如:akka-http, persistence-query等都是基于akka-streams的,其实没有真正把akka-streams用起来。这段时间所遇到的一些需求也是经过集合来解决的。不过,如今所处的环境仍是逼迫着去真正了解akka-streams的应用场景。现状是这样的:跨入大数据时代,已经有大量的现代IT系统从传统关系数据库转到分布式数据库(非关系数据库)了。不难想象,这些应用的数据操做编程不说大相径庭吧,确定也会有巨大改变。特别是在传统SQL编程中依赖数据关系的join已经不复存在了,groupby、disctict等操做方法也不是全部的分布式数据库都能支持的。而这些操做在具体的数据呈现和数据处理中又是不可缺乏的。固然,有不少需求能够经过集合来知足,但涉及到大数据处理我想最好仍是经过流处理来实现,由于流处理stream-processing的其中一项特色就是可以在有限的内存空间里处理无限量的数据。因此流处理应该是分布式数据处理的理想方式了。这是此次写akka-streams的初衷:但愿能经过akka-streams来实现分布式数据处理编程。数据库

先从基本流部件basic stream parts开始,即source,flow,sink。这几个部件能够组合成一个所谓线性流linear-stream。一个流对数据的处理包括两部分:一、对流中元素进行转变,如:source:Source[Int,NotUsed] = Source(1 to 10).map(i => i.toString),把流里的全部Int转变成String、二、对流内元素进行计算得出运算结果,如:sink: Sink[Int,Future[Int]] = Sink.fold(0)(_ + _)。当咱们run这个sink后得出Future[Int],如:res: Future[Int] = src.runWith(sink)。这两项对流元素的操做所产生的结果不一样:元素转换获得动态流动的一串元素、运算元素获得一个静态值,这个运算值materialized-value只能在Sink里获取。即便有这样的表示方式:Source[Int,Future[Int]],这是个迷惑,这个运算值只能经过自定义的graph才能获得,也就是说基本组件是没这个功能的。举个具体的例子吧:val source: Source[Int, Promise[Option[Int]]] = Source.maybe[Int] 这个表达式貌似能够在Source方获取运算值,再看看Source.maybe[Int]:编程

  def maybe[T]: Source[T, Promise[Option[T]]] = Source.fromGraph(MaybeSource.asInstanceOf[Graph[SourceShape[T], Promise[Option[T]]]])

能够看出这个Source.maybe是从graph构建的。session

上面这个例子里用一个Source对接一个Sink已经组成了一个完整的流,那么Flow是用来干什么的呢?因为运算值是没法看成流元素传递的,Flow只能是用来对Source传下来的元素进行转换后再传递给Sink,也就是说Flow是由一个或多个处理环节构成的。用Flow来分步实现功能是流处理实现并行运算的基本方式,如:async

Source(1 to 10).async.via(Flow[Int].map(i => i + 1)).async.runWith(sink)

用async把这个流分割成3个运算发送给3个actor去同时运算。乍看之下map好像是个Flow,它们的做用也彷佛相同,也能够对接Source。如:Source(1 to 10).map(_ + 1)。但map和Flow仍是有分别的,从类型款式来看Flow[In,Out,M]比起map[A,B]多出来了M,运算值。因此via(map(_.toString))没法匹配类型。那么对于定义带有预先处理环节的Sink就必须用Flow来实现了:ex_sink = Flow[Int].map(_ + 1).to(sink)。分布式

虽然运算值不能像流元素同样流动,但akka-streams提供了机制让用户选择是否返回某个节点的运算值M。系统默认只选择最最左边节点的M,如:ide

// A source that can be signalled explicitly from the outside
val source: Source[Int, Promise[Option[Int]]] = Source.maybe[Int] // A flow that internally throttles elements to 1/second, and returns a Cancellable // which can be used to shut down the stream
val flow: Flow[Int, Int, Cancellable] = throttler // A sink that returns the first element of a stream in the returned Future
val sink: Sink[Int, Future[Int]] = Sink.head[Int] val stream: RunnableGraph[(Cancellable, Future[Int])] = source.viaMat(flow)(Keep.right).toMat(sink)(Keep.both) val stream1: RunnableGraph[(Promise[Option[Int]], Cancellable, Future[Int])] = source.viaMat(flow)(Keep.both).toMat(sink)(Keep.both)

运算值M能够经过viaMat,toMat选择,而后stream.run()获取。akka-streams提供了简便一点的运算方式runWith:指定runWith参数流组件的M为最终运算值。如:函数

// Using runWith will always give the materialized values of the stages added // by runWith() itself
val r4: Future[Int] = source.via(flow).runWith(sink) val r5: Promise[Option[Int]] = flow.to(sink).runWith(source) val r6: (Promise[Option[Int]], Future[Int]) = flow.runWith(source, sink)

值得注意的是:咱们能够分别从Source,Sink,Flow开始针对Source runWith(Sink), Sink runWith(Source)及Flow runWith (Source,Sink)。用基础流组件Source,Flow,Sink构成的流是直线型的。也就是说从Source流出的元素会一个不漏的通过Flow进入Sink,不能多也不能少。可能Source.filter会产生疑惑,不过看看filter函数定义就明白了:工具

def filter(p: Out => Boolean): Repr[Out] = via(Filter(p)) @InternalApi private[akka] final case class Filter[T](p: T => Boolean) extends SimpleLinearGraphStage[T] { override def initialAttributes: Attributes = DefaultAttributes.filter override def toString: String = "Filter"

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with OutHandler with InHandler { def decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider private var buffer: OptionVal[T] = OptionVal.none override def preStart(): Unit = pull(in) override def onPush(): Unit =
        try { val elem = grab(in) if (p(elem)) if (isAvailable(out)) { push(out, elem) pull(in) } else buffer = OptionVal.Some(elem) else pull(in) } catch { case NonFatal(ex) => decider(ex) match { case Supervision.Stop => failStage(ex) case _                => pull(in) } } override def onPull(): Unit = buffer match { case OptionVal.Some(value) => push(out, value) buffer = OptionVal.none if (!isClosed(in)) pull(in) else completeStage() case _ => // already pulled
 } override def onUpstreamFinish(): Unit =
        if (buffer.isEmpty) super.onUpstreamFinish() // else onPull will complete
 setHandlers(in, out, this) } }

怎样?够复杂的了吧。很明显,复杂点的流处理须要根据上游元素内容来维护内部状态从而从新构建向下游发送元素的机制。若是想实现join,groupby,distict这些功能就必然对流动元素除转换以外还须要进行增减操做。这项需求可能还必须留在后面的sream-graph章节中讨论解决方案了。不过临时解决方法能够经过运算值M来实现。由于M能够是一个集合,在构建这个M集合时是能够对集合元素进行增减的,下面这段代码示范了一种cassandra数据表groupby的效果:学习

 def getVouchers(terminalid: String, susp: Boolean)(implicit classicSystem: ActorSystem) = { implicit val session = CassandraSessionRegistry(classicSystem).sessionFor("alpakka.cassandra") implicit val ec = classicSystem.dispatcher var stmt = "select * from pos_on_cloud.txn_log where terminal = ? and txndate = ?"
    if (susp) stmt = "select * from pos_on_cloud.txn_hold where terminal = ? and txndate = ?" val source = session.select(stmt,terminalid,LocalDate.now.format(DateTimeFormatter.ofPattern("yyyyMMdd"))) val sink = Sink.fold[List[TxnItem],TxnItem](List[TxnItem]()){(acc,txn) =>
      if (acc.isEmpty) txn.copy(price = 1) :: acc else
        if (acc.head.num == txn.num) { if (txn.salestype == SALESTYPE.itm && txn.txntype == TXNTYPE.sales) { val nacc = acc.head.copy( price = acc.head.price + 1, qty = acc.head.qty + txn.qty, amount = acc.head.amount + txn.amount, dscamt = acc.head.dscamt + txn.dscamt ) nacc :: acc.drop(1) } else acc } else txn :: acc } for { vchs <- source.map(TxnItem.fromCqlRow).toMat(sink)(Keep.right).run() _ <- session.close(ec) } yield vchs }

固然,基本流组件在流模式数据库读写方面仍是比较高效的,如:大数据

    def futTxns(items: Seq[TxnItem]): Future[Seq[TxnItem]] = Source(items.toSeq) .via( CassandraFlow.create(CassandraWriteSettings.defaults, CQLScripts.insertTxns, statementBinder) ) .runWith(Sink.seq)