Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >akka-streams - 从应用角度学习:basic stream parts

akka-streams - 从应用角度学习:basic stream parts

作者头像
用户1150956
发布于 2020-09-08 13:05:43
发布于 2020-09-08 13:05:43
1.1K00
代码可运行
举报
运行总次数:0
代码可运行

实际上很早就写了一系列关于akka-streams的博客。但那个时候纯粹是为了了解akka而去学习的,主要是从了解akka-streams的原理为出发点。因为akka-streams是akka系列工具的基础,如:akka-http, persistence-query等都是基于akka-streams的,其实没有真正把akka-streams用起来。这段时间所遇到的一些需求也是通过集合来解决的。不过,现在所处的环境还是逼迫着去真正了解akka-streams的应用场景。现状是这样的:跨入大数据时代,已经有大量的现代IT系统从传统关系数据库转到分布式数据库(非关系数据库)了。不难想象,这些应用的数据操作编程不说截然不同吧,肯定也会有巨大改变。特别是在传统SQL编程中依赖数据关系的join已经不复存在了,groupby、disctict等操作方法也不是所有的分布式数据库都能支持的。而这些操作在具体的数据呈现和数据处理中又是不可缺少的。当然,有很多需求可以通过集合来满足,但涉及到大数据处理我想最好还是通过流处理来实现,因为流处理stream-processing的其中一项特点就是能够在有限的内存空间里处理无限量的数据。所以流处理应该是分布式数据处理的理想方式了。这是这次写akka-streams的初衷:希望能通过akka-streams来实现分布式数据处理编程。

先从基本流部件basic stream parts开始,即source,flow,sink。这几个部件可以组合成一个所谓线性流linear-stream。一个流对数据的处理包括两部分:1、对流中元素进行转变,如:source:Source[Int,NotUsed] = Source(1 to 10).map(i => i.toString),把流里的所有Int转变成String、2、对流内元素进行计算得出运算结果,如:sink: Sink[Int,Future[Int]] = Sink.fold(0)(_ + _)。当我们run这个sink后得出Future[Int],如:res: Future[Int] = src.runWith(sink)。这两项对流元素的操作所产生的结果不同:元素转换得到动态流动的一串元素、运算元素得到一个静态值,这个运算值materialized-value只能在Sink里获取。即使有这样的表示方式:Source[Int,Future[Int]],这是个迷惑,这个运算值只能通过自定义的graph才能得到,也就是说基本组件是没这个功能的。举个具体的例子吧:val source: Source[Int, Promise[Option[Int]]] = Source.maybe[Int] 这个表达式貌似可以在Source方获取运算值,再看看Source.maybe[Int]:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  def maybe[T]: Source[T, Promise[Option[T]]] =
    Source.fromGraph(MaybeSource.asInstanceOf[Graph[SourceShape[T], Promise[Option[T]]]])

可以看出这个Source.maybe是从graph构建的。

上面这个例子里用一个Source对接一个Sink已经组成了一个完整的流,那么Flow是用来干什么的呢?由于运算值是无法当作流元素传递的,Flow只能是用来对Source传下来的元素进行转换后再传递给Sink,也就是说Flow是由一个或多个处理环节构成的。用Flow来分步实现功能是流处理实现并行运算的基本方式,如:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Source(1 to 10).async.via(Flow[Int].map(i => i + 1)).async.runWith(sink)

用async把这个流分割成3个运算发送给3个actor去同时运算。乍看之下map好像是个Flow,它们的作用也似乎相同,也可以对接Source。如:Source(1 to 10).map(_ + 1)。但map和Flow还是有分别的,从类型款式来看Flow[In,Out,M]比起map[A,B]多出来了M,运算值。所以via(map(_.toString))无法匹配类型。那么对于定义带有预先处理环节的Sink就必须用Flow来实现了:ex_sink = Flow[Int].map(_ + 1).to(sink)。

虽然运算值不能像流元素一样流动,但akka-streams提供了机制让用户选择是否返回某个节点的运算值M。系统默认只选择最最左边节点的M,如:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// A source that can be signalled explicitly from the outside
val source: Source[Int, Promise[Option[Int]]] = Source.maybe[Int]

// A flow that internally throttles elements to 1/second, and returns a Cancellable
// which can be used to shut down the stream
val flow: Flow[Int, Int, Cancellable] = throttler

// A sink that returns the first element of a stream in the returned Future
val sink: Sink[Int, Future[Int]] = Sink.head[Int]

val stream: RunnableGraph[(Cancellable, Future[Int])] =
  source.viaMat(flow)(Keep.right).toMat(sink)(Keep.both)

val stream1: RunnableGraph[(Promise[Option[Int]], Cancellable, Future[Int])] =
  source.viaMat(flow)(Keep.both).toMat(sink)(Keep.both)

