如何支持Swift Actor中的超时回调的异步回调

发布于 2025-01-25 10:24:40 字数 2914 浏览 4 评论 0原文

我有运行“遗产”代码(过程管道和回调块)的异步/等待(演员)的混合物。我的情况类似于我的演员功能返回值的情况,而一旦完成一些工作,我也会存储递延回调(演员在等待工作完成时不应阻止,演员目的仅是为了支持ID生成和内部。需要螺纹安全的更新)。理想情况下,我想构建回调,也是符合“异步”变体的符合人体工程学的 我正在使用演员进行线程安全。 ASYNC函数之一存储了一个回调,该回调一旦工作完成或暂停后。 目前,僵持和“丑陋”的筑巢。

typealias Block = (String?) async -> Void // do I need the async keyword here necessarily?

actor Processor {
  let id = Int
  var callbacks = [Int:Block]()
  let process : Process // external process which communicates of stdin/stdout
  init(processPath: String) async throws {
    process = Process()
    process.launchPath = processPath
    process.standardInput = Pipe()
    process.standardOutput = Pipe()
    (process.standardOutput as! Pipe).fileHandleForReading.readabilityHandler = {[weak self] handler in
       // does not support async/await directly, wrapping in a task, perhaps better mechanism?
       Task {[weak self] in
          await self?.handleOutput(data: handler.availableData)
       }
    }
  }

  struct Result: Codable { 
     id: Int ,
     output: String
  }
  func handleOutput(data: Data) {
     if let decoded = try? JSONDecoder().decode(Result.self, from: data),
            let id = decoded.id ,
            let callback = pop(id: id) {
            await callback(decoded.output) // call the block, this time with real value vs when there was a timeout
        }
  }
  func work(text: String, completion: @escaping Block) async -> WorkArgs {
     id += 1 // increment the running id
     // store the callback
     callbacks[id] = completion
     // timeout if the result doesn't come back in a reasonable amount of time. 
     // I will perform a check after 1 second, and pop the callback if needed
     // problematic here..., how to wait while also not blocking this function
     Task { [weak self] in
       // invalidate if needed
       try await Task.sleep(nanoseconds: 1_000_000_000)
       // dead lock here:
       if let bl = await self?.pop(id: id) {
            print("invalidated the task \(id)")
            await bl(nil)
       }
     }
     // call out to the external process with this convention, it will run async and print a result with the same id
     let command = "\(id) \(text)\n"
     let d = command(using: .utf8)

     try! (process.standardInput as! Pipe).fileHandleForWriting.write(contentsOf: d)
     
  }
  // pop this callback
  func pop(id: Int) -> Block? {
        return callbacks.removeValue(forKey: id)
  }
}

struct WorkArgs {
  let id: Int
  let date: Date
}

actor IdGen {
    private var id : Int64 = 0
    func next() -> Int64 {
        id += 1
        return id
    }
}

actor CallbackActor {
var pendingCallbacks = [Int: (String) -> Void]()
    func push(_ key: Int, block: @escaping  (String) -> Void) {
        pendingCallbacks[key] = block
    }
    func pop(_ key: Int64) -> AutoCompleteBlock? {
        return pendingCallbacks.removeValue(forKey: key)
    }
}

I have a mix of async/await (Actors) running "legacy" code (Process Pipes and callback blocks). I have a situation similar to this where my actor's function returns a value, while also storing a deferred callback once some work is completed ( actor should not block while waiting for the work to complete, the actor purpose is only to support id generation and internal updates which need to be threadsafe). Ideally I would like to structure the callback to also be ergonomic as an "async" variant
I am using an actor for thread safety. One of the async functions stores a callback which is called once the work is completed or once a timeout has occurred.
Currently getting a deadlock as well as "ugly" nesting.

typealias Block = (String?) async -> Void // do I need the async keyword here necessarily?

