如何在不丢弃 RX 中的值的情况下减慢 Observable 的速度?

发布于 2024-11-16 04:28:10 字数 648 浏览 2 评论 0原文

我的场景: 我有一个计算应该每秒运行一次。运行后,应该等待大约 200 毫秒,让其他东西赶上。如果一秒后计算仍在运行,则应重新启动,但程序应等到计算完成,并在完成后 200ms 开始下一次计算。

我现在这样做的方式:

_refreshFinished = new Subject<bool>();
_autoRefresher = Observable.Interval(TimeSpan.FromMilliseconds(1000))
   .Zip(_refreshFinished, (x,y) => x)
   .Subscribe(x => AutoRefresh(stuff));

这段代码的问题是,我认为没有办法在计算完成后添加延迟。 Delay 方法仅延迟可观察集合的第一个元素。通常这种行为一次是正确的,因为如果你想缓冲每个人,你就必须缓冲无数的元素,但由于延迟对 Autorefesh 的调用 200 毫秒也会延迟 _refreshFinished 的输出 200 毫秒,因此不会有缓冲区开销。 基本上我想要一个 Observable 每隔 MaxTime(some_call,1000ms) 触发一次,然后延迟 200ms 甚至更好,一些动态值。在这一点上,我什至并不真正关心贯穿其中的价值观,尽管将来可能会改变。

我愿意接受任何建议

My scenario:
I have a computation that should be run about once a second. After it is run there should be a wait of about 200ms for other stuff to catch up. If the compuation is still running after a second it should be started a second time, but should the program should wait until it is finished and start the next computation 200ms after finishing.

The way I am doing it now:

_refreshFinished = new Subject<bool>();
_autoRefresher = Observable.Interval(TimeSpan.FromMilliseconds(1000))
   .Zip(_refreshFinished, (x,y) => x)
   .Subscribe(x => AutoRefresh(stuff));

The problem with this code is, that i see no way to put in a delay after a computation finished.
The Delay method only delays the first element of the observable collection. Usually this behaviour is the right once, since you would have to buffer an endless amount of elements if you wanted to buffer everyone, but since delaying the call to Autorefesh by 200ms delays the output of _refreshFinished by 200ms as well there would be no buffer overhead.
Basicly I want an Observable that fires every every MaxTime(some_call,1000ms) then gets delayed by 200ms or even better, some dynamic value. At this point i dont even really care about the values that are running through this, although that might change in the future.

I´m open to any suggestions

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

感受沵的脚步 2024-11-23 04:28:11

这听起来更像是新异步框架的工作 http://msdn.microsoft.com/ zh-cn/vstudio/gg316360

This sounds more like a job for the new async framework http://msdn.microsoft.com/en-us/vstudio/gg316360

稍尽春風 2024-11-23 04:28:11

有一种方法可以做到。这不是最简单的事情,因为等待时间必须根据每个值动态计算,但它有效并且非常通用。

当您使用此代码时,您只需插入应在 YOURCODE 中调用的代码,其他一切都会自动运行。你的代码基本上会被调用每隔Max(yourCodeTime+extraDelay,usualCallTime+extraDelay)。这意味着您的代码不会同时被调用两次,并且应用程序将始终有额外的延迟时间来执行其他操作。
如果有一些更简单/其他的方法可以做到这一点,我很想听听。

double usualCallTime = 1000;
double extraDealy = 100;
var subject = new Subject<double>();
var subscription =
    sub.TimeInterval()
        .Select(x =>
            {
                var processingTime = x.Interval.TotalMilliseconds - x.Value;
                double timeToWait = 
                     Math.Max(0, usualCallTime - processingTime) + extraDelay;
                return Observable.Timer(TimeSpan.FromMilliseconds(timeToWait))
                    .Select(ignore => timeToWait);
            })
        .Switch()
        .Subscribe(x => {YOURCODE();sub.OnNext(x)});
sub.OnNext(0);

private static void YOURCODE()
{
    // do stuff here
    action.Invoke();
}

