生成多个 IEnumerable

发布于 2024-11-28 06:09:52 字数 1598 浏览 0 评论 0原文

我有一段代码可以对资产进行计算。有数百万个,所以我想计算流中的所有内容。我当前的“管道”如下所示:

我有一个作为数据读取器执行的查询。

然后我的 Asset 类有一个接受 IDataReader 的构造函数;

Public Asset(IdataReader rdr){
  // logic that initiates fields
}

以及将 IDataReader 转换为 IEnumerable的方法。

public static IEnumerable<Asset> ToAssets(IDataReader rdr) {

    // make sure the reader is in the right formt
    CheckReaderFormat(rdr);

    // project reader into IEnumeable<Asset>
    while (rdr.Read()) yield return new Asset(rdr);

}

然后将其传递到执行实际计算的函数中,然后将其投影到 IEnumerable中。

然后获取一个包装器,将答案公开为 IDataReader,然后将其传递给 OracleBulkCopy,并将流写入数据库。

到目前为止,它就像一个魅力。由于设置的原因,我可以将 DataReader 替换为从文件读取的 IEnumerable,或者将结果写入文件等。所有这些都取决于我如何将类/函数串在一起。

现在:我可以计算一些东西,例如,除了正常的答案之外,我还可以有一个 DebugAnswer 类,它还输出一些用于调试的中间数字。所以我想做的是将 IEnumerable 投影到多个输出流中,这样我就可以在这些输出流上放置“侦听器”。这样我就不必多次检查数据。我怎样才能做到这一点?有点像有多个事件,然后仅在附加侦听器时才触发某些代码。

有时我也会写入数据库,但也会写入 zip 文件,只是为了保留结果的备份。那么我想在 IEnumerable 上有 2 个“监听器”。一个项目是作为 IDataReader,另一个项目是直接写入文件。

如何输出多个输出流以及如何在一个输出流上放置多个侦听器?是什么让我能够组成这样的数据流?

编辑

所以我想做一些伪代码:

foreach(Asset in Assets){
   if(DebugListener != null){
     // compute 
     DebugAnswer da = new DebugAnswer {result = 100};
     yield da to DebugListener;  // so instead of yield return yield to that stream

   }

   if(AnswerListener != null){
     // compute basic stuff 
     Answer a = new Answer { bla = 200 };
     yield a to AnswerListener;
   }
}

提前致谢,

Gert-Jan

I have an piece of code that does calculations on assets. There are many millions of those so I want to compute everything in streams. My current 'pipeline' looks like this:

I have a query that is executed as a Datareader.

Then my Asset class has a constructor that accepts an IDataReader;

Public Asset(IdataReader rdr){
  // logic that initiates fields
}

and a method that converts the IDataReader to an IEnumerable<Asset>

public static IEnumerable<Asset> ToAssets(IDataReader rdr) {

    // make sure the reader is in the right formt
    CheckReaderFormat(rdr);

    // project reader into IEnumeable<Asset>
    while (rdr.Read()) yield return new Asset(rdr);

}

That then gets passed into a function that does the actually calculations and then projects it into a IEnumerable<Asnwer>

That then gets a wrapper the exposes the Answers as an IDataReader and that then that gets passed to a OracleBulkCopy and the stream is written to the DB.

So far it works like a charm. Because of the setup I can swap the DataReader for an IEnumerable that reads from a file, or have the results written to a file etc. All depending on how I string the classes/ functions together.

Now: There are several thing I can compute, for instance besides the normal Answer I could have a DebugAnswer class that also outputs some intermediate numbers for debugging. So what I would like to do is project the IEnumerable into several output streams so I can put 'listeners' on those. That way I won't have to go over the data multiple times. How can I do that? Kind of like having several Events and then only fire certain code if there's a listeners attached.

Also sometimes I write to the DB but also to a zipfile just to keep a backup of the results. So then I would like to have 2 'listeners' on the IEnumerable. One that projects is as an IDataReader and another one that writes straight to the file.

How do I output multiple output streams and how can I put multiple listeners on one outputstream? What lets me compose streams of data like that?

edit

so some pseudocode of what I would like to do:

foreach(Asset in Assets){
   if(DebugListener != null){
     // compute 
     DebugAnswer da = new DebugAnswer {result = 100};
     yield da to DebugListener;  // so instead of yield return yield to that stream

   }

   if(AnswerListener != null){
     // compute basic stuff 
     Answer a = new Answer { bla = 200 };
     yield a to AnswerListener;
   }
}

