使用异步 Socket.BeginReceive 时如何检测超时?
在 F# 中使用原始套接字编写异步 Ping,以使用尽可能少的线程启用并行请求。不使用“System.Net.NetworkInformation.Ping”,因为它似乎为每个请求分配一个线程。我也对使用 F# 异步工作流程感兴趣。
当目标主机不存在/响应时,下面的同步版本正确地超时,但异步版本挂起。当主机响应时,两者都会起作用。不确定这是 .NET 问题还是 F# 问题...
有什么想法吗?
(注意:该进程必须以管理员身份运行才能允许原始套接字访问)
这会引发超时:
let result = Ping.Ping ( IPAddress.Parse( "192.168.33.22" ), 1000 )
但是,这会挂起:
let result = Ping.AsyncPing ( IPAddress.Parse( "192.168.33.22" ), 1000 )
|> Async.RunSynchronously
这是代码...
module Ping
open System
open System.Net
open System.Net.Sockets
open System.Threading
//---- ICMP Packet Classes
type IcmpMessage (t : byte) =
let mutable m_type = t
let mutable m_code = 0uy
let mutable m_checksum = 0us
member this.Type
with get() = m_type
member this.Code
with get() = m_code
member this.Checksum = m_checksum
abstract Bytes : byte array
default this.Bytes
with get() =
[|
m_type
m_code
byte(m_checksum)
byte(m_checksum >>> 8)
|]
member this.GetChecksum() =
let mutable sum = 0ul
let bytes = this.Bytes
let mutable i = 0
// Sum up uint16s
while i < bytes.Length - 1 do
sum <- sum + uint32(BitConverter.ToUInt16( bytes, i ))
i <- i + 2
// Add in last byte, if an odd size buffer
if i <> bytes.Length then
sum <- sum + uint32(bytes.[i])
// Shuffle the bits
sum <- (sum >>> 16) + (sum &&& 0xFFFFul)
sum <- sum + (sum >>> 16)
sum <- ~~~sum
uint16(sum)
member this.UpdateChecksum() =
m_checksum <- this.GetChecksum()
type InformationMessage (t : byte) =
inherit IcmpMessage(t)
let mutable m_identifier = 0us
let mutable m_sequenceNumber = 0us
member this.Identifier = m_identifier
member this.SequenceNumber = m_sequenceNumber
override this.Bytes
with get() =
Array.append (base.Bytes)
[|
byte(m_identifier)
byte(m_identifier >>> 8)
byte(m_sequenceNumber)
byte(m_sequenceNumber >>> 8)
|]
type EchoMessage() =
inherit InformationMessage( 8uy )
let mutable m_data = Array.create 32 32uy
do base.UpdateChecksum()
member this.Data
with get() = m_data
and set(d) = m_data <- d
this.UpdateChecksum()
override this.Bytes
with get() =
Array.append (base.Bytes)
(this.Data)
//---- Synchronous Ping
let Ping (host : IPAddress, timeout : int ) =
let mutable ep = new IPEndPoint( host, 0 )
let socket = new Socket( AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp )
socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.SendTimeout, timeout )
socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, timeout )
let packet = EchoMessage()
let mutable buffer = packet.Bytes
try
if socket.SendTo( buffer, ep ) <= 0 then
raise (SocketException())
buffer <- Array.create (buffer.Length + 20) 0uy
let mutable epr = ep :> EndPoint
if socket.ReceiveFrom( buffer, &epr ) <= 0 then
raise (SocketException())
finally
socket.Close()
buffer
//---- Entensions to the F# Async class to allow up to 5 paramters (not just 3)
type Async with
static member FromBeginEnd(arg1,arg2,arg3,arg4,beginAction,endAction,?cancelAction): Async<'T> =
Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,iar,state)), endAction, ?cancelAction=cancelAction)
static member FromBeginEnd(arg1,arg2,arg3,arg4,arg5,beginAction,endAction,?cancelAction): Async<'T> =
Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,arg5,iar,state)), endAction, ?cancelAction=cancelAction)
//---- Extensions to the Socket class to provide async SendTo and ReceiveFrom
type System.Net.Sockets.Socket with
member this.AsyncSendTo( buffer, offset, size, socketFlags, remoteEP ) =
Async.FromBeginEnd( buffer, offset, size, socketFlags, remoteEP,
this.BeginSendTo,
this.EndSendTo )
member this.AsyncReceiveFrom( buffer, offset, size, socketFlags, remoteEP ) =
Async.FromBeginEnd( buffer, offset, size, socketFlags, remoteEP,
this.BeginReceiveFrom,
(fun asyncResult -> this.EndReceiveFrom(asyncResult, remoteEP) ) )
//---- Asynchronous Ping
let AsyncPing (host : IPAddress, timeout : int ) =
async {
let ep = IPEndPoint( host, 0 )
use socket = new Socket( AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp )
socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.SendTimeout, timeout )
socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, timeout )
let packet = EchoMessage()
let outbuffer = packet.Bytes
try
let! result = socket.AsyncSendTo( outbuffer, 0, outbuffer.Length, SocketFlags.None, ep )
if result <= 0 then
raise (SocketException())
let epr = ref (ep :> EndPoint)
let inbuffer = Array.create (outbuffer.Length + 256) 0uy
let! result = socket.AsyncReceiveFrom( inbuffer, 0, inbuffer.Length, SocketFlags.None, epr )
if result <= 0 then
raise (SocketException())
return inbuffer
finally
socket.Close()
}
Writing an asynchronous Ping using Raw Sockets in F#, to enable parallel requests using as few threads as possible. Not using "System.Net.NetworkInformation.Ping", because it appears to allocate one thread per request. Am also interested in using F# async workflows.
The synchronous version below correctly times out when the target host does not exist/respond, but the asynchronous version hangs. Both work when the host does respond. Not sure if this is a .NET issue, or an F# one...
Any ideas?
(note: the process must run as Admin to allow Raw Socket access)
This throws a timeout:
let result = Ping.Ping ( IPAddress.Parse( "192.168.33.22" ), 1000 )
However, this hangs:
let result = Ping.AsyncPing ( IPAddress.Parse( "192.168.33.22" ), 1000 )
|> Async.RunSynchronously
Here's the code...
module Ping
open System
open System.Net
open System.Net.Sockets
open System.Threading
//---- ICMP Packet Classes
type IcmpMessage (t : byte) =
let mutable m_type = t
let mutable m_code = 0uy
let mutable m_checksum = 0us
member this.Type
with get() = m_type
member this.Code
with get() = m_code
member this.Checksum = m_checksum
abstract Bytes : byte array
default this.Bytes
with get() =
[|
m_type
m_code
byte(m_checksum)
byte(m_checksum >>> 8)
|]
member this.GetChecksum() =
let mutable sum = 0ul
let bytes = this.Bytes
let mutable i = 0
// Sum up uint16s
while i < bytes.Length - 1 do
sum <- sum + uint32(BitConverter.ToUInt16( bytes, i ))
i <- i + 2
// Add in last byte, if an odd size buffer
if i <> bytes.Length then
sum <- sum + uint32(bytes.[i])
// Shuffle the bits
sum <- (sum >>> 16) + (sum &&& 0xFFFFul)
sum <- sum + (sum >>> 16)
sum <- ~~~sum
uint16(sum)
member this.UpdateChecksum() =
m_checksum <- this.GetChecksum()
type InformationMessage (t : byte) =
inherit IcmpMessage(t)
let mutable m_identifier = 0us
let mutable m_sequenceNumber = 0us
member this.Identifier = m_identifier
member this.SequenceNumber = m_sequenceNumber
override this.Bytes
with get() =
Array.append (base.Bytes)
[|
byte(m_identifier)
byte(m_identifier >>> 8)
byte(m_sequenceNumber)
byte(m_sequenceNumber >>> 8)
|]
type EchoMessage() =
inherit InformationMessage( 8uy )
let mutable m_data = Array.create 32 32uy
do base.UpdateChecksum()
member this.Data
with get() = m_data
and set(d) = m_data <- d
this.UpdateChecksum()
override this.Bytes
with get() =
Array.append (base.Bytes)
(this.Data)
//---- Synchronous Ping
let Ping (host : IPAddress, timeout : int ) =
let mutable ep = new IPEndPoint( host, 0 )
let socket = new Socket( AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp )
socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.SendTimeout, timeout )
socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, timeout )
let packet = EchoMessage()
let mutable buffer = packet.Bytes
try
if socket.SendTo( buffer, ep ) <= 0 then
raise (SocketException())
buffer <- Array.create (buffer.Length + 20) 0uy
let mutable epr = ep :> EndPoint
if socket.ReceiveFrom( buffer, &epr ) <= 0 then
raise (SocketException())
finally
socket.Close()
buffer
//---- Entensions to the F# Async class to allow up to 5 paramters (not just 3)
type Async with
static member FromBeginEnd(arg1,arg2,arg3,arg4,beginAction,endAction,?cancelAction): Async<'T> =
Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,iar,state)), endAction, ?cancelAction=cancelAction)
static member FromBeginEnd(arg1,arg2,arg3,arg4,arg5,beginAction,endAction,?cancelAction): Async<'T> =
Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,arg5,iar,state)), endAction, ?cancelAction=cancelAction)
//---- Extensions to the Socket class to provide async SendTo and ReceiveFrom
type System.Net.Sockets.Socket with
member this.AsyncSendTo( buffer, offset, size, socketFlags, remoteEP ) =
Async.FromBeginEnd( buffer, offset, size, socketFlags, remoteEP,
this.BeginSendTo,
this.EndSendTo )
member this.AsyncReceiveFrom( buffer, offset, size, socketFlags, remoteEP ) =
Async.FromBeginEnd( buffer, offset, size, socketFlags, remoteEP,
this.BeginReceiveFrom,
(fun asyncResult -> this.EndReceiveFrom(asyncResult, remoteEP) ) )
//---- Asynchronous Ping
let AsyncPing (host : IPAddress, timeout : int ) =
async {
let ep = IPEndPoint( host, 0 )
use socket = new Socket( AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp )
socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.SendTimeout, timeout )
socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, timeout )
let packet = EchoMessage()
let outbuffer = packet.Bytes
try
let! result = socket.AsyncSendTo( outbuffer, 0, outbuffer.Length, SocketFlags.None, ep )
if result <= 0 then
raise (SocketException())
let epr = ref (ep :> EndPoint)
let inbuffer = Array.create (outbuffer.Length + 256) 0uy
let! result = socket.AsyncReceiveFrom( inbuffer, 0, inbuffer.Length, SocketFlags.None, epr )
if result <= 0 then
raise (SocketException())
return inbuffer
finally
socket.Close()
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(7)
詹姆斯,你自己接受的答案有一个我想指出的问题。您只分配一个计时器,这使得 AsyncReceiveEx 返回的异步对象成为有状态的一次性对象。下面是我删减的一个类似示例:
理想情况下,您希望 AsyncReceiveEx 返回的异步的每次“运行”都表现相同,这意味着每次运行都需要自己的计时器和一组引用标志。因此,这个问题很容易解决:
唯一的变化是将 AsyncReceiveEx 的主体放在
async{...}
中,并让最后一行return!
。James, your own accepted answer has a problem I wanted to point out. You only allocate one timer, which makes the async object returned by AsyncReceiveEx a stateful one-time-use object. Here's a similar example that I trimmed down:
Ideally you want every 'run' of the async returned by AsyncReceiveEx to behave the same, which means each run needs its own timer and set of ref flags. This is easy to fix thusly:
The only change is to put the body of AsyncReceiveEx inside
async{...}
and have the last linereturn!
.文档明确指出超时仅适用于同步版本:
http://msdn.microsoft.com/en-us/library/system.net.sockets.socketoptionname.aspx
The docs clearly state that the timeout only applies to the sync versions:
http://msdn.microsoft.com/en-us/library/system.net.sockets.socketoptionname.aspx
经过一番思考,得出以下结论。此代码将 AsyncReceiveEx 成员添加到 Socket,其中包含超时值。它将看门狗定时器的详细信息隐藏在接收方法中......非常整洁且独立。现在这就是我一直在寻找的东西!
请参阅下面的完整异步 ping 示例。
不确定锁是否有必要,但安全总比遗憾好……
这是一个完整的 Ping 示例。为了避免耗尽源端口并防止一次收到太多回复,它一次扫描一个 C 类子网。
After some thought, came up with the following. This code adds an AsyncReceiveEx member to Socket, which includes a timeout value. It hides the details of the watchdog timer inside the receive method... very tidy and self contained. Now THIS is what I was looking for!
See the complete async ping example, further below.
Not sure if the locks are necessary, but better safe than sorry...
Here is a complete Ping example. To avoid running out of source ports and to prevent getting too many replies at once, it scans one class-c subnet at a time.
有几件事...
首先,可以将 .NET
FooAsync
/FooCompleted
模式改编为 F# 异步。 FSharp.Core 库为 WebClient 执行此操作;我想你可以在这里使用相同的模式。这是 WebClient 代码,我认为您可以对
SendAsync
/SendAsyncCancel
/PingCompleted
执行相同的操作(我没有仔细考虑)。其次,将您的方法命名为
AsyncPing
,而不是PingAsync
。 F# 异步方法被命名为AsyncFoo
,而具有事件模式的方法被命名为FooAsync
。我没有仔细查看你的代码来尝试找出错误可能出在哪里。
A couple things...
First, it's possible to adapt the .NET
FooAsync
/FooCompleted
pattern into an F# async. The FSharp.Core library does this for WebClient; I think you can use the same pattern here. Here's the WebClient codeand I think you can do the same for
SendAsync
/SendAsyncCancel
/PingCompleted
(I have not thought it through carefully).Second, name your method
AsyncPing
, notPingAsync
. F# async methods are namedAsyncFoo
, whereas methods with the event pattern are namedFooAsync
.I didn't look carefully through your code to try to find where the error may lie.
这是使用 Async.FromContinuations 的版本。
然而,这并不能解决我的问题,因为它无法扩展。该代码可能对某人有用,因此将其发布在这里。
这不是答案的原因是 System.Net.NetworkInformation.Ping 似乎每个 Ping 使用一个线程和相当多的内存(可能是由于线程堆栈空间)。尝试 ping 整个 B 类网络将耗尽内存并使用 100 个线程,而使用原始套接字的代码仅使用几个线程且内存不足 10Mb。
Here is a version using Async.FromContinuations.
However, this is NOT an answer to my problem, because it does not scale. The code may be useful to someone, so posting it here.
The reason this is not an answer is because System.Net.NetworkInformation.Ping appears to use one thread per Ping and quite a bit of memory (likely due to thread stack space). Attempting to ping an entire class-B network will run out of memory and use 100's of threads, whereas the code using raw sockets uses only a few threads and under 10Mb.
编辑:代码更改为工作版本。
詹姆斯,我修改了你的代码,看起来它和你的版本一样工作,但使用 MailboxProcessor 作为超时处理程序引擎。该代码比您的版本慢 4 倍,但使用的内存少 1.5 倍。
EDIT: code changed to working version.
James, I've modified your code and it seems it work as well as your version, but uses MailboxProcessor as timeout handler engine. The code is 4x times slower then your version but uses 1.5x less memory.
也可能参见:
http://blogs.msdn.com/ pfxteam/archive/2010/05/04/10007557.aspx
Possibly see also:
http://blogs.msdn.com/pfxteam/archive/2010/05/04/10007557.aspx