WCF F# - 在客户端上正常处理服务关闭

发布于 2024-11-10 00:43:50 字数 3666 浏览 7 评论 0原文

我有一些实验代码基本上只是想让一个简单的场景发挥作用。我有一个客户端正在将数据流式传输到多个服务。我遇到的问题是,如果其中一项服务没有正常关闭,我会收到一个 EndpointNotFoundException 异常,我似乎无法处理该异常。以下是我处理此问题的尝试,但失败了。实际上,我想从通道列表中删除失败的服务通道,并继续将数据流传输到仍在运行的服务。计时器的作用只是让服务有机会在数据流开始之前启动。

let prices = returns a seq of data that is streamed.

type ReplayDataStream(prices) =
  let evt = new Event<_>()
  member x.Replay() = 
                    async { for line, delay in prices do
                                do! Async.Sleep(delay)
                                evt.Trigger(line) }
                                |> Async.StartImmediate

  member x.PriceChanged = evt.Publish


let main() =
    let addresses = new ResizeArray<EndpointAddress>()

    let announcementService = new AnnouncementService()

    let createChannels addresses =
        let channels = new ResizeArray<IInputDataService>()
        for (address:EndpointAddress) in addresses do
                let channelFactory = new ChannelFactory<IInputDataService>(new BasicHttpBinding(), address)
                let channel = channelFactory.CreateChannel()
                (channel :?> ICommunicationObject).Faulted.Add(fun x -> 
                                                                        (channel :?> ICommunicationObject).Abort()
                                                                        channels.Remove(channel) |> ignore
                                                               )
                channels.Add(channel)
        channels

    let sendMessage(args:ElapsedEventArgs) =
        let channels = createChannels addresses
        for financialDataStream in prices do
        let replayDataStreamA = new ReplayDataStream(financialDataStream)
        for channel in channels do
            try
            //This is where it blows up and the try block isn't catching the exception.
            replayDataStreamA.PriceChanged.Add(channel.InputStringData)
            with
            | :? EndpointNotFoundException as ex -> Console.WriteLine(ex.ToString())
            | :? CommunicationException as ex -> Console.WriteLine(ex.ToString())
            | :? Exception as ex -> Console.WriteLine(ex.ToString())
            replayDataStreamA.Replay()

    let timer = new System.Timers.Timer()
    timer.Enabled <- true
    timer.AutoReset <- false
    timer.Interval <- 30000.0
    timer.Start()
    timer.Elapsed.Add(sendMessage)

    announcementService.OnlineAnnouncementReceived.Add(fun e -> 
                                                                Console.WriteLine(e.EndpointDiscoveryMetadata.Address)
                                                                addresses.Add(e.EndpointDiscoveryMetadata.Address)
                                                                )

    announcementService.OfflineAnnouncementReceived.Add(fun e -> 
                                                                Console.WriteLine(e.EndpointDiscoveryMetadata.Address)
                                                                addresses.Remove(e.EndpointDiscoveryMetadata.Address) |> ignore
                                                                )

    let announcementServiceHost = new ServiceHost(announcementService)
    try
        announcementServiceHost.AddServiceEndpoint(new UdpAnnouncementEndpoint());
        announcementServiceHost.Open();
    with 
    | :? System.ServiceModel.CommunicationException as ex -> Console.WriteLine(ex.ToString())
    | :? System.TimeoutException as ex -> Console.WriteLine(ex.ToString())


    printfn "%s" "Hit any key to close."
    Console.ReadKey() |> ignore

I have some experimental code basically just trying to make a simple scenario work. I have one client that is streaming data to multiple services. The problem I have is that if one of the services does not shutdown gracefully I get an EndpointNotFoundException which I can't seem to handle. Below is my attempt at handling this which is failing. In reality I would like to remove the failed service channel from the list of channels and continue on streaming data to the services which are still up and running. The timer stuff simply gives the services a chance to start up before the data streaming starts.

let prices = returns a seq of data that is streamed.

type ReplayDataStream(prices) =
  let evt = new Event<_>()
  member x.Replay() = 
                    async { for line, delay in prices do
                                do! Async.Sleep(delay)
                                evt.Trigger(line) }
                                |> Async.StartImmediate

  member x.PriceChanged = evt.Publish


let main() =
    let addresses = new ResizeArray<EndpointAddress>()

    let announcementService = new AnnouncementService()

    let createChannels addresses =
        let channels = new ResizeArray<IInputDataService>()
        for (address:EndpointAddress) in addresses do
                let channelFactory = new ChannelFactory<IInputDataService>(new BasicHttpBinding(), address)
                let channel = channelFactory.CreateChannel()
                (channel :?> ICommunicationObject).Faulted.Add(fun x -> 
                                                                        (channel :?> ICommunicationObject).Abort()
                                                                        channels.Remove(channel) |> ignore
                                                               )
                channels.Add(channel)
        channels

    let sendMessage(args:ElapsedEventArgs) =
        let channels = createChannels addresses
        for financialDataStream in prices do
        let replayDataStreamA = new ReplayDataStream(financialDataStream)
        for channel in channels do
            try
            //This is where it blows up and the try block isn't catching the exception.
            replayDataStreamA.PriceChanged.Add(channel.InputStringData)
            with
            | :? EndpointNotFoundException as ex -> Console.WriteLine(ex.ToString())
            | :? CommunicationException as ex -> Console.WriteLine(ex.ToString())
            | :? Exception as ex -> Console.WriteLine(ex.ToString())
            replayDataStreamA.Replay()

    let timer = new System.Timers.Timer()
    timer.Enabled <- true
    timer.AutoReset <- false
    timer.Interval <- 30000.0
    timer.Start()
    timer.Elapsed.Add(sendMessage)

    announcementService.OnlineAnnouncementReceived.Add(fun e -> 
                                                                Console.WriteLine(e.EndpointDiscoveryMetadata.Address)
                                                                addresses.Add(e.EndpointDiscoveryMetadata.Address)
                                                                )

    announcementService.OfflineAnnouncementReceived.Add(fun e -> 
                                                                Console.WriteLine(e.EndpointDiscoveryMetadata.Address)
                                                                addresses.Remove(e.EndpointDiscoveryMetadata.Address) |> ignore
                                                                )

    let announcementServiceHost = new ServiceHost(announcementService)
    try
        announcementServiceHost.AddServiceEndpoint(new UdpAnnouncementEndpoint());
        announcementServiceHost.Open();
    with 
    | :? System.ServiceModel.CommunicationException as ex -> Console.WriteLine(ex.ToString())
    | :? System.TimeoutException as ex -> Console.WriteLine(ex.ToString())


    printfn "%s" "Hit any key to close."
    Console.ReadKey() |> ignore

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

