我读取数据从一个Q和传递到执行者通道处理。不,主线程的事务范围不能工作。如何从执行者通道创建一个具有多个下游直接通道的事务。下面是配置
Read data from some Queue ( gateway)
<int:channel id="mainChannel">
<int:interceptors>
<int:wire-tap channel="channel1"/>
<int:wire-tap channel="channel2"/>
<int:wire-tap channel="channel3"/>
</int:interceptors>
</int:channel>
<int:channel id="channel1">
<int:dispatcher task-executor="exec1" />
</int:channel>
<int:channel id="channel2">
<int:dispatcher task-executor="exec2" />
</int:channel>
<int:channel id="channel3">
<int:dispatcher task-executor="exec3" />
</int:channel>
<int:chain id="id" input-channel="Channel1">
<int:header-value-router header-name="headerName">
<int:mapping value="Header1" channel="Channel4"/>
<int:mapping value="Header2" channel="Channel5"/>
<int:mapping value="Header3" channel="Channel6"/>
<int:mapping value="Header4" channel="Channel7"/>
<int:mapping value="Header5" channel="Channel8"/>
<int:mapping value="Header6" channel="Channel9"/>
</int:header-value-router>
</int:chain>
那么Channel4-Channel 9(全直接通道)根据一些业务逻辑将数据存储在DB中。需求是在单个事务中包装从channel1到Channel4-Channel 9的流程。我试着在网上找到,但什么也找不到。
编辑这里是我现在正在做的事情
Read data from some Queue ( gateway)
<int:channel id="mainChannel">
<int:interceptors>
<int:wire-tap channel="channel1"/>
<int:wire-tap channel="channel2"/>
<int:wire-tap channel="channel3"/>
</int:interceptors>
</int:channel>
<int:channel id="channel1">
<int:dispatcher task-executor="exec1" />
</int:channel>
<int:channel id="channel2">
<int:dispatcher task-executor="exec2" />
</int:channel>
<int:channel id="channel3">
<int:dispatcher task-executor="exec3" />
</int:channel>
<int:chain id="id" input-channel="Channel1">
<int:service-activator ref="someRef" method="name1">
<ref bean="txAdvice" />
</int:service-activator>
<int:header-value-router header-name="headerName">
<int:mapping value="Header1" channel="Channel4"/>
<int:mapping value="Header2" channel="Channel5"/>
<int:mapping value="Header3" channel="Channel6"/>
<int:mapping value="Header4" channel="Channel7"/>
<int:mapping value="Header5" channel="Channel8"/>
<int:mapping value="Header6" channel="Channel9"/>
</int:header-value-router>
<bean id="transactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<tx:advice id="txAdvice" >
<tx:attributes>
<tx:method name="*"/>
</tx:attributes>
</tx:advice>
</int:chain>
Channel4,Channel5 ...Channel9,它们都在DB中插入。如果插入失败事务中的任何一个,我想要的都应该回滚。不过,如果其中一个DB insert失败,则不会回滚事务。
编辑3
<tx:advice id="txAdvice" >
<tx:attributes>
<tx:method name="send" propagation="REQUIRES_NEW"/>
</tx:attributes>
</tx:advice>
<aop:config>
<aop:advisor advice-ref="txAdvice" pointcut="bean(Channel1)"/></aop:config>
编辑4解决了Config
<int:channel id="mainChannel">
<int:interceptors>
<int:wire-tap channel="channel1"/>
<int:wire-tap channel="channel2"/>
<int:wire-tap channel="channel3"/>
</int:interceptors>
<int:service-activator ref="gatewayID" method = "sendToDB" input-channel="channel1"/>
<int:gateway id="gatewayID" service-
interface="com.*.*.*.TransactionalGateway" error-
channel="errorChannel" default-request-channel="OutChannel"/>
<int:chain id="id" input-channel="OutChannel">
<int:header-value-router header-name="headerName">
<int:mapping value="Header1" channel="Channel4"/>
<int:mapping value="Header2" channel="Channel5"/>
<int:mapping value="Header3" channel="Channel6"/>
<int:mapping value="Header4" channel="Channel7"/>
<int:mapping value="Header5" channel="Channel8"/>
<int:mapping value="Header6" channel="Channel9"/>
</int:header-value-router>
</int:chain>
public interface TransactionalGateway {
@Transactional
void sendToDB(Message<?> m);
误差
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ConsumerEndpointFactoryBean#10': Cannot resolve reference to bean 'org.springframework.integration.config.ServiceActivatorFactoryBean#2' while setting bean property 'handler'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ServiceActivatorFactoryBean#2': FactoryBean threw exception on object creation; nested exception is java.lang.IllegalArgumentException: Found ambiguous parameter type [class java.lang.Void] for method match: [public final java.lang.Class[] com.sun.proxy.$Proxy46.getProxiedInterfaces(), public final void com.sun.proxy.$Proxy46.setTargetSource(org.springframework.aop.TargetSource), public final void com.sun.proxy.$Proxy46.setPreFiltered(boolean), public final boolean com.sun.proxy.$Proxy46.removeAdvisor(org.springframework.aop.Advisor), public final void com.sun.proxy.$Proxy46.removeAdvisor(int) throws org.springframework.aop.framework.AopConfigException, public final boolean com.sun.proxy.$Proxy46.isInterfaceProxied(java.lang.Class), public final void com.sun.proxy.$Proxy46.addAdvice(org.aopalliance.aop.Advice) throws org.springframework.aop.framework.AopConfigException]
at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveReference(BeanDefinitionValueResolver.java:359)
at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveValueIfNecessary(BeanDefinitionValueResolver.java:108)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyPropertyValues(AbstractAutowireCapableBeanFactory.java:1522)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1269)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:551)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:481)
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:312)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:308)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:738)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:542)
at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:139)
at com.citi.loads.framework.StartLoadsApp.lambda$loadContexts$4(StartLoadsApp.java:133)
at java.util.HashMap$KeySet.forEach(HashMap.java:933)
at com.citi.loads.framework.StartLoadsApp.loadContexts(StartLoadsApp.java:129)
at com.citi.loads.framework.StartLoadsApp.runWithFramework(StartLoadsApp.java:109)
at com.citi.loads.framework.StartLoadsApp.main(StartLoadsApp.java:33)
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ServiceActivatorFactoryBean#2': FactoryBean threw exception on object creation; nested exception is java.lang.IllegalArgumentException: Found ambiguous parameter type [class java.lang.Void] for method match: [public final java.lang.Class[] com.sun.proxy.$Proxy46.getProxiedInterfaces(), public final void com.sun.proxy.$Proxy46.setTargetSource(org.springframework.aop.TargetSource), public final void com.sun.proxy.$Proxy46.setPreFiltered(boolean), public final boolean com.sun.proxy.$Proxy46.removeAdvisor(org.springframework.aop.Advisor), public final void com.sun.proxy.$Proxy46.removeAdvisor(int) throws org.springframework.aop.framework.AopConfigException, public final boolean com.sun.proxy.$Proxy46.isInterfaceProxied(java.lang.Class), public final void com.sun.proxy.$Proxy46.addAdvice(org.aopalliance.aop.Advice) throws org.springframework.aop.framework.AopConfigException]
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:185)
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.getObjectFromFactoryBean(FactoryBeanRegistrySupport.java:103)
at org.springframework.beans.factory.support.AbstractBeanFactory.getObjectForBeanInstance(AbstractBeanFactory.java:1646)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:254)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)
at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveReference(BeanDefinitionValueResolver.java:351)
... 18 more
Caused by: java.lang.IllegalArgumentException: Found ambiguous parameter type [class java.lang.Void] for method match: [public final java.lang.Class[] com.sun.proxy.$Proxy46.getProxiedInterfaces(), public final void com.sun.proxy.$Proxy46.setTargetSource(org.springframework.aop.TargetSource), public final void com.sun.proxy.$Proxy46.setPreFiltered(boolean), public final boolean com.sun.proxy.$Proxy46.removeAdvisor(org.springframework.aop.Advisor), public final void com.sun.proxy.$Proxy46.removeAdvisor(int) throws org.springframework.aop.framework.AopConfigException, public final boolean com.sun.proxy.$Proxy46.isInterfaceProxied(java.lang.Class), public final void com.sun.proxy.$Proxy46.addAdvice(org.aopalliance.aop.Advice) throws org.springframework.aop.framework.AopConfigException]
at org.springframework.util.Assert.isNull(Assert.java:113)
at org.springframework.integration.util.MessagingMethodInvokerHelper.findHandlerMethodsForTarget(MessagingMethodInvokerHelper.java:499)
at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:226)
at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:149)
at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:144)
at org.springframework.integration.handler.MethodInvokingMessageProcessor.<init>(MethodInvokingMessageProcessor.java:60)
at org.springframework.integration.handler.ServiceActivatingHandler.<init>(ServiceActivatingHandler.java:37)
at org.springframework.integration.config.ServiceActivatorFactoryBean.createMethodInvokingHandler(ServiceActivatorFactoryBean.java:57)
at org.springframework.integration.config.AbstractStandardMessageHandlerFactoryBean.createHandler(AbstractStandardMessageHandlerFactoryBean.java:117)
at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.createHandlerInternal(AbstractSimpleMessageHandlerFactoryBean.java:184)
at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.getObject(AbstractSimpleMessageHandlerFactoryBean.java:172)
at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.getObject(AbstractSimpleMessageHandlerFactoryBean.java:57)
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:178)
... 23 more
发布于 2021-04-21 18:23:45
事务只是绑定到线程边界,所以您不能启动一个事务,将消息发送到该执行器通道,并期望它扩展到另一侧的该通道的订阅服务器,这将确实是一个不同的线程。
但是,如果您的观点略有不同(与您的问题不太清楚),并且您确实希望当事务使用一个消息并将其分发到路由器中的一个或多个通道时,事务将使用您的chain
启动,那么就可以通过<int:request-handler-advice-chain>
和嵌套的<bean>
配置来实现TransactionHandleMessageAdvice
。
请参阅有关docs中事务的更多信息:https://docs.spring.io/spring-integration/reference/html/transactions.html#transactions
https://stackoverflow.com/questions/67198240
复制相似问题