actor Processor {
  let id = Int
  var callbacks = [Int:Block]()
  let process : Process // external process which communicates of stdin/stdout
  init(processPath: String) async throws {
    process = Process()
    process.launchPath = processPath
    process.standardInput = Pipe()
    process.standardOutput = Pipe()
    (process.standardOutput as! Pipe).fileHandleForReading.readabilityHandler = {[weak self] handler in
       // does not support async/await directly, wrapping in a task, perhaps better mechanism?
       Task {[weak self] in
          await self?.handleOutput(data: handler.availableData)
       }
    }
  }

  struct Result: Codable { 
     id: Int ,
     output: String
  }
  func handleOutput(data: Data) {
     if let decoded = try? JSONDecoder().decode(Result.self, from: data),
            let id = decoded.id ,
            let callback = pop(id: id) {
            await callback(decoded.output) // call the block, this time with real value vs when there was a timeout
        }
  }
  func work(text: String, completion: @escaping Block) async -> WorkArgs {
     id += 1 // increment the running id
     // store the callback
     callbacks[id] = completion
     // timeout if the result doesn't come back in a reasonable amount of time. 
     // I will perform a check after 1 second, and pop the callback if needed
     // problematic here..., how to wait while also not blocking this function
     Task { [weak self] in
       // invalidate if needed
       try await Task.sleep(nanoseconds: 1_000_000_000)
       // dead lock here:
       if let bl = await self?.pop(id: id) {
            print("invalidated the task \(id)")
            await bl(nil)
       }
     }
     // call out to the external process with this convention, it will run async and print a result with the same id
     let command = "\(id) \(text)\n"
     let d = command(using: .utf8)

     try! (process.standardInput as! Pipe).fileHandleForWriting.write(contentsOf: d)
     
  }
  // pop this callback
  func pop(id: Int) -> Block? {
        return callbacks.removeValue(forKey: id)
  }
}

struct WorkArgs {
  let id: Int
  let date: Date
}

actor IdGen {
    private var id : Int64 = 0
    func next() -> Int64 {
        id += 1
        return id
    }
}