戈亓 2024-11-17 00:43:50

用 C# 重写代码后,我终于意识到我做错了什么。 PriceChanged 事件处理程序应如下所示。我需要捕获 lambda 本身内部的异常。现在我需要编写一些实际上看起来像生产代码的东西。 :)

replayDataStreamA.PriceChanged.Add( fun x -> 
                                                            try
                                                            channel.InputStringData x
                                                            with 
                                                            | :? System.ServiceModel.CommunicationException as ex -> (channel :?> ICommunicationObject).Abort()
                                                            )

对于后代来说,这是整个方法:

let sendMessage(args:ElapsedEventArgs) =
            if(addresses.Count > 0) then
                for address in addresses do
                    let channelFactory = new ChannelFactory<IInputDataService>(new BasicHttpBinding(), address)
                    let channel = channelFactory.CreateChannel()
                    for financialDataStream in prices do
                    let replayDataStreamA = new ReplayDataStream(financialDataStream)
                    replayDataStreamA.PriceChanged.Add( fun x -> 
                                                        try
                                                        channel.InputStringData x
                                                        with 
                                                        | :? System.ServiceModel.CommunicationException as ex -> (channel :?> ICommunicationObject).Abort()
                                                        )
                    replayDataStreamA.Replay()

After rewriting my code in C# it finally dawned on me what I was doing wrong. This is what the PriceChanged event handler should look like. I needed to catch the exception inside the lambda itself. Now I need to write something that actually looks like production code. :)

replayDataStreamA.PriceChanged.Add( fun x -> 
                                                            try
                                                            channel.InputStringData x
                                                            with 
                                                            | :? System.ServiceModel.CommunicationException as ex -> (channel :?> ICommunicationObject).Abort()
                                                            )

For posterity here is the entire method:

let sendMessage(args:ElapsedEventArgs) =
            if(addresses.Count > 0) then
                for address in addresses do
                    let channelFactory = new ChannelFactory<IInputDataService>(new BasicHttpBinding(), address)
                    let channel = channelFactory.CreateChannel()
                    for financialDataStream in prices do
                    let replayDataStreamA = new ReplayDataStream(financialDataStream)
                    replayDataStreamA.PriceChanged.Add( fun x -> 
                                                        try
                                                        channel.InputStringData x
                                                        with 
                                                        | :? System.ServiceModel.CommunicationException as ex -> (channel :?> ICommunicationObject).Abort()
                                                        )
                    replayDataStreamA.Replay()
罗罗贝儿 2024-11-17 00:43:50

Sky Sanders 的解释很有道理,应该适用于这种情况。这是 链接到博客。

为Faulted 事件提供订阅者与在异常处理程序中调用channel.Abort() 所做的事情并不完全相同。

PriceChanged.Add() 相当于 PriceChanged += :您正在订阅价格更改事件的处理程序。放置 try/with 块将捕获订阅时引发的异常(想想事件中的自定义添加/删除实现),但这不是您想要的。您正在寻找一种方法来处理调用 InputStringData 时的异常。这个思维过程自然会导致您的解决方案

在 C# 生产代码中,在事件端引发异常的点周围放置一个 try/catch 块。捕获订阅者抛出的异常并重新抛出 Debug.Assert,警告开发人员所有异常都应在订阅者端处理。在您的代码中,这意味着在 evt.Trigger() 处发出警告并重新抛出的 try/with 块。

您可以公开异步块,而不是在声明点运行它。这应该为您提供更高级别的编排能力:在 sendMessage 中。有一个特殊的 API 可以在一个中心位置捕获异常、处理取消和超时,这确实值得 调查

Explanation by Sky Sanders makes a lot of sense, and should work for this scenario. Here's a link to the blog.

Providing a subscriber to the Faulted event does not do exactly the same thing as calling channel.Abort() inside an exception handler.

PriceChanged.Add() is the equivalent of PriceChanged += : you're subscribing a handler to the Price changed event. Placing a try/with block will catch exceptions thrown while subscribing (think custom Add/Remove implementation in your event), and that's not what you want. You're looking for a way to handle exception when calling InputStringData. This thought process naturally leads to your solution.

In C# production code, place a try/catch block around a point where the exception is raised event-side. Catch the exception thrown by subscriber and Debug.Assert with a rethrow, warning developer that all exceptions should be handled subscriber-side. In your code, this means a try/with block that warns and re-throws at evt.Trigger().

You could expose the async block instead of running it at the point of declaration. This should provide you with orchestration powers at a higher level: within sendMessage. There's a special API to catch exceptions, handle cancellation and timeouts in one central place that's really worth looking into.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文