如何使这个异步方法调用工作?

发布于 2024-10-03 05:33:49 字数 4651 浏览 10 评论 0原文

我试图使用异步方法调用开发方法管道。管道的逻辑如下

  1. 集合中有 n 个数据,必须馈送到管道中的 m 个方法中
  2. 枚举 T 的集合 将
  3. 第一个元素馈送到第一个方法
  4. 获取输出,将其馈送到第二个方法方法异步
  5. 同时将集合的第二个元素喂给第一个方法
  6. 第一个方法完成后,将结果喂给第二个方法(如果第二个方法仍在运行,则将结果放入其队列并开始执行第一个方法的第三个元素)
  7. 当第二个方法执行完成时,从队列中取出第一个元素并执行,依此类推(每个方法都应该异步运行,没有人应该等待下一个完成)
  8. 在第m个方法,执行数据后,将结果存储到列表中
  9. 在第 m 个方法完成第 n 个元素后,将结果列表(n 个结果)返回到第一级。

我想出了如下代码,但它没有按预期工作,结果永远不会返回,而且它没有按应有的顺序执行。

static class Program
    {
        static void Main(string[] args)
        {
            var list = new List<int> { 1, 2, 3, 4 };
            var result = list.ForEachPipeline(Add, Square, Add, Square);
            foreach (var element in result)
            {
                Console.WriteLine(element);
                Console.WriteLine("---------------------");
            }
            Console.ReadLine();
        }

        private static int Add(int j)
        {
            return j + 1;
        }

        private static int Square(int j)
        {
            return j * j;
        }

        internal static void AddNotify<T>(this List<T> list, T item)
        {
            Console.WriteLine("Adding {0} to the list", item);
            list.Add(item);
        }    
    }

    internal class Function<T>
    {
        private readonly Func<T, T> _func;

        private readonly List<T> _result = new List<T>();
        private readonly Queue<T> DataQueue = new Queue<T>();
        private bool _isBusy;
        static readonly object Sync = new object();
        readonly ManualResetEvent _waitHandle = new ManualResetEvent(false);

        internal Function(Func<T, T> func)
        {
            _func = func;
        }

        internal Function<T> Next { get; set; }
        internal Function<T> Start { get; set; }
        internal int Count;

        internal IEnumerable<T> Execute(IEnumerable<T> source)
        {
            var isSingle = true;
            foreach (var element in source) {
                var result = _func(element);
                if (Next != null)
                {
                    Next.ExecuteAsync(result, _waitHandle);
                    isSingle = false;
                }
                else
                    _result.AddNotify(result);
            }
            if (!isSingle)
                _waitHandle.WaitOne();
            return _result;
        }


        internal void ExecuteAsync(T element, ManualResetEvent resetEvent)
        {
            lock(Sync)
            {
                if(_isBusy)
                {
                    DataQueue.Enqueue(element);
                    return;
                }
                _isBusy = true;

                _func.BeginInvoke(element, CallBack, resetEvent);
            }           
        }

        internal void CallBack(IAsyncResult result)
        {
            bool set = false;
            var worker = (Func<T, T>) ((AsyncResult) result).AsyncDelegate;
            var resultElement = worker.EndInvoke(result);
            var resetEvent = result.AsyncState as ManualResetEvent;

            lock(Sync)
            {
                _isBusy = false;
                if(Next != null)
                    Next.ExecuteAsync(resultElement, resetEvent);
                else
                    Start._result.AddNotify(resultElement);

                if(DataQueue.Count > 1)
                {
                    var element = DataQueue.Dequeue();
                    ExecuteAsync(element, resetEvent);
                }
                if(Start._result.Count == Count)
                    set = true;
            }
            if(set)
              resetEvent.Set();
        }
    }

    public static class Pipe
    {
        public static IEnumerable<T> ForEachPipeline<T>(this IEnumerable<T> source, params Func<T, T>[] pipes)
        {
            Function<T> start = null, previous = null;
            foreach (var function in pipes.Select(pipe => new Function<T>(pipe){ Count = source.Count()}))
            {
                if (start == null)
                {
                    start = previous = function;
                    start.Start = function;
                    continue;
                }
                function.Start = start;
                previous.Next = function;
                previous = function;
            }
            return start != null ? start.Execute(source) : null;
        }
    }

你们能帮我让这件事发挥作用吗?如果此设计不适合实际的方法管道,请随时提出不同的设计。

编辑:我必须严格遵守.Net 3.5。

I was trying to develop a method pipeline using asynchronous method invocation. The logic for the pipeline is as follows

  1. There are n data in a collection that have to be fed into m number of methods in a pipeline
  2. Enumerate a collection of T
  3. Feed the first element to the first method
  4. Get the output, feed it to the second method asynchronously
  5. At the same time, feed the second element of the collection to the first method
  6. After completion of the first method, fed the result to the second method (if the second method is still running, put the result into its queue and start executing the third element at first method)
  7. When second method finishes executing take the first element from the queue and execute and so on (every method should run asynchronously, no one should wait for the next to finish)
  8. At the mth method, after executing the data, store the result to a list
  9. After completing nth element at the mth method return the list of the results (n number of results) to the very first level.