运算值M可以通过viaMat,toMat选择,然后stream.run()获取。akka-streams提供了简便一点的运算方式runWith:指定runWith参数流组件的M为最终运算值。如:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// Using runWith will always give the materialized values of the stages added
// by runWith() itself
val r4: Future[Int] = source.via(flow).runWith(sink)
val r5: Promise[Option[Int]] = flow.to(sink).runWith(source)
val r6: (Promise[Option[Int]], Future[Int]) = flow.runWith(source, sink)

值得注意的是:我们可以分别从Source,Sink,Flow开始针对Source runWith(Sink), Sink runWith(Source)及Flow runWith (Source,Sink)。用基础流组件Source,Flow,Sink构成的流是直线型的。也就是说从Source流出的元素会一个不漏的经过Flow进入Sink,不能多也不能少。可能Source.filter会产生疑惑,不过看看filter函数定义就明白了:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def filter(p: Out => Boolean): Repr[Out] = via(Filter(p))

@InternalApi private[akka] final case class Filter[T](p: T => Boolean) extends SimpleLinearGraphStage[T] {
  override def initialAttributes: Attributes = DefaultAttributes.filter

  override def toString: String = "Filter"

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with OutHandler with InHandler {
      def decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider

      private var buffer: OptionVal[T] = OptionVal.none

      override def preStart(): Unit = pull(in)
      override def onPush(): Unit =
        try {
          val elem = grab(in)
          if (p(elem))
            if (isAvailable(out)) {
              push(out, elem)
              pull(in)
            } else
              buffer = OptionVal.Some(elem)
          else pull(in)
        } catch {
          case NonFatal(ex) =>
            decider(ex) match {
              case Supervision.Stop => failStage(ex)
              case _                => pull(in)
            }
        }

      override def onPull(): Unit =
        buffer match {
          case OptionVal.Some(value) =>
            push(out, value)
            buffer = OptionVal.none
            if (!isClosed(in)) pull(in)
            else completeStage()
          case _ => // already pulled
        }

      override def onUpstreamFinish(): Unit =
        if (buffer.isEmpty) super.onUpstreamFinish()
      // else onPull will complete

      setHandlers(in, out, this)
    }
}

怎样?够复杂的了吧。很明显,复杂点的流处理需要根据上游元素内容来维护内部状态从而重新构建向下游发送元素的机制。如果想实现join,groupby,distict这些功能就必然对流动元素除转换之外还需要进行增减操作。这项需求可能还必须留在后面的sream-graph章节中讨论解决方案了。不过临时解决方法可以通过运算值M来实现。因为M可以是一个集合,在构建这个M集合时是可以对集合元素进行增减的,下面这段代码示范了一种cassandra数据表groupby的效果:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 def getVouchers(terminalid: String, susp: Boolean)(implicit classicSystem: ActorSystem) = {
    implicit val session = CassandraSessionRegistry(classicSystem).sessionFor("alpakka.cassandra")
    implicit val ec = classicSystem.dispatcher
    var stmt = "select * from pos_on_cloud.txn_log where terminal = ? and txndate = ?"
    if (susp) stmt = "select * from pos_on_cloud.txn_hold where terminal = ? and txndate = ?"
    val source  = session.select(stmt,terminalid,LocalDate.now.format(DateTimeFormatter.ofPattern("yyyyMMdd")))
    val sink = Sink.fold[List[TxnItem],TxnItem](List[TxnItem]()){(acc,txn) =>
      if (acc.isEmpty) txn.copy(price = 1) :: acc
      else
        if (acc.head.num == txn.num) {
          if (txn.salestype == SALESTYPE.itm &&
            txn.txntype == TXNTYPE.sales) {
            val nacc = acc.head.copy(
              price = acc.head.price + 1,
              qty = acc.head.qty + txn.qty,
              amount = acc.head.amount + txn.amount,
              dscamt = acc.head.dscamt + txn.dscamt
            )
            nacc :: acc.drop(1)
          } else acc
        }
        else txn :: acc
    }
    for {
      vchs <- source.map(TxnItem.fromCqlRow).toMat(sink)(Keep.right).run()
      _ <- session.close(ec)
    } yield vchs
  }

当然,基本流组件在流模式数据库读写方面还是比较高效的,如:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    def futTxns(items: Seq[TxnItem]): Future[Seq[TxnItem]] = Source(items.toSeq)
      .via(
        CassandraFlow.create(CassandraWriteSettings.defaults,
          CQLScripts.insertTxns,
          statementBinder)
      )
      .runWith(Sink.seq)
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-09-07 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Akka(26): Stream:异常处理-Exception handling
   akka-stream是基于Actor模式的,所以也继承了Actor模式的“坚韧性(resilient)”特点,在任何异常情况下都有某种整体统一的异常处理策略和具体实施方式。在akka-stre
