返回介绍

演练:创建数据流管道

发布于 2025-02-23 23:16:17 字数 27794 浏览 0 评论 0 收藏 0

尽管可以使用 DataflowBlock.Receive , DataflowBlock.ReceiveAsync ,和 DataflowBlock.TryReceive 方法来接收来自消息源块,也可以连接消息块来形成数据流管道。 数据流管道是一系列的组件,或数据流块,其中每个执行一个参与更大目标的特定任务。 数据流管道中的每个数据流块会在收到来自另一数据流块的消息时执行工作。 这就好比是汽车制造装配线。 每辆汽车通过装配线时,一站组装车架,下一站则安装引擎,以此类推。 因为装配线可以同时装配多辆汽车,所以比一次装配整辆车拥有更高的产出。

提示

TPL 数据流库( System.Threading.Tasks.Dataflow 命名空间)不是随 .NET Framework 4.5 一起分发的。 若要安装 System.Threading.Tasks.Dataflow 命名空间中,打开你的项目中 Visual Studio 2012,选择 管理 NuGet 包 从项目菜单,然后联机搜索 Microsoft.Tpl.Dataflow 包。

本文档演示下载书籍的数据流管道Iliad of Homer从网站和搜索文本匹配单个单词与该相反单词的第一个单词字符。 本文档中数据流管道的形成包括以下步骤:

  1. 创建参与管道的数据流块。
  2. 连接每个数据流块与管道中的下一个块。 每个块将管道中前一个块的输出作为输入接收。
  3. 对每个数据流块,创建一个延续任务,该延续任务在上一个块完成后将下一个块的状态设置为已完成状态。
  4. 将数据发布到管道的开头。
  5. 将管道的开头标记为已完成。
  6. 等待管道完成所有工作。

先决条件

开始本演练之前,请阅读 数据流 。

创建控制台应用程序

在 Visual Studio 中,创建一个 Visual C# 或 Visual Basic 控制台应用程序项目。 添加对 System.Threading.Tasks.Dataflow.dll 的引用。

或者,创建一个文件并将其命名为 ReverseWords.cs ( ReverseWords.vb 为 Visual Basic),然后在 Visual Studio 命令提示符窗口以编译该项目中运行以下命令。

Visual C#

csc.exe /r:System.Threading.Tasks.Dataflow.dll ReverseWords.cs

Visual Basic

vbc.exe /r:System.Threading.Tasks.Dataflow.dll ReverseWords.vb

将以下代码添加到项目中以创建基本应用程序。

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web 
// and finds all reversed words that appear in that book.
class Program
{
   static void Main(string[] args)
   {
   }
}
Imports System.Collections.Concurrent
Imports System.Linq
Imports System.Net
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all palindromes that appear in that book.
Class Program
   Private Shared Sub Main(args As String())
   End Sub
End Class

创建数据流块

将以下代码添加到 Main 方法以创建参与管道的数据流块。 下表总结了管道的每个成员的角色。

//
// Create the members of the pipeline.
// 

// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(uri =>
{
   Console.WriteLine("Downloading '{0}'...", uri);

   return new WebClient().DownloadString(uri);
});

// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters 
   // with a space character to.
   char[] tokens = text.ToArray();
   for (int i = 0; i < tokens.Length; i++)
   {
    if (!char.IsLetter(tokens[i]))
     tokens[i] = ' ';
   }
   text = new string(tokens);

   // Separate the text into an array of words.
   return text.Split(new char[] { ' ' },
    StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words, orders the resulting words alphabetically, 
// and then remove duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words.Where(word => word.Length > 3).OrderBy(word => word)
    .Distinct().ToArray();
});