I came up with a code as follows, but it did not work as intended, the result never gets returned and moreover it is not executing in the order as it should be.

static class Program
    {
        static void Main(string[] args)
        {
            var list = new List<int> { 1, 2, 3, 4 };
            var result = list.ForEachPipeline(Add, Square, Add, Square);
            foreach (var element in result)
            {
                Console.WriteLine(element);
                Console.WriteLine("---------------------");
            }
            Console.ReadLine();
        }

        private static int Add(int j)
        {
            return j + 1;
        }

        private static int Square(int j)
        {
            return j * j;
        }

        internal static void AddNotify<T>(this List<T> list, T item)
        {
            Console.WriteLine("Adding {0} to the list", item);
            list.Add(item);
        }    
    }

    internal class Function<T>
    {
        private readonly Func<T, T> _func;

        private readonly List<T> _result = new List<T>();
        private readonly Queue<T> DataQueue = new Queue<T>();
        private bool _isBusy;
        static readonly object Sync = new object();
        readonly ManualResetEvent _waitHandle = new ManualResetEvent(false);

        internal Function(Func<T, T> func)
        {
            _func = func;
        }

        internal Function<T> Next { get; set; }
        internal Function<T> Start { get; set; }
        internal int Count;

        internal IEnumerable<T> Execute(IEnumerable<T> source)
        {
            var isSingle = true;
            foreach (var element in source) {
                var result = _func(element);
                if (Next != null)
                {
                    Next.ExecuteAsync(result, _waitHandle);
                    isSingle = false;
                }
                else
                    _result.AddNotify(result);
            }
            if (!isSingle)
                _waitHandle.WaitOne();
            return _result;
        }


        internal void ExecuteAsync(T element, ManualResetEvent resetEvent)
        {
            lock(Sync)
            {
                if(_isBusy)
                {
                    DataQueue.Enqueue(element);
                    return;
                }
                _isBusy = true;

                _func.BeginInvoke(element, CallBack, resetEvent);
            }           
        }

        internal void CallBack(IAsyncResult result)
        {
            bool set = false;
            var worker = (Func<T, T>) ((AsyncResult) result).AsyncDelegate;
            var resultElement = worker.EndInvoke(result);
            var resetEvent = result.AsyncState as ManualResetEvent;

            lock(Sync)
            {
                _isBusy = false;
                if(Next != null)
                    Next.ExecuteAsync(resultElement, resetEvent);
                else
                    Start._result.AddNotify(resultElement);

                if(DataQueue.Count > 1)
                {
                    var element = DataQueue.Dequeue();
                    ExecuteAsync(element, resetEvent);
                }
                if(Start._result.Count == Count)
                    set = true;
            }
            if(set)
              resetEvent.Set();
        }
    }

    public static class Pipe
    {
        public static IEnumerable<T> ForEachPipeline<T>(this IEnumerable<T> source, params Func<T, T>[] pipes)
        {
            Function<T> start = null, previous = null;
            foreach (var function in pipes.Select(pipe => new Function<T>(pipe){ Count = source.Count()}))
            {
                if (start == null)
                {
                    start = previous = function;
                    start.Start = function;
                    continue;
                }
                function.Start = start;
                previous.Next = function;
                previous = function;
            }
            return start != null ? start.Execute(source) : null;
        }
    }

Can you guys please help me to make this thing work? If this design is not good for an actual method pipeline, please feel free to suggest a different one.

Edit: I have to stick to .Net 3.5 strictly.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

诗酒趁年少 2024-10-10 05:33:49

我没有立即在您的代码中发现问题,但您可能使事情变得过于复杂了。这可能是一种更简单的方法来完成您想做的事情。

public static class Pipe 
{
   public static IEnumerable<T> Execute<T>(
      this IEnumerable<T> input, params Func<T, T>[] functions)
   {
      // each worker will put its result in this array
      var results = new T[input.Count()];

      // launch workers and return a WaitHandle for each one
      var waitHandles = input.Select(
         (element, index) =>
         {
            var waitHandle = new ManualResetEvent(false);
            ThreadPool.QueueUserWorkItem(
               delegate
               {
                  T result = element;
                  foreach (var function in functions)
                  {
                     result = function(result);
                  }
                  results[index] = result;
                  waitHandle.Set();
               });
            return waitHandle;
         });

      // wait for each worker to finish
      foreach (var waitHandle in waitHandles)
      {
          waitHandle.WaitOne();
      }
      return results;
   }
}

这不会像您自己的尝试那样为管道的每个阶段创建锁。我省略了它,因为它看起来没什么用。但是,您可以通过包装如下函数轻松添加它:

var wrappedFunctions = functions.Select(x => AddStageLock(x));

其中 AddStageLock 是这样的:

