作者 | Ivelin Ivanov
来源 | Medium
编辑 | 代码医生团队
Tensorflow激发开发人员在几乎任何想到的领域中尝试他们令人兴奋的AI创意。ML社区中有三个众所周知的因素构成了一个好的深度神经网络模型做了一些神奇的事情。
感兴趣的领域是实时通信。提出可能为RTC应用程序增加价值的实用ML用例很容易。
https://www.telestax.com/blog/robocall-bill-prompts-need-for-value-over-volume/
正如好友Jean Deruelle指出的那样,如果通过新一代通信设备无缝地增强家庭和工作体验而徘徊于环境计算中,则会有更多相邻的用例。所以想构建一个简单的原型并直接将Restcomm连接到Tensorflow。经过几天的研究,意识到没有简单的方法可以将实时流式音频/视频媒体(SIP / RTP)提供给张量流模型。类似于Google Cloud的Speech to Text流gRPC API的东西可能是一个可接受的初始回退,但在开源Tensorflow社区中找不到。
https://github.com/restcomm
https://cloud.google.com/speech-to-text/docs/streaming-recognize
有一些方法可以读取离线音频文件和视频文件,但这与处理实时延迟敏感的媒体流完全不同。
https://github.com/tensorflow/io/tree/master/tensorflow_io/audio
https://github.com/tensorflow/io/tree/master/tensorflow_io/video
最后去了Yong Tang领导的Tensorflow IO项目。TF IO是一个年轻的项目,由Google,IBM和其他人支持的社区不断发展。Yong指出了一个开放的github,用于等待贡献者的实时音频支持。那开始了很好的对话。几个周末之后,已经建立了足够的勇气来承担一个小的编码挑战 - 为PCAP网络捕获文件实施新的Tensorflow数据集。
https://github.com/tensorflow/io
https://github.com/tensorflow/io/issues/50
PCAP文件与实时媒体流密切相关,因为它们是网络活动的精确历史快照。PCAP文件允许在进入媒体处理软件时记录和重放实际网络数据包,包括丢弃的数据包和时间延迟。
https://www.tcpdump.org/#documentation
回到本文的主题,现在将介绍构建TF PcapDataset并将其贡献给Tensorflow IO项目的主要步骤:
1.Fork Tensorflow IO和源代码构建
https://github.com/tensorflow/io#developing
2.查看源树中的相邻数据集,并选择一个最接近pcap的数据集。利用来自文本,cifar和parquet的代码。还有一个关于创建TF操作的文档证明是有帮助的。
https://github.com/tensorflow/io/tree/master/tensorflow_io
3.得到了Stephan Uphoff和Yong的宝贵建议。还有每月电话会议,任何人都可以在项目问题上进行讨论。
https://github.com/suphoff
4.准备好后提交拉取请求。TF IO团队通过调整和修复来满足最佳实践,从而提供相应的响应和支持性指导贡献者。
https://github.com/tensorflow/io/pull/303
第2步结果是花费大部分周末业余时间学习TF基础设施和API的那个。为你分解一下。
基本上,TF是在每个节点处具有操作的图形结构。数据进入图表,操作将数据样本作为输入,处理这些样本并将输出传递给其节点所连接的图形中的下一个操作。下图是官方文档中TF图的示例。
TF Graph示例
操作使用名为tensors的公共数据类型(因此名称为TensorFlow)。术语张量具有数学定义,但张量的数据结构本质上是n维向量:0D标量(数字,字符或字符串),1D标量列表,标量的2D矩阵或向量的更高维向量。
在将数据馈送到TF模型之前,必须对数据进行预处理并将其格式化为Tensor数据结构。这种张量格式要求是由于深度神经网络中广泛使用的线性代数以及这些结构在GPU或TPU上应用计算并行性所能实现的优化。
张量的例子
它有助于理解 TF数据集的好处以及开箱即用的所有便利功能,如批处理,映射,重排,重复。这些功能使得使用有限数据量和计算能力构建和训练TF模型变得更加容易和高效。
数据集和其他TF操作可以用C ++或Python构建。我选择了C ++路由,这样我就可以学习一些TF C ++框架。然后我用Python包装它们。将来,我计划编写一些纯Python数据集,这应该会更容易一些。
看一下TF IO数据集的源代码文件结构。
TF IO pcap数据集的源代码目录结构
Tensorflow使用Bazel作为构建系统,Google于2015年开源。以下是PcapDataset BUILD文件。它声明了动态pcap库的公共名称(_pcap_ops.so)。列出要从(pcap_input.cc和pcap_ops.cc)构建的两个源文件。并声明构建所需的一些TF依赖项。
licenses(["notice"]) # Apache 2.0
package(default_visibility = ["//visibility:public"])
cc_binary(
name = "python/ops/_pcap_ops.so",
srcs = [
"kernels/pcap_input.cc",
"ops/pcap_ops.cc",
],
copts = [
"-pthread",
"-std=c++11",
"-DNDEBUG",
],
linkshared = 1,
deps = [
"//tensorflow_io/core:dataset_ops",
"@local_config_tf//:libtensorflow_framework",
"@local_config_tf//:tf_header_lib",
],
)
pcap数据集的主Bazel BUILD文件
下一个重要的源文件是pcap_ops.cc在其中声明将在TF运行时环境中注册并可在TF应用程序中使用的TF操作。
#include "tensorflow/core/framework/common_shape_fns.h"
#include "tensorflow/core/framework/op.h"
#include "tensorflow/core/framework/shape_inference.h"
namespace tensorflow {
REGISTER_OP("PcapInput")
.Input("source: string")
.Output("handle: variant")
.Attr("filters: list(string) = []")
.Attr("columns: list(string) = []")
.Attr("schema: string = ''")
.SetShapeFn([](shape_inference::InferenceContext* c) {
c->set_output(0, c->MakeShape({c->UnknownDim()}));
return Status::OK();
});
REGISTER_OP("PcapDataset")
.Input("input: T")
.Input("batch: int64")
.Output("handle: variant")
.Attr("output_types: list(type) >= 1")
.Attr("output_shapes: list(shape) >= 1")
.Attr("T: {string, variant} = DT_VARIANT")
.SetIsStateful()
.SetShapeFn([](shape_inference::InferenceContext* c) {
c->set_output(0, c->MakeShape({}));
return Status::OK();
});
} // namespace tensorflow
这里的大部分代码都是样板文件。它说正在引入一个PcapInput操作系统,它可以读取pcap文件和一个由PcapInput填充的PcapDataset操作系统。片刻之间的关系将在一段时间内变得更加明显。
从开始贡献工作直到它被TF主分支接受的时间开始,在基础TF 2.0框架中引入了几个简化,减少了文件中的样板代码。怀疑在不久的将来会有更多的这些简化。
核心TF团队了解到,为了吸引更多的贡献者社区,降低进入门槛很重要。新的贡献者应该只能专注于他们正在编写的新的新代码,而不会出现与TF环境交互的细节,直到他们为此做好准备。
包中的下一个文件是pcap_input.cc。这就是大部分繁重发生的地方。花了很多时间编写和测试这个文件。
https://github.com/tensorflow/io/blob/master/tensorflow_io/pcap/kernels/pcap_input.cc
它有一个声明PcapDataset,PcapInput和PcapInputStream之间关系的部分。将看到每个人做了什么。
REGISTER_UNARY_VARIANT_DECODE_FUNCTION(PcapInput, "tensorflow::data::PcapInput");
REGISTER_KERNEL_BUILDER(Name("PcapInput").Device(DEVICE_CPU),
FileInputOp<PcapInput>);
REGISTER_KERNEL_BUILDER(Name("PcapDataset").Device(DEVICE_CPU),
FileInputDatasetOp<PcapInput, PcapInputStream>);
PcapInputStream包含从原始pcap文件读取的大部分逻辑并将其转换为张量。为了获得输入的味道,这里是使用CocoaPacketAnalyzer查看的测试http.pcap文件的屏幕截图。
https://github.com/tensorflow/io/blob/master/tests/test_pcap/http.pcap
http.pcap的CocoaPacketAnalyzer视图
跳过特定于pcap文件的逻辑,并指出从原始二进制文件数据到张量转换的一些定义元素。
// read packets from the file up to record_to_read or end of file
while ((*record_read) < record_to_read) {
int64 record_count = 0;
double packet_timestamp;
string packet_data_buffer;
Status status = state.get()->ReadRecord(packet_timestamp, &packet_data_buffer, record_count);
if (!(status.ok() || errors::IsOutOfRange(status))) {
return status;
}
if (record_count > 0) {
Tensor timestamp_tensor = (*out_tensors)[0];
timestamp_tensor.flat<double>()(*record_read) = packet_timestamp;
Tensor data_tensor = (*out_tensors)[1];
data_tensor.flat<string>()(*record_read) = std::move(packet_data_buffer);
(*record_read) += record_count;
} else {
// no more records available to read
// record_count == 0
break;
}
}
从pcap文件中读取数据包记录并转换为张量
此ReadRecord行从pcap文件读取下一个pcap数据包并填充两个本地变量:packet_timestamp double和packet_data_buffer string。
ReadRecord(packet_timestamp, &packet_data_buffer, record_count);
如果成功填充了新的pcap记录,则将标量放置到相应的张量占位符中。得到的输出张量的形状是具有两列的矩阵。一列保存每个读取pcap数据包的时间戳标量。另一列将相应的分组数据保存为字符串。输出张量(矩阵)中的每一行对应一个pcap数据包。
处理pcap文件输入到TF张量输出
Tensor timestamp_tensor = (*out_tensors)[0];
timestamp_tensor.flat<double>()(*record_read) = packet_timestamp;
Tensor data_tensor = (*out_tensors)[1];
data_tensor.flat<string>()(*record_read) = std::move(packet_data_buffer);
out_tensors是在从PcapDataset请求新批处理时准备的占位符张量。这是在这里完成的; 在读循环之前。
https://github.com/tensorflow/io/blob/master/tensorflow_io/pcap/kernels/pcap_input.cc#L157
packet_timestamp标量使用类型化的flat函数放置在第一列(index [0])和(* record_read)行。packet_data_buffer分别位于第二列(index [1])和相同(* record_read)行。
这涵盖了C ++代码的关键元素。现在看一下Python文件。
顶部pcap目录级别的_init_.py指示TF Python文档生成器如何遍历python代码并提取API参考文档。可以在此处阅读有关文档最佳实践的更多信息。
https://github.com/tensorflow/io/blob/master/tensorflow_io/pcap/__init__.py
https://github.com/Debian/tensorflow/blob/master/tensorflow/docs_src/community/documentation.md#generating-python-api-documentation
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from tensorflow.python.util.all_util import remove_undocumented
from tensorflow_io.pcap.python.ops.pcap_ops import PcapDataset
_allowed_symbols = [
"PcapDataset",
]
remove_undocumented(__name__, allowed_exception_list=_allowed_symbols)
上面的代码指示Pyhton API文档生成器专注于PcapDataset类并忽略此模型中的其他代码。
接下来,pcap_ops.py包装C ++ DataSet操作并使其可供Python应用程序使用。
https://github.com/tensorflow/io/blob/master/tensorflow_io/pcap/python/ops/pcap_ops.py
"""PcapDataset"""
import tensorflow as tf
from tensorflow_io.core.python.ops import data_ops as data_ops
from tensorflow_io import _load_library
pcap_ops = _load_library('_pcap_ops.so')
class PcapDataset(data_ops.Dataset):
"""A pcap Dataset. Pcap is a popular file format for capturing network packets.
"""
def __init__(self, filenames, batch=None):
"""Create a pcap Reader.
Args:
filenames: A `tf.string` tensor containing one or more filenames.
"""
batch = 0 if batch is None else batch
dtypes = [tf.float64, tf.string]
shapes = [
tf.TensorShape([]), tf.TensorShape([])] if batch == 0 else [
tf.TensorShape([None]), tf.TensorShape([None])]
super(PcapDataset, self).__init__(
pcap_ops.pcap_dataset,
pcap_ops.pcap_input(filenames),
batch, dtypes, shapes)
C ++动态库导入如下:
from tensorflow_io import _load_library
pcap_ops = _load_library('_pcap_ops.so')
数据集构造函数的主要作用之一是提供有关其生成的数据集张量类型的元数据。首先它必须描述单个数据样本中的张量类型。PcapDataset样本是两个标量的向量。一个用于tf.float64类型的pcap数据包时间戳,另一个用于类型为tf.string的数据包数据。
dtypes = [tf.float64, tf.string]
批量是通过神经网络的一个前向/后向传递中的训练示例的数量。在例子中,当定义批次的大小时也定义了张量的形状。
当多个pcap数据包在一个批处理中分组时,时间戳(tf.float64)和数据(tf.string)都是一维张量,形状为tf.TensorShape([batch])。
由于事先不知道总样本的数量,并且总样本可能不能被批量大小整除,宁愿将形状设置为tf.TensorShape([None])以给更多的灵活性。
批量大小为0是一种特殊情况,其中每个单个张量的形状退化为tf.TensorShape([])或0-D标量张量。
shapes = [
tf.TensorShape([]), tf.TensorShape([])] if batch == 0 else [
tf.TensorShape([None]), tf.TensorShape([None])]
只需要一个测试用例。test_pcap_eager.py在从http.pcap采样时执行PcapDataset。
https://github.com/tensorflow/io/blob/master/tests/test_pcap/http.pcap
dataset = pcap_io.PcapDataset(url_filenames, batch=1)
packets_total = 0
for v in dataset:
(packet_timestamp, packet_data) = v
if packets_total == 0:
assert packet_timestamp.numpy()[0] == 1084443427.311224 # we know this is the correct value in the test pcap file
assert len(packet_data.numpy()[0]) == 62 # we know this is the correct packet data buffer length in the test pcap file
packets_total += 1
assert packets_total == 43 # we know this is the correct number of packets in the test pcap file
测试代码很简单。迭代所有pcap数据包,并根据已知常量测试第一个中的值。
要构建PcapDataset并运行其测试,使用了本地io目录中的以下行:
$ bazel build -s --verbose_failures //tensorflow_io/pcap/...
$ pytest tests/test_pcap_eager.py
希望这可以帮助构建自己的自定义数据集。希望会考虑将它贡献给TF社区,以加速开源AI的进展。