首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Akka(23): Stream:自定义流构件功能-Custom defined stream processing stages

一个Graph可以用GraphShape和GraphStage两个部分来描述:GraphShape描述了Graph的输入输出端口数量,GraphStage描述数据在流通中的转化处理过程。...akka-stream预先提供了一些基本的形状,包括SourceShape/FlowShape/SinkShape:  /** * A Source [[Shape]] has exactly one...GraphStage描述了数据流构件的行为,通过数据流元素在构件中进出流动方式和在流动过程中的转变来定义流构件的具体功能。...akka-stream还提供了一套更简单的API使用户可以更灵活的对端口进行操作。...对于一对多扩散型和多对一合并型形状的数据流构件akka-stream提供了UniformFanIn和UniformFanOut两种GraphStage。

1.7K80
  • 您找到你想要的搜索结果了吗?
    是的
    没有找到

    Akka(19): Stream:组合数据流,组合共用-Graph modular composition

    在更高的功能层面上实现Graph的模块化(modular)。...然后我们再使用这个自定义流图模块组建一个完整的闭合流图: import akka.actor._ import akka.stream._ import akka.stream.scaladsl._...注意这个~>符合的使用:akka-stream只提供了对预设定Shape作为连接对象的支持如: def ~>[Out](junction: UniformFanInShape[T, Out]...无法想象如果用纯函数数据流如scalaz-stream应该怎样去实现这么复杂的流程,也可能根本是没有解决方案的。...不同的还有akka-stream的运算是在actor上进行的,除了大家都能对数据流元素进行处理之外,akka-stream还可以通过actor的内部状态来维护和返回运算结果。

    1.1K100

    Akka(24): Stream:从外部系统控制数据流-control live stream from external system

    与这些外界系统对接的意思是在另一个线程中运行的数据流可以接收外部系统推送的事件及做出行为改变的响应。...如果一个外界系统需要控制一个运行中数据流的功能环节GraphStage,首先必须在这个GraphStage内部构建一个控制函数,这样才能接触并更改GraphStage的内部状态。...外部系统可以通过调用这个控制函数来向GraphStage发送信息,控制GraphStage行为。akka-stream是多线程异步模式的程序,所以这个函数只能是一个异步运行的回调callback。...插入了一个正在运行中的数据流中并在最后终止了这个数据流。 另外,一个GraphStage也可以被外界当作一种Actor来进行交流。...与上个例子一样,作为一个GraphStage的内部函数,它可以使用、更新GraphStage内部状态。

    703100

    Akka(17): Stream:数据流基础组件-Source,Flow,Sink简介

    scalaz-stream的运算器是自备的函数式程序,特点是能很好的控制线程使用和进行并行运算。akka-stream的运算器是materializer。...akka-stream的数据流是由三类基础组件组合而成,不同的组合方式代表不同的数据处理及表达功能。三类组件分别是: 1、Source:数据源。...属于数据元素的使用方,主要作用是消耗数据流中的元素。SinkShape是有一个输入端的数据流形状。...在akka-stream里数据流组件一般被称为数据流图(graph)。我们可以用许多数据流图组成更大的stream-graph。...我们上面提过akka-stream是在actor系统里处理数据流元素的。在这个过程中同时可以用actor内部状态来产生运算结果。

    1.7K60

    使用WebSocket在Server类中无法使用Autowired注解进行自动注入

    问题 在SpringBoot项目中使用WebSocket的过程中有其他的业务操作需要注入其它接口来做相应的业务操作,但是在WebSocket的Server类中使用Autowired注解无效,这样注入的对象就是空...,在使用过程中会报空指针异常。...注释:上面说的WebSocket的Server类就是指被@ServerEndpoint注解修饰的类 原因 原因就是在spring容器中管理的是单例的,他只会注入一次,而WebSocket是多对象的,当有新的用户使用的时候...,他就会新创建一个WebSocket对象,这就导致了用户创建的WebSocket对象都不能注入对象了,所以在运行的时候就会发生注入对象为null的情况; 主要的原因就是Spring容器管理的方式不能直接注入...WebSocket中的对象,所以需要调整一下注入方式。

    5.6K60

    Akka(39): Http:File streaming-文件交换

    这种模式首先解决了纯Http大数据通过Multipart传输所必须进行的数据分段操作和复杂的消息属性设定等需要的技术门槛,再者用户还可以很方便的使用Akka-stream对数据进行深度处理,免去了数据转换的麻烦...更重要的是:Akka-http还支持reactive-stream,可以避免由传输速率所产生的种种问题。在本篇我们讨论利用Akka-http进行文件的双向传递。  ...放入了HttpRequest中。...._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives...._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import akka.http.scaladsl.model.HttpEntity.limitableByteSource

    1.3K90

    akka-streams - 从应用角度学习:basic stream parts

    特别是在传统SQL编程中依赖数据关系的join已经不复存在了,groupby、disctict等操作方法也不是所有的分布式数据库都能支持的。而这些操作在具体的数据呈现和数据处理中又是不可缺少的。...这是这次写akka-streams的初衷:希望能通过akka-streams来实现分布式数据处理编程。 先从基本流部件basic stream parts开始,即source,flow,sink。...由于运算值是无法当作流元素传递的,Flow只能是用来对Source传下来的元素进行转换后再传递给Sink,也就是说Flow是由一个或多个处理环节构成的。...所以via(map(_.toString))无法匹配类型。...这项需求可能还必须留在后面的sream-graph章节中讨论解决方案了。不过临时解决方法可以通过运算值M来实现。

    1.1K10

    Akka(26): Stream:异常处理-Exception handling

    在akka-stream的官方文件中都有详细的说明和示范例子。我们在这篇讨论里也没有什么更好的想法和范例,也只能略做一些字面翻译和分析理解的事了。...对于出现异常的stream,Supervisor-Strategy提供了三种处理方法: Stop:终结stream,返回异常 Resume:越过当前元素,继续运行 Restart:重新启动、越过当前元素...、清除任何内部状态 akka-stream的默认异常处理方式是Stop,即立即终止数据流,返回异常。...从下面的运算结果中我们确定了Restart在重启过程中清除了内部状态,也就是说从发生异常的位置开始重新进行计算了: 0 1 4 0 5 12 好了,下面是这次示范涉及的完整源代码: import akka.actor...._ import akka.stream._ import akka.stream.scaladsl._ import scala.concurrent.duration._ object ExceptionHandling

    1.3K80

    Akka(22): Stream:实时操控:动态管道连接-MergeHub,BroadcastHub and PartitionHub

    在现实中我们会经常遇到这样的场景:有一个固定的数据源Source,我们希望按照程序运行状态来接驳任意数量的下游接收方subscriber、又或者我需要在程序运行时(runtime)把多个数据流向某个固定的数据流终端...从akka-stream的技术文档得知:一对多,多对一或多对多类型的复杂数据流组件必须用GraphDSL来设计,产生Graph类型结果。...但akka-stream提供了MergeHub,BroadcastHub和PartitionHub来支持这样的功能需求。 1、MergeHub:多对一合并类型。...下面是以上示范中MergeHub及BroadcastHub示范的源代码: import akka.NotUsed import akka.stream.scaladsl._ import akka.stream...import akka.stream.scaladsl._ import akka.stream._ import akka.actor._ import scala.concurrent.duration

    94680

    Akka(25): Stream:对接外部系统-Integration

    在现实应用中akka-stream往往需要集成其它的外部系统形成完整的应用。这些外部系统可能是akka系列系统或者其它类型的系统。...akka-stream提供了mapAsync+ask模式可以从一个运算中的数据流向外连接某个Actor来进行数据交换。这是一种akka-stream与Actor集成的应用。...说到与Actor集成,联想到如果能把akka-stream中复杂又消耗资源的运算任务交付给Actor,那么我们就可以充分利用actor模式的routing,cluster,supervison等等特殊功能来实现分布式高效安全的运算...在我们这次的测试里只能使用group类型的Router,因为如果需要对routee实现监管supervision的话,pool类型的router在routee终止时会自动补充构建新的routee,如此就避开了监管策略...把这个数据流传给Calculator,这样Calculator就可以向这个运行中的Stream发送数据了。我们会通过这个过程来示范SourceQueue的基本操作。

    2.1K80

    Akka(20): Stream:异步运算,压力缓冲-Async, batching backpressure and buffering

    因为akka-stream已经在上下游环节全部实现了Reactive-Streams-Specification,所以上下游之间可以进行互动,这样就可以在akka-stream里由下游通知上游自身可接收数据的状态来控制上游数据流速...akka-stream的backpressure使用了缓冲区buffer来成批预存及补充数据,这样可以提高数据传输效率。...akka-stream可以通过以下几种方式来设定异步运算使用的缓冲大小: 1、在配置文件中设定默认buffer: akka.stream.materializer.max-input-buffer-size...= 16 2、在ActorMaterializerSetting中宏观层面上设定: val materializer = ActorMaterializer( ActorMaterializerSettings...with failure. */ def fail: OverflowStrategy = Fail } 当akka-stream需要与外界系统进行数据交换时就无法避免数据流上下游速率不匹配的问题了

    89270

    使用Akka HTTP构建微服务:CDC方法

    一般情况下,在开发Web应用程序的时候,从模型和流程定义开始,深入到软件开发中,都是使用TDD(测试驱动开发)方法:先写测试,考虑我们真正想要的,以及我们如何使用它; 但微服务(microservices...正如我所说的,Pact适用于很多平台,在我们的例子中,用Scala编写Consumer和Producer,我们只能使用一个实现:Scala-Pact。...测试环境也有特定的配置; 只是因为我们在同一个项目中同时拥有生产者和客户端,所以并行执行被禁用,所以如果并行执行(我们稍后会看到它),我们可能会在Pact文件生成和使用过程中遇到问题。...如果应用程序很简单,我们可以使用这种方法,如果不是这样,我们可以为这种测试实现特定的测试运行器,但我建议尽可能与生产案例类似。...在主类中使用它非常容易; 只需将其添加为类特征,并将静态值替换为相应的常量即可: MyLibraryAppServer.scala package com.fm.mylibrary.producer.app

    7.5K50

    Akka(43): Http:SSE-Server Sent Event - 服务端主推消息

    比如一个零售店管理平台的服务端在完成了某些数据更新后需要通知各零售门市客户端下载最新数据。...SSE模式的基本原理是服务端统一集中发布消息,各客户端持久订阅服务端发布的消息并从消息的内容中筛选出属于自己应该执行的指令,然后进行相应的处理。...服务端是通过complete以SeverSentEvent类为元素的Source来进行SSE的,如下: import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling...import akka.stream.ActorMaterializer import akka.stream.scaladsl.Source import scala.concurrent.duration.DurationInt...akka.stream.ActorMaterializer import akka.stream.scaladsl.Source import akka.http.scaladsl.model.sse.ServerSentEvent

    1.1K90
    领券