有多种方式来进行限界上下文之间的集成
任何一个分布式开发者都应该知道一下分布式计算原则:
我们需要一种可靠的方法在两个系统之间传递信息数据,所传递的信息数据的结构应该能被所有的系统所消费有多种方式可以生成信息数据的结构,比如xml,json
当一个限界上下文以URI的方式提供了大量的REST资源时,我们便可以称其为开放主机服务(3)。
开发主机服务:为系统所提供的服务定义一套协议,开放该协议以使其他需要集成的系统能够使用,在有新的集成需求时,对协议进行改进和扩展
本章以SaaSOvation公司将REST原则应用与 身份与访问上下文 为例展开
用户是否扮演某个角色(/tenants/{tenantId}/users/{username}/inRole/{role})
@RestController
@RequestMapping("/tenants/{tenantId}/users")
public class UserSource {
@Resource
private AccessService accessService;
@GetMapping("{username}/inRole/{role}")
public Response getUserInRole(@RequestParam String aTenantId,
@RequestParam String aUsername,
@RequestParam String aRoleName) {
Response response = null;
User user = null;
try{
this.accessService.userInRole(aTenantId, aUsername, aRoleName);
}catch (Exception e) {
// 跳过异常
}
if (user != null) {
response = this.userInRoleResponse(user, aRoleName);
}
return response;
}
}
public class AccessService {
@Transactional(readOnly= true)
public User userInRole(String aTenantId, String aUse rname, String aRoleName) {
User userInRole = null;
TenantId tenantId = new TenantId(new TenantId(aTenantId));
User user = DomainRegistry
.userRepository()
.userWithUsername(tenantId, aUsername);
if (user != null) {
Role role = DomainRegistry
.roleRepository()
.roleNamed(tenantId, aRoleName);
if (role != null) {
GroupMemberService groupMemberService = DomainRegistry.groupMemberService();
if (role.isInRole(user, groupMemberService)) {
userInRole = user;
}
}
}
return userInRole;
}
}
请求的输出
{
"role": "Author", "username": "zoe",
"tenantId": "A94A8298-43B8-4DA0-9917-13FFF9E11 6ED",
"firstName": "Zoe",
"lastName": "Doe",
"emailAddress" : "zoe@saasovation. com"
}
对于客户方来说,虽然身份与访问上下文所提供的JSON展现数据非常有用,但是当我们考虑到DDD的目标时,客户方的限界上下文是不会原封不动地消费这些JSON数据的。在前面的章节中我们已经讲到,如果消费方是协作上下文,该上下文的开发团队对原生的用户和角色信息并不会感兴趣,他们关心的是更加特定于自身领域的角色。此时,单纯地使用User和Role领域对象对他们来说已经不再适用。
外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7KQxWz10-1661076224942)(/Users/didi/Library/Application Support/typora-user-images/image-20220819102722736.png)
UserInRoleAdapter、CollaboratorService、CollaboratorTranslator 属于防腐层
CollaboratorService -> 位于 领域层
public interface CollaboratorService {
public Author authorFrom(Tenant aTenant, String anIdentity);
public Creator creatorFrom(Tenant aTenant, String anIdentity);
public Moderator moderatorFrom(Tenant aTenant, String anIdentity);
public Owner ownerFrom(Tenant aTenant, String anIdentity);
public Participant participantFrom(Tenant aTenant, String anIdentity);
}
TranslatingCollaboratorService -> 位于 基础设施层
package com.saasovation.collaboration.infrastructure.services;
import com.saasovation.collaboration.domain.model.collaborator.Author;
public class TranslatingCollaboratorService implements CollaboratorService{
@Override
public Author authorFrom(Tenant aTenant, String anIdentity){
Author author = this.userInRoleAdapter
.toCollaborator(aTenant, anIdentity, "Author",Author.class);
return author;
}
}
UserInRoleAdapter -> 位于 基础设施层
package com.saasovation.collaboration.infrastructure.services;
import org.jboss.resteasy.client.ClientRequest;
import org.jboss.resteasy.client.ClientResponse;
public class UserInRoleAdapter(
public <T extends Collaborator> T toCollaborator(
Tenant aTenant,
String anIdentity, String aRoleName,
Class<T> aCollaboratorClass){
T collaborator = nul1;
try{
ClientRequest request=this.buildRequest(aTenant, anIdentity, aRoleName);
ClientResponse<String>response=request.get(String.class);
if (response.getStatus()==200){
collaborator=new CollaboratorTranslator().toCollaboratorFromRepresentation(
response.getEntity(),
aCollaboratorClass);
} else if (response.getStatus()!=204){
throw new IllegalStateException("There was a problem requesting the user:" +anIdentity+"in role:" +aRoleName+ " with resulting status:"+response.getStatus());
} catch(Throwable t){
throw new IllegalStateException("Failed because:"+t.getMessage(),t);
}
return collaborator;
}
}
CollaboratorTranslator -> 位于 基础设施层
package com.saasovation.collaboration.infrastructure.services;
import java.lang.reflect.Constructor;
import com.saasovation.common.media.RepresentationReader;
public class CollaboratorTranslator{
public CollaboratorTranslator(){
super();
}
public <T extends Collaborator> T toCollaboratorFromRepresentation(String aUserInRoleRepresentation,Class<T> aCollaboratorClass) throws Exception{
RepresentationReader reader=new RepresentationReader(aUserInRoleRepresentation);
String username = reader.stringValue("username");
String firstName = reader.stringValue("firstName");
String lastName = reader.stringValue("lastName");
String emailAddress = reader.stringValue("emailAddress");
T collaborator=this.newCollaborator(username, firstName, lastName, emailAddress,aCollaboratorClass);
return collaborator;
}
private <T extends Collaborator> T newCollaborator(
String aUsername,
String aFirstName,
String aLastName,
String aEmailAddress,
Class<T> aCollaboratorClass) throws Exception{
Constructor<T> ctor=aCollaboratorClass.getConstructor(String.class, String.class, String.class);
T collaborator=ctor.newInstance(aUsername,(aFirstName + " " + aLastName).trim(), aEmailAddress);
return collaborator;
}
}
Author extends Collaborator
package com.saasovation.collaboration.domain.model.collaborator;
public final class Author extends Collaborator{
public Author(String anIdentity, String aName,String anEmailAddress){
super(anIdentity, aName, anEmailAddress);
}
}
接下来看一下应用服务怎么获取Author,然后将其交给forum来开始一个新的Discussion
package com.saasovation.collaboration.application;
public class ForumService{
@Transactional
public Discussion startDiscussion(
String aTenantId,
String aForumId,
String anAuthorId,
String aSubject){
Tenant tenant= new Tenant(aTenantId);
ForumId forumId = new ForumId(aForumId);
Forum forum = this.forum(tenant, forumId);
if (forum ==nu11){
throw new IllegalStateException("Forum does not exist.");
}
Author author=this.collaboratorService.authorFrom(tenant, anAuthorId);
Discussion newDiscussion=forum.startDiscussion(this.forumNavigationService(), author, aSubject);
this.discussionRepository.add(newDiscussion);
return newDiscussion;
}
}
在使用消息进行继承时,任何一个系统都可以获得更高层次的自治性。只要消息基础设施正常工作,即使其中一个交互系统不可用,消息依然可以得到发送和投递。当事件发生时,系统将通过消息机制将这些时间发送到对事件感兴趣的相关方。
本章以Scrum产品和 敏捷项目管理上下文 的交互为例,给出通过消息集成的方式
敏捷项目管理上下文将通过身份与访问上下文来管理不同的角色。在身份与访问上下文中,每一个订阅的租户都会创建2个Role实例:ScrumProductOwner和ScrumTeamMember。每一个需要 扮演某种角色的User都会被指派给相应的Role。在该限界上下文的应用服务中,我们通过以下方式来实现:
AccessService -> 位于 应用层
package com.saasovation.identityaccess.application;
public class AccessService{
@Transactional
public void assignUserToRole(AssignUserToRoleCommand aCommand){
TenantId tenantId=new TenantId(aCommand.getTenantId());
User user=this.userRepository.userWithUsername(tenantId, aCommand.getUsername());
if (user!=null){
Role role=
this.roleRepository.roleNamed(tenantId,aCommand.getRoleName());
if (role != null){
role.assignUser(user);
}
}
}
}
敏捷项目管理上下文又如何知道是谁扮演了ScrumTeamMember或者ScrumProductOwner呢?
答案是:当Role中的 assignUser() 方法执行完毕时,它将发布一个事件
Role
package com.saasovation.identityaccess.domain.model.access;
public class Role extends Entity{
public void assignUser(User aUser){
if (aUser == null){
throw new NullPointerException("User must not be nu11.");
}
if (!this.tenantId().equals (aUser.tenantId())){
throw new IllegalArgumentException("Wrong tenant for this user.");
}
this.group().addUser (aUser);
DomainEventPublisher.instance()
·publish(new UserAssignedToRole(this.tenantId(),
this.name(),
aUser.username(),
aUser.person().name().firstName(),
aUser.person().name().lastName(),
aUser.person().emailAddress()));
}
}
最终 它将被投递到所有感兴趣的相关方。当敏捷项目管理上下文收到该事件时,它将相应地创建一个新的TeamMember或者ProductOwner。
比较好的实现就是通过消息队列实现,比如:rabbitMQ、rocketMQ、kafka等
接下来看一下书中的例子,通过rabbitMQ实现的代码
抽象的监听基类ExchangeListener
package com.saasovation.common.port.adapter.messaging.rabbitmg;
public abstract class ExchangeListener {
private MessageConsumer messageConsumer;
private Queue queue;
public ExchangeListener () {
super ();
}
this.attachToQueue ();
this.registerConsumer ():
protected abstract String exchangeName ();
protected abstract void filteredDispatch(String aType, String aTextMessage);
protected abstract String [] listensToEvents();
protected String queueName () {
return this.getClass ().getSimpleName ();
}
private void attachToQueue () {
Exchange exchange = Exchange.fanOutInstance (ConnectionSettings.instance (), this.exchangeName (), true);
this.queue = Queue.individualExchangeSubscriberInstance (exchange, this.exchangeName () + "." + this.queueName ());
}
private Queue queue () {
return this.queue;
}
private void registerConsumer () {
this.messageConsumer = MessageConsumer.instance (this.queue (), false);
this.messageConsumer.receiveOnly(this.listensToEvents (), new MessageListener (MessageListener.Type. TEXT) {
@Override
public void handleMessage(String aType, String aMessageId, Date aTimestamp, String aTextMessage, long aDeliveryTag,boolean isRedelivery) throws Exception {
filteredDispatch (aType, aTextMessage);
}
}
}
}
核心方法总共有三个:
exchangeName: 交换器的名称
listensToEvents: 事件的全类名
filteredDispatch: 消费事件的核心逻辑
下面看一下UserAssignedToRole对应的监听器TeamMemberEnablerListener
package com.saasovation.agilepm.infrastructure.messaging;
public class TeamMemberEnablerListener extends ExchangeListener{
@Autowired
private TeamService teamService;
public TeamMemberEnablerListener(){
super();
}
@Override
protected String exchangeName(){
return Exchanges.IDENTITY_ACCESS_EXCHANGE_NAME;
}
@Override
protected void filteredDispatch(String aType, String aTextMessage){
NotificationReader reader=new NotificationReader(aTextMessage);
String roleName = reader.eventStringValue ("roleName");
if (!roleName.equals ("ScrumProductOwner")&&!roleName.equals ("ScrumTeamMember")){
return;
}
String emailAddress = reader.eventStringValue ("emailAddress");
String firstName = reader.eventStringValue("firstName");
String lastName = reader.eventStringValue("lastName");
String tenantId=reader.eventStringValue("tenantId.id");
String username=reader.eventStringValue("username");
Date occurredon=reader.occurredon();
if (roleName.equals("ScrumProductOwner")){
this.teamService.enableProductOwner(
new EnableProductOwnerCommand(tenantId,username, firstName, lastName, emailAddress,occurredon));
}else{
this.teamService.enableTeamMember(
new EnableTeamMemberCommand( tenantId,username, firstName, lastName, emailAddress,occurredon));
}
}
@Override
protected String[] listensToEvents(){
return new String[]{"com.saasovation.identityaccess.domain.model. access.UserAssignedToRole"};
}
}
teamService
package com.saasovation.agilepm.application;
public class TeamService {
@Autowired
private ProductOwnerRepository productOwnerRepository;
@Autowired
private TeamMemberRepository teamMemberRepository;
@Transactional
public void enableProductOwner(EnableProductOwnerCommand aCommand){
TenantId tenantId=new TenantId(aCommand.getTenantId());
ProductOwner productOwner=this.productOwnerRepository·productOwnerOfIdentity(tenantId,aCommand.getUsername());
if (productOwner!=nu1l){
productOwner.enable(aCommand.getOccurredon());
}else{
productOwner=new ProductOwner(tenantId,aCommand.getUsername(), aCommand.getFirstName(), aCommand.getLastName(), aCommand.getEmailAddress () ,aCommand.getOccurredon());
this.productOwnerRepository.add(productOwner);
}
}
}
enableProductOwner和enableTeamMember处理逻辑是一样,这里就不写enableTeamMember的实现了
讲到这里通过消息集成限界上下文一个简单的实现就完事了,但是如果仅仅是这样实现还会存在一些问题,下面就让我们一起看一下存在的问题,以及对应的解决方法
例如,在身份与访问上下文中,如果一个管理者错误地将Joe Johnson所扮演的ScrumTeamMember角色解除了,情况会怎么样?当然,我们会收到一个事件通知,然后调用TeamService将Joe Johnson所对应的TeamMember转为失活状态。等一等, 几秒钟之后,该管理者意识到了错误,她真正应该被操作的User是Joe Jones,而不是Joe Johnson。因此,她立即将ScrumTeamMember角色再次指派给Joe Johnson, 然后解除Joe Jones所扮演的ScrumTeamMember角色。之后,敏捷项目管理上下文将 接收到相应的通知,万事大吉。也或者,万事真的就大吉了吗?
对于这个用例来说,我们做出了错误的假设,即假设通知的接收顺序和它们在身份与访问上下文中的产生顺序相同。但是,事实却不总是如此。对于Joe Johnson来说,如果我们先接收到了UserAssignedToRole事件,再接收到 UserUnassignedToRole事件,情况又会如何呢?在所有事件处理完后,Joe Johnson 所对应的TeamMember将依然处于失活状态。此时,有人可能需要向敏捷项目管理上下文的数据库中打些补丁,或者管理者需要玩弄一些小技巧将Joe Johnson重新激活。这种情况是有可能发生的,并且比我们所想象的发生频率更高。那么,我们应该如何避免这种情况呢?
让我们仔细看看传给TeamServiceAPI的命令对象,比如EnableTeam MemberCommand和DisableTeamMemberCommand。这两个命令对象都需要 提供一个Data对象,即occurredOn属性。事实上,所有的命令对象都是如此设计的。我们将使用该occurredOn属性来确保ProductOwner和TeamMember是 以正确的时间顺序来处理命令操作的。对于前面的UserAssignedToRole先于UserUnassignedToRole被接收的情况,我们看看如何处理:
TeamService
package com.saasovation.agilepm.application;
public class TeamService{
@Transactional
public void disableTeamMember(DisableTeamMemberCommand aCommand){
TenantId tenantId=new TenantId(aCommand.getTenantId());
TeamMember teamMember=this.teamMemberRepository.teamMemberofIdentity(tenantId,aCommand.getUsername());
if (teamMember != nul1){
teamMember.disable(aCommand.getOccurredon());
}
}
}
Member
package com.saasovation.agilepm.domain.model.team;
public abstract class Member extends Entity{
private MemberChangeTracker changeTracker;
public void disable(Date asOfDate){
if (this.changeTracker().canToggleEnabling(asOfDate)){
this.setEnabled(false);
this.setChangeTracker(this.changeTracker().enablingOn (asOfDate));
}
}
public void enable(Date asOfDate){
if (this.changeTracker().canToggleEnabling(asofDate)){
this.setEnabled(true);
this.setChangeTracker(
this.changeTracker().enablingOn(asOfDate));
}
}
MemberChangeTracker
package com.saasovation.agilepm.domain.model.team;
public final class MemberChangeTracker implements Serializable {
private Date emailAddressChangedOn;
private Date enablingOn;
private Date nameChangedon;
public boolean canToggleEnabling(Date asOfDate){
return this.enablingOn().before (asOfDate);
}
public MemberChangeTracker enablingOn(Date asofDate){
return new MemberChangeTracker(asOfDate, this.nameChangedOn(), this.emailAddressChangedOn());
}
}
这里我们通过创建产品的用例,来看一下代码如何实现的
1.用户提供Product的描述信息 2.用户希望为该Product创建一个Discussion 3.用户发出创建Product的请求 4.系统创建一个Product,连同Forum和Discussion
ProductService
package com.saasovation.agilepm.application;
public class ProductService{
@Autowired
private ProductRepository productRepository;
@Autowired
private ProductOwnerRepository productOwnerRepository;
@Transactional
public String newProductWithDiscussion(NewProductCommand aCommand){
return this.newProductWith(aCommand.getTenantId(),
aCommand.getProductOwnerId(),
aCommand.getName (),
aCommand.getDescription(),
this.requestDiscussionIfAvailable());
}
}
Product
package com.saasovation.agilepm.domain.model.product;
public class Product extends ConcurrencySafeEntity{
public Product(
TenantId aTenantId,
ProductId aProductId,
ProductOwnerId aProductOwnerId,
String aName,
String aDescription,
DiscussionAvailability aDiscussionAvailability){
this();
this.setTenantId(aTenantId);
this.setProductId(aProductId);
this.setProductOwnerId(aProductOwnerId);
this.setName(aName);
this.setDescription(aDescription);
this.setDiscussion(ProductDiscussion.fromAvailability(aDiscussionAvailability));
DomainEventPublisher.instance()
·publish(new ProductCreated(this.tenantId(),
this.productId(),
this.productOwnerId(),
this.name(),
this.description(),
this.discussion().availability().isRequested()));
}
}
其中DiscussionAvailability参数有三个状态分别是:ADD_ON_NOT_ENABLED, NOT_REQUESTEDN、REQUESTED(代表需要关联Discussion)
ProductDiscussion
package com.saasovation.agilepm.domain.model.product;
public final class ProductDiscussion implements Serializable{
public static ProductDiscussion fromAvailability(DiscussionAvailability anAvailability){
if (anAvailability.isReady()){
throw new IllegalArgumentException("Cannot be created ready.");
}
DiscussionDescriptor descriptor=new DiscussionDescriptor(DiscussionDescriptor.UNDEFINED_ID);
return new ProductDiscussion(descriptor, anAvailability);
}
}
ProductDiscussionRequestedListener
public class ProductDiscussionRequestedListener extends ExchangeListener{
private static final String COMMAND="com.saasovation.collaboration.discussion.CreateExclusiveDiscussion";
@Override
protected void filteredDispatch(String aType,String aTextMessage){
NotificationReader reader=new NotificationReader(aTextMessage);
if (!reader.eventBooleanValue ("requestingDiscussion")){
return;
}
Properties parameters=this.parametersFrom(reader);
PropertiesSerializer serializer=PropertiesSerializer.instance();
String serialization = serializer.serialize(parameters);
String commandId=this.commandIdFrom(parameters);
this.messageProducer()
.send(serialization, MessageParameters.durableText Parameters(COMMAND,commandId,new Date()))
.close();
}
}
ExclusiveDiscussionCreationListener
package com.saasovation.collaboration.infrastructure.messaging;
public class ExclusiveDiscussionCreationListener extends ExchangeListener{
@Autowired
private ForumService forumService;
@Override
protected void filteredDispatch(String aType,String aTextMessage){
NotificationReader reader=new NotificationReader(aTextMessage);
String tenantId = reader.eventStringValue("tenantId");
String exclusiveOwnerId=reader.eventStringValue("exclusiveOwnerId");
String forumSubject = reader.eventStringValue ("forumTitle");
String forumDescription=reader.eventStringValue ("forumDescription");
String discussionSubject=reader.eventStringValue ("discussionSubject") ;
String creatorId =reader.eventStringValue("creatorId");
String moderatorId = reader.eventStringValue("moderatorId");
// forum这里会发布一个时间ForumStarted
forumService.startExclusiveForumWithDiscussion(
tenantId,
creatorId,
moderatorId,
forumSubject,
forumDescription,
discussionSubject,
exclusiveOwnerId);
}
}
DiscussionStartedListener
package com.saasovation.agilepm.infrastructure.messaging;
public class DiscussionStartedListener extends ExchangeListener{
@Autowired
private ProductService productService;
@Override
protected void filteredDispatch(String aType,String aTextMessage){
NotificationReader reader=new NotificationReader(aTextMessage);
String tenantId = reader.eventStringValue("tenant.id");
String productId = reader.eventStringValue ("exclusiveOwner");
String discussionId=reader.eventStringValue("discussionId.id");
productService.initiateDiscussion(new InitiateDiscussionCommand( tenantId,productId, discussionId));
}
}
ProductService
package com.saasovation.agilepm.application;
public class ProductService{
@Autowired
private ProductRepository productRepository;
@Transactional
public void initiateDiscussion (InitiateDiscussionCommand aCommand) {
Product product =productRepository.productofId(
new TenantId (aCommand.getTenantId ()),
new ProductId (aCommand. getProductId ()));
if (product == null) {
throw new IllegalStateException ("Unknown product of tenant id: " + aCommand.getTenantId ()+ " and product id: "+ aCommand. getProductId ());
}
product.initiateDiscussion (new DiscussionDescriptor(aCommand.getDiscussionId ()));
}
Product
package com.saasovation.agilepm.domain.model.product;
public class Product extends ConcurrencySafeEntity{
public void initiateDiscussion(DiscussionDescriptor aDescriptor){
if (aDescriptor==null){
throw new IllegalArgumentException("The descriptor must not be nu11.");
}
if (this.discussion().availability().isRequested()){
this.setDiscussion(this.discussion()
.nowReady(aDescriptor));
DomainEventPublisher.instance()
·publish(new ProductDiscussionInitiated(this.tenantId(),this.productId(), this.discussion()));
}
}
}
通过上面的样例代码,可以看到我们可以通过状态来解决事件顺序的带来的问题,那么我们思考一下,现在代码是否就不存在问题了呢?
答案是否定的,因为如果我们使用过程中,事件消息的中间件出现了问题,我们又应该怎么办呢,这里比较通用的方法是:添加重试和超时机制
创建一个TimeConstrainedProcessTracker监视那些指定完成时间已经过期的处理过程
ProcessTimedOut
public class ProcessTimedOut{
public boolean hasFullyTimedOut(); // 是否属于完全超时还是重试
public boolean allowsRetries(); // 是否允许重试
public int retryCount(); // 当前重试次数
public int totalRetriesPermitted(); // 允许的重试总数
public boolean totalRetriesReached(); // 检验是否达到总重试测试
}
Product维护了长时处理过程的当前状态,当重试间隔抵达,或者处理过程彻底超时时,跟踪器将发布下面事件
ProductDiscussionRequestTimedout
package com.saasovation.agilepm.domain.model.product;
import com.saasovation.common.domain.model.process.ProcessId;
import com.saasovation.common.domain.model.process.ProcessTimedout;
public class ProductDiscussionRequestTimedout extends ProcessTimedOut{
public ProductDiscussionRequestTimedout(String aTenantId, ProcessId aProcessId, int aTotalRetriesPermitted, int aRetryCount){
super(aTenantId, aProcessId,aTotalRetriesPermitted,aRetryCount);
}
}
每一个监听器都可以通过调用ProcessTimedOut的hasFullyTimedOut()方法来确定该事件是否属于完全超时还是重试。如果是重试,那么监听器可以调用ProcessTimedOut的allowsRetries(), totalRetriesPermitted()和 totalRetriesReached(),等方法来获取更多的事件重试信息。 在可以接收重试和超时通知的情况下,我们可以把Product放在一个更好的长时处理过程中。首先,我们需要启动该处理过程,此时我们可以使用既有的ProductDiscussionRequestedListener
ProductDiscussionRequestedListener
package com.saasovation.agilepm.infrastructure.messaging;
public class ProductDiscussionRequestedListener extends ExchangeListener{
@Override
protected void filteredDispatch(String aType,String aTextMessage){
NotificationReader reader=new NotificationReader(aTextMessage);
if (!reader.eventBooleanValue ("requestingDiscussion")){
return;
}
String tenantId = reader.eventStringValue("tenantId.id");
String productId = reader.eventStringValue ("product.id") ;
productService.startDiscussionInitiation(new StartDiscussionInitiationCommand(tenantId,productId)); //将命令发送给协作上下文
}
}
接着我们将ProductService交付于TimeConstrainedProcessTracker的跟踪器来进行处理
ProductService
package com.saasovation.agilepm.application;
public class ProductService {
@Transactional
public void startDiscussionInitiation(StartDiscussionInitiationCommand aCommand){
Product product=productRepository·productofId(
new TenantId(aCommand.getTenantId()),
new ProductId(aCommand.getProductId()));
if (product == null){
throw new IllegalStateException("Unknown product of tenant id:" +aCommand.getTenantId()+ " and product id:"+aCommand.getProductId());
}
String timedOutEventName=ProductDiscussionRequestTimedout.class.getName();
TimeConstrainedProcessTracker tracker=new TimeConstrainedProcessTracker (
product.tenantId().id(),
ProcessId.newProcessId(),
"Create discussion for product:" +product.name(),
new Date(),
5L * 60L * 1000L, // 每五分钟重试一次,
3, // 总共重试3次
timedOutEventName);
processTrackerRepository.add(tracker);
product.setDiscussionInitiationId(tracker.processId().id());
}
}
我们会在后台写一个定时器,定时处理TimeConstrainedProcessTracker容器里面过期的任务
ProcessService
package com.saasovation.agilepm.application;
public class ProcessService{
@Transactional
public void checkForTimedOutProcesses(){
Collection<TimeConstrainedProcessTracker> trackers= processTrackerRepository.allTimedOut () ;
for (TimeConstrainedProcessTracker tracker:trackers){
tracker.informProcessTimedout();
}
}
}
TimeConstrainedProcessTracker容器的informProcessTimedout方法将对重试或者超时进行确认,在确认后,会发布一个ProcessTimedOut的事件
ProductDiscussionRetryListener
package com.saasovation.agilepm.infrastructure.messaging;
public class ProductDiscussionRetryListener extends ExchangeListener{
@Autowired
private ProcessService processService;
@Override
protected String exchangeName(){
return Exchanges.AGILEPM_EXCHANGE_NAME;
}
@Override
protected void filteredDispatch(String aType,String aTextMessage){
Notification notification= NotificationSerializer
.instance()
.deserialize(aTextMessage,Notification.class);
ProductDiscussionRequestTimedout event=notification.event();
if (event.hasFullyTimedOut()){
productService.timeOutProductDiscussionRequest(new TimeOutProductDiscussionRequestCommand(
event.tenantId(),
event.processId().id(),
event.occurredon()));
}else{
productService.retryProductDiscussionRequest(new RetryProductDiscussionRequestCommand(
event.tenantId(),
event.processId().id()));
}
}
@Override
protected String[] listensToEvents(){
return new String[]{"com.saasovation.agilepm.process. ProductDiscussionRequestTimedOut"};
}
}
ProductDiscussionRetryListener监听器支出ProductDiscussionRequestTimedout的事件,并且可以处理重试(retryProductDiscussionRequest)和超时(timeOutProductDiscussionRequest)
接下来先看一下productService里面超时处理逻辑
ProductService
package com.saasovation.agilepm.application;
public class ProductService{
@Transactional
public void timeOutProductDiscussionRequest(TimeOutProductDiscussionRequestCommand aCommand){
ProcessId processId=ProcessId.existingProcessId(aCommand.getProcessId());
TenantId tenantId = new TenantId(aCommand.getTenantId());
Product product=productRepository·productOfDiscussionInitiationId(tenantId,processId.id());
this.sendEmailForTimedOutProcess(product);
product.failDiscussionInitiation();
}
}
如果是超时,productService将会发送邮件给产品负责人,然后Product将被标记为'初始化讨论失败',接下来看一下failDiscussionInitiation的实现
Prodcut
package com.saasovation.agilepm.domain.model.product;
public class Product extends ConcurrencySafeEntity{
public void failDiscussionInitiation(){
if (!this.discussion().availability().isReady()){
this.setDiscussionInitiationId(null);
this.setDiscussion(ProductDiscussion
.fromAvailability(DiscussionAvailability.FAILED));
}
}
}
其次先看一下productService里面重试处理逻辑
ProductService
package com.saasovation.agilepm.application;
public class ProductService{
@Transactional
public void retryProductDiscussionRequest(RetryProductDiscussionRequestCommand aCommand){
ProcessId processId=ProcessId.existingProcessId(aCommand.getProcessId());
TenantId tenantId = new TenantId(aCommand.getTenantId());
Product product=productRepository·productofDiscussionInitiationId(tenantId,processId.id());
if (product == null){
throw new IllegalStateException("Unknown product of tenant id:" +aCommand.getTenantId()+" and discussion initiation id:"+processId.id());
}
this.requestProductDiscussion(new RequestProductDiscussionCommand(aCommand.getTenantId(), product.productId().id()));
}
}
我们需要在讨论成功开启后,productService添加一下新行为
ProductService
package com.saasovation.agilepm.application;
public class ProductService{
@Autowired
private ProductRepository productRepository;
@Transactional
public void initiateDiscussion (InitiateDiscussionCommand aCommand) {
Product product =productRepository.productofId(
new TenantId (aCommand.getTenantId ()),
new ProductId (aCommand. getProductId ()));
if (product == null) {
throw new IllegalStateException ("Unknown product of tenant id: " + aCommand.getTenantId ()+ " and product id: "+ aCommand. getProductId ());
}
product.initiateDiscussion (new DiscussionDescriptor(aCommand.getDiscussionId ()));
// 新添加的行为 (找到监听器中的创建,修改为完成,防止触发重试和超时机制)
TimeConstrainedProcessTracker tracker=this.processTrackerRepository.trackerOfProcessId(
ProcessId.existingProcessId(product.discussionInitiationId()));
tracker.completed();
}
在消息机制不可用时,通知的发布方将不能通过该消息机制发布事件。这种情况将被发布客户端所检测到,此时的客户端可以退一步,减少消息的发送量,等到消息系统可用时再进行正常发送。在这个过程中,如果其中一次发送成功,那么我们便可以认为消息系统已经再次可用了。但是直到那个时候,请确保消息的发送频率小于正常情况。我们可以每隔30秒或者1分钟重试一次。请注意,如果你的系统使用了事件存储,那么你的事件在成功发送之前都将一直位于消息队列中,当消息系统重新可用时,我们可以立即对这些消息进行发送。 对于消息监听器来说,在消息机制不可用时,它将接收不到新的事件通知。当消息系统重新可用时,你的监听器会被自动地重新激活吗,也或许你需要重新进行订阅?如果此时的消息消费方不能自动恢复,那么你需要确保重新注册该消费方。否则,你将发现你的限界上下文不再接收所依赖限界上下文发出的通知,这是你需要避免的。 当然,问题并不总是出自消息机制。考虑以下场景:在一段时间之内,你的限界上下文变得不可用。当它再次可用时,此时的消息系统中已经收集到了大量的未投递的消息。然后,你的限界上下文重新注册消息的消费方,那么要接收并处理完所有未被处理的消息将消耗大量的时间。对于这种情况来说,你将没有什么好做的。当然,你可以增加更多的节点(集群),此时即便其中一个节点不可用,整个系统依然是可用的。此外,有些时候你根本无法避免停机的情况。比如,当你对系统代码的修改需要更新数据库,而你并不能直接向数据库中打补丁时,你便需要一些系统停机时间了。在这种情况下,你的消息处理机制便只能使劲追赶了。
在本章中,我们学习了集成限界上下文的多种方式。 ·你学到了在分布式计算环境中完成系统集成所需要考虑的基本问题。 ·你学习了如何通过REST资源的方式来集成限界上下文。 ·你学到了通过消息集成限界上下文的多个例子,其中包括开发和管理长时处理过程。 ·你学到了在不同限界上下文之间复制信息所面临的挑战,以及如何管理并且避免这些信息。 ·你从简单的例子中学到了很多,然后学习了一些更加复杂的例子,这些例子体现了更高的设计成熟度。
应用程序:
我这里使用的“应用程序”表示那些支撑核心域(2)模型的组件,通常包括领域模型本身、用户界面、内部使用的应用服务和基础设施组件等。至于这些组件中应该包含些什么,这是根据应用程序的不同而不同的,并且有可能受到所用架构(4)的影响。
在图14.1中,我们看不到与架构相关的信息。其中,虚线表示的是依赖注入原则(4),而实线则表示操作分发。比如,基础设施实现了用户界面、应用服务和领域模型中的抽象接口,同时它还将操作分发给应用服务、领域模型和数据存储。
对于如何通过最好的方式将领域对象渲染到用户界面,业界一直存在着争论。很多时候,除了操作所需数据之外,我们还会向用户界面提供一些额外的数据。这是有好处的,因为这些额外的信息可以对用户操作起到帮助作用。这些额外数据还可以包含一些选项数据。因此,用户界面通常都需要渲染多个聚合(10)实例中的属性,尽管用户最终只会修改其中一个聚合实例,请参考图14.2。
一种渲染多个聚合实例的方法便是使用数据传输对象(Data Tranfer Object,DTO)[Fowler,PofEAA]。DTO将包含需要显示的所有属性值。应用服务通过资源库(12)读取所需的聚合实例,然后使用一个DTO组装器(DTOAssemble)[Fowler, P of EAA]将需要显示的属性值映射到DTO中。之后,用户界面组件将访问每一个DTO属性值,并将其渲染到显示界面中。
调停者(中介)设计模式:https://www.runoob.com/design-pattern/mediator-pattern.html
要解决客户端和领域模型之间的耦合问题,我们可以使用调停者模式[Gamma et al.],即双分派(Double-Dispatch)和回调(Callback)。此时,聚合将 通过调停者接口来发布内部状态。客户端将实现调停者接口,然后把实现对象的引用作为参数传给聚合。之后,聚合双分派给调停者以发布自身状态,在这个过程中,聚合并没有向外暴露自身的内部结构。这里的诀窍在于,不要将调停者接口与任何显示规范绑定在一起,而是关注于对所感兴趣的聚合状态的渲染
在没有必要使用DTO时,我们可以使用另一种改进方法。该方法将多个聚合实例中需要显示的数据汇集到一个领域负载对象(Domain Payload Object,DPO)中[Vernon,DPO]。DPO与DTO相似,但是它的优点是可以用于单虚拟机应用架构中。DPO中包含了对整个聚合实例的引用,而不是单独的属性。此时,聚合实例集群可以在多个逻辑层之间传输。应用服务(请参考“应用服务”一节)通过资源库获取到所需聚合实例,然后创建DPO实例,该DPO持有对所有聚合实例的引用。之后,展现组件通过DPO获得聚合实例的引用,再从聚合中访问需要显示的属性。
如果你的程序提供了REST(4)资源,那么你便需要为领域模型创建状态展现以供客户端使用。有一点非常重要:我们应该基于用例来创建状态展现,而不是基于聚合实例。从这一点来看,创建状态展现和DTO是相似的,因为DTO也是基于用例的。然而,更准确的是将一组REST资源看作一个单独的模型—视图模型(View Model)或展现模型(Presentation Model)[Fowler,PM]。我们所创建的 展现模型不应该与领域模型中的聚合状态存在一一对应的关系。否则,你的客户端便需要像聚合本身一样了解你的领域模型。此时,客户端需要紧跟领域模型中行为和状态的变化,你也随之失去了抽象所带来的好处。
与其读取多个聚合实例,然后再通过编程的方式将它们组装到单个容器(DTO或DPO)中,我们可以转而使用用例优化查询。此时,我们可以在资源库中创建一些查询方法,这些方法返回的是所有聚合实例属性的超集。查询方法动态地将查询结果放在一个值对象(6)中,该值对象是特别为当前用例设计的。请注意,你设计的是值对象,而不是DTO,因为此时的查询是特定于领域的,而不是特定于应用程序的。这个用例优化的值对象将被直接用于渲染用户界面。
用例优化查询的动机与CQRS(4)相似。然而,用例优化查询依然会使用资源库,而不会直接与数据库打交道(比如使用SQL)。要了解这两者的不同,请参考资源库(12)中的相关讨论。当然,如果你打算在用例优化查询之路上继续走下去,那么你已经离CQRS很近了,此时考虑转用CQRS也是一种不错的选择。
如果你的应用程序必须支持多种不同类型的客户端,你该怎么办呢?这些客户端可能包括RIA、图形界面、REST服务和消息系统等。另外,各种测试也可以被认为是不同类型的客户端。此时,你的应用服务可以使用一个数据转换器(DataTransformer),然后由客户端来决定需要使用的数据转换器类型。应用层将双分派给数据转换器以生成所需的数据格式。
我们可以将展现模型看成是一种适配器[Gamma et al.]。它根据视图之所需向外提供属性和行为,由此隐藏了领域模型的细节。这也意味着,此时的展现模型不止是向外提供领域对象或DTO的属性,而是在渲染视图时,展现模型将根据模型的状态做出一些决定。比如,要在视图中显示一个特定的控件,这并不会与领域模型中的属性存在直接的关系,而是可以从这些属性中推导得出。我们不会要求领域模型对视图显示属性提供特别的支持,而是将职责分给展现模型。此时,展现模型通过领域模型的状态推导出一些特定于视图的指示器和属性值。
使用展现模型的另一个好处在于,如果聚合不提供JavaBean所规定的getter方法,而用户界面框架恰恰又需要这样的getter方法,那么展现模型可以完成这样的适配转换工作。多数基于Java的Web框架都要求对象提供公有的getter方法,比如getSummary()和getStory()等,但是对领域模型的设计却倾向于使用流畅的、特定于领域的表达式来反映通用语言(1)。此时,我们将使用summary()和story()这样的方法命名,这便与用户界面框架产生了阻抗失配。此时,展现模型可以将summary()方法适配到getSummary()方法,将story(方法适配到getStory()方法, 从而消除模型与视图之间的冲突
应用服务是领域模型的直接客户。应用服务负责用例流的任务协调,每个用例流对应了一个服务方法。在使用ACID数据库时,应用服务还负责控制事务以确保对模型修改的原子提交。另外,应用服务还会处理和安全相关的操作。应用服务应该做成很薄的一层,并且只使用它们来协调对模型的任务操作。
看一个示例接口和实现类,该应用服务用于管理身份与访问上下文中的Tenant
接口TenantIdentityService
package com.saasovation.identityaccess.application;
public interface TenantIdentityService{
public void activateTenant(TenantId aTenantId); // 激活已有Tenant
public void deactivateTenant(TenantId aTenantId); // 禁用已有Tenant
public String offerLimitedRegistrationInvitation(TenantId aTenantId,Date aStartsonDate,Date anUntilDate); // 有限制的邀请其他Tenant
public String offerOpenEndedRegistrationInvitation(TenantId aTenantId); // 无限制的邀请其他Tenant
public Tenant provisionTenant(String aTenantName,
String aTenantDescription,
boolean isActive,
FullName anAdministratorName,
EmailAddress anEmailAddress,
PostalAddress aPostalAddress,
Telephone aPrimaryTelephone,
Telephone aSecondaryTelephone,
String aTimeZone); // 创建Tenant
public Tenant tenant(TenantId aTenantId); // 查询Tenant
}
实现类TenantIdentityService
package com.saasovation.identityaccess.application;
public class TenantIdentityService{
@Transactional
@PreAuthorize ("hasRole ('SubscriberRepresentative')")
public void activateTenant(TenantId aTenantId){
this.nonNullTeant(aTenantId).active();
}
@Transactional
@PreAuthorize ("hasRole ('SubscriberRepresentative')")
public void deactivateTenant(TenantId aTenantId){
this.nonNu11Tenant(aTenantId).deactivate();
}
@Transactional(readonly=true)
public Tenant tenant(TenantId aTenantId){
Tenant tenant=this.tenantRepository().tenantOfId(aTenantId);
return tenant;
}
@Transactional(readonly=true)
@PreAuthorize ("hasRole ('SubscriberRepresentative')")
public Tenant provisionTenant(String aTenantName,
String aTenantDescription,
boolean isActive,
FullName anAdministratorName,
EmailAddress anEmailAddress,
PostalAddress aPostalAddress,
Telephone aPrimaryTelephone,
Telephone aSecondaryTelephone,
String aTimeZone){
return this.tenantProvisioningService·provisionTenant(aTenantName,
aTenantDescription,
isActive,
anAdministratorName,
anEmailAddress,
aPostalAddress,
aPrimaryTelephone,
aSecondaryTelephone,
aTimeZone);
}
private Tenant nonNullTenant(TenantId aTenantId){
Tenant tenant=this.tenant(aTenantId);
if(tenant==null){
throw new IllegalArgumentException("Tenant does not exist.");
}
return tenant;
}
}
Tenant实体是通过aTenantId在资源库获取的,并且在应用服务层添加事务控制
我们可以看到provisionTenant的参数列表总共需要9个参数,有可能以后会更多,我们可以通过命令对象避免这样的多参数的传递,命令对象就是将一个请求封装到一个对象中,从而是的我们对客户端进行参数化,下面看一个简单的命令对象的类
ProvisionTenantCommand
public class ProvisionTenantCommand{
private String tenantName;
private String tenantDescription;
private boolean isActive;
private String administratorFirstName;
private String administratorLastName;
private String emailAddress;
private String primaryTelephone;
private String secondaryTelephone;
private String addressStreetAddress;
private String addressCity;
private String addressStateProvince;
private String addressPostalCode;
private String addressCountryCode;
private String timeZone;
public ProvisionTenantCommand(...){ }
public ProvisionTenantCommand(){
super();
}
public String getTenantName(){
return tenantName;
}
public void setTenantName(String tenantName){
this.tenantName=tenantName;
}
}
经过命令对象封装之后,TenantIdentityService的实现如下了
public class TenantIdentityService{
...
@Transactional
public String provisionTenant (ProvisionTenantCommand aCommand){
return tenant.tenantId().id();
}
...
}
先前,我们讨论到了数据转换器。对于不同类型的客户端,数据转换器将提供客户端所需的特定数据类型。此时,不同的数据转换器将实现一个共有的抽象接口。从客户端的角度,我们可以通过以下方式来使用数据转换器:
TenantData tenantData=tenantIdentityService.provisionTenant(···, myTenantDataTransformer);
TenantPresentationModel tenantPresentationModel= new TenantPresentationModel(tenantData.value());
应用服务被设计成了具有输入和输出的API,而传入数据转换器的目的即在于为客户端生成特定的输出类型。
现在,让我们考虑另一种完全不同的方式:使应用服务返回void类型而不向客户端返回数据。这将如何工作呢?事实上,这正是六边形架构(4)所提倡的,此时我们可以使用端口和适配器的风格。对于本例,我们可以使用单个标准输出端口,然后为不同种类的客户端创建不同的适配器。此时,应用层的provisionTenant()方法将变成:
@Transactional
@PreAuthorize ("hasRole (' SubscriberRepresentative')")
public void provisionTenant(String aTenantName,
String aTenantDescription,
boolean isActive,
FullName anAdministratorName,
EmailAddress anEmailAddress,
PostalAddress aPostalAddress,
Telephone aPrimaryTelephone,
Telephone aSecondaryTelephone,
String aTimeZone){
Tenant tenant= this.tenantProvisioningService·provisionTenant(aTenantName,
aTenantDescription,
isActive,
anAdministratorName,
anEmailAddress,
aPostalAddress,
aPrimaryTelephone,
aSecondaryTelephone,
aTimeZone);
this.tenantIdentityoutputPort().write(tenant);
}
这里的输出端口是一个特殊的命名端口,它位于应用程序的边缘。在使用Spring时,该端口类可以被注入到应用服务中。此时,provisionTenant()方法唯一需要知道的便是调用write()方法把从领域服务中获取到的Tenant实例写到端口中。该端口可以有很多读取器,在使用应用服务之前,我们将这些读取器注册给端口。在write()方法执行后,每一个注册的读取器都会将端口的输出作为自己的输入。在读取数据时,读取器可以使用某些机制对数据进行转换,比如数据转换器。
这并不是一种增加架构复杂性的雕虫小技,而是与其他任何端口和适配器架构一—无论是软件系统,还是硬件设备——具有相同的长处。每一个组件只需要知道读进输入、调用自身行为,最后将输出写到端口中。
在之前的例子中,我们只提到了单个用户界面对应单个领域模型的情况,在现实中还存在单个用户界面对应多个领域模型的情况,下面咱们就一起来看一下,多个领域模型,我们应该怎么做
方案一 | 方案二 | 方案三 |
---|---|---|
一个应用层对应一个领域模型 | 一个应用层对应多个领域模型 | 抽取一个新的限界上下文来聚合多个领域模型,然后应用层接受新的限界上下文 |
基础设施层可以作用于整个架构,今天只看一下应用层如何调用基础设施层
这节主要是讲了管理java bean的容器,因为我们都是使用的spring,就不做多余的结束了
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。