actor CallbackActor {
var pendingCallbacks = [Int: (String) -> Void]()
    func push(_ key: Int, block: @escaping  (String) -> Void) {
        pendingCallbacks[key] = block
    }
    func pop(_ key: Int64) -> AutoCompleteBlock? {
        return pendingCallbacks.removeValue(forKey: key)
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

请叫√我孤独 2025-02-01 10:24:40

在解决超时问题之前,我们可能应该讨论如何将Process包装在Swift并发中。

  • 一种模式是使用asyncSequence(即,asyncstream)作为stdout

      actor processwithstream {
        私人让流程= process()
        私有令stdin = pipe()
        私有令stdout = pipe()
        私有让stderr = pipe()
        私人var buffer = data()
    
        init(url:url){
            process.standardinput = stdin
            process.standardOutput = stdout
            process.standarderror = stderr
            Process.ecutableUrl = url
        }
    
        func start()抛出{
            尝试process.run()
        }
    
        func terminate(){
            process.terminate()
        }
    
        func send(_ string:string){
            令数据=数据(“ \(string)\ n” .utf8)
            stdin.filehandleforwriting.write(数据)
        }
    
        func stream() - > Asyncstream< data> {
            asyncstream(data.self){继续
                stdout.filehandleforreading.radiabilityHandler = {
                    continuation.yield(handler.availabledata)
                }
                process.terminationhandler = {处理程序
                    continuation.finish()
                }
            }
        }
    }
     

    然后您可以等待该流:

    覆盖func viewdidload(){
    super.viewdidload()

    任务 {
    尝试等待startstream()
    打印(“完成”)
    }
    }

    @ibaction func didtapsend(_ sender:any){
    令字符串= textfield.stringvalue
    任务 {
    等待过程。
    }
    textfield.stringvalue =“”
    }

    func startstream()async抛出{
    尝试等待process.start()
    让流=等待process.stream()

    对于流中的等待数据{
    如果让字符串=字符串(数据:数据,编码:.UTF8){
    打印(字符串,终结器:“”)
    }
    }
    }

    这是一种简单的方法。而且看起来不错(因为我在没有终结者的情况下打印响应)。

    但需要小心,因为READABICATIONHANDLER并不总是使用某些特定输出的完整Data来调用。它可能会分解或分别通过单独的调用READABICATION HANDLER

  • 另一种模式是使用lines,它避免了READABICE HANDLER的问题对于给定的输出,可能会多次调用:

      actor processWithlines {
        私人让流程= process()
        私有令stdin = pipe()
        私有令stdout = pipe()
        私有让stderr = pipe()
        私人var buffer = data()
        private(set)var lines:asynclinesequence< filehandle.asyncbytes>?
    
        init(url:url){
            process.standardinput = stdin
            process.standardOutput = stdout
            process.standarderror = stderr
            Process.ecutableUrl = url
        }
    
        func start()抛出{
            lines = stdout.filehandleforreading.bytes.lines
            尝试process.run()
        }
    
        func terminate(){
            process.terminate()
        }
    
        func send(_ string:string){
            令数据=数据(“ \(string)\ n” .utf8)
            stdin.filehandleforwriting.write(数据)
        }
    }
     

    然后您可以做:

    覆盖func viewdidload(){
    super.viewdidload()

    任务 {
    尝试等待startstream()
    打印(“完成”)
    }
    }

    @ibaction func didtapsend(_ sender:any){
    令字符串= textfield.stringvalue
    任务 {
    等待过程。
    }
    textfield.stringvalue =“”
    }

    func startstream()async抛出{
    尝试等待process.start()
    guard let lines =等待process.lines else {return}

    为了尝试等待行{
    打印(线)
    }
    }

    这避免了中线响应的破裂。


您问:

如何支持... Swift Actor中的超时

模式是将请求包装在任务中,然后启动一个单独的任务,该任务将在task.sleep 间隔。

但这在这种情况下将非常复杂,因为您必须与单独的Process协调该协调,否则它将继续进行,不知道task> task已取消。从理论上讲,这可能会导致问题(例如,该过程被积压等)。

我建议将超时逻辑集成到由Process调用的应用程序中,而不是试图将呼叫者处理。可以完成(例如,可以编写流程应用程序以捕获和处理sigint,然后呼叫者可以调用 中断 Process)。但是它将很复杂,而且很可能是脆弱的。

Before I get to the timeout question, we should probably talk about how to wrap the Process within Swift concurrency.

  • One pattern would be to use AsyncSequence (i.e., an AsyncStream) for stdout:

    actor ProcessWithStream {
        private let process = Process()
        private let stdin = Pipe()
        private let stdout = Pipe()
        private let stderr = Pipe()
        private var buffer = Data()
    
        init(url: URL) {
            process.standardInput = stdin
            process.standardOutput = stdout
            process.standardError = stderr
            process.executableURL = url
        }
    
        func start() throws {
            try process.run()
        }
    
        func terminate() {
            process.terminate()
        }
    
        func send(_ string: String) {
            let data = Data("\(string)\n".utf8)
            stdin.fileHandleForWriting.write(data)
        }
    
        func stream() -> AsyncStream<Data> {
            AsyncStream(Data.self) { continuation in
                stdout.fileHandleForReading.readabilityHandler = { handler in
                    continuation.yield(handler.availableData)
                }
                process.terminationHandler = { handler in
                    continuation.finish()
                }
            }
        }
    }
    

    Then you can for await that stream:

    let process = ProcessWithStream(url: url)
    
    override func viewDidLoad() {
        super.viewDidLoad()
    
        Task {
            try await startStream()
            print("done")
        }
    }
    
    @IBAction func didTapSend(_ sender: Any) {
        let string = textField.stringValue
        Task {
            await process.send(string)
        }
        textField.stringValue = ""
    }
    
    func startStream() async throws {
        try await process.start()
        let stream = await process.stream()
    
        for await data in stream {
            if let string = String(data: data, encoding: .utf8) {
                print(string, terminator: "")
            }
        }
    }
    

    This is a simple approach. And it looks fine (because I am printing responses without a terminator).

    But one needs to be careful because readabilityHandler will not always be called with the full Data of some particular output. It might be broken up or split across separate calls to the readabilityHandler.

  • Another pattern would be to use lines, which avoids the problem of readabilityHandler possibly being called multiple times for a given output:

    actor ProcessWithLines {
        private let process = Process()
        private let stdin = Pipe()
        private let stdout = Pipe()
        private let stderr = Pipe()
        private var buffer = Data()
        private(set) var lines: AsyncLineSequence<FileHandle.AsyncBytes>?
    
        init(url: URL) {
            process.standardInput = stdin
            process.standardOutput = stdout
            process.standardError = stderr
            process.executableURL = url
        }
    
        func start() throws {
            lines = stdout.fileHandleForReading.bytes.lines
            try process.run()
        }
    
        func terminate() {
            process.terminate()
        }
    
        func send(_ string: String) {
            let data = Data("\(string)\n".utf8)
            stdin.fileHandleForWriting.write(data)
        }
    }
    

    Then you can do:

    let process = ProcessWithLines(url: url)
    
    override func viewDidLoad() {
        super.viewDidLoad()
    
        Task {
            try await startStream()
            print("done")
        }
    }
    
    @IBAction func didTapSend(_ sender: Any) {
        let string = textField.stringValue
        Task {
            await process.send(string)
        }
        textField.stringValue = ""
    }
    
    func startStream() async throws {
        try await process.start()
        guard let lines = await process.lines else { return }
    
        for try await line in lines {
            print(line)
        }
    }
    

    This avoids the breaking of responses mid-line.


You asked:

How to support ... timeout in swift actor

The pattern is to wrap the request in a Task, and then start a separate task that will cancel that prior task after a Task.sleep interval.

But this is going to be surprisingly complicated in this case, because you have to coordinate that with the separate Process which will otherwise still proceed, unaware that the Task has been canceled. That can theoretically lead to problems (e.g., the process gets backlogged, etc.).

I would advise integrating the timeout logic in the app invoked by the Process, rather than trying to have the caller handle that. It can be done (e.g. maybe write the process app to capture and handle SIGINT and then the caller can call interrupt on the Process). But it is going to be complicated and, most likely, brittle.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文