将多个 observable 合并到一个 observable 数组中

发布于 2024-11-30 09:47:15 字数 5292 浏览 1 评论 0 原文

您好,我正在尝试将多个可观察值合并到一个可观察数组中。这是一个在 fsi 中运行的示例。 (抱歉,它很长)

#r "./bin/Debug/System.Reactive.dll"

open System
open System.Reactive.Linq

/// Subscribes to the Observable with all 3 callbacks.
let subscribeComplete next error completed (observable: IObservable<'T>) = 
    observable.Subscribe(
        (fun x -> next x),
        (fun e -> error e),
        (fun () -> completed()))

/// Subscribes to the Observable with a next and an error-function.
let subscribeWithError next error observable = 
    subscribeComplete next error (fun () -> ()) observable

/// Subscribes to the Observable with a next-function
let subscribe (next: 'T -> unit) (observable: IObservable<'T>) : IDisposable = 
    subscribeWithError next ignore observable

/// Static method to generate observable from input functions
let ObsGenerate (initState: 'TS) (termCond: 'TS -> bool) (iterStep: 'TS -> 'TS)
        (resSelect: 'TS -> 'TR) (timeSelect : 'TS -> System.TimeSpan) =
            Observable.Generate(initState, termCond, iterStep, resSelect, timeSelect)

//maps the given observable with the given function
let obsMap (f: 'T -> 'U) (observable : IObservable<'T>) : IObservable<'U> =
    Observable.Select(observable, Func<_,_>(f))

/// Merges two observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest (obs1: IObservable<'T>) (obs2: IObservable<'U>) : IObservable<'T * 'U> = 
    Observable.CombineLatest(
        obs1, obs2, Func<_,_,_>(fun a b -> a, b))    

/// Merges three observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest3 (obs1: IObservable<'T>) (obs2: IObservable<'U>) (obs3: IObservable<'V>) : IObservable<'T * 'U * 'V> = 
    let obs12 =obs1.CombineLatest(obs2, Func<_,_,_>(fun a b -> a, b))    
    obs12.CombineLatest(obs3, Func<_,_,_>(fun (a,b) c -> a, b, c))    

/// Merges four observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest4 (obs1: IObservable<'T>) (obs2: IObservable<'U>) (obs3: IObservable<'V>) (obs4: IObservable<'W>) : IObservable<'T * 'U * 'V * 'W> = 
    let obsNew = combineLatest3 obs1 obs2 obs3
    obsNew.CombineLatest(obs4, Func<_,_,_>(fun (a,b,c) d -> a, b, c, d))    

// second section generating arrays
let combineLatestArray (obs1: IObservable<'T>) (obs2: IObservable<'T>) =
    combineLatest obs1 obs2 
    |> obsMap (fun (a, b) -> [a; b] |> List.toArray)

let combineLatest3Array (obs1: IObservable<'T>) (obs2: IObservable<'T>) (obs3: IObservable<'T>) =
    combineLatest3 obs1 obs2 obs3 
    |> obsMap (fun (a, b, c) -> [a; b; c] |> List.toArray)

let combineLatest4Array (obs1: IObservable<'T>) (obs2: IObservable<'T>) (obs3: IObservable<'T>) (obs4: IObservable<'T>) =
    combineLatest4 obs1 obs2 obs3 obs4
    |> obsMap (fun (a, b, c, d) -> [a; b; c; d] |> List.toArray)

let combineLatestListToArray (list: IObservable<'T> List) =
    match list.Length with
    | 2 -> combineLatestArray list.[0] list.[1]
    | 3 -> combineLatest3Array list.[0] list.[1] list.[2]
    | 4 -> combineLatest4Array list.[0] list.[1] list.[2] list.[3]
    | _ -> failwith "combine latest on unsupported list size"

type FooType = 
        {   NameVal :   string
            IdVal   :   int
            RetVal  :   float }

        member x.StringKey() =
            x.NameVal.ToString() + ";" + x.IdVal.ToString()


// example code starts here

let rnd = System.Random()

let fooListeners = Collections.Generic.Dictionary()

let AddAFoo (foo : FooType) =
    let fooId = foo.StringKey()

    if fooListeners.ContainsKey(fooId)
        then fooListeners.[fooId]
    else
        let myObs = ObsGenerate {NameVal = foo.NameVal; IdVal = foo.IdVal; RetVal = foo.RetVal} (fun x -> true) (fun x -> {NameVal = (x.NameVal); IdVal = (x.IdVal); RetVal = (x.RetVal + rnd.NextDouble() - 0.5)}) (fun x -> x) (fun x -> System.TimeSpan.FromMilliseconds(rnd.NextDouble() * 2000.0))
        fooListeners.Add(fooId,myObs)
        myObs

let fooInit =   [6..9]
                |> List.map (fun index -> {NameVal = (string index + "st"); IdVal = index; RetVal = (float index + 1.0)})     
                |> List.map (fun foo -> AddAFoo foo)

let fooValuesArray =    fooInit
                        |> List.map(fun x -> (x |> obsMap (fun x -> x.RetVal)))
                        |> combineLatestListToArray

let mySub =
    fooValuesArray
    |> subscribe (fun fooVals -> printfn "fooArray: %A" fooVals)

//execute until here to start example

// execute this last line to unsubscribe
mySub.Dispose()

我对这段代码有两个问题:

  1. 是否有更智能的方法将可观察值合并到数组? (它变得非常冗长,因为我需要合并更大的数组)

  2. 我想限制更新。我的意思是,我希望在同一个半秒窗口内发生的所有更新都作为数组上的一个更新进行处理。理想情况下,我希望此窗口仅在第一个更新到来时打开,即如果 2 秒内没有更新到达,则有一个更新到达,然后我们等待并包含进一步的更新 0.5 秒,然后触发可观察值。尽管没有触发任何可观察值,但我不希望它每 0.5 秒定期发布一次。我希望这个描述足够清楚。

更新:我决定接受 F# 答案之一,但我还没有公正地对待 C# 答案。我希望能够尽快正确检查它们。

Hi I am trying to merge a number of observables to an observable array. Here an example that works in fsi. (sorry that it is lengthy)

#r "./bin/Debug/System.Reactive.dll"

open System
open System.Reactive.Linq

/// Subscribes to the Observable with all 3 callbacks.
let subscribeComplete next error completed (observable: IObservable<'T>) = 
    observable.Subscribe(
        (fun x -> next x),
        (fun e -> error e),
        (fun () -> completed()))

/// Subscribes to the Observable with a next and an error-function.
let subscribeWithError next error observable = 
    subscribeComplete next error (fun () -> ()) observable

/// Subscribes to the Observable with a next-function
let subscribe (next: 'T -> unit) (observable: IObservable<'T>) : IDisposable = 
    subscribeWithError next ignore observable

/// Static method to generate observable from input functions
let ObsGenerate (initState: 'TS) (termCond: 'TS -> bool) (iterStep: 'TS -> 'TS)
        (resSelect: 'TS -> 'TR) (timeSelect : 'TS -> System.TimeSpan) =
            Observable.Generate(initState, termCond, iterStep, resSelect, timeSelect)

//maps the given observable with the given function
let obsMap (f: 'T -> 'U) (observable : IObservable<'T>) : IObservable<'U> =
    Observable.Select(observable, Func<_,_>(f))

/// Merges two observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest (obs1: IObservable<'T>) (obs2: IObservable<'U>) : IObservable<'T * 'U> = 
    Observable.CombineLatest(
        obs1, obs2, Func<_,_,_>(fun a b -> a, b))    

/// Merges three observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest3 (obs1: IObservable<'T>) (obs2: IObservable<'U>) (obs3: IObservable<'V>) : IObservable<'T * 'U * 'V> = 
    let obs12 =obs1.CombineLatest(obs2, Func<_,_,_>(fun a b -> a, b))    
    obs12.CombineLatest(obs3, Func<_,_,_>(fun (a,b) c -> a, b, c))    

/// Merges four observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest4 (obs1: IObservable<'T>) (obs2: IObservable<'U>) (obs3: IObservable<'V>) (obs4: IObservable<'W>) : IObservable<'T * 'U * 'V * 'W> = 
    let obsNew = combineLatest3 obs1 obs2 obs3
    obsNew.CombineLatest(obs4, Func<_,_,_>(fun (a,b,c) d -> a, b, c, d))    

// second section generating arrays
let combineLatestArray (obs1: IObservable<'T>) (obs2: IObservable<'T>) =
    combineLatest obs1 obs2 
    |> obsMap (fun (a, b) -> [a; b] |> List.toArray)

let combineLatest3Array (obs1: IObservable<'T>) (obs2: IObservable<'T>) (obs3: IObservable<'T>) =
    combineLatest3 obs1 obs2 obs3 
    |> obsMap (fun (a, b, c) -> [a; b; c] |> List.toArray)

let combineLatest4Array (obs1: IObservable<'T>) (obs2: IObservable<'T>) (obs3: IObservable<'T>) (obs4: IObservable<'T>) =
    combineLatest4 obs1 obs2 obs3 obs4
    |> obsMap (fun (a, b, c, d) -> [a; b; c; d] |> List.toArray)

let combineLatestListToArray (list: IObservable<'T> List) =
    match list.Length with
    | 2 -> combineLatestArray list.[0] list.[1]
    | 3 -> combineLatest3Array list.[0] list.[1] list.[2]
    | 4 -> combineLatest4Array list.[0] list.[1] list.[2] list.[3]
    | _ -> failwith "combine latest on unsupported list size"

type FooType = 
        {   NameVal :   string
            IdVal   :   int
            RetVal  :   float }

        member x.StringKey() =
            x.NameVal.ToString() + ";" + x.IdVal.ToString()


// example code starts here

let rnd = System.Random()

let fooListeners = Collections.Generic.Dictionary()

let AddAFoo (foo : FooType) =
    let fooId = foo.StringKey()

    if fooListeners.ContainsKey(fooId)
        then fooListeners.[fooId]
    else
        let myObs = ObsGenerate {NameVal = foo.NameVal; IdVal = foo.IdVal; RetVal = foo.RetVal} (fun x -> true) (fun x -> {NameVal = (x.NameVal); IdVal = (x.IdVal); RetVal = (x.RetVal + rnd.NextDouble() - 0.5)}) (fun x -> x) (fun x -> System.TimeSpan.FromMilliseconds(rnd.NextDouble() * 2000.0))
        fooListeners.Add(fooId,myObs)
        myObs

let fooInit =   [6..9]
                |> List.map (fun index -> {NameVal = (string index + "st"); IdVal = index; RetVal = (float index + 1.0)})     
                |> List.map (fun foo -> AddAFoo foo)

let fooValuesArray =    fooInit
                        |> List.map(fun x -> (x |> obsMap (fun x -> x.RetVal)))
                        |> combineLatestListToArray

let mySub =
    fooValuesArray
    |> subscribe (fun fooVals -> printfn "fooArray: %A" fooVals)

//execute until here to start example

// execute this last line to unsubscribe
mySub.Dispose()

I have two questions about this code:

  1. Is there a smarter way of merging the observables to arrays? (it gets very lengthy as I need to merge larger arrays)

  2. I want to throttle the updates. What I mean by that is that I want all updates that occur within (say) the same half a second window to be handled as one update on the array. Ideally, I want this window to open only when a first update comes in, i.e if no updates arrive in 2 seconds, then one update arrives, then we wait and include further updates for 0.5 seconds and then trigger the observable. I don't want it to publish periodically every 0.5 seconds although no observables are triggered. I hope this description is clear enough.

update: I have decided to accept one of the F# answers, but I haven't done the C# answers justice yet. I hope to be able to check them out properly soon.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

戴着白色围巾的女孩 2024-12-07 09:47:15

对于 1,List.foldList.toArray 的应用程序以及一些 Observable 运算符应该可以很好地工作。例如:

let combineLatest observables =
    Observable.Select(
        (observables 
         |> List.fold (fun ol o 
                         -> Observable.CombineLatest(o, ol, (fun t tl -> t :: tl))
                      ) (Observable.Return<_>([]))
        ), 
        List.toArray)

由于嵌套,如果您有大量 Observables 列表,您可能最终会遇到性能问题,但在您手动编写之前至少值得尝试一下。

对于2,我同意其他答案,对结果应用节流。

For 1, an application of List.fold and List.toArray and a few Observable operators should work nicely. Something like:

let combineLatest observables =
    Observable.Select(
        (observables 
         |> List.fold (fun ol o 
                         -> Observable.CombineLatest(o, ol, (fun t tl -> t :: tl))
                      ) (Observable.Return<_>([]))
        ), 
        List.toArray)

Due to the nesting, you may end up with performance issues if you have a large list of Observables, but it's worth at least trying before you resort to writing it by hand.

For 2, I would agree with the other answers to apply Throttling to the result.

做个ˇ局外人 2024-12-07 09:47:15

抱歉,这不是 F# - 我希望我有时间学习它 - 但这里有一个可能的 C# 答案。

以下是一组扩展方法,可将最新的 IEnumerable> 组合到 IObservable>

public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<T> first, IObservable<T> second)
{
    if (first == null) throw new ArgumentNullException("first");
    if (second == null) throw new ArgumentNullException("second");
    return first.CombineLatest(second, (t0, t1) => EnumerableEx.Return(t0).Concat(EnumerableEx.Return(t1)));
}

public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<IEnumerable<T>> firsts, IObservable<T> second)
{
    if (firsts == null) throw new ArgumentNullException("firsts");
    if (second == null) throw new ArgumentNullException("second");
    return firsts.CombineLatest(second, (t0s, t1) => t0s.Concat(EnumerableEx.Return(t1)));
}

public static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> sources)
{
    if (sources == null) throw new ArgumentNullException("sources");
    return sources.CombineLatest(() => sources.First().CombineLatest(sources.Skip(1)), () => Observable.Empty<IEnumerable<T>>());
}

public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<T> first, IEnumerable<IObservable<T>> seconds)
{
    if (first == null) throw new ArgumentNullException("first");
    if (seconds == null) throw new ArgumentNullException("seconds");
    return seconds.CombineLatest(() => first.CombineLatest(seconds.First()).CombineLatest(seconds.Skip(1)), () => first.Select(t => EnumerableEx.Return(t)));
}

public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<IEnumerable<T>> firsts, IEnumerable<IObservable<T>> seconds)
{
    if (firsts == null) throw new ArgumentNullException("firsts");
    if (seconds == null) throw new ArgumentNullException("seconds");
    return seconds.CombineLatest(() => firsts.CombineLatest(seconds.First()).CombineLatest(seconds.Skip(1)), () => firsts);
}

private static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> sources, Func<IObservable<IEnumerable<T>>> any, Func<IObservable<IEnumerable<T>>> none)
{
    if (sources == null) throw new ArgumentNullException("sources");
    if (any == null) throw new ArgumentNullException("any");
    if (none == null) throw new ArgumentNullException("none");
    return Observable.Defer(() => sources.Any() ? any() : none());
}

它们可能不会非常高效,但它们确实可以处理任意数量的需要组合的可观察量。

我很希望看到这些方法转换为 F#。

至于你的第二个问题,我不确定我可以回答你到目前为止所说的,因为 CombineLatestThrottle 都会丢失值,所以理解它可能是谨慎的在尝试回答之前更详细地了解您的用例。

I'm sorry this isn't F# - I wish I had time to learn it - but here's a possible answer in C#.

Here are a set of extension methods that will combine the latest from an IEnumerable<IObservable<T>> to an IObservable<IEnumerable<T>>:

public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<T> first, IObservable<T> second)
{
    if (first == null) throw new ArgumentNullException("first");
    if (second == null) throw new ArgumentNullException("second");
    return first.CombineLatest(second, (t0, t1) => EnumerableEx.Return(t0).Concat(EnumerableEx.Return(t1)));
}

public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<IEnumerable<T>> firsts, IObservable<T> second)
{
    if (firsts == null) throw new ArgumentNullException("firsts");
    if (second == null) throw new ArgumentNullException("second");
    return firsts.CombineLatest(second, (t0s, t1) => t0s.Concat(EnumerableEx.Return(t1)));
}

public static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> sources)
{
    if (sources == null) throw new ArgumentNullException("sources");
    return sources.CombineLatest(() => sources.First().CombineLatest(sources.Skip(1)), () => Observable.Empty<IEnumerable<T>>());
}

public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<T> first, IEnumerable<IObservable<T>> seconds)
{
    if (first == null) throw new ArgumentNullException("first");
    if (seconds == null) throw new ArgumentNullException("seconds");
    return seconds.CombineLatest(() => first.CombineLatest(seconds.First()).CombineLatest(seconds.Skip(1)), () => first.Select(t => EnumerableEx.Return(t)));
}