// Finds all words in the specified collection whose reverse also 
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding reversed words...");

   // Holds reversed words.
   var reversedWords = new ConcurrentQueue<string>();

   // Add each word in the original collection to the result whose 
   // reversed word also exists in the collection.
   Parallel.ForEach(words, word =>
   {
    // Reverse the work.
    string reverse = new string(word.Reverse().ToArray());

    // Enqueue the word if the reversed version also exists
    // in the collection.
    if (Array.BinarySearch<string>(words, reverse) >= 0 &&
      word != reverse)
    {
     reversedWords.Enqueue(word);
    }
   });

   return reversedWords;
});

// Prints the provided reversed words to the console.  
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
   Console.WriteLine("Found reversed words {0}/{1}",
    reversedWord, new string(reversedWord.Reverse().ToArray()));
});
'
' Create the members of the pipeline.
' 

' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(Function(uri)
                                Console.WriteLine("Downloading '{0}'...", uri)
                                Return New WebClient().DownloadString(uri)
                               End Function)

' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(Function(text)
                                ' Remove common punctuation by replacing all non-letter characters 
                                ' with a space character to.
                                ' Separate the text into an array of words.
                                Console.WriteLine("Creating word list...")
                                Dim tokens() As Char = text.ToArray()
                                For i As Integer = 0 To tokens.Length - 1
                                   If Not Char.IsLetter(tokens(i)) Then
                                    tokens(i) = " "c
                                   End If
                                Next i
                                text = New String(tokens)
                                Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
                               End Function)

' Removes short words, orders the resulting words alphabetically, 
' and then remove duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(Function(words)
                                  Console.WriteLine("Filtering word list...")
                                  Return words.Where(Function(word) word.Length > 3).OrderBy(Function(word) word).Distinct().ToArray()
                                 End Function)

' Finds all words in the specified collection whose reverse also 
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(Function(words)
                                     ' Holds reversed words.
                                     ' Add each word in the original collection to the result whose 
                                     ' reverse also exists in the collection.
                                     ' Reverse the work.
                                     ' Enqueue the word if the reversed version also exists
                                     ' in the collection.
                                     Console.WriteLine("Finding reversed words...")
                                     Dim reversedWords = New ConcurrentQueue(Of String)()
                                     Parallel.ForEach(words, Sub(word)
                                                  Dim reverse As New String(word.Reverse().ToArray())
                                                  If Array.BinarySearch(Of String)(words, reverse) >= 0 AndAlso word <> reverse Then
                                                   reversedWords.Enqueue(word)
                                                  End If
                                                 End Sub)
                                     Return reversedWords
                                  End Function)

' Prints the provided reversed words to the console.  
Dim printReversedWords = New ActionBlock(Of String)(Sub(reversedWord) Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray())))
成员类型描述
downloadStringTransformBlock<TInput,TOutput>从 Web 下载该书的文本。
createWordListTransformBlock<TInput,TOutput>将该书的文本分成单词的数组。
filterWordListTransformBlock<TInput,TOutput>将短单词从单词数组中移除,按字母顺序对得到的单词排序,并删除重复项。
findReversedWordsTransformManyBlock<TInput,TOutput>查找经过筛选的单词数组集合中所有将字母反转后仍在单词数组中出现的单词。
printReversedWordsActionBlock<TInput>向控制台显示单词及对应的倒序词。

虽然可以将本示例中数据流管道的多个步骤合并为一个步骤,不过本示例阐释了组合多个独立数据流任务来执行较大任务的概念。 示例通过使用 TransformBlock<TInput,TOutput> 使管道的每个成员能对其输入数据执行操作并将结果发送到管道中的下一步骤。 管道的 findReversedWords 成员是一个 TransformManyBlock<TInput,TOutput> 对象,因为该成员会为每个输入生成多个独立输出。 管道的结尾 printReversedWords 是一个 ActionBlock<TInput> 对象,因为它会对其输入执行一个动作,但不产生结果。

形成管道

添加以下代码将每个块与管道中的下一个块连接。

当您调用 LinkTo 方法将源数据流块连接到目标数据流块时,源数据流块会在数据可用时将数据传播到目标块。

//
// Connect the dataflow blocks to form a pipeline.
//