private static Func<T,T> AddStageLock<T>(Func<T,T> function)
{
   object stageLock = new object();
   Func<T, T> wrappedFunction =
      x =>
      {
         lock (stageLock)
         {
            return function(x);
         }
      };
   return wrappedFunction;
}

edit: Execute 实现可能会比单线程执行,除非为每个单独元素完成的工作使创建等待句柄和在线程池上调度任务的开销相形见绌,要真正从多线程中受益,您需要限制开销; .NET 4 中的 PLINQ 通过对数据进行分区来实现此目的< /a>.

I didn't immediately find the problem in your code, but you might be overcomplicating things a bit. This might be a simpler way to do what you want.

public static class Pipe 
{
   public static IEnumerable<T> Execute<T>(
      this IEnumerable<T> input, params Func<T, T>[] functions)
   {
      // each worker will put its result in this array
      var results = new T[input.Count()];

      // launch workers and return a WaitHandle for each one
      var waitHandles = input.Select(
         (element, index) =>
         {
            var waitHandle = new ManualResetEvent(false);
            ThreadPool.QueueUserWorkItem(
               delegate
               {
                  T result = element;
                  foreach (var function in functions)
                  {
                     result = function(result);
                  }
                  results[index] = result;
                  waitHandle.Set();
               });
            return waitHandle;
         });

      // wait for each worker to finish
      foreach (var waitHandle in waitHandles)
      {
          waitHandle.WaitOne();
      }
      return results;
   }
}

This does not create a lock for each stage of the pipeline as in your own attempt. I've omitted that because it did not seem useful. However, you could easily add it by wrapping the functions like this:

var wrappedFunctions = functions.Select(x => AddStageLock(x));

where AddStageLock is this:

private static Func<T,T> AddStageLock<T>(Func<T,T> function)
{
   object stageLock = new object();
   Func<T, T> wrappedFunction =
      x =>
      {
         lock (stageLock)
         {
            return function(x);
         }
      };
   return wrappedFunction;
}

edit: The Execute implementation will probably be slower than single threaded execution, unless the work to be done for each individual element dwarfs the overhead of creating a wait handle and scheduling a task on the thread pool, To really benefit from multi-threading you need to limit the overhead; PLINQ in .NET 4 does this by partitioning the data.

何以笙箫默 2024-10-10 05:33:49

采取管道方法有什么特殊原因吗? IMO,为每个输入启动一个单独的线程,并将所有函数依次链接起来,编写起来会更简单,执行起来也更快。例如,

function T ExecPipe<T>(IEnumerable<Func<T, T>> pipe, T input)
{
  T value = input;
  foreach(var f in pipe)
  {
    value = f(value);
  }
  return value;
}

var pipe = new List<Func<int, int>>() { Add, Square, Add, Square };
var list = new List<int> { 1, 2, 3, 4 };
foreach(var value in list)
{
  ThreadPool.QueueUserWorkItem(o => ExecPipe(pipe, (int)o), value);
}

现在,谈到您的代码,我相信为了使用 M 阶段实现准确的管道实现,您必须恰好有 M 个线程,因为每个阶段都可以并行执行 - 现在,某些线程可能处于空闲状态,因为 i/p 尚未到达它们。我不确定您的代码是否正在启动任何线程以及特定时间的线程数是多少。

Any particular reason for taking pipe-line approach? IMO, launching a separate thread for each input with all functions chained one after another would be simpler to write and faster to execute. For example,

function T ExecPipe<T>(IEnumerable<Func<T, T>> pipe, T input)
{
  T value = input;
  foreach(var f in pipe)
  {
    value = f(value);
  }
  return value;
}

var pipe = new List<Func<int, int>>() { Add, Square, Add, Square };
var list = new List<int> { 1, 2, 3, 4 };
foreach(var value in list)
{
  ThreadPool.QueueUserWorkItem(o => ExecPipe(pipe, (int)o), value);
}

Now, coming to your code, I believe for accurate pipeline implementation with M stage, you must have exactly M threads as each stage can execute in parallel - now, some threads may be idle because i/p has not reached them. I am not certain if your code is launching any threads and what will be the count of thread at particular time.

微暖i 2024-10-10 05:33:49

为什么不为每次迭代中断一个线程并将结果聚合到锁定资源中。你只需要做。可以使用 PLinq 来实现此目的。
我认为您可能将方法误认为是资源。仅当方法正在处理其中包含共享资源的关键块时,才需要锁定该方法。通过选择一个资源并从那里中断到一个新线程,您就无需管理第二种方法。

IE:方法 X 调用方法 1,然后将值传递给方法 2
Foreach 项目 in arr
异步(方法X(项目));

Why dont you break off a thread for each iteration and aggregate your results in a locking resource. You only need to do. Could use PLinq for this.
I think you might be mistaking methods for resources. You only need lock a method if it is dealing with a critical block with a shared resource in it. By picking a resource off and breaking into a new thread from there, you eliminate the need to manage your second method.

I.E.: Method X Calls Method1 then Passes value into Method2
Foreach item in arr
Async(MethodX(item));

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文