一个Graph可以用GraphShape和GraphStage两个部分来描述:GraphShape描述了Graph的输入输出端口数量,GraphStage描述数据在流通中的转化处理过程。...akka-stream预先提供了一些基本的形状,包括SourceShape/FlowShape/SinkShape: /** * A Source [[Shape]] has exactly one...GraphStage描述了数据流构件的行为,通过数据流元素在构件中进出流动方式和在流动过程中的转变来定义流构件的具体功能。...akka-stream还提供了一套更简单的API使用户可以更灵活的对端口进行操作。...对于一对多扩散型和多对一合并型形状的数据流构件akka-stream提供了UniformFanIn和UniformFanOut两种GraphStage。
所有基础组件Core-Graph都必须定义Shape来描述它的输入输出端口,定义GraphStage中的GraphStateLogic来描述对数据流元素具体的读写方式。...import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import scala.collection.immutable...因为继承了Shape类所以必须实现Shape类的抽象函数。...所以我们最好使用akka-stream提供现成的pull,push来重写抽象函数onPull,onPush。...._ import akka.stream._ import akka.stream.scaladsl._ import akka.stream.ActorAttributes._ import akka.stream.stage
在更高的功能层面上实现Graph的模块化(modular)。...然后我们再使用这个自定义流图模块组建一个完整的闭合流图: import akka.actor._ import akka.stream._ import akka.stream.scaladsl._...注意这个~>符合的使用:akka-stream只提供了对预设定Shape作为连接对象的支持如: def ~>[Out](junction: UniformFanInShape[T, Out]...无法想象如果用纯函数数据流如scalaz-stream应该怎样去实现这么复杂的流程,也可能根本是没有解决方案的。...不同的还有akka-stream的运算是在actor上进行的,除了大家都能对数据流元素进行处理之外,akka-stream还可以通过actor的内部状态来维护和返回运算结果。
与这些外界系统对接的意思是在另一个线程中运行的数据流可以接收外部系统推送的事件及做出行为改变的响应。...如果一个外界系统需要控制一个运行中数据流的功能环节GraphStage,首先必须在这个GraphStage内部构建一个控制函数,这样才能接触并更改GraphStage的内部状态。...外部系统可以通过调用这个控制函数来向GraphStage发送信息,控制GraphStage行为。akka-stream是多线程异步模式的程序,所以这个函数只能是一个异步运行的回调callback。...插入了一个正在运行中的数据流中并在最后终止了这个数据流。 另外,一个GraphStage也可以被外界当作一种Actor来进行交流。...与上个例子一样,作为一个GraphStage的内部函数,它可以使用、更新GraphStage内部状态。
刚好,在这篇讨论里我们希望能介绍一些Akka-Stream和外部系统集成对接的实际用例,把Slick数据库数据载入连接到Akka-Stream形成streaming-dataset应该是一个挺好的想法。...enqueue代表akka-stream向scalaz-stream-fs2发送数据,可以用akka-stream的Sink构件来实现: class FS2Gate[T](q: fs2.async.mutable.Queue...GraphStage描述了对上游每一个元素的enqueue动作。...._ import akka.stream._ import akka.stream.scaladsl._ import akka.stream.stage._ import slick.basic.DatabasePublisher...import akka._ import fs2._ import akka.stream.stage.
scalaz-stream的运算器是自备的函数式程序,特点是能很好的控制线程使用和进行并行运算。akka-stream的运算器是materializer。...akka-stream的数据流是由三类基础组件组合而成,不同的组合方式代表不同的数据处理及表达功能。三类组件分别是: 1、Source:数据源。...属于数据元素的使用方,主要作用是消耗数据流中的元素。SinkShape是有一个输入端的数据流形状。...在akka-stream里数据流组件一般被称为数据流图(graph)。我们可以用许多数据流图组成更大的stream-graph。...我们上面提过akka-stream是在actor系统里处理数据流元素的。在这个过程中同时可以用actor内部状态来产生运算结果。
问题 在SpringBoot项目中使用WebSocket的过程中有其他的业务操作需要注入其它接口来做相应的业务操作,但是在WebSocket的Server类中使用Autowired注解无效,这样注入的对象就是空...,在使用过程中会报空指针异常。...注释:上面说的WebSocket的Server类就是指被@ServerEndpoint注解修饰的类 原因 原因就是在spring容器中管理的是单例的,他只会注入一次,而WebSocket是多对象的,当有新的用户使用的时候...,他就会新创建一个WebSocket对象,这就导致了用户创建的WebSocket对象都不能注入对象了,所以在运行的时候就会发生注入对象为null的情况; 主要的原因就是Spring容器管理的方式不能直接注入...WebSocket中的对象,所以需要调整一下注入方式。
这种模式首先解决了纯Http大数据通过Multipart传输所必须进行的数据分段操作和复杂的消息属性设定等需要的技术门槛,再者用户还可以很方便的使用Akka-stream对数据进行深度处理,免去了数据转换的麻烦...更重要的是:Akka-http还支持reactive-stream,可以避免由传输速率所产生的种种问题。在本篇我们讨论利用Akka-http进行文件的双向传递。 ...放入了HttpRequest中。...._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives...._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import akka.http.scaladsl.model.HttpEntity.limitableByteSource
特别是在传统SQL编程中依赖数据关系的join已经不复存在了,groupby、disctict等操作方法也不是所有的分布式数据库都能支持的。而这些操作在具体的数据呈现和数据处理中又是不可缺少的。...这是这次写akka-streams的初衷:希望能通过akka-streams来实现分布式数据处理编程。 先从基本流部件basic stream parts开始,即source,flow,sink。...由于运算值是无法当作流元素传递的,Flow只能是用来对Source传下来的元素进行转换后再传递给Sink,也就是说Flow是由一个或多个处理环节构成的。...所以via(map(_.toString))无法匹配类型。...这项需求可能还必须留在后面的sream-graph章节中讨论解决方案了。不过临时解决方法可以通过运算值M来实现。
版本号:maven-resources-plugin:3.1.0 bootstrap.yml spring: application: name: ...
在akka-stream的官方文件中都有详细的说明和示范例子。我们在这篇讨论里也没有什么更好的想法和范例,也只能略做一些字面翻译和分析理解的事了。...对于出现异常的stream,Supervisor-Strategy提供了三种处理方法: Stop:终结stream,返回异常 Resume:越过当前元素,继续运行 Restart:重新启动、越过当前元素...、清除任何内部状态 akka-stream的默认异常处理方式是Stop,即立即终止数据流,返回异常。...从下面的运算结果中我们确定了Restart在重启过程中清除了内部状态,也就是说从发生异常的位置开始重新进行计算了: 0 1 4 0 5 12 好了,下面是这次示范涉及的完整源代码: import akka.actor...._ import akka.stream._ import akka.stream.scaladsl._ import scala.concurrent.duration._ object ExceptionHandling
在现实中我们会经常遇到这样的场景:有一个固定的数据源Source,我们希望按照程序运行状态来接驳任意数量的下游接收方subscriber、又或者我需要在程序运行时(runtime)把多个数据流向某个固定的数据流终端...从akka-stream的技术文档得知:一对多,多对一或多对多类型的复杂数据流组件必须用GraphDSL来设计,产生Graph类型结果。...但akka-stream提供了MergeHub,BroadcastHub和PartitionHub来支持这样的功能需求。 1、MergeHub:多对一合并类型。...下面是以上示范中MergeHub及BroadcastHub示范的源代码: import akka.NotUsed import akka.stream.scaladsl._ import akka.stream...import akka.stream.scaladsl._ import akka.stream._ import akka.actor._ import scala.concurrent.duration
在现实应用中akka-stream往往需要集成其它的外部系统形成完整的应用。这些外部系统可能是akka系列系统或者其它类型的系统。...akka-stream提供了mapAsync+ask模式可以从一个运算中的数据流向外连接某个Actor来进行数据交换。这是一种akka-stream与Actor集成的应用。...说到与Actor集成,联想到如果能把akka-stream中复杂又消耗资源的运算任务交付给Actor,那么我们就可以充分利用actor模式的routing,cluster,supervison等等特殊功能来实现分布式高效安全的运算...在我们这次的测试里只能使用group类型的Router,因为如果需要对routee实现监管supervision的话,pool类型的router在routee终止时会自动补充构建新的routee,如此就避开了监管策略...把这个数据流传给Calculator,这样Calculator就可以向这个运行中的Stream发送数据了。我们会通过这个过程来示范SourceQueue的基本操作。
因为akka-stream已经在上下游环节全部实现了Reactive-Streams-Specification,所以上下游之间可以进行互动,这样就可以在akka-stream里由下游通知上游自身可接收数据的状态来控制上游数据流速...akka-stream的backpressure使用了缓冲区buffer来成批预存及补充数据,这样可以提高数据传输效率。...akka-stream可以通过以下几种方式来设定异步运算使用的缓冲大小: 1、在配置文件中设定默认buffer: akka.stream.materializer.max-input-buffer-size...= 16 2、在ActorMaterializerSetting中宏观层面上设定: val materializer = ActorMaterializer( ActorMaterializerSettings...with failure. */ def fail: OverflowStrategy = Fail } 当akka-stream需要与外界系统进行数据交换时就无法避免数据流上下游速率不匹配的问题了
前面我们全面介绍了在akka-cluster环境下实现的CQRS写端write-side。简单来说就是把发生事件描述作为对象严格按发生时间顺序写入数据库。...,我们无法控制akka传递消息的过程。...这个EventEnvelope类定义如下: /** * Event wrapper adding meta data for the events in the result stream of...refresh-interval可以在配置文件中设置,如下面的cassandra-plugin配置: cassandra-query-journal { # Implementation class...我们可以run这个stream把数据读入一个集合里,然后可以在任何一个线程里用这个集合演算业务逻辑(如我们前面提到的写端状态转变维护过程),可以用router/routee模式来实现一种在集群节点中负载均衡式的分配
虽然这次的restapi是围绕着数据库表的CRUD操作设计的,但文件类数据在服务端与客户端之间的交换其实也很常用,特别是多媒体类如图片等文件类型。...akka-http是以akka-stream为核心的,使用了大量的akka-stream功能。...自带了ByteString的Marshaller,可以实现数据格式自动转换,在网络传输中不需要增加什么数据格式转换动作。...系统之外的线程池来进行FileIO操作,可以避免影响akka系统的运行效率。...._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.server.Directives._ import
akka-http提供了丰富的Marshaller来实现自动的数据转换,但在编译时要提供Marshaller的隐式实例implicit instance,所以用类参数是无法通过编译的。...} package com.datatech.restapi import akka.actor._ import akka.stream._ import akka.http.scaladsl.common...akka-stream提供的功能。..." %% "akka-http" % "10.1.8", "com.typesafe.akka" %% "akka-stream" % "2.5.23", "com.pauldijou" %...._ import akka.stream._ import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives._
一般情况下,在开发Web应用程序的时候,从模型和流程定义开始,深入到软件开发中,都是使用TDD(测试驱动开发)方法:先写测试,考虑我们真正想要的,以及我们如何使用它; 但微服务(microservices...正如我所说的,Pact适用于很多平台,在我们的例子中,用Scala编写Consumer和Producer,我们只能使用一个实现:Scala-Pact。...测试环境也有特定的配置; 只是因为我们在同一个项目中同时拥有生产者和客户端,所以并行执行被禁用,所以如果并行执行(我们稍后会看到它),我们可能会在Pact文件生成和使用过程中遇到问题。...如果应用程序很简单,我们可以使用这种方法,如果不是这样,我们可以为这种测试实现特定的测试运行器,但我建议尽可能与生产案例类似。...在主类中使用它非常容易; 只需将其添加为类特征,并将静态值替换为相应的常量即可: MyLibraryAppServer.scala package com.fm.mylibrary.producer.app
不过因为是Akka-http的配套库,在将来Akka-http的持续发展中具有一定的优势,所以我们还是用它来进行下面的示范。 下面就让我们开始写些代码吧。...._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives...下面是这部分客户端的完整代码: import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http...complete { futofNames } } } } } 在客户端试运行返回结果显示...._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import akka.http.scaladsl.model
比如一个零售店管理平台的服务端在完成了某些数据更新后需要通知各零售门市客户端下载最新数据。...SSE模式的基本原理是服务端统一集中发布消息,各客户端持久订阅服务端发布的消息并从消息的内容中筛选出属于自己应该执行的指令,然后进行相应的处理。...服务端是通过complete以SeverSentEvent类为元素的Source来进行SSE的,如下: import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling...import akka.stream.ActorMaterializer import akka.stream.scaladsl.Source import scala.concurrent.duration.DurationInt...akka.stream.ActorMaterializer import akka.stream.scaladsl.Source import akka.http.scaladsl.model.sse.ServerSentEvent
领取专属 10元无门槛券
手把手带您无忧上云