用户1150956
2018/01/05
1.3K0
Akka(17): Stream:数据流基础组件-Source,Flow,Sink简介
    在大数据程序流行的今天,许多程序都面临着共同的难题:程序输入数据趋于无限大,抵达时间又不确定。一般的解决方法是采用回调函数(callback-function)来实现的,但这样的解决方案很容易
用户1150956
2018/01/05
1.7K0
Akka(25): Stream:对接外部系统-Integration
   在现实应用中akka-stream往往需要集成其它的外部系统形成完整的应用。这些外部系统可能是akka系列系统或者其它类型的系统。所以,akka-stream必须提供一些函数和方法来实现与各种不
用户1150956
2018/01/05
2.1K0
Akka(36): Http:Client-side-Api,Client-Connections
   Akka-http的客户端Api应该是以HttpRequest操作为主轴的网上消息交换模式编程工具。我们知道:Akka-http是搭建在Akka-stream之上的。所以,Akka-http在客
用户1150956
2018/01/05
1.1K0
Akka(20): Stream:异步运算,压力缓冲-Async, batching backpressure and buffering
   akka-stream原则上是一种推式(push-model)的数据流。push-model和pull-model的区别在于它们解决问题倾向性:push模式面向高效的数据流下游(fast-dow
用户1150956
2018/01/05
9210
alpakka-kafka(1)-producer
alpakka项目是一个基于akka-streams流处理编程工具的scala/java开源项目,通过提供connector连接各种数据源并在akka-streams里进行数据处理。alpakka-kafka就是alpakka项目里的kafka-connector。对于我们来说:可以用alpakka-kafka来对接kafka,使用kafka提供的功能。或者从另外一个角度讲:alpakka-kafka就是一个用akka-streams实现kafka功能的scala开发工具。
用户1150956
2021/03/02
1K0
Akka(22): Stream:实时操控:动态管道连接-MergeHub,BroadcastHub and PartitionHub
  在现实中我们会经常遇到这样的场景:有一个固定的数据源Source,我们希望按照程序运行状态来接驳任意数量的下游接收方subscriber、又或者我需要在程序运行时(runtime)把多个数据流向某
用户1150956
2018/01/05
9590
Akka(18): Stream:组合数据流,组件-Graph components
   akka-stream的数据流可以由一些组件组合而成。这些组件统称数据流图Graph,它描述了数据流向和处理环节。Source,Flow,Sink是最基础的Graph。用基础Graph又可以组合
用户1150956
2018/01/05
1.2K0
alpakka-kafka(2)-consumer
alpakka-kafka-consumer的功能描述很简单:向kafka订阅某些topic然后把读到的消息传给akka-streams做业务处理。在kafka-consumer的实现细节上,为了达到高可用、高吞吐的目的,topic又可用划分出多个分区partition。分区是分布在kafka集群节点broker上的。由于一个topic可能有多个partition,对应topic就会有多个consumer,形成一个consumer组,共用统一的groupid。一个partition只能对应一个consumer、而一个consumer负责从多个partition甚至多个topic读取消息。kafka会根据实际情况将某个partition分配给某个consumer,即partition-assignment。所以一般来说我们会把topic订阅与consumer-group挂钩。这个可以在典型的ConsumerSettings证实:
用户1150956
2021/03/02
6400
Akka(23): Stream:自定义流构件功能-Custom defined stream processing stages
    从总体上看:akka-stream是由数据源头Source,流通节点Flow和数据流终点Sink三个框架性的流构件(stream components)组成的。这其中:Source和Sink是
用户1150956
2018/01/05
1.8K0
akka-grpc - 基于akka-http和akka-streams的scala gRPC开发工具
关于grpc,在前面的scalaPB讨论里已经做了详细的介绍:google gRPC是一种全新的RPC框架,在开源前一直是google内部使用的集成工具。gRPC支持通过http/2实现protobuf格式数据交换。protobuf即protocol buffer,是google发明的一套全新的序列化传输协议serialization-protocol,是二进制编码binary-encoded的,相对java-object,XML,Json等在空间上占有优势,所以数据传输效率更高。由于gRPC支持http/2协议,可以实现双向通讯duplex-communication,解决了独立request/response交互模式在软件编程中的诸多局限。这是在系统集成编程方面相对akka-http占优的一个亮点。protobuf格式数据可以很方便的转换成 json格式数据,支持对外部系统的的开放协议数据交换。这也是一些人决定选择gRPC作为大型系统微服务集成开发工具的主要原因。更重要的是:用protobuf和gRPC进行client/server交互不涉及任何http对象包括httprequest,httpresponse,很容易上手使用,而且又有在google等大公司内部的成功使用经验,用起来会更加放心。
用户1150956
2020/08/25
2.1K0
Akka(19): Stream:组合数据流,组合共用-Graph modular composition
   akka-stream的Graph是一种运算方案,它可能代表某种简单的线性数据流图如:Source/Flow/Sink,也可能是由更基础的流图组合而成相对复杂点的某种复合流图,而这个复合流图本身
用户1150956
2018/01/05
1.1K0
Akka(19): Stream:组合数据流,组合共用-Graph modular composition
SDP(12): MongoDB-Engine - Streaming
   在akka-alpakka工具包里也提供了对MongoDB的stream-connector,能针对MongoDB数据库进行streaming操作。这个MongoDB-connector里包含了MongoSource,MongoFlow,MongoSink。我们只使用MongoSource,其它两个我们直接用mapAsyc来创造。下面是MongoSource的定义: object MongoSource { def apply(query: Observable[Document]): Sour
用户1150956
2018/04/02
1.4K0
Akka-CQRS(8)- CQRS Reader Actor 应用实例
前面我们已经讨论了CQRS-Reader-Actor的基本工作原理,现在是时候在之前那个POS例子里进行实际的应用示范了。
用户1150956
2019/06/24
1.6K0
Akka(27): Stream:Use case-Connecting Slick-dbStream & Scalaz-stream-fs2
 在以前的博文中我们介绍了Slick,它是一种FRM(Functional Relation Mapper)。有别于ORM,FRM的特点是函数式的语法可以支持灵活的对象组合(Query Compos
用户1150956
2018/01/05
9030
Akka(21): Stream:实时操控:人为中断-KillSwitch
KillSwitch是Akka中用于实现多播和流处理的工具。它通过将多个数据流共享一个单一的KillSwitch来控制多个数据流的终止。使用KillSwitch时,多个数据流可以共享一个单一的KillSwitch,并通过调用KillSwitch的shutdown()方法来同时终止所有数据流的执行。此外,KillSwitch还可以通过调用abort()方法来终止数据流的执行,并抛出异常。在Akka中,使用KillSwitch可以有效地管理多个数据流,并确保所有数据流都按照预期执行。
用户1150956
2018/01/05
8600
Akka(37): Http:客户端操作模式
该文章是一篇关于技术社区和编辑人员如何参与社区管理、贡献技术文档并解决技术问题的文章。主要介绍了技术社区中编辑人员的工作职责和流程,包括技术社区的建立、文档的编辑和管理、技术问题的解决、社区沟通和贡献度量等方面的内容。文章还探讨了技术社区中的编辑人员如何与其他社区成员、管理团队和利益相关者进行协作和沟通,以确保社区的健康发展和成长。
用户1150956
2018/01/05
1.2K0
Akka-Cluster(5)- load-balancing with backoff-supervised stateless computation - 无状态任务集群节点均衡分配
 分布式程序运算是一种水平扩展(scale-out)运算模式,其核心思想是能够充分利用服务器集群中每个服务器节点的计算资源,包括:CPU、内存、硬盘、IO总线等。首先对计算任务进行分割,然后把细分的任务分派给各节点去运算。细分的任务相互之间可以有关联或者各自为独立运算,使用akka-cluster可以把任务按照各节点运算资源的负载情况进行均匀的分配,从而达到资源的合理充分利用以实现运算效率最大化的目的。如果一项工作可以被分割成多个独立的运算任务,那么我们只需要关注如何合理地对细分任务进行分配以实现集群节点的负载均衡,这实际上是一种对无需维护内部状态的运算任务的分配方式:fire and forget。由于承担运算任务的目标actor具体的部署位置是由算法决定的,所以我们一般不需要控制指定的actor或者读取它的内部状态。当然,如果需要的话我们还是可以通过嵌入消息的方式来实现这样的功能。
用户1150956
2019/01/07
1.5K0
PICE(3):CassandraStreaming - gRPC-CQL Service
  在上一篇博文里我们介绍了通过gRPC实现JDBC数据库的streaming,这篇我们介绍关于cassandra的streaming实现方式。如果我们需要从一个未部署cassandra的节点或终端上读取cassandra数据,可以用gRPC来搭建一个数据桥梁来连接这两端。这时cassandra这端就是gRPC-Server端,由它提供cassandra的数据服务。
用户1150956
2018/07/31
1.2K0
Akka(41): Http:DBTable-rows streaming - 数据库表行交换
  在前面一篇讨论里我们介绍了通过http进行文件的交换。因为文件内容是以一堆bytes来表示的,而http消息的数据部分也是byte类型的,所以我们可以直接用Source[ByteString,_]
用户1150956
2018/01/05
1.6K0
推荐阅读
相关推荐
Akka(26): Stream:异常处理-Exception handling
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验