首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >向Azure协议网关发送/接收批处理消息

向Azure协议网关发送/接收批处理消息
EN

Stack Overflow用户
提问于 2017-02-22 23:24:26
回答 1查看 1.3K关注 0票数 0

目前,我正在向Azure实例上的IoTHub发送设备消息,然后将所有消息发送到EventHub进行处理。

我的目标是使用Azure协议云网关作为中介来接收批处理的消息,然后在将它们发送出去进行处理之前将它们展开。通过将消息批处理,我可以减少正在传输的数据量,从而降低数据使用成本。一旦数据在云中,它可以进行非压缩,然后正常处理。

在对本地机器上的Gateway进行了一些研究和使用了内置在解决方案中的一些单元测试之后,我看到了如何将消息发送到Gateway/IoTHub。

代码语言:javascript
运行
复制
        ServiceClient serviceClient = ServiceClient.CreateFromConnectionString(iotHubConnectionString);

        Stopwatch sw = Stopwatch.StartNew();

        await this.CleanupDeviceQueueAsync(hubConnectionStringBuilder.HostName, device);

        var clientScenarios = new ClientScenarios(hubConnectionStringBuilder.HostName, this.deviceId, this.deviceSas);

        var group = new MultithreadEventLoopGroup();
        string targetHost = this.tlsCertificate.GetNameInfo(X509NameType.DnsName, false);

        var readHandler1 = new ReadListeningHandler(CommunicationTimeout);
        Bootstrap bootstrap = new Bootstrap()
            .Group(group)
            .Channel<TcpSocketChannel>()
            .Option(ChannelOption.TcpNodelay, true)
            .Handler(this.ComposeClientChannelInitializer(targetHost, readHandler1));
        IChannel clientChannel = await bootstrap.ConnectAsync(this.ServerAddress, protocolGatewayPort);
        this.ScheduleCleanup(() => clientChannel.CloseAsync());

        Task testWorkTask = Task.Run(async () =>
        { //Where the messaging actually starts and sends
            Tuple<EventData, string>[] ehMessages = await CollectEventHubMessagesAsync(receivers, 2); //Async task for recieving messages back from the IoTHub
            Tuple<EventData, string> qos0Event = Assert.Single(ehMessages.Where(x => TelemetryQoS0Content.Equals(x.Item2, StringComparison.Ordinal)));
            Tuple<EventData, string> qos1Event = Assert.Single(ehMessages.Where(x => TelemetryQoS1Content.Equals(x.Item2, StringComparison.Ordinal)));

            string qosPropertyName = ConfigurationManager.AppSettings["QoSPropertyName"];

            var qos0Notification = new Message(Encoding.UTF8.GetBytes(NotificationQoS0Content));
            qos0Notification.Properties[qosPropertyName] = "0";
            qos0Notification.Properties["subTopic"] = "tips";
            await serviceClient.SendAsync(this.deviceId, qos0Notification);

            var qos1Notification = new Message(Encoding.UTF8.GetBytes(NotificationQoS1Content));
            qos1Notification.Properties["subTopic"] = "firmware-update";
            await serviceClient.SendAsync(this.deviceId, qos1Notification);

            var qos2Notification = new Message(Encoding.UTF8.GetBytes(NotificationQoS2Content));
            qos2Notification.Properties[qosPropertyName] = "2";
            qos2Notification.Properties["subTopic"] = "critical-alert";
            await serviceClient.SendAsync(this.deviceId, qos2Notification);

            var qos2Notification2 = new Message(Encoding.UTF8.GetBytes(NotificationQoS2Content2));
            qos2Notification2.Properties[qosPropertyName] = "2";
            await serviceClient.SendAsync(this.deviceId, qos2Notification2);
        });

因此,"ServiceClient“在此单元测试中发送4条消息: qos0Notification、qos1Notification、qos2Notification、qos2Notification2,并使用SendAsync方法发送信息。

SendAsync方法是应用程序基本代码的一部分,对view.The方法不可用,它还接受DeviceId和消息对象。消息对对象有3个重载: Base、Byte或Byte。

一旦网关初始化,它就会使用以下方法接收它的消息:

代码语言:javascript
运行
复制
    public override void ChannelRead(IChannelHandlerContext context, object message)
    {
        var packet = message as Packet;
        if (packet == null)
        {
            CommonEventSource.Log.Warning($"Unexpected message (only `{typeof(Packet).FullName}` descendants are supported): {message}", this.ChannelId);
            return;
        }

        this.lastClientActivityTime = DateTime.UtcNow; // notice last client activity - used in handling disconnects on keep-alive timeout

        if (this.IsInState(StateFlags.Connected) || packet.PacketType == PacketType.CONNECT)
        {
            this.ProcessMessage(context, packet);
        }
        else
        {
            if (this.IsInState(StateFlags.ProcessingConnect))
            {
                Queue<Packet> queue = this.connectPendingQueue ?? (this.connectPendingQueue = new Queue<Packet>(4));
                queue.Enqueue(packet);
            }
            else
            {
                // we did not start processing CONNECT yet which means we haven't received it yet but the packet of different type has arrived.
                ShutdownOnError(context, string.Empty, new InvalidOperationException($"First packet in the session must be CONNECT. Observed: {packet}, channel id: {this.ChannelId}, identity: {this.identity}"));
            }
        }
    }

我觉得这将是打开任何批次消息的最佳场所。一旦我们有了一个消息列表,我们就会将它们发送到ProcessMessage,以确定它是哪种消息,以及如何处理它。

这似乎没有太多的信息,因为它是非常新的。

EN

回答 1

Stack Overflow用户

发布于 2017-02-23 09:16:36

在对本地机器上的Gateway进行了一些研究和使用了内置在解决方案中的一些单元测试之后,我看到了如何将消息发送到Gateway/IoTHub。

这里,ServiceClient向设备发送消息,而不是向IoTHub发送消息。这类消息是云对设备消息。您可以使用DeviceClient发送设备到云消息。

我的目标是使用作为中介来接收批处理的消息,然后在发送出去进行处理之前将它们展开。通过将消息批处理,我可以减少正在传输的数据量,从而降低数据使用成本。

您可以通过注入自定义进程来减少传递给IoTHub的数据。您可以添加自定义通道处理程序。

协议网关使用基于流水线的消息处理,使用以下处理程序: TLS加密/解密、MQTT编解码器(编码器和解码器)和MqttIotHubAdapter。定制网关行为的一种方法是在此管道中注入额外的处理程序。通过在MQTT编解码器和MqttIotHubAdapter之间添加一个自定义处理程序,您可以检查、修改、丢弃、延迟或拆分传递的消息。

更多信息和示例代码,您可以参考"Microsoft协议网关开发人员指南“。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42404223

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档