首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

PlayScala 2.5.x - 实现完全异步非阻塞的流数据导出

介绍 从Play2.5.x开始,Play使用Akka Streams实现流处理,废弃了之前的Enumerator/Iteratee Api。...虽然官方已经更新了JavaStream的文档,但是ScalaStream的文档仍然没有更新,已经提了issue,希望能尽快得到反馈。...结合Play和ReactiveMongo二者的流处理功能,我们可以很方便地实现完全异步非阻塞的报表导出功能。...实现 由于ReactiveMongo暂时还没有提供Akka Streams的流处理实现,所以无法直接通过map/flatMap直接返回一个Stream写回响应: @Singleton class TestStreamController...数据传递过程如下: foldBulks(读取查询结果) -> sourceActor(收集查询结果) -> source(生产者) -> Ok.chunked(消费者) 下面是浏览器中看到的效果: ?

85140
  • 您找到你想要的搜索结果了吗?
    是的
    没有找到

    实时同步MongoDB Oplog开发指南

    Capped Collections MongoDB有一种特殊的Collection叫Capped collections,它的插入速度非常快,基本和磁盘的写入速度差不多,并且支持按照插入顺序高效的查询操作...Capped collections的使用限制: 如果更新数据,你需要为之创建索引以防止collection scan; 更新数据时,文档的大小不能改变。...位: 前32位是time_t值,表示从epoch时间至今的秒数 后32位是ordinal值,该值是一个顺序增长的序数,表示某一秒内的第几次操作 开始同步Oplog 在开始同步Oplog之前,我们需要注意以下几点...: 由于Oplog不使用索引,所以初始查询代价可能很大 当Oplog数据量很大时,可以保存ts,系统重启时利用该ts可以减少首次查询开销 oplogReplay标志可以显著加快包含ts条件过滤的查询,但是只对...的Akka Stream实现有bug,如果首次查询没有数据返回,则会持续发送查询请求,大约每秒中发送几十次至几百次请求,因为Oplog的查询开销很大,最终会导致MongoDB内存溢出。

    2.6K80

    Akka 指南 之「第 3 部分: 使用设备 Actors」

    这还允许我们在不存在写入部分的时候测试 Actor 的查询部分,因为设备 Actor 可以报告空结果。 从设备 Actor 获取当前温度的协议很简单。Actor: 等待当前温度的请求。...如果我们依赖消息的成功处理,那么一旦订单提交给负责验证它、处理它并将其放入数据库的内部 API,Actor 就会报告成功。不幸的是,在调用 API 之后,可能会立即发生以下任何情况: 主机可能崩溃。...我们只希望在订单被实际完全处理和持久化后报告成功。唯一能够报告成功的实体是应用程序本身,因为只有它对所需的域保证最了解。...我们的设备 Actor 有责任为给定查询的响应使用相同的 ID 参数,这将使它看起来像下面这样。...我们已经看到,Akka 不保证这些消息的传递,并将其留给应用程序以提供成功通知。在我们的情况下,一旦我们更新了上次的温度记录,例如TemperatureRecorded,我们希望向发送方发送确认。

    59530

    修复 Flink Kubernetes 资源分配慢 兼谈如何贡献开源社区

    而 TaskManager 的日志则没有异常,均为向 ResourceManager 注册成功,但是向新作业的 JobManager 注册时发生超时造成的被迫退出,日志日下: 2020-10-11 21...后面我们通过短时间快速查询多个 IP 的主机名时,确认 DNS 反应速度会变的异常缓慢(后续了解到是云 DNS 做了反查频率限制导致的),而正是服务器迟迟不返回造成 Flink Akka Dispatcher...把 DNS 反向解析功能下放到 getter 方法中,在首次访问时进行主机名获取和保存。 经过验证,两种方法均可解决本文提到的资源分配缓慢的问题。...注意请务必按照模板里的 Checklist 做逐项检查和填写,否则会影响 Review。 Review 和修改 当一个 Pull Request 提交后,Bot 会介入进行自动化构建,并随后更新结果。...如果构建失败,则需要仔细检查是不是代码风格未通过校验(例如 JavaDoc 编写不规范,每句话后没有加句号、有未使用的 import、换行不规范等问题),或者文档未更新(文档编辑后需要进入flink-docs

    2.6K41

    你有必要了解一下Flink底层RPC使用的框架和原理

    Akka介绍 由于Flink底层Rpc是基于Akka实现,我们先了解下Akka的基本使用。 Akka是一个开发并发、容错和可伸缩应用的框架。...它是Actor Model的一个实现,和Erlang的并发模型很像。在Actor模型中,所有的实体被认为是独立的actors。actors和其他actors通过发送异步消息通信。...创建Akka系统 Akka系统的核心ActorSystem和Actor,若需构建一个Akka系统,首先需要创建ActorSystem,创建完ActorSystem后,可通过其创建Actor(注意:Akka...如下代码展示了如何配置一个Akka系统。 // 1....总结 RPC框架是Flink任务运行的基础,Flink整个RPC框架基于Akka实现,并对Akka中的ActorSystem、Actor进行了封装和使用,文章主要分析了Flink底层RPC通信框架的实现和相关流程

    2.4K30

    Akka 指南 之「第 5 部分: 查询设备组」

    明确地: 设备 Actor 返回一个不需要状态更改的读取 记录温度,更新单个字段 设备组 Actor 通过添加或删除映射中的条目来维护组成员身份 在本部分中,我们将使用一个更复杂的示例。...查询到达后启动的 Actor 可以被忽略。 如果快照中的某个 Actor 在查询期间停止而没有应答,我们将向查询消息的发送者报告它停止的事实。...scheduleOnce的返回值是Cancellable,如果查询及时成功完成,可以使用它取消定时器。在查询开始时,我们需要询问每个设备 Actor 当前的温度。...如果没有,我们将查询结果发送给原始请求者并停止查询 Actor。否则,我们需要更新repliesSoFar和stillWaiting结构并等待更多的消息。...为此,我们建议你查看以下技术,看看哪些适合你: 「Akka HTTP」是一个 HTTP 服务和客户端的库,使发布和使用 HTTP 端点(endpoints)成为可能。

    1.1K20

    Akka 指南 之「持久化」

    持久化 Actor 的createReceiveRecover方法通过处理Evt和SnapshotOffer消息来定义在恢复过程中如何更新状态。...成功完成这些操作后,将调用内部回调(一旦日志确认了它们所持续的事件是持久的)。只有在成功地调用了所有这些处理程序之后,才能将下一个命令传递给持久性 Actor。...警告:如果你使用「持久性查询」,查询结果可能会丢失日志中已删除的消息,这取决于日志插件中如何实现删除。...对于严重的故障(如恢复或持久化事件失败),在调用故障处理程序后将停止持久性 Actor。...下面的示例强调了消息如何到达 Actor 的邮箱,以及在使用persist()时它们如何与其内部存储机制交互。

    3.5K30

    CQRS架构简介

    这些Event Handler可能是更新Q端的ReadDB,也可能是发送邮件,也可能是调用外部系统的接口。...ES的方式持久化; 持久化完成后,更新actor的状态; 更新状态完成后,再处理mailbox中的下一个消息; 从上面的过程,我们可以看出,akka框架本质上也实现了避免资源竞争的原则,因为每个actor...所以,我也是认为,事件不必异步持久化,完全可以像akka框架那样,产生的事件先同步持久化,完成后再更新actor的状态即可。...上面说了,akka框架的核心工作原理,以及其他一些方面,比如akka会确保一个actor实例在集群中只有一个。这点其实也是和本文说的一样,也是为了避免资源竞争,包括它的mailbox也是一样。...之前我设计ENode时,没了解过akka框架,后来我学习后,发现和ENode的思想是如此接近,呵呵。

    1.6K20

    大数据技术之_16_Scala学习_11_客户信息管理系统+并发编程模型 Akka+Akka 网络编程-小黄鸡客服案例+Akka 网络编程-Spark Master Worker 进程通讯项目

    CustomerService.scala 中定义一个方法根据 id 修改用户(更新用户)的方法 和 // 根据 id 查找用户信息 的方法   // 根据 id 查找用户信息   def findCustomerById...注意:需要如图勾选,update snapshots,而且不需要联网,如果使用 maven 解决依赖后,仍然 pom.xml 有误,则只需要重启下 idea, 或者动一下 pom.xml 文件(不用改)...3、当 B Actor 在 receive 方法中接收到消息,需要回复时,可以通过 sender() 获取到发送 Actor 的代理对象。 如何理解 Actor 的 receive 方法被调用?...一个小技巧:网络不通时,如何确定是哪一个路由(ip地址)出现问题?答:使用 tracert 指令。演示如下: ?... " + id + " 的心跳时间 ")     } WorkerActor.scala 中增加代码       // 当客户端注册成功后,就定义一个定时器,每隔一定时间,发送 SendHeartBeat

    1.9K30

    Akka(2):Actor生命周期管理 - 监控和监视

    任何子级Actor在运算中发生异常后立即将自己和自己的子级Actor运算挂起,并将下一步行动交付给自己的父级Actor决定。...] def props: Props } 注意以上源代码中Backoff.onFailure和Backoff.onStop的使用说明:当一个预设为永生的子级Actor由于某些原因而停止后再重启时用onStop...所以在处理异常时我们应该使用onFailure。 我们看到BackoffSupervior提供了更详细的重启方式支持。...那么如何捕捉Actor终止的信息呢?Akka提供了context.watch和context.unwatch来设置通过ActorRef对任何Actor的终止状态监视,无须父子级别关系要求。...,更新销售额 5、完成销售目标后通知厨房打烊 6、收款员看到厨房打烊后停业关门 另外,可能出现几种异常情况:厨房的大厨可能忙不过来准备特价咖啡、收据打印机有可能卡纸。

    2.5K80

    CQRS架构简介_公司架构图

    这些Event Handler可能是更新Q端的ReadDB,也可能是发送邮件,也可能是调用外部系统的接口。...ES的方式持久化; 持久化完成后,更新actor的状态; 更新状态完成后,再处理mailbox中的下一个消息; 从上面的过程,我们可以看出,akka框架本质上也实现了避免资源竞争的原则,因为每个actor...所以,我也是认为,事件不必异步持久化,完全可以像akka框架那样,产生的事件先同步持久化,完成后再更新actor的状态即可。...上面说了,akka框架的核心工作原理,以及其他一些方面,比如akka会确保一个actor实例在集群中只有一个。这点其实也是和本文说的一样,也是为了避免资源竞争,包括它的mailbox也是一样。...之前我设计ENode时,没了解过akka框架,后来我学习后,发现和ENode的思想是如此接近,呵呵。

    1K20

    通过 Laravel Eloquent 模型实现简单增删改查操作

    实际上,Eloquent 模型类底层的查询也是基于查询构建器来实现的,你可以在模型类上调用所有查询构建器的 Where 查询方法,同样是以流接口的模式构建方法链调用即可。...此外,如果查询的条件是主键 ID 的话,还可以将上述调用简化为通过 find 方法来实现: $user = User::find(1); 返回结果与上面完全一致。...注:除获取单条记录之外,ELoquent 模型类查询返回的结果都是集合类,因此你可以在查询结果上调用集合类的所有方法,还可以自定义模型对应集合类,详情请查看对应官方文档。...,这可以借助查询构建器来实现: Post::where('views', '>', 0)->update(['views' => 100]); 删除数据 通过模型类删除对应数据表记录和更新记录类似,都要先获取对应操作模型实例...,删除对应记录更简单,获取到模型实例后,直接调用其删除方法即可: $post = Post::find(31); $post->delete(); 这样,就完成了 id = 31 对应数据表记录的删除

    8K20

    ThinkPHP5.1学习笔记 数据库操作

    * FROM `think_user` WHERE `status` = 1 select 方法查询结果是一个二维数组,如果结果不存在,返回空数组 如果希望在没有查找到数据后抛出异常可以使用 try...方法的第一个参数的作用和name方法一样,如果需要使用不同的数据库连接,可以使用: db('user','db_config1')->where('id', 1)->find(); 查询某个字段的值可以用...@qq.com') ->find(); 链式查询简单认识 查询规则 通过指向符号 -> 多次连续的调用方法 Db::table('think_user') ->where('status...]; // 笔者在insert()的时候,发现中文内容添加后会在数据库中显示空白 Db::name('user')->insert($data); insert 方法添加数据成功返回添加成功的条数,通常情况返回...* FROM think_user WHERE type=1 AND status=1 这里的where多样运用需要开发者按照自己的喜好和实际的开发需求决定如何使用;如果无法运用就直接使用原生!

    2K10

    爆肝3W字,全网最全爬虫自动化获取企业招标信息,招标网、爱企查...

    执行查询后,cursor.fetchall 方法被调用以获取查询结果的完整列表。这个方法会返回一个列表,其中每个元素都是一个包含查询结果行的元组。...SQL执行:连接成功后,函数创建一个游标对象cursor,并使用cursor.execute(sql)执行传入的SQL语句。...然而,由于前面的异常处理可能导致函数提前返回,这个新创建的游标实际上只有在没有异常发生时才会被使用。...查询公司名称:无论前面的SQL语句执行成功与否(实际上,由于异常处理的存在,如果失败则函数不会执行到这里),函数都会尝试使用新创建的游标执行一个查询语句,从company表中检索所有公司的名称。...结果处理:查询结果通过cursor.fetchall()获取,并使用列表推导式处理成只包含公司名称的列表。 返回值:函数返回处理后的公司名称列表。

    36810

    Java一分钟之-Akka:反应式编程框架

    本文将带你快速入门Akka,探讨其核心概念、常见问题、易错点及如何避免,同时辅以代码示例,让你一分钟内领略Akka的魅力。...每个Actor都有自己的邮箱,通过发送消息而非直接调用方法来与其他Actor通信,这使得并发控制变得简单且安全。此外,Akka提供了故障处理机制,支持Actor的生命周期管理和容错策略。...阻塞Actor 问题描述:在Actor中执行耗时操作(如数据库查询、网络请求)会阻塞该Actor处理其他消息的能力。...错误的消息处理 问题描述:不恰当的消息类型处理可能导致Actor行为异常。 解决方案:在Actor类中实现unhandled方法,捕获未处理的消息类型,并给出合理的响应或日志记录。...合理使用并发工具:如使用ActorSystem.scheduler()安排定时任务,避免直接使用线程池。 监控与日志:充分利用Akka的日志和监控功能,及时发现并解决问题。

    93310

    使用Lagom和Java构建反应式微服务系统

    该接口不仅定义了如何调用和实现服务,还定义了描述如何将接口映射到底层传输协议的元数据。通常,服务描述符,其实现和消费应该与正在使用的传输方式无关,无论是REST,Websockets还是其他传输。...上述服务调用使用严格的消息。 流式传输消息是Source类型的消息。 Source是一种允许异步流式传输和处理消息的Akka流API。 ? 此服务调用具有严格的请求类型和流响应类型。...为了在Lagom中实现这一点,持久性模块促进了使用事件源(ES)和命令查询责任分隔(CQRS)。事件溯源是将所有更改作为域事件捕获的做法,这是事件发生的不可变事实。...当一个事件成功保存时,通过将事件应用到当前状态来更新当前状态。用于更新状态的功能使用BehaviorBuilder的setEventHandler方法进行注册。...在持续新事件和重播事件时都使用事件处理程序。 ? 事件处理程序通常只是更新状态,但它们也可以改变实体的行为,因为可以定义用于处理命令和事件的新功能。

    1.9K50
    领券