udp之订购 react 性扩展事件

kevingrace 阅读:65 2024-05-22 17:00:29 评论:0

我正在多个线程中的UDP上接收消息。每次接待后,我都会提出MessageReceived.OnNext(message)

因为我使用多个线程,所以消息无序引发,这是一个问题。

如何通过消息计数器命令消息的引发?
(让我们说有一个message.counter属性)

必须记住,一条消息可能会在通信中丢失(如果在X消息未填满之后我们有一个埋藏的漏洞,我会提出下一条消息)

必须尽快引发消息(如果收到下一个计数器)

请您参考如下方法:

在说明检测丢失消息的要求时,您没有考虑到最后一条消息未到达的可能性。我添加了一个timeoutDuration,如果在给定时间内没有任何响应,则刷新缓冲的消息-您可能想将其视为错误,请参阅注释以了解如何执行此操作。

我将通过定义具有以下签名的扩展方法来解决此问题:

public static IObservable<TSource> Sort<TSource>( 
    this IObservable<TSource> source, 
    Func<TSource, int> keySelector, 
    TimeSpan timeoutDuration = new TimeSpan(), 
    int gapTolerance = 0) 
  • source是未排序消息流
  • keySelector是从消息中提取int密钥的功能。我假设寻找的第一个键是0;必要时进行修改。
  • 上面讨论了
  • timeoutDuration,如果省略,则没有超时
  • tolerance是等待出现故障的消息时保留的最大消息数。传递0以保存任意数量的消息
  • scheduler是用于超时的调度程序,用于测试目的,如果未指定,则使用默认值。

  • 演练

    我将在此处逐行演示。下面重复完整的实现。

    分配默认计划程序

    首先,如果未提供默认调度程序,则必须分配一个默认调度程序:
    scheduler = scheduler ?? Scheduler.Default; 
    

    安排超时

    现在,如果请求超时,我们将使用一个副本替换源,该副本将简单地终止,如果消息没有到达 OnCompleted,则发送 timeoutDuration
    if(timeoutDuration != TimeSpan.Zero) 
        source = source.Timeout( 
            timeoutDuration, 
            Observable.Empty<TSource>(), 
            scheduler); 
    

    如果您希望发送 TimeoutException,只需删除 Timeout的第二个参数-空流,以选择执行此操作的重载。请注意,我们可以安全地与所有订阅者共享此文件,因此它位于 Observable.Create的调用之外。

    创建订阅处理程序

    我们使用 Observable.Create构建流。每当发生订阅时,即会调用作为 Create的参数的lambda函数,并将调用观察者( o)传递给我们。 Create返回我们的 IObservable<T>,因此我们在这里返回它。
    return Observable.Create<TSource>(o => { ... 
    

    初始化一些变量

    我们将在 nextKey中跟踪下一个预期的键值,并创建一个 SortedDictionary来保存乱序消息,直到可以发送它们为止。
    int nextKey = 0;   
    var buffer = new SortedDictionary<int, TSource>(); 
    

    订阅源并处理消息

    现在,我们可以订阅消息流(可能已应用超时)。首先,我们介绍 OnNext处理程序。下一条消息分配给 x:
    return source.Subscribe(x => { ... 
    

    我们调用 keySelector函数从消息中提取密钥:
    var key = keySelector(x); 
    

    如果消息具有旧密钥(因为它超出了我们对乱序消息的容忍度),我们将删除它并完成此消息(您可能希望采取其他措施):
    // drop stale keys 
    if(key < nextKey) return; 
    

    否则,我们可能会有预期的密钥,在这种情况下,我们可以增加 nextKey发送消息:
    if(key == nextKey) 
    { 
        nextKey++; 
        o.OnNext(x);                     
    } 
    

    或者,我们将来可能会出现故障消息,在这种情况下,必须将其添加到缓冲区中。如果这样做,我们还必须确保缓冲区没有超出存储乱序消息的容限-在这种情况下,我们还将 nextKey碰到缓冲区中的第一个键,因为它是 SortedDictionary,所以方便地使用下一个最低键:
    else if(key > nextKey) 
    { 
        buffer.Add(key, x); 
        if(gapTolerance != 0 && buffer.Count > gapTolerance) 
            nextKey = buffer.First().Key; 
    } 
    

    现在,不管上面的结果如何,我们都需要清空所有准备就绪的键的缓冲区。为此,我们使用辅助方法。请注意,它会调整 nextKey,因此我们必须小心通过引用传递它。我们只需遍历缓冲区读取,删除和发送消息,只要键彼此跟随即可,每次都增加 nextKey:
    private static void SendNextConsecutiveKeys<TSource>( 
        ref int nextKey, 
        IObserver<TSource> observer, 
        SortedDictionary<int, TSource> buffer) 
    { 
        TSource x; 
        while(buffer.TryGetValue(nextKey, out x)) 
        { 
            buffer.Remove(nextKey); 
            nextKey++; 
            observer.OnNext(x);                         
        } 
    } 
    

    处理错误

    接下来,我们提供一个 OnError处理程序-这将通过任何错误,包括Timeout异常(如果您选择采用这种方式)。

    冲洗缓冲区

    最后,我们必须处理 OnCompleted。在这里,我选择清空缓冲区-如果乱序消息阻止了消息并且从未到达,则这是必要的。这就是为什么我们需要超时的原因:
    () => { 
        // empty buffer on completion 
        foreach(var item in buffer) 
            o.OnNext(item.Value);                 
        o.OnCompleted(); 
    }); 
    

    全面实施

    这是完整的实现。
    public static IObservable<TSource> Sort<TSource>( 
        this IObservable<TSource> source, 
        Func<TSource, int> keySelector, 
        int gapTolerance = 0, 
        TimeSpan timeoutDuration = new TimeSpan(), 
        IScheduler scheduler = null) 
    {        
        scheduler = scheduler ?? Scheduler.Default; 
     
        if(timeoutDuration != TimeSpan.Zero) 
            source = source.Timeout( 
                timeoutDuration, 
                Observable.Empty<TSource>(), 
                scheduler); 
     
        return Observable.Create<TSource>(o => { 
            int nextKey = 0;   
            var buffer = new SortedDictionary<int, TSource>(); 
     
            return source.Subscribe(x => { 
                var key = keySelector(x); 
     
                // drop stale keys 
                if(key < nextKey) return; 
     
                if(key == nextKey) 
                { 
                    nextKey++; 
                    o.OnNext(x);                     
                } 
                else if(key > nextKey) 
                { 
                    buffer.Add(key, x); 
                    if(gapTolerance != 0 && buffer.Count > gapTolerance) 
                        nextKey = buffer.First().Key; 
                } 
                SendNextConsecutiveKeys(ref nextKey, o, buffer); 
            }, 
            o.OnError, 
            () => { 
                // empty buffer on completion 
                foreach(var item in buffer) 
                    o.OnNext(item.Value);                 
                o.OnCompleted(); 
            }); 
        }); 
    } 
     
    private static void SendNextConsecutiveKeys<TSource>( 
        ref int nextKey, 
        IObserver<TSource> observer, 
        SortedDictionary<int, TSource> buffer) 
    { 
        TSource x; 
        while(buffer.TryGetValue(nextKey, out x)) 
        { 
            buffer.Remove(nextKey); 
            nextKey++; 
            observer.OnNext(x);                         
        } 
    } 
    

    测试线束

    如果您在控制台应用程序中包含nuget rx-testing,则将在运行以下测试工具时运行以下命令:
    public static void Main() 
    { 
        var tests = new Tests(); 
        tests.Test(); 
    } 
     
    public class Tests : ReactiveTest 
    { 
        public void Test() 
        { 
            var scheduler = new TestScheduler(); 
     
            var xs = scheduler.CreateColdObservable( 
                OnNext(100, 0), 
                OnNext(200, 2), 
                OnNext(300, 1), 
                OnNext(400, 4), 
                OnNext(500, 5), 
                OnNext(600, 3), 
                OnNext(700, 7), 
                OnNext(800, 8), 
                OnNext(900, 9),             
                OnNext(1000, 6), 
                OnNext(1100, 12), 
                OnCompleted(1200, 0)); 
     
            //var results = scheduler.CreateObserver<int>(); 
     
            xs.Sort( 
                keySelector: x => x, 
                gapTolerance: 2, 
                timeoutDuration: TimeSpan.FromTicks(200), 
                scheduler: scheduler).Subscribe(Console.WriteLine); 
     
            scheduler.Start(); 
        } 
    } 
    

    结束语

    这里有各种各样有趣的替代方法。我之所以采用这种主要命令性方法,是因为我认为这是最容易遵循的方法-但您可能可以使用一些花哨的方式对这些恶作剧进行分组。我知道关于Rx始终如一的事实-总是有很多方法可以给猫皮!

    我也不完全满意这里的超时概念-在生产系统中,我想实现一些检查连接性的方法,例如心跳或类似信号。我没有涉及到它,因为显然它将是针对特定应用程序的。此外,之前在这些板上以及其他地方已经讨论过心跳( such as on my blog for example)。


    标签:React
    声明

    1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

    关注我们

    一个IT知识分享的公众号