public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<IEnumerable<T>> firsts, IEnumerable<IObservable<T>> seconds)
{
    if (firsts == null) throw new ArgumentNullException("firsts");
    if (seconds == null) throw new ArgumentNullException("seconds");
    return seconds.CombineLatest(() => firsts.CombineLatest(seconds.First()).CombineLatest(seconds.Skip(1)), () => firsts);
}

private static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> sources, Func<IObservable<IEnumerable<T>>> any, Func<IObservable<IEnumerable<T>>> none)
{
    if (sources == null) throw new ArgumentNullException("sources");
    if (any == null) throw new ArgumentNullException("any");
    if (none == null) throw new ArgumentNullException("none");
    return Observable.Defer(() => sources.Any() ? any() : none());
}

They may not be very efficient, but they do handle any number of observables that need to be combined.

I'd be keen to see these methods converted to F#.

As for your second question, I'm not sure I can answer with what you've said so far because CombineLatest and Throttle both lose values so it is probably prudent to understand your use case in more detail before attempting an answer.

枕花眠 2024-12-07 09:47:15

尽管吉迪恩·恩格尔伯特(Gideon Engelberth)已经用解决问题的可能方法之一回答了您的问题。其他可能的方式可能如下所示,它不使用嵌套。

let combineLatestToArray (list : IObservable<'T> list) = 
    let s = new Subject<'T array>()
    let arr = Array.init list.Length (fun _ -> Unchecked.defaultof<'T>)
    let cb (i:int,v:'T) = 
        arr.[i] <- v
        s.OnNext(arr |> Array.toList |> List.toArray)
    let main = list |> List.mapi (fun i o -> o.Select(fun t -> (i,t)))
               |> Observable.Merge
    main.Subscribe(new Action<int * 'T>(cb)
                   ,new Action<exn>(fun e -> s.OnError(e)) 
                   ,new Action(fun () -> s.OnCompleted()) ) |> ignore
    s :> IObservable<'T array>

让我知道这是否解决了您的问题,因为我没有对其进行太多测试:)
注意:这是第一部分,对于第二部分,每个人都已经提到了您需要做什么

更新:
另一个实现:

let combineLatestToArray (list : IObservable<'T> list) = 
    let s = new Subject<'T array>()
    let arr = Array.init list.Length (fun _ -> Unchecked.defaultof<'T>)
    let main = list |> List.mapi (fun i o -> o.Select(fun t -> (i,t)))
               |> Observable.Merge
    async {
        try
            let se = main.ToEnumerable() |> Seq.scan (fun ar (i,t) -> Array.set ar i t; ar) arr
            for i in se do
                s.OnNext(i |> Array.toList |> List.toArray)
            s.OnCompleted()
        with
        | :? Exception as ex -> s.OnError(ex)
    } |> Async.Start
    s :> IObservable<'T array>

Although Gideon Engelberth has answered your question with one of the possible way to solve the problem. Other possible way could be something like below, it doesn't use nesting.

let combineLatestToArray (list : IObservable<'T> list) = 
    let s = new Subject<'T array>()
    let arr = Array.init list.Length (fun _ -> Unchecked.defaultof<'T>)
    let cb (i:int,v:'T) = 
        arr.[i] <- v
        s.OnNext(arr |> Array.toList |> List.toArray)
    let main = list |> List.mapi (fun i o -> o.Select(fun t -> (i,t)))
               |> Observable.Merge
    main.Subscribe(new Action<int * 'T>(cb)
                   ,new Action<exn>(fun e -> s.OnError(e)) 
                   ,new Action(fun () -> s.OnCompleted()) ) |> ignore
    s :> IObservable<'T array>

Let me know if this solved your problem as I haven't testing it much :)
NOTE: This is for the first part, for second part everyone has already mentioned what you need to do

UPDATE:
Another implementation :

let combineLatestToArray (list : IObservable<'T> list) = 
    let s = new Subject<'T array>()
    let arr = Array.init list.Length (fun _ -> Unchecked.defaultof<'T>)
    let main = list |> List.mapi (fun i o -> o.Select(fun t -> (i,t)))
               |> Observable.Merge
    async {
        try
            let se = main.ToEnumerable() |> Seq.scan (fun ar (i,t) -> Array.set ar i t; ar) arr
            for i in se do
                s.OnNext(i |> Array.toList |> List.toArray)
            s.OnCompleted()
        with
        | :? Exception as ex -> s.OnError(ex)
    } |> Async.Start
    s :> IObservable<'T array>
纸短情长 2024-12-07 09:47:15
  1. 似乎 Observable.Merge() 具有可变数量的 IObservables 重载,更接近您想要的。

  2. Observable.Buffer() 与时间重载将在此处执行您想要的操作。在“没有事件”的情况下,Buffer 仍然会 OnNext() 一个空列表,让您对该情况做出反应。

  1. Seems that Observable.Merge() which has overloads for variable number of IObservables is closer to what you want.

  2. Observable.Buffer() with the time overloads will do what you want here. In the "no events" situation, Buffer will still OnNext() an empty list, letting you react to that stiuation.

爱情眠于流年 2024-12-07 09:47:15

这是我能想到的最好的办法。我有一段时间一直想解决这个问题。

public static class Extensions
{
    public static IObservable<IEnumerable<T>> CombineLatest<T>(this Observable observable, IEnumerable<IObservable<T>> observableCollection)
    {
        return observableCollection.CombineLatest();
    }

    public static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> observables)
    {
        return observables.Aggregate<IObservable<T>, IObservable<IEnumerable<T>>>
        (
            Observable.Return(Enumerable.Empty<T>()),
            (o, n) => o.CombineLatest
            (
                n,
                (list, t) => list.Concat(EnumerableEx.Return(t))
            )
        );
    }
}

因此,一个示例用法是:

var obs = new List<IObservable<bool>> 
{ 
    Observable.Return(true), 
    Observable.Return(false), 
    Observable.Return(true) 
};

var result = obs.CombineLatest().Select(list => list.All(x => x));
result.Subscribe(Console.WriteLine);
Console.ReadKey();

不过,您必须对知识进行操作,直到所有可观察量都产生了一个值< /em>。这是我在我的场景中需要的,但可能不适合您的场景。

我担心的是所有 .Concat 的性能。在扩展方法中处理可变集合可能会更高效。没有把握。

抱歉,我不认识 F#。有一天我会抽出时间来解决这个问题。

在获得最终的可观察值后,只需使用 .Throttle 运算符即可完成节流。

编辑:这个答案是对神秘性的递归阳

This is the best I could come up with. I've been wanting to solve this for a while.

public static class Extensions
{
    public static IObservable<IEnumerable<T>> CombineLatest<T>(this Observable observable, IEnumerable<IObservable<T>> observableCollection)
    {
        return observableCollection.CombineLatest();
    }

    public static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> observables)
    {
        return observables.Aggregate<IObservable<T>, IObservable<IEnumerable<T>>>
        (
            Observable.Return(Enumerable.Empty<T>()),
            (o, n) => o.CombineLatest
            (
                n,
                (list, t) => list.Concat(EnumerableEx.Return(t))
            )
        );
    }
}

So an example usage would be:

var obs = new List<IObservable<bool>> 
{ 
    Observable.Return(true), 
    Observable.Return(false), 
    Observable.Return(true) 
};

var result = obs.CombineLatest().Select(list => list.All(x => x));
result.Subscribe(Console.WriteLine);
Console.ReadKey();

You would have to operate on the knowledge, though, that the resulting IObservable<IEnumerable<T>> will not fire until all observables have yielded a value. This is what I needed in my scenarios, but might not be appropriate for your scenario.

My worry with this is the performance of all of the .Concats. Might be more performant to deal in a mutable collection in the extension method. Not sure.

Sorry, I don't know F#. I'll get around to it one of these days.

Throttling is just done with the .Throttle operator after you get your final observable.

Edit: This answer is the iterative Ying to Enigmativity's recursive Yang.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文