There is a way to do it. Its not the easiest thing ever, since the wait time has to be dynamicly calculated on each value but it works and is pretty generic.

When you use thise code you can just insert the code that should be called in YOURCODE and everything else works automaticly. You code will be basicly be called every Max(yourCodeTime+extraDelay,usualCallTime+extraDelay). This means yourCode wont be called twice at the same time and the app will always have extraDelay of time to do other stuff.
If there is some easier/other way to do this i would ove to hear it.

double usualCallTime = 1000;
double extraDealy = 100;
var subject = new Subject<double>();
var subscription =
    sub.TimeInterval()
        .Select(x =>
            {
                var processingTime = x.Interval.TotalMilliseconds - x.Value;
                double timeToWait = 
                     Math.Max(0, usualCallTime - processingTime) + extraDelay;
                return Observable.Timer(TimeSpan.FromMilliseconds(timeToWait))
                    .Select(ignore => timeToWait);
            })
        .Switch()
        .Subscribe(x => {YOURCODE();sub.OnNext(x)});
sub.OnNext(0);

private static void YOURCODE()
{
    // do stuff here
    action.Invoke();
}
终难愈 2024-11-23 04:28:11

如果我正确理解您的问题,您有一个长时间运行的计算函数,如下所示:

static String compute()
{
    int t = 300 + new Random().Next(1000);
    Console.Write("[{0}...", t);
    Thread.Sleep(t);
    Console.Write("]");
    return Guid.NewGuid().ToString();
}

并且您希望每秒至少调用一次该函数,但没有重叠调用,并且调用之间的恢复时间至少为 200 毫秒。下面的代码适用于这种情况。

我从一种更实用的方法开始(使用 Scan()Timestamp()),更多地采用 Rx 风格——因为我正在寻找一个好的 Rx 练习—— - 但最终,这种非聚合方法更加简单。

static void Main()
{
    TimeSpan period = TimeSpan.FromMilliseconds(1000);
    TimeSpan recovery = TimeSpan.FromMilliseconds(200);

    Observable
        .Repeat(Unit.Default)
        .Select(_ =>
        {
            var s = DateTimeOffset.Now;
            var x = compute();
            var delay = period - (DateTimeOffset.Now - s);
            if (delay < recovery)
                delay = recovery;

            Console.Write("+{0} ", (int)delay.TotalMilliseconds);

            return Observable.Return(x).Delay(delay).First();
        })
        .Subscribe(Console.WriteLine);
}

这是输出:

[1144...]+200 a7cb5d3d-34b9-4d44-95c9-3e363f518e52
[1183...]+200 359ad966-3be7-4027-8b95-1051e3fb20c2
[831...]+200 f433b4dc-d075-49fe-9c84-b790274982d9
[766...]+219 310c9521-7bee-4acc-bbca-81c706a4632a
[505...]+485 0715abfc-db9b-42e2-9ec7-880d7ff58126
[1244...]+200 30a3002a-924a-4a64-9669-095152906d85
[1284...]+200 e5b1cd79-da73-477c-bca0-0870f4b5c640
[354...]+641 a43c9df5-53e8-4b58-a0df-7561cf4b0483
[1094...]+200 8f25019c-77a0-4507-b05e-c9ab8b34bcc3
[993...]+200 840281bd-c8fd-4627-9324-372636f8dea3

[编辑:此示例使用 Rx 2.0(RC) 2.0.20612.0]

If I understand your problem correctly, you have a long-running compute function such as this:

static String compute()
{
    int t = 300 + new Random().Next(1000);
    Console.Write("[{0}...", t);
    Thread.Sleep(t);
    Console.Write("]");
    return Guid.NewGuid().ToString();
}

And you want to call this function at least once per second but without overlapping calls, and with a minimum 200ms recovery time between calls. The code below works for this situation.

I started with a more functional approach (using Scan() and Timestamp()), more in the style of Rx--because I was looking for a good Rx exercise--but in the end, this non-aggregating approach was just simpler.

