首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >聊聊go.cqrs的DomainRepository

聊聊go.cqrs的DomainRepository

原创
作者头像
code4it
修改于 2021-04-09 02:05:22
修改于 2021-04-09 02:05:22
3710
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下go.cqrs的DomainRepository

DomainRepository

代码语言:javascript
复制
// DomainRepository is the interface that all domain repositories should implement.
type DomainRepository interface {
    //Loads an aggregate of the given type and ID
    Load(aggregateTypeName string, aggregateID string) (AggregateRoot, error)

    //Saves the aggregate.
    Save(aggregate AggregateRoot, expectedVersion *int) error
}

DomainRepository定义了Load、Save方法

GetEventStoreCommonDomainRepo

代码语言:javascript
复制
// GetEventStoreCommonDomainRepo is an implementation of the DomainRepository
// that uses GetEventStore for persistence
type GetEventStoreCommonDomainRepo struct {
    eventStore         *goes.Client
    eventBus           EventBus
    streamNameDelegate StreamNamer
    aggregateFactory   AggregateFactory
    eventFactory       EventFactory
}

// Load will load all events from a stream and apply those events to an aggregate
// of the type specified.
//
// The aggregate type and id will be passed to the configured StreamNamer to
// get the stream name.
func (r *GetEventStoreCommonDomainRepo) Load(aggregateType, id string) (AggregateRoot, error) {

    if r.aggregateFactory == nil {
        return nil, fmt.Errorf("The common domain repository has no Aggregate Factory.")
    }

    if r.streamNameDelegate == nil {
        return nil, fmt.Errorf("The common domain repository has no stream name delegate.")
    }

    if r.eventFactory == nil {
        return nil, fmt.Errorf("The common domain has no Event Factory.")
    }

    aggregate := r.aggregateFactory.GetAggregate(aggregateType, id)
    if aggregate == nil {
        return nil, fmt.Errorf("The repository has no aggregate factory registered for aggregate type: %s", aggregateType)
    }

    streamName, err := r.streamNameDelegate.GetStreamName(aggregateType, id)
    if err != nil {
        return nil, err
    }

    stream := r.eventStore.NewStreamReader(streamName)
    for stream.Next() {
        switch err := stream.Err().(type) {
        case nil:
            break
        case *url.Error, *goes.ErrTemporarilyUnavailable:
            return nil, &ErrRepositoryUnavailable{}
        case *goes.ErrNoMoreEvents:
            return aggregate, nil
        case *goes.ErrUnauthorized:
            return nil, &ErrUnauthorized{}
        case *goes.ErrNotFound:
            return nil, &ErrAggregateNotFound{AggregateType: aggregateType, AggregateID: id}
        default:
            return nil, &ErrUnexpected{Err: err}
        }

        event := r.eventFactory.GetEvent(stream.EventResponse().Event.EventType)

        //TODO: No test for meta
        meta := make(map[string]string)
        stream.Scan(event, &meta)
        if stream.Err() != nil {
            return nil, stream.Err()
        }
        em := NewEventMessage(id, event, Int(stream.EventResponse().Event.EventNumber))
        for k, v := range meta {
            em.SetHeader(k, v)
        }
        aggregate.Apply(em, false)
        aggregate.IncrementVersion()
    }

    return aggregate, nil

}

// Save persists an aggregate
func (r *GetEventStoreCommonDomainRepo) Save(aggregate AggregateRoot, expectedVersion *int) error {

    if r.streamNameDelegate == nil {
        return fmt.Errorf("The common domain repository has no stream name delagate.")
    }

    resultEvents := aggregate.GetChanges()

    streamName, err := r.streamNameDelegate.GetStreamName(typeOf(aggregate), aggregate.AggregateID())
    if err != nil {
        return err
    }

    if len(resultEvents) > 0 {

        evs := make([]*goes.Event, len(resultEvents))

        for k, v := range resultEvents {
            //TODO: There is no test for this code
            v.SetHeader("AggregateID", aggregate.AggregateID())
            evs[k] = goes.NewEvent("", v.EventType(), v.Event(), v.GetHeaders())
        }

        streamWriter := r.eventStore.NewStreamWriter(streamName)
        err := streamWriter.Append(expectedVersion, evs...)
        switch e := err.(type) {
        case nil:
            break
        case *goes.ErrConcurrencyViolation:
            return &ErrConcurrencyViolation{Aggregate: aggregate, ExpectedVersion: expectedVersion, StreamName: streamName}
        case *goes.ErrUnauthorized:
            return &ErrUnauthorized{}
        case *goes.ErrTemporarilyUnavailable:
            return &ErrRepositoryUnavailable{}
        default:
            return &ErrUnexpected{Err: e}
        }
    }

    aggregate.ClearChanges()

    for k, v := range resultEvents {
        if expectedVersion == nil {
            r.eventBus.PublishEvent(v)
        } else {
            em := NewEventMessage(v.AggregateID(), v.Event(), Int(*expectedVersion+k+1))
            r.eventBus.PublishEvent(em)
        }
    }

    return nil
}

