我正在尝试Spring数据流,今天我更新到最新版本,因为我无法创建这个简单的示例,它应该简单地记录AMQP消息.
rabbit | log
当我部署这个流并简单地在消耗的队列上发布一个字符串消息时,这是可以的。但是,当它是一个序列化的PoJo时,它就没有了。数据流服务器+的旧版本启动了基于spring 1.5.x的应用程序,它就是这样做的。
Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload={"absolute_path":"/+~JF4472914347363856925.tmp","filename":"+~JF4472914347363856925.tmp","timestamp":1536315010932,"sshd_server":"localhost","sshd_port":22}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=sftp.uploaded, amqp_receivedExchange=exchange, amqp_deliveryTag=1, amqp_consumerQueue=sftp_uploaded, amqp_redelivered=false, id=d3d84d90-53ca-4c39-cdef-8665d35ddcf1, amqp_consumerTag=amq.ctag-8kP4KDjn13oae1Qutmw4IA, contentType=text/json, timestamp=1536315013950}]' to outbound message.
at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:163) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:475) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.7.RELEASE.jar!/:5.0.7.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:60) ~[spring-integration-amqp-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:240) ~[spring-integration-amqp-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:207) ~[spring-integration-amqp-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414) ~[spring-rabbit-2.0.4.RELEASE.jar!/:2.0.4.RELEASE]
... 22 common frames omitted
Caused by: java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload={"absolute_path":"/+~JF4472914347363856925.tmp","filename":"+~JF4472914347363856925.tmp","timestamp":1536315010932,"sshd_server":"localhost","sshd_port":22}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=sftp.uploaded, amqp_receivedExchange=exchange, amqp_deliveryTag=1, amqp_consumerQueue=sftp_uploaded, amqp_redelivered=false, id=d3d84d90-53ca-4c39-cdef-8665d35ddcf1, amqp_consumerTag=amq.ctag-8kP4KDjn13oae1Qutmw4IA, contentType=text/json, timestamp=1536315013950}]' to outbound message.
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor.doPreSend(MessageConverterConfigurer.java:324) ~[spring-cloud-stream-2.0.1.RELEASE.jar!/:2.0.1.RELEASE]
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$AbstractContentTypeInterceptor.preSend(MessageConverterConfigurer.java:351) ~[spring-cloud-stream-2.0.1.RELEASE.jar!/:2.0.1.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:589) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:435) ~[spring-integration-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
... 32 common frames omitted
版本
发布于 2018-09-09 00:25:10
Stream 2.*中的内容类型协商出现了显着的变化和增强。你可以在这里读https://docs.spring.io/spring-cloud-stream/docs/Fishtown.BUILD-SNAPSHOT/reference/htmlsingle/#content-type-management。基本上,我看到的是在堆栈中没有适当的MessageConverter。而且,根据我所看到的,您的内容类型是text/json
,我们没有为其提供转换器。考虑将其更改为application/json
。
https://stackoverflow.com/questions/52220618
复制相似问题