生产者-消费者模式-快速回顾在Producer-Consumer模式的最简单实现中,我们有一个线程产生一些数据并将其放入队列,另一个线程消耗队列中的数据。
图1:生产者-消费者模式该管道模式是生产者-消费者模式的变体。这种模式允许消费者也是数据的生产者。此数据将放入第二个队列,另一个消费者将使用它。
图2:管道模式在上面的示例中,我们有一个管道,它执行三个阶段的处理。在第一阶段,产生一些数据,在第二阶段消耗该数据并产生一些其他数据。其他数据将在第三阶段消耗。在上一篇文章中,我举了一个例子,在第一阶段,我们从一些商店(例如数据库)读取文档,在第二阶段我们翻译它们,在第三阶段,我们将翻译后的文档存储到某个目的地商店。在此示例中,第一个队列存储从商店读取的文档,而第二个队列存储已翻译的文档。
图3:数据流中的分支在该图中,在第一阶段中,读取文档。然后,根据文档的语言,它要么排队到西班牙文档的队列中,要么排队到德语文档的队列中。特殊的消费者线程从第一个队列中读取西班牙文档并对其进行翻译。同时,另一个特殊的消费者线程从另一个队列中读取德语文档并进行翻译。这两个消费者也是生产者,因为他们将翻译的文档添加到翻译的文档队列中。特殊的消费者线程读取翻译的文档并将它们写入商店(例如数据库)。
图4:使用管道而不是数据流在这个图中,我们有一个简单的管道。在第一阶段,我们读取一个文档,并将其放在一个队列中,无论其编写语言如何。在第二阶段,我们确定文档语言,并决定是否调用一些代码来翻译西班牙文或其他代码来翻译德语。注意这个阶段的并行度是多少。在图3和图4的两个示例中,我们同时翻译了两个文档。那么数据流方法给我们带来了哪些好处呢?在数据流方法中,最多一个西班牙文档将在任何给定时间处理。
net中的数据流模式
此示例与我在上一篇文章中提供的示例非常相似。我们只为三个队列创建三个BlockingCollection对象(加上一个用于输入队列),然后创建四个任务。第一个从商店中读取文档,并根据其语言将它们放入适当的队列中。这是分支发生的地方。接下来的两个任务从西班牙语和德语文档队列中获取文档,翻译它们,然后将它们放入已翻译的文档队列中。第四个任务从已翻译的文档队列中获取文档并将它们保存到目标存储。
这样,在等待翻译方法完成时,我们不会占用一个线程。但是,我们仍然阻止这两个地方的当前线程:当德语文档队列当前为空时,对germanDocumentsQueue.GetConsumingEnumerable()返回的可枚举上的IEnumerable.MoveNext()进行隐式调用。也就是说,当我们等待空队列变为非空时。队列满时调用translatedDocumentsQueue.Add。
此方法包含13条语句,在注释中正确标记。我现在就解释一下。
在1日声明,我创建将读取的存储文档块。此块是TransformBlock,这意味着此块使用字符串并生成Document对象。在这种情况下,字符串是文档ID。构造函数中的第一个参数是函数的委托,该函数将字符串转换为文档。我们给它一个lambda表达式来调用ReadDocumentFromSourceStore方法。第二个参数允许我们为此块配置更多选项。
这段代码看起来更好,对吧?流逻辑非常清晰。ForEach循环的主体向我们展示了处理单个文档所需的步骤。我们先阅读该文件。然后我们确定它的语言。然后我们根据语言进行分支;我们将TranslateSpanishDocument称为西班牙文档,将TranslateGermanDocument称为德语文档。最后,我们调用SaveDocumentToDestinationStore来保存翻译的文档。
我首先创建四个Dataflow块。该ProcDataflowBlock和AsyncProcDataflowBlock类来自ProceduralDataflow库。这两个类之间的区别在于ProcDataflowBlock类用于CPU绑定操作,AsyncProcDataflowBlock类用于异步操作,例如I/O绑定操作。这与我们创建TPLDataflow块的方式非常相似。
这是ProcessDocument本地函数的修改版本。它在转换方法的调用周围添加了一个try/catch块。如果转换中发生异常,将调用名为DfStoreDocumentInFaultedDocumentsStore的方法将此类文档存储在特殊数据库中。该方法还具有相应的块,具有自己的并行度和队列大小。请注意,try块包围了两个翻译文档的调用(西班牙语和德语翻译方法)。
图5:数据流中的循环在此图中,在块2处理项目之后,处理结果可能会返回到块1的队列,或者它可能会根据某些条件转到最终块队列。现在,假设所有队列都已满。如果没有循环,这可能不是问题,因为最终的块队列最终会因为最后一个块处理某些项而变为非满。但是,对于循环,块1可能正在等待块2的队列变为非满,而块2可能正在等待块1的队列变为非满。这会导致死锁。
领取专属 10元无门槛券
私享最新 技术干货