GetEventStoreCommonDomainRepo定义了eventStore、eventBus、streamNameDelegate、aggregateFactory、eventFactory属性,其Load方法先通过r.aggregateFactory.GetAggregate获取aggregate,再通过r.streamNameDelegate.GetStreamName(aggregateType, id)获取streamName,然后通过r.eventStore.NewStreamReader去遍历event,挨个执行aggregate.Apply(em, false)及aggregate.IncrementVersion();其Save方法先通过aggregate.GetChanges()获取resultEvents,再遍历resultEvents构造goes.Event,之后通过streamWriter.Append写入,然后执行aggregate.ClearChanges(),最后执行r.eventBus.PublishEvent

小结

go.cqrs的DomainRepository定义了Load、Save方法;GetEventStoreCommonDomainRepo实现了DomainRepository接口,其Load方法主要是读取event,然后挨个执行aggregate.Apply;其Save方法主要是将aggregate.GetChanges()转换为event,然后通过streamWriter.Append写入,然后执行aggregate.ClearChanges(),最后执行r.eventBus.PublishEvent。

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
高薪面试题之三.DB必备
40+ 个非常有用的 Oracle 查询语句,主要涵盖了日期操作,获取服务器信息,获取执行状态,计算数据库大小等等方面的查询。这些是所有 Oracle 开发者都必备的技能,所以快快收藏吧!
张哥编程
2024/12/17
3520
一线运维 DBA 五年经验常用 SQL 大全(一)
本文 SQL 均是在运维工作中总结整理而成的,对于运维 DBA 来说可提高很大工作效率,当然如果你全部能够背下来那就牛逼了,如果不能,建议收藏下来慢慢看,每条 SQL 的使用频率都很高,肯定能够帮助到你。
JiekeXu之路
2021/02/23
1.6K0
笔记分享(1) oracle常用查询语句
以下的oracle常用查询笔记是我之前工作中用到过的. 其实常用的查询差不多就是这些.
大大刺猬
2021/04/01
1.5K0
Oracle DBA的SQL编写技能提升宝典(含SQL资源)
背景:要迁移数据库,需要创建与源库相同的表空间,大小与源库相同。由于个别表空间较大,手工添加可能需要写很多的脚本,于是同事通过PL/SQL解决了问题。
数据和云
2021/10/13
1.4K0
Oracle DBA的SQL编写技能提升宝典(含SQL资源)
【DB笔试面试451】Oracle常用日期处理函数有哪些?这些函数可以实现哪些功能?
(5)查询当前数据库日期格式的命令:SELECT SYS_CONTEXT('USERENV','NLS_DATE_FORMAT') FROM DUAL;。
AiDBA宝典
2019/09/30
1.4K0
sql基础知识:日期的常用用法
日期操作 select sysdate,add_months(sysdate,12) from dual; -- + 1 year select sysdate,add_months(sysdate,1) from dual; -- + 1 month select sysdate,to_char(sysdate+7,'yyyy-mm-dd HH24:MI:SS') from dual; -- + 1 week select sysdate,to_char(sysdate+1,'yyyy-mm-dd HH2
用户1154259
2018/01/17
1K0
Oracle定时任务
oracle job 是应用在数据库层面,用来定时执行存储过程或者 SQL 语句的定时器。
overme
2022/01/17
2.9K0
Oracle定时任务
关于Oracle Job定时任务配置讲解
几天前,公司的job调度出现了问题,由于权限管的严,没有查看Oracle 一些重要的数据字典,后面联系DBA,是由于数据库切换到备机时,参数设置不对,导致db job没有正常调度。
星哥玩云
2022/08/17
2.8K0
关于Oracle Job定时任务配置讲解
Oracle Job创建及使用详解
Oracle job有定时执行的功能,可以在指定的时间点或每天的某个时间点自行执行任务。 一、查询系统中的job,可以查询视图 --相关视图 select * from dba_jobs; select * from all_jobs; select * fromuser_jobs; -- 查询字段描述 /* 字段(列) 类型 描述 JOB NUMBER 任务的唯一标示号 LOG_USER VARCHAR2(30) 提交任务的用户 PRIV_USER VARCHAR2(30) 赋予任务权限的用户 SCHEMA_USER VARCHAR2(30) 对任务作语法分析的用户模式 LAST_DATE DATE 最后一次成功运行任务的时间 LAST_SEC VARCHAR2(8) 如HH24:MM:SS格式的last_date日期的小时,分钟和秒 THIS_DATE DATE 正在运行任务的开始时间,如果没有运行任务则为null THIS_SEC VARCHAR2(8) 如HH24:MM:SS格式的this_date日期的小时,分钟和秒 NEXT_DATE DATE 下一次定时运行任务的时间 NEXT_SEC VARCHAR2(8) 如HH24:MM:SS格式的next_date日期的小时,分钟和秒 TOTAL_TIME NUMBER 该任务运行所需要的总时间,单位为秒 BROKEN VARCHAR2(1) 标志参数,Y标示任务中断,以后不会运行 INTERVAL VARCHAR2(200) 用于计算下一运行时间的表达式 FAILURES NUMBER 任务运行连续没有成功的次数 WHAT VARCHAR2(2000) 执行任务的PL/SQL块 CURRENT_SESSION_LABELRAW MLSLABEL 该任务的信任Oracle会话符 CLEARANCE_HI RAW MLSLABEL 该任务可信任的Oracle最大间隙 CLEARANCE_LO RAW MLSLABEL 该任务可信任的Oracle最小间隙 NLS_ENV VARCHAR2(2000) 任务运行的NLS会话设置 MISC_ENV RAW(32) 任务运行的其他一些会话参数 */ -- 正在运行job select * fromdba_jobs_running; 其中最重要的字段就是job这个值就是我们操作job的id号,what 操作存储过程的名称,next_date 执行的时间,interval执行间隔 二、执行间隔interval运行频率
流浪的猫666
2018/06/26
2.1K0
ORACLE常用性能监控SQL【二】
条件为什么block>100,因为一些很小的表,只有几行数据实际大小很小,但是block一次性分配就是5个(11g开始默认一次性分配1M的block大小了,见create table storged的NEXT参数),5个block相对于几行小表数据来说就相差太大了
小小工匠
2021/08/16
4.2K0
oracle日期时间函数总结
常常写 sql 的同学应该会接触到一些 oracle 的日期时间函数, 比如: 財务软件或者人力资源软件须要依照每年, 每季度, 每月, 甚至每一个星期来进行统计.
全栈程序员站长
2022/07/12
1.9K0
Oracle 系统表大全
数据字典dict总是属于Oracle用户sys的。   1、用户:    select username from dba_users;   改口令    alter user spgroup identified by spgtest;   2、表空间:    select * from dba_data_files;    select * from dba_tablespaces;//表空间
一见
2018/08/07
7240
DBA常用SQL语句(3)- cache、undo、索引和等待事件
Undospace=URUPSblocksize + overload(10%), 计算 undo tablespace 大小
Yunjie Ge
2022/04/23
5490
Oracle 最常用功能函数经典汇总
Oracle 最常用功能函数经典汇总 SQL中的单记录函数 1.ASCII 返回与指定的字符对应的十进制数; SQL> select ascii('A') A,ascii('a') a,ascii('0') zero,ascii(' ') space from dual;         A         A      ZERO     SPACE --------- --------- --------- ---------        65        97    
阿新
2018/04/09
1.7K0
Oracle迁移MySQL 8特殊SQL处理 顶
我们创建一个表,并生成两个表分区CUS_PART1,CUS_PART2.关于分区的分类可以参考https://www.cnblogs.com/wnlja/p/3979684.html
算法之名
2020/05/18
1.3K0
Oracle迁移MySQL 8特殊SQL处理
                                                                            顶
Oracle常用语句
INSERT INTO 表名(字段名1, 字段名2, ……) VALUES ( 值1, 值2, ……); INSERT INTO 表名(字段名1, 字段名2, ……) SELECT (字段名1, 字段名2, ……) FROM 另外的表名;
泰斗贤若如
2019/06/19
3.3K0
Oracle根据时间查询
以下SQL,只会查询2021-01-01至2021-1-2 00:00:00的数据
鱼找水需要时间
2023/02/16
2.8K0
Oracle根据时间查询
从迁移开发角度看差异:Oracle vs MySQL
随着近些年来数据库技术发展演进,及国内数据库日益活跃。越来越多的企业将数据库从传统商业数据库迁移到开源或国产数据库平台。本文对比了最为常见的一种情况,从Oracle迁移到MySQL需要关注的一些差异点。这方便应用研发在迁移之初做好必要的评估备。此外,因MySQL生态发展很广泛,很多数据库产品会将MySQL作为兼容的首选。因此,很多其他类型的数据库迁移,也可以参考此文内容。
用户5548425
2021/11/18
1.7K0
从迁移开发角度看差异:Oracle vs MySQL
Oracle SQL调优之表设计
分区表使用与查询频繁而更新数据不频繁的情况,不过要记得加全局索引,而不加分区索引,分区类型:分区分为范围分区、列表分区、HASH分区、组合分区四种,用了分区表,查询时就定位到对应的区,而不用全表,所以查询效率比普通表好,当然有很多细节,还是建议看《收获,不止sql优化》一书
SmileNicky
2019/06/11
6010
oracle: job使用
oracle的job,实际上就是数据库内置的定时任务,类似代码中的Timer功能。下面是使用过程: 这里我们模拟一个场景:定时调用存储过程P_TEST_JOB 向表TEST_JOB_LOG中插入数据 表结构: 1 create table TEST_JOB_LOG 2 ( 3 rec_id NUMBER not null, 4 occr_time DATE 5 ); 6 alter table TEST_JOB_LOG 7 add constraint PK_TEST_JOB primary k
菩提树下的杨过
2018/01/19
9950
相关推荐
高薪面试题之三.DB必备
更多 >
目录
  • DomainRepository
  • GetEventStoreCommonDomainRepo
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
首页
学习
活动
专区
圈层
工具
MCP广场
首页
学习
活动
专区
圈层
工具
MCP广场