downloadString.LinkTo(createWordList);
createWordList.LinkTo(filterWordList);
filterWordList.LinkTo(findReversedWords);
findReversedWords.LinkTo(printReversedWords);
'
' Connect the dataflow blocks to form a pipeline.
'

downloadString.LinkTo(createWordList)
createWordList.LinkTo(filterWordList)
filterWordList.LinkTo(findReversedWords)
findReversedWords.LinkTo(printReversedWords)

创建完成任务

添加以下代码,使每个数据流块能够在处理了所有数据之后执行一个最终操作。

//
// For each completion task in the pipeline, create a continuation task
// that marks the next block in the pipeline as completed.
// A completed dataflow block processes any buffered elements, but does
// not accept new elements.
//

downloadString.Completion.ContinueWith(t =>
{
   if (t.IsFaulted) ((IDataflowBlock)createWordList).Fault(t.Exception);
   else createWordList.Complete();
});
createWordList.Completion.ContinueWith(t =>
{
   if (t.IsFaulted) ((IDataflowBlock)filterWordList).Fault(t.Exception);
   else filterWordList.Complete();
});
filterWordList.Completion.ContinueWith(t =>
{
   if (t.IsFaulted) ((IDataflowBlock)findReversedWords).Fault(t.Exception);
   else findReversedWords.Complete();
});
findReversedWords.Completion.ContinueWith(t =>
{
   if (t.IsFaulted) ((IDataflowBlock)printReversedWords).Fault(t.Exception);
   else printReversedWords.Complete();
});
'
' For each completion task in the pipeline, create a continuation task
' that marks the next block in the pipeline as completed.
' A completed dataflow block processes any buffered elements, but does
' not accept new elements.
'

downloadString.Completion.ContinueWith(Sub(t)
                      If t.IsFaulted Then
                       CType(createWordList, IDataflowBlock).Fault(t.Exception)
                      Else
                       createWordList.Complete()
                      End If
                     End Sub)
createWordList.Completion.ContinueWith(Sub(t)
                      If t.IsFaulted Then
                       CType(filterWordList, IDataflowBlock).Fault(t.Exception)
                      Else
                       filterWordList.Complete()
                      End If
                     End Sub)
filterWordList.Completion.ContinueWith(Sub(t)
                      If t.IsFaulted Then
                       CType(findReversedWords, IDataflowBlock).Fault(t.Exception)
                      Else
                       findReversedWords.Complete()
                      End If
                     End Sub)
findReversedWords.Completion.ContinueWith(Sub(t)
                       If t.IsFaulted Then
                        CType(printReversedWords, IDataflowBlock).Fault(t.Exception)
                       Else
                        printReversedWords.Complete()
                       End If
                      End Sub)

若要在管道中传播完成,每个完成任务都要将下一数据流块设为已完成状态。 例如,当管道的开头设置为已完成状态时,它会处理所有剩余的缓冲信息,然后运行其完成任务,将管道的第二个成员设置为已完成状态。 管道的第二个成员反过来处理所有剩余的缓冲信息,然后运行其完成任务,将管道的第三个成员设置为已完成状态。 此过程将继续,直至管道的所有成员都完成。 此示例使用 delegate 关键字( Function 中为 Visual Basic)来定义延续任务。

将数据发布到管道

添加以下代码以书的 URL 发布Iliad of Homer到数据流管道的开头。

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/files/6130/6130-0.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/files/6130/6130-0.txt")

此示例使用 DataflowBlock.Post 将数据同步发送到管道的开头。 在必须将数据异步发送到数据流节点时,请使用 DataflowBlock.SendAsync 方法。

完成管道活动

添加以下代码将管道的开头标记为已完成。 管道的开头在处理了所有缓冲的消息后运行其延续任务。 此延续任务在管道中传播已完成状态。