Thanks in advance,

Gert-Jan

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

2024-12-05 06:09:52

您所描述的内容听起来有点像响应式框架通过 IObservable 接口提供的内容,但我不确定它是否允许多个订阅者访问单个订阅流。

更新

如果您查看IObservable的文档 ,它有一个很好的示例,说明如何使用单个对象的多个订阅者来完成您正在做的事情。

What you're describing sounds sort of like what the Reactive framework provides via the IObservable interface, but I don't know for sure whether it allows multiple subscribers to a single subscription stream.

Update

If you take a look at the documentation for IObservable, it has a pretty good example of how to do the sort of thing you're doing, with multiple subscribers to a single object.

箹锭⒈辈孓 2024-12-05 06:09:52

使用 Rx 重写的示例:

// The stream of assets
IObservable<Asset> assets = ...

// The stream of each asset projected to a DebugAnswer
IObservable<DebugAnswer> debugAnswers = from asset in assets
                                        select new DebugAnswer { result = 100 };

// Subscribe the DebugListener to receive the debugAnswers
debugAnswers.Subscribe(DebugListener);

// The stream of each asset projected to an Anwer
IObservable<Answer> answers = from asset in assets
                              select new Answer { bla = 200 };

// Subscribe the AnswerListener to receive the answers
answers.Subscribe(AnswerListener);

Your example rewritten using Rx:

// The stream of assets
IObservable<Asset> assets = ...

// The stream of each asset projected to a DebugAnswer
IObservable<DebugAnswer> debugAnswers = from asset in assets
                                        select new DebugAnswer { result = 100 };

// Subscribe the DebugListener to receive the debugAnswers
debugAnswers.Subscribe(DebugListener);

// The stream of each asset projected to an Anwer
IObservable<Answer> answers = from asset in assets
                              select new Answer { bla = 200 };

// Subscribe the AnswerListener to receive the answers
answers.Subscribe(AnswerListener);
怪异←思 2024-12-05 06:09:52

这正是响应式扩展(成为.NET的一部分)的工作从 4.0 开始,在 3.5 中作为库提供)。

This is exactly the job for Reactive Extensions (became part of .NET since 4.0, available as a library in 3.5).

你与昨日 2024-12-05 06:09:52

您不需要多个“侦听器”,您只需要非破坏性甚至不一定可转换的管道组件。

IEnumerable<T> PassThroughEnumerable<T>(IEnumerable<T> source, Action<T> action) {
    foreach (T t in source) {
       Action(t);
       yield return t;
    }    
}

或者,当您在管道中处理时,只需引发一些要使用的事件。如果需要,您可以异步它们:

static IEnumerable<Asset> ToAssets(IDataReader rdr) {
   CheckReaderFormat(rdr);
   var h = this.DebugAsset;
   while (rdr.Read()) {
      var a = new Asset(rdr);
      if (h != null) h(a);
      yield return a;
   }
}

public event EventHandler<Asset> DebugAsset;

You don't need multiple "listeners", you just need pipeline components that aren't destructive or even necessarily transformable.

IEnumerable<T> PassThroughEnumerable<T>(IEnumerable<T> source, Action<T> action) {
    foreach (T t in source) {
       Action(t);
       yield return t;
    }    
}

Or, as you're processing in the pipeline just raise some events to be consumed. You can async them if you want:

static IEnumerable<Asset> ToAssets(IDataReader rdr) {
   CheckReaderFormat(rdr);
   var h = this.DebugAsset;
   while (rdr.Read()) {
      var a = new Asset(rdr);
      if (h != null) h(a);
      yield return a;
   }
}

public event EventHandler<Asset> DebugAsset;
失去的东西太少 2024-12-05 06:09:52

如果我的理解正确,应该可以替换或装饰包装器。 WrapperDecorator 可能会将调用转发到普通的 OracleBulkCopy (或您正在使用的任何内容)并添加一些自定义调试代码。

这对你有帮助吗?

马蒂亚斯

If I got you right, it should be possible to replace or decorate the wrapper. The WrapperDecorator may forward calls to the normal OracleBulkCopy (or whatever you're using) and add some custom debug code.

Does that help you?

Matthias

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文