如何支持Swift Actor中的超时回调的异步回调
我有运行“遗产”代码(过程管道和回调块)的异步/等待(演员)的混合物。我的情况类似于我的演员功能返回值的情况,而一旦完成一些工作,我也会存储递延回调(演员在等待工作完成时不应阻止,演员目的仅是为了支持ID生成和内部。需要螺纹安全的更新)。理想情况下,我想构建回调,也是符合“异步”变体的符合人体工程学的 我正在使用演员进行线程安全。 ASYNC函数之一存储了一个回调,该回调一旦工作完成或暂停后。 目前,僵持和“丑陋”的筑巢。
typealias Block = (String?) async -> Void // do I need the async keyword here necessarily?
actor Processor {
let id = Int
var callbacks = [Int:Block]()
let process : Process // external process which communicates of stdin/stdout
init(processPath: String) async throws {
process = Process()
process.launchPath = processPath
process.standardInput = Pipe()
process.standardOutput = Pipe()
(process.standardOutput as! Pipe).fileHandleForReading.readabilityHandler = {[weak self] handler in
// does not support async/await directly, wrapping in a task, perhaps better mechanism?
Task {[weak self] in
await self?.handleOutput(data: handler.availableData)
}
}
}
struct Result: Codable {
id: Int ,
output: String
}
func handleOutput(data: Data) {
if let decoded = try? JSONDecoder().decode(Result.self, from: data),
let id = decoded.id ,
let callback = pop(id: id) {
await callback(decoded.output) // call the block, this time with real value vs when there was a timeout
}
}
func work(text: String, completion: @escaping Block) async -> WorkArgs {
id += 1 // increment the running id
// store the callback
callbacks[id] = completion
// timeout if the result doesn't come back in a reasonable amount of time.
// I will perform a check after 1 second, and pop the callback if needed
// problematic here..., how to wait while also not blocking this function
Task { [weak self] in
// invalidate if needed
try await Task.sleep(nanoseconds: 1_000_000_000)
// dead lock here:
if let bl = await self?.pop(id: id) {
print("invalidated the task \(id)")
await bl(nil)
}
}
// call out to the external process with this convention, it will run async and print a result with the same id
let command = "\(id) \(text)\n"
let d = command(using: .utf8)
try! (process.standardInput as! Pipe).fileHandleForWriting.write(contentsOf: d)
}
// pop this callback
func pop(id: Int) -> Block? {
return callbacks.removeValue(forKey: id)
}
}
struct WorkArgs {
let id: Int
let date: Date
}
actor IdGen {
private var id : Int64 = 0
func next() -> Int64 {
id += 1
return id
}
}
actor CallbackActor {
var pendingCallbacks = [Int: (String) -> Void]()
func push(_ key: Int, block: @escaping (String) -> Void) {
pendingCallbacks[key] = block
}
func pop(_ key: Int64) -> AutoCompleteBlock? {
return pendingCallbacks.removeValue(forKey: key)
}
}
I have a mix of async/await (Actors) running "legacy" code (Process Pipes and callback blocks). I have a situation similar to this where my actor's function returns a value, while also storing a deferred callback once some work is completed ( actor should not block while waiting for the work to complete, the actor purpose is only to support id generation and internal updates which need to be threadsafe). Ideally I would like to structure the callback to also be ergonomic as an "async" variant
I am using an actor for thread safety. One of the async functions stores a callback which is called once the work is completed or once a timeout has occurred.
Currently getting a deadlock as well as "ugly" nesting.
typealias Block = (String?) async -> Void // do I need the async keyword here necessarily?
actor Processor {
let id = Int
var callbacks = [Int:Block]()
let process : Process // external process which communicates of stdin/stdout
init(processPath: String) async throws {
process = Process()
process.launchPath = processPath
process.standardInput = Pipe()
process.standardOutput = Pipe()
(process.standardOutput as! Pipe).fileHandleForReading.readabilityHandler = {[weak self] handler in
// does not support async/await directly, wrapping in a task, perhaps better mechanism?
Task {[weak self] in
await self?.handleOutput(data: handler.availableData)
}
}
}
struct Result: Codable {
id: Int ,
output: String
}
func handleOutput(data: Data) {
if let decoded = try? JSONDecoder().decode(Result.self, from: data),
let id = decoded.id ,
let callback = pop(id: id) {
await callback(decoded.output) // call the block, this time with real value vs when there was a timeout
}
}
func work(text: String, completion: @escaping Block) async -> WorkArgs {
id += 1 // increment the running id
// store the callback
callbacks[id] = completion
// timeout if the result doesn't come back in a reasonable amount of time.
// I will perform a check after 1 second, and pop the callback if needed
// problematic here..., how to wait while also not blocking this function
Task { [weak self] in
// invalidate if needed
try await Task.sleep(nanoseconds: 1_000_000_000)
// dead lock here:
if let bl = await self?.pop(id: id) {
print("invalidated the task \(id)")
await bl(nil)
}
}
// call out to the external process with this convention, it will run async and print a result with the same id
let command = "\(id) \(text)\n"
let d = command(using: .utf8)
try! (process.standardInput as! Pipe).fileHandleForWriting.write(contentsOf: d)
}
// pop this callback
func pop(id: Int) -> Block? {
return callbacks.removeValue(forKey: id)
}
}
struct WorkArgs {
let id: Int
let date: Date
}
actor IdGen {
private var id : Int64 = 0
func next() -> Int64 {
id += 1
return id
}
}
actor CallbackActor {
var pendingCallbacks = [Int: (String) -> Void]()
func push(_ key: Int, block: @escaping (String) -> Void) {
pendingCallbacks[key] = block
}
func pop(_ key: Int64) -> AutoCompleteBlock? {
return pendingCallbacks.removeValue(forKey: key)
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
在解决超时问题之前,我们可能应该讨论如何将
Process
包装在Swift并发中。一种模式是使用
asyncSequence
(即,asyncstream
)作为stdout
:然后您可以
等待
该流:在
覆盖func viewdidload(){
super.viewdidload()
任务 {
尝试等待startstream()
打印(“完成”)
}
}
@ibaction func didtapsend(_ sender:any){
令字符串= textfield.stringvalue
任务 {
等待过程。
}
textfield.stringvalue =“”
}
func startstream()async抛出{
尝试等待process.start()
让流=等待process.stream()
对于流中的等待数据{
如果让字符串=字符串(数据:数据,编码:.UTF8){
打印(字符串,终结器:“”)
}
}
}
这是一种简单的方法。而且看起来不错(因为我在没有终结者的情况下打印响应)。
但需要小心,因为
READABICATIONHANDLER
并不总是使用某些特定输出的完整Data
来调用。它可能会分解或分别通过单独的调用READABICATION HANDLER
。。
另一种模式是使用
lines
,它避免了READABICE HANDLER的问题
对于给定的输出,可能会多次调用:然后您可以做:
在
覆盖func viewdidload(){
super.viewdidload()
任务 {
尝试等待startstream()
打印(“完成”)
}
}
@ibaction func didtapsend(_ sender:any){
令字符串= textfield.stringvalue
任务 {
等待过程。
}
textfield.stringvalue =“”
}
func startstream()async抛出{
尝试等待process.start()
guard let lines =等待process.lines else {return}
为了尝试等待行{
打印(线)
}
}
这避免了中线响应的破裂。
您问:
模式是将请求包装在
任务
中,然后启动一个单独的任务,该任务将在task.sleep 间隔。
但这在这种情况下将非常复杂,因为您必须与单独的
Process
协调该协调,否则它将继续进行,不知道task> task
已取消。从理论上讲,这可能会导致问题(例如,该过程被积压等)。我建议将超时逻辑集成到由
Process
调用的应用程序中,而不是试图将呼叫者处理。可以完成(例如,可以编写流程应用程序以捕获和处理sigint
,然后呼叫者可以调用中断
Process
)。但是它将很复杂,而且很可能是脆弱的。Before I get to the timeout question, we should probably talk about how to wrap the
Process
within Swift concurrency.One pattern would be to use
AsyncSequence
(i.e., anAsyncStream
) forstdout
:Then you can
for await
that stream:This is a simple approach. And it looks fine (because I am printing responses without a terminator).
But one needs to be careful because
readabilityHandler
will not always be called with the fullData
of some particular output. It might be broken up or split across separate calls to thereadabilityHandler
.Another pattern would be to use
lines
, which avoids the problem ofreadabilityHandler
possibly being called multiple times for a given output:Then you can do:
This avoids the breaking of responses mid-line.
You asked:
The pattern is to wrap the request in a
Task
, and then start a separate task that will cancel that prior task after aTask.sleep
interval.But this is going to be surprisingly complicated in this case, because you have to coordinate that with the separate
Process
which will otherwise still proceed, unaware that theTask
has been canceled. That can theoretically lead to problems (e.g., the process gets backlogged, etc.).I would advise integrating the timeout logic in the app invoked by the
Process
, rather than trying to have the caller handle that. It can be done (e.g. maybe write the process app to capture and handleSIGINT
and then the caller can callinterrupt
on theProcess
). But it is going to be complicated and, most likely, brittle.