// Mark the head of the pipeline as complete. The continuation tasks 
// propagate completion through the pipeline as each part of the 
// pipeline finishes.
downloadString.Complete();
' Mark the head of the pipeline as complete. The continuation tasks 
' propagate completion through the pipeline as each part of the 
' pipeline finishes.
downloadString.Complete()

此示例通过要处理的数据流管道发送一个 URL。 如果要通过管道发送多个输入,请在提交了所有输入后调用 IDataflowBlock.Complete 方法。 如果您的应用程序没有表示数据不再可用或应用程序不必等待管道完成的定义完善的点,则可以忽略此步骤。

等待管道完成

添加以下代码以等待管道完成。 由于本示例使用延续任务来在管道中传播完成,因此当管道的结尾完成时整体操作即已完成。

// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()

可以同时等待任一线程或多个线程的数据流完成。

完整示例

下面的示例显示此演练的完整代码。

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web 
// and finds all reversed words that appear in that book.
class DataflowReversedWords
{
   static void Main(string[] args)
   {
    //
    // Create the members of the pipeline.
    // 

    // Downloads the requested resource as a string.
    var downloadString = new TransformBlock<string, string>(uri =>
    {
     Console.WriteLine("Downloading '{0}'...", uri);

     return new WebClient().DownloadString(uri);
    });

    // Separates the specified text into an array of words.
    var createWordList = new TransformBlock<string, string[]>(text =>
    {
     Console.WriteLine("Creating word list...");

     // Remove common punctuation by replacing all non-letter characters 
     // with a space character to.
     char[] tokens = text.ToArray();
     for (int i = 0; i < tokens.Length; i++)
     {
      if (!char.IsLetter(tokens[i]))
         tokens[i] = ' ';
     }
     text = new string(tokens);

     // Separate the text into an array of words.
     return text.Split(new char[] { ' ' },
      StringSplitOptions.RemoveEmptyEntries);
    });

    // Removes short words, orders the resulting words alphabetically, 
    // and then remove duplicates.
    var filterWordList = new TransformBlock<string[], string[]>(words =>
    {
     Console.WriteLine("Filtering word list...");

     return words.Where(word => word.Length > 3).OrderBy(word => word)
      .Distinct().ToArray();
    });

    // Finds all words in the specified collection whose reverse also 
    // exists in the collection.
    var findReversedWords = new TransformManyBlock<string[], string>(words =>
    {
     Console.WriteLine("Finding reversed words...");

     // Holds reversed words.
     var reversedWords = new ConcurrentQueue<string>();

     // Add each word in the original collection to the result whose 
     // reversed word also exists in the collection.
     Parallel.ForEach(words, word =>
     {
      // Reverse the work.
      string reverse = new string(word.Reverse().ToArray());

      // Enqueue the word if the reversed version also exists
      // in the collection.
      if (Array.BinarySearch<string>(words, reverse) >= 0 &&
        word != reverse)
      {
         reversedWords.Enqueue(word);
      }
     });

     return reversedWords;
    });

    // Prints the provided reversed words to the console.  
    var printReversedWords = new ActionBlock<string>(reversedWord =>
    {
     Console.WriteLine("Found reversed words {0}/{1}",
      reversedWord, new string(reversedWord.Reverse().ToArray()));
    });

    //
    // Connect the dataflow blocks to form a pipeline.
    //

    downloadString.LinkTo(createWordList);
    createWordList.LinkTo(filterWordList);
    filterWordList.LinkTo(findReversedWords);
    findReversedWords.LinkTo(printReversedWords);

    //
    // For each completion task in the pipeline, create a continuation task
    // that marks the next block in the pipeline as completed.
    // A completed dataflow block processes any buffered elements, but does
    // not accept new elements.
    //

    downloadString.Completion.ContinueWith(t =>
    {
     if (t.IsFaulted) ((IDataflowBlock)createWordList).Fault(t.Exception);
     else createWordList.Complete();
    });
    createWordList.Completion.ContinueWith(t =>
    {
     if (t.IsFaulted) ((IDataflowBlock)filterWordList).Fault(t.Exception);
     else filterWordList.Complete();
    });
    filterWordList.Completion.ContinueWith(t =>
    {
     if (t.IsFaulted) ((IDataflowBlock)findReversedWords).Fault(t.Exception);
     else findReversedWords.Complete();
    });
    findReversedWords.Completion.ContinueWith(t =>
    {
     if (t.IsFaulted) ((IDataflowBlock)printReversedWords).Fault(t.Exception);
     else printReversedWords.Complete();
    });

    // Process "The Iliad of Homer" by Homer.
    downloadString.Post("http://www.gutenberg.org/files/6130/6130-0.txt");

    // Mark the head of the pipeline as complete. The continuation tasks 
    // propagate completion through the pipeline as each part of the 
    // pipeline finishes.
    downloadString.Complete();

    // Wait for the last block in the pipeline to process all messages.
    printReversedWords.Completion.Wait();
   }
}
/* Sample output:
   Downloading 'http://www.gutenberg.org/files/6130/6130-0.txt'...
   Creating word list...
   Filtering word list...
   Finding reversed words...
   Found reversed words doom/mood
   Found reversed words draw/ward
   Found reversed words aera/area
   Found reversed words seat/taes
   Found reversed words live/evil
   Found reversed words port/trop
   Found reversed words sleek/keels
   Found reversed words area/aera
   Found reversed words tops/spot
   Found reversed words evil/live
   Found reversed words mood/doom
   Found reversed words speed/deeps
   Found reversed words moor/room
   Found reversed words trop/port
   Found reversed words spot/tops
   Found reversed words spots/stops
   Found reversed words stops/spots
   Found reversed words reed/deer
   Found reversed words keels/sleek
   Found reversed words deeps/speed
   Found reversed words deer/reed
   Found reversed words taes/seat
   Found reversed words room/moor
   Found reversed words ward/draw
*/
Imports System
Imports System.Collections.Concurrent
Imports System.Linq
Imports System.Net
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Friend Class DataflowReversedWords

   Shared Sub Main(ByVal args() As String)
    '
    ' Create the members of the pipeline.
    ' 

    ' Downloads the requested resource as a string.
    Dim downloadString = New TransformBlock(Of String, String)(Function(uri)
                                  Console.WriteLine("Downloading '{0}'...", uri)
                                  Return New WebClient().DownloadString(uri)
                                 End Function)

    ' Separates the specified text into an array of words.
    Dim createWordList = New TransformBlock(Of String, String())(Function(text)
                                    ' Remove common punctuation by replacing all non-letter characters 
                                    ' with a space character to.
                                    ' Separate the text into an array of words.
                                    Console.WriteLine("Creating word list...")
                                    Dim tokens() As Char = text.ToArray()
                                    For i As Integer = 0 To tokens.Length - 1
                                     If Not Char.IsLetter(tokens(i)) Then
                                      tokens(i) = " "c
                                     End If
                                    Next i
                                    text = New String(tokens)
                                    Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
                                   End Function)

    ' Removes short words, orders the resulting words alphabetically, 
    ' and then remove duplicates.
    Dim filterWordList = New TransformBlock(Of String(), String())(Function(words)
                                    Console.WriteLine("Filtering word list...")
                                    Return words.Where(Function(word) word.Length > 3).OrderBy(Function(word) word).Distinct().ToArray()
                                   End Function)

    ' Finds all words in the specified collection whose reverse also 
    ' exists in the collection.
    Dim findReversedWords = New TransformManyBlock(Of String(), String)(Function(words)
                                       ' Holds reversed words.
                                       ' Add each word in the original collection to the result whose 
                                       ' reverse also exists in the collection.
                                       ' Reverse the work.
                                       ' Enqueue the word if the reversed version also exists
                                       ' in the collection.
                                       Console.WriteLine("Finding reversed words...")
                                       Dim reversedWords = New ConcurrentQueue(Of String)()
                                       Parallel.ForEach(words, Sub(word)
                                                    Dim reverse As New String(word.Reverse().ToArray())
                                                    If Array.BinarySearch(Of String)(words, reverse) >= 0 AndAlso word <> reverse Then
                                                       reversedWords.Enqueue(word)
                                                    End If
                                                   End Sub)
                                       Return reversedWords
                                      End Function)

    ' Prints the provided reversed words to the console.  
    Dim printReversedWords = New ActionBlock(Of String)(Sub(reversedWord) Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray())))

    '
    ' Connect the dataflow blocks to form a pipeline.
    '

    downloadString.LinkTo(createWordList)
    createWordList.LinkTo(filterWordList)
    filterWordList.LinkTo(findReversedWords)
    findReversedWords.LinkTo(printReversedWords)
    '
    ' For each completion task in the pipeline, create a continuation task
    ' that marks the next block in the pipeline as completed.
    ' A completed dataflow block processes any buffered elements, but does
    ' not accept new elements.
    '

    downloadString.Completion.ContinueWith(Sub(t)
                        If t.IsFaulted Then
                           CType(createWordList, IDataflowBlock).Fault(t.Exception)
                        Else
                           createWordList.Complete()
                        End If
                       End Sub)
    createWordList.Completion.ContinueWith(Sub(t)
                        If t.IsFaulted Then
                           CType(filterWordList, IDataflowBlock).Fault(t.Exception)
                        Else
                           filterWordList.Complete()
                        End If
                       End Sub)
    filterWordList.Completion.ContinueWith(Sub(t)
                        If t.IsFaulted Then
                           CType(findReversedWords, IDataflowBlock).Fault(t.Exception)
                        Else
                           findReversedWords.Complete()
                        End If
                       End Sub)
    findReversedWords.Completion.ContinueWith(Sub(t)
                           If t.IsFaulted Then
                            CType(printReversedWords, IDataflowBlock).Fault(t.Exception)
                           Else
                            printReversedWords.Complete()
                           End If
                        End Sub)


    ' Process "The Iliad of Homer" by Homer.
    downloadString.Post("http://www.gutenberg.org/files/6130/6130-0.txt")

    ' Mark the head of the pipeline as complete. The continuation tasks 
    ' propagate completion through the pipeline as each part of the 
    ' pipeline finishes.
    downloadString.Complete()

    ' Wait for the last block in the pipeline to process all messages.
    printReversedWords.Completion.Wait()
   End Sub

