目前,我正在向Azure实例上的IoTHub发送设备消息,然后将所有消息发送到EventHub进行处理。
我的目标是使用Azure协议云网关作为中介来接收批处理的消息,然后在将它们发送出去进行处理之前将它们展开。通过将消息批处理,我可以减少正在传输的数据量,从而降低数据使用成本。一旦数据在云中,它可以进行非压缩,然后正常处理。
在对本地机器上的Gateway进行了一些研究和使用了内置在解决方案中的一些单元测试之后,我看到了如何将消息发送到Gateway/IoTHub。
ServiceClient serviceClient = ServiceClient.CreateFromConnectionString(iotHubConnectionString);
Stopwatch sw = Stopwatch.StartNew();
await this.CleanupDeviceQueueAsync(hubConnectionStringBuilder.HostName, device);
var clientScenarios = new ClientScenarios(hubConnectionStringBuilder.HostName, this.deviceId, this.deviceSas);
var group = new MultithreadEventLoopGroup();
string targetHost = this.tlsCertificate.GetNameInfo(X509NameType.DnsName, false);
var readHandler1 = new ReadListeningHandler(CommunicationTimeout);
Bootstrap bootstrap = new Bootstrap()
.Group(group)
.Channel<TcpSocketChannel>()
.Option(ChannelOption.TcpNodelay, true)
.Handler(this.ComposeClientChannelInitializer(targetHost, readHandler1));
IChannel clientChannel = await bootstrap.ConnectAsync(this.ServerAddress, protocolGatewayPort);
this.ScheduleCleanup(() => clientChannel.CloseAsync());
Task testWorkTask = Task.Run(async () =>
{ //Where the messaging actually starts and sends
Tuple<EventData, string>[] ehMessages = await CollectEventHubMessagesAsync(receivers, 2); //Async task for recieving messages back from the IoTHub
Tuple<EventData, string> qos0Event = Assert.Single(ehMessages.Where(x => TelemetryQoS0Content.Equals(x.Item2, StringComparison.Ordinal)));
Tuple<EventData, string> qos1Event = Assert.Single(ehMessages.Where(x => TelemetryQoS1Content.Equals(x.Item2, StringComparison.Ordinal)));
string qosPropertyName = ConfigurationManager.AppSettings["QoSPropertyName"];
var qos0Notification = new Message(Encoding.UTF8.GetBytes(NotificationQoS0Content));
qos0Notification.Properties[qosPropertyName] = "0";
qos0Notification.Properties["subTopic"] = "tips";
await serviceClient.SendAsync(this.deviceId, qos0Notification);
var qos1Notification = new Message(Encoding.UTF8.GetBytes(NotificationQoS1Content));
qos1Notification.Properties["subTopic"] = "firmware-update";
await serviceClient.SendAsync(this.deviceId, qos1Notification);
var qos2Notification = new Message(Encoding.UTF8.GetBytes(NotificationQoS2Content));
qos2Notification.Properties[qosPropertyName] = "2";
qos2Notification.Properties["subTopic"] = "critical-alert";
await serviceClient.SendAsync(this.deviceId, qos2Notification);
var qos2Notification2 = new Message(Encoding.UTF8.GetBytes(NotificationQoS2Content2));
qos2Notification2.Properties[qosPropertyName] = "2";
await serviceClient.SendAsync(this.deviceId, qos2Notification2);
});
因此,"ServiceClient“在此单元测试中发送4条消息: qos0Notification、qos1Notification、qos2Notification、qos2Notification2,并使用SendAsync方法发送信息。
SendAsync方法是应用程序基本代码的一部分,对view.The方法不可用,它还接受DeviceId和消息对象。消息对对象有3个重载: Base、Byte或Byte。
一旦网关初始化,它就会使用以下方法接收它的消息:
public override void ChannelRead(IChannelHandlerContext context, object message)
{
var packet = message as Packet;
if (packet == null)
{
CommonEventSource.Log.Warning($"Unexpected message (only `{typeof(Packet).FullName}` descendants are supported): {message}", this.ChannelId);
return;
}
this.lastClientActivityTime = DateTime.UtcNow; // notice last client activity - used in handling disconnects on keep-alive timeout
if (this.IsInState(StateFlags.Connected) || packet.PacketType == PacketType.CONNECT)
{
this.ProcessMessage(context, packet);
}
else
{
if (this.IsInState(StateFlags.ProcessingConnect))
{
Queue<Packet> queue = this.connectPendingQueue ?? (this.connectPendingQueue = new Queue<Packet>(4));
queue.Enqueue(packet);
}
else
{
// we did not start processing CONNECT yet which means we haven't received it yet but the packet of different type has arrived.
ShutdownOnError(context, string.Empty, new InvalidOperationException($"First packet in the session must be CONNECT. Observed: {packet}, channel id: {this.ChannelId}, identity: {this.identity}"));
}
}
}
我觉得这将是打开任何批次消息的最佳场所。一旦我们有了一个消息列表,我们就会将它们发送到ProcessMessage,以确定它是哪种消息,以及如何处理它。
这似乎没有太多的信息,因为它是非常新的。
发布于 2017-02-23 09:16:36
在对本地机器上的Gateway进行了一些研究和使用了内置在解决方案中的一些单元测试之后,我看到了如何将消息发送到Gateway/IoTHub。
这里,ServiceClient
向设备发送消息,而不是向IoTHub发送消息。这类消息是云对设备消息。您可以使用DeviceClient
发送设备到云消息。
我的目标是使用作为中介来接收批处理的消息,然后在发送出去进行处理之前将它们展开。通过将消息批处理,我可以减少正在传输的数据量,从而降低数据使用成本。
您可以通过注入自定义进程来减少传递给IoTHub的数据。您可以添加自定义通道处理程序。
更多信息和示例代码,您可以参考"Microsoft协议网关开发人员指南“。
https://stackoverflow.com/questions/42404223
复制相似问题