将多个 observable 合并到一个 observable 数组中
您好,我正在尝试将多个可观察值合并到一个可观察数组中。这是一个在 fsi 中运行的示例。 (抱歉,它很长)
#r "./bin/Debug/System.Reactive.dll"
open System
open System.Reactive.Linq
/// Subscribes to the Observable with all 3 callbacks.
let subscribeComplete next error completed (observable: IObservable<'T>) =
observable.Subscribe(
(fun x -> next x),
(fun e -> error e),
(fun () -> completed()))
/// Subscribes to the Observable with a next and an error-function.
let subscribeWithError next error observable =
subscribeComplete next error (fun () -> ()) observable
/// Subscribes to the Observable with a next-function
let subscribe (next: 'T -> unit) (observable: IObservable<'T>) : IDisposable =
subscribeWithError next ignore observable
/// Static method to generate observable from input functions
let ObsGenerate (initState: 'TS) (termCond: 'TS -> bool) (iterStep: 'TS -> 'TS)
(resSelect: 'TS -> 'TR) (timeSelect : 'TS -> System.TimeSpan) =
Observable.Generate(initState, termCond, iterStep, resSelect, timeSelect)
//maps the given observable with the given function
let obsMap (f: 'T -> 'U) (observable : IObservable<'T>) : IObservable<'U> =
Observable.Select(observable, Func<_,_>(f))
/// Merges two observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest (obs1: IObservable<'T>) (obs2: IObservable<'U>) : IObservable<'T * 'U> =
Observable.CombineLatest(
obs1, obs2, Func<_,_,_>(fun a b -> a, b))
/// Merges three observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest3 (obs1: IObservable<'T>) (obs2: IObservable<'U>) (obs3: IObservable<'V>) : IObservable<'T * 'U * 'V> =
let obs12 =obs1.CombineLatest(obs2, Func<_,_,_>(fun a b -> a, b))
obs12.CombineLatest(obs3, Func<_,_,_>(fun (a,b) c -> a, b, c))
/// Merges four observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest4 (obs1: IObservable<'T>) (obs2: IObservable<'U>) (obs3: IObservable<'V>) (obs4: IObservable<'W>) : IObservable<'T * 'U * 'V * 'W> =
let obsNew = combineLatest3 obs1 obs2 obs3
obsNew.CombineLatest(obs4, Func<_,_,_>(fun (a,b,c) d -> a, b, c, d))
// second section generating arrays
let combineLatestArray (obs1: IObservable<'T>) (obs2: IObservable<'T>) =
combineLatest obs1 obs2
|> obsMap (fun (a, b) -> [a; b] |> List.toArray)
let combineLatest3Array (obs1: IObservable<'T>) (obs2: IObservable<'T>) (obs3: IObservable<'T>) =
combineLatest3 obs1 obs2 obs3
|> obsMap (fun (a, b, c) -> [a; b; c] |> List.toArray)
let combineLatest4Array (obs1: IObservable<'T>) (obs2: IObservable<'T>) (obs3: IObservable<'T>) (obs4: IObservable<'T>) =
combineLatest4 obs1 obs2 obs3 obs4
|> obsMap (fun (a, b, c, d) -> [a; b; c; d] |> List.toArray)
let combineLatestListToArray (list: IObservable<'T> List) =
match list.Length with
| 2 -> combineLatestArray list.[0] list.[1]
| 3 -> combineLatest3Array list.[0] list.[1] list.[2]
| 4 -> combineLatest4Array list.[0] list.[1] list.[2] list.[3]
| _ -> failwith "combine latest on unsupported list size"
type FooType =
{ NameVal : string
IdVal : int
RetVal : float }
member x.StringKey() =
x.NameVal.ToString() + ";" + x.IdVal.ToString()
// example code starts here
let rnd = System.Random()
let fooListeners = Collections.Generic.Dictionary()
let AddAFoo (foo : FooType) =
let fooId = foo.StringKey()
if fooListeners.ContainsKey(fooId)
then fooListeners.[fooId]
else
let myObs = ObsGenerate {NameVal = foo.NameVal; IdVal = foo.IdVal; RetVal = foo.RetVal} (fun x -> true) (fun x -> {NameVal = (x.NameVal); IdVal = (x.IdVal); RetVal = (x.RetVal + rnd.NextDouble() - 0.5)}) (fun x -> x) (fun x -> System.TimeSpan.FromMilliseconds(rnd.NextDouble() * 2000.0))
fooListeners.Add(fooId,myObs)
myObs
let fooInit = [6..9]
|> List.map (fun index -> {NameVal = (string index + "st"); IdVal = index; RetVal = (float index + 1.0)})
|> List.map (fun foo -> AddAFoo foo)
let fooValuesArray = fooInit
|> List.map(fun x -> (x |> obsMap (fun x -> x.RetVal)))
|> combineLatestListToArray
let mySub =
fooValuesArray
|> subscribe (fun fooVals -> printfn "fooArray: %A" fooVals)
//execute until here to start example
// execute this last line to unsubscribe
mySub.Dispose()
我对这段代码有两个问题:
-
是否有更智能的方法将可观察值合并到数组? (它变得非常冗长,因为我需要合并更大的数组)
-
我想限制更新。我的意思是,我希望在同一个半秒窗口内发生的所有更新都作为数组上的一个更新进行处理。理想情况下,我希望此窗口仅在第一个更新到来时打开,即如果 2 秒内没有更新到达,则有一个更新到达,然后我们等待并包含进一步的更新 0.5 秒,然后触发可观察值。尽管没有触发任何可观察值,但我不希望它每 0.5 秒定期发布一次。我希望这个描述足够清楚。
更新:我决定接受 F# 答案之一,但我还没有公正地对待 C# 答案。我希望能够尽快正确检查它们。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(5)
对于 1,
List.fold
和List.toArray
的应用程序以及一些Observable
运算符应该可以很好地工作。例如:由于嵌套,如果您有大量 Observables 列表,您可能最终会遇到性能问题,但在您手动编写之前至少值得尝试一下。
对于2,我同意其他答案,对结果应用节流。
For 1, an application of
List.fold
andList.toArray
and a fewObservable
operators should work nicely. Something like:Due to the nesting, you may end up with performance issues if you have a large list of Observables, but it's worth at least trying before you resort to writing it by hand.
For 2, I would agree with the other answers to apply Throttling to the result.
抱歉,这不是 F# - 我希望我有时间学习它 - 但这里有一个可能的 C# 答案。
以下是一组扩展方法,可将最新的
IEnumerable>
组合到IObservable>
:它们可能不会非常高效,但它们确实可以处理任意数量的需要组合的可观察量。
我很希望看到这些方法转换为 F#。
至于你的第二个问题,我不确定我可以回答你到目前为止所说的,因为
CombineLatest
和Throttle
都会丢失值,所以理解它可能是谨慎的在尝试回答之前更详细地了解您的用例。I'm sorry this isn't F# - I wish I had time to learn it - but here's a possible answer in C#.
Here are a set of extension methods that will combine the latest from an
IEnumerable<IObservable<T>>
to anIObservable<IEnumerable<T>>
:They may not be very efficient, but they do handle any number of observables that need to be combined.
I'd be keen to see these methods converted to F#.
As for your second question, I'm not sure I can answer with what you've said so far because
CombineLatest
andThrottle
both lose values so it is probably prudent to understand your use case in more detail before attempting an answer.尽管吉迪恩·恩格尔伯特(Gideon Engelberth)已经用解决问题的可能方法之一回答了您的问题。其他可能的方式可能如下所示,它不使用嵌套。
让我知道这是否解决了您的问题,因为我没有对其进行太多测试:)
注意:这是第一部分,对于第二部分,每个人都已经提到了您需要做什么
更新:
另一个实现:
Although Gideon Engelberth has answered your question with one of the possible way to solve the problem. Other possible way could be something like below, it doesn't use nesting.
Let me know if this solved your problem as I haven't testing it much :)
NOTE: This is for the first part, for second part everyone has already mentioned what you need to do
UPDATE:
Another implementation :
似乎
Observable.Merge()
具有可变数量的IObservables
重载,更接近您想要的。Observable.Buffer()
与时间重载将在此处执行您想要的操作。在“没有事件”的情况下,Buffer 仍然会 OnNext() 一个空列表,让您对该情况做出反应。Seems that
Observable.Merge()
which has overloads for variable number ofIObservables
is closer to what you want.Observable.Buffer()
with the time overloads will do what you want here. In the "no events" situation, Buffer will still OnNext() an empty list, letting you react to that stiuation.这是我能想到的最好的办法。我有一段时间一直想解决这个问题。
因此,一个示例用法是:
不过,您必须对知识进行操作,直到所有可观察量都产生了一个值< /em>。这是我在我的场景中需要的,但可能不适合您的场景。
我担心的是所有 .Concat 的性能。在扩展方法中处理可变集合可能会更高效。没有把握。
抱歉,我不认识 F#。有一天我会抽出时间来解决这个问题。
在获得最终的可观察值后,只需使用
.Throttle
运算符即可完成节流。编辑:这个答案是对神秘性的递归阳。
This is the best I could come up with. I've been wanting to solve this for a while.
So an example usage would be:
You would have to operate on the knowledge, though, that the resulting
IObservable<IEnumerable<T>>
will not fire until all observables have yielded a value. This is what I needed in my scenarios, but might not be appropriate for your scenario.My worry with this is the performance of all of the .Concats. Might be more performant to deal in a mutable collection in the extension method. Not sure.
Sorry, I don't know F#. I'll get around to it one of these days.
Throttling is just done with the
.Throttle
operator after you get your final observable.Edit: This answer is the iterative Ying to Enigmativity's recursive Yang.