End Class

' Sample output:
'Downloading 'http://www.gutenberg.org/files/6130/6130-0.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw

后续步骤

此示例发送一个通过数据流管道处理的 URL。 如果要通过管道发送多个输入值,可以将并行的形式引入应用程序,这与零件在汽车厂中移动的方式类似。 当管道的第一个成员将其结果发送给第二个成员时,它可以在第二个成员处理第一个结果时并行处理另一个项。

通过使用数据流管道实现的并行称为粗粒度的并行因为它通常由的较少量的较大任务组成。 你还可以使用更细粒度的并行较小的短时间运行任务中数据流管道。 在本示例中,管道的 findReversedWords 成员使用 Parallel.ForEach 方法来并行处理工作列表中的多个项。 在粗粒度的管道中使用细粒度并行可以提高总吞吐量。

此外可以将源数据流块连接到多个目标块以创建数据流网络。 LinkTo 方法采用一个 Predicate<T> 对象,该对象定义了目标块是否根据其值来接受每个消息。 大多数充当源的数据流块类型按目标块连接的顺序向所有已连接的目标块提供消息,直到其中一个块接受此消息。 通过使用此筛选机制,您可以创建已连接数据流块的系统,指示某些数据通过一条路径,其他数据通过另一条路径。 有关使用筛选创建数据流网络的示例,请参阅 演练: 在 Windows 窗体应用程序中使用数据流 。

另请参阅

数据流

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文