static void Main()
{
    TimeSpan period = TimeSpan.FromMilliseconds(1000);
    TimeSpan recovery = TimeSpan.FromMilliseconds(200);

    Observable
        .Repeat(Unit.Default)
        .Select(_ =>
        {
            var s = DateTimeOffset.Now;
            var x = compute();
            var delay = period - (DateTimeOffset.Now - s);
            if (delay < recovery)
                delay = recovery;

            Console.Write("+{0} ", (int)delay.TotalMilliseconds);

            return Observable.Return(x).Delay(delay).First();
        })
        .Subscribe(Console.WriteLine);
}

Here's the output:

[1144...]+200 a7cb5d3d-34b9-4d44-95c9-3e363f518e52
[1183...]+200 359ad966-3be7-4027-8b95-1051e3fb20c2
[831...]+200 f433b4dc-d075-49fe-9c84-b790274982d9
[766...]+219 310c9521-7bee-4acc-bbca-81c706a4632a
[505...]+485 0715abfc-db9b-42e2-9ec7-880d7ff58126
[1244...]+200 30a3002a-924a-4a64-9669-095152906d85
[1284...]+200 e5b1cd79-da73-477c-bca0-0870f4b5c640
[354...]+641 a43c9df5-53e8-4b58-a0df-7561cf4b0483
[1094...]+200 8f25019c-77a0-4507-b05e-c9ab8b34bcc3
[993...]+200 840281bd-c8fd-4627-9324-372636f8dea3

[edit: this sample uses Rx 2.0(RC) 2.0.20612.0]

一口甜 2024-11-23 04:28:11

假设您有一个现有的 'IObservable' ,那么以下内容将起作用

var delay = TimeSpan.FromSeconds(1.0);
var actual = source.Scan(
    new ConcurrentQueue<object>(),
    (q, i) =>
        {
            q.Enqueue(i);
            return q;
        }).CombineLatest(
            Observable.Interval(delay),
            (q, t) =>
                {
                    object item;
                    if (q.TryDequeue(out item))
                    {
                        return item;
                    }

                    return null;
                }).Where(v => v != null);

'actual' 是您的结果可观察值。但请记住,如果它还不是热的,上面的代码已将其转换为热可观察值。所以你不会被调用“OnCompleted”。

Suppose you have an existing 'IObservable' , then the following will work

var delay = TimeSpan.FromSeconds(1.0);
var actual = source.Scan(
    new ConcurrentQueue<object>(),
    (q, i) =>
        {
            q.Enqueue(i);
            return q;
        }).CombineLatest(
            Observable.Interval(delay),
            (q, t) =>
                {
                    object item;
                    if (q.TryDequeue(out item))
                    {
                        return item;
                    }

                    return null;
                }).Where(v => v != null);

'actual' is your resultant observable. But keep in mind that the above code has turned that into a Hot observable if it wasn't hot already. So you won't get 'OnCompleted' called.

小霸王臭丫头 2024-11-23 04:28:10

Observable.Generate() 有许多重载,可以让您动态调整创建下一个项目的时间。

例如

IScheduler schd = Scheduler.TaskPool;
var timeout = TimeSpan.FromSeconds(1);
var shortDelay = TimeSpan.FromMilliseconds(200);
var longerDelay = TimeSpan.FromMilliseconds(500);
Observable.Generate(schd.Now, 
                    time => true, 
                    time => schd.Now, 
                    time => new object(), // your code here
                    time => schd.Now.Subtract(time) > timeout  ? shortDelay : longerDelay ,
                    schd);

Observable.Generate() has a number of overloads which will let you dynamically adjust the time in which the next item is created.

For instance

IScheduler schd = Scheduler.TaskPool;
var timeout = TimeSpan.FromSeconds(1);
var shortDelay = TimeSpan.FromMilliseconds(200);
var longerDelay = TimeSpan.FromMilliseconds(500);
Observable.Generate(schd.Now, 
                    time => true, 
                    time => schd.Now, 
                    time => new object(), // your code here
                    time => schd.Now.Subtract(time) > timeout  ? shortDelay : longerDelay ,
                    schd);
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文