在 Thrift 处理函数中获取对等地址

发布于 2024-12-07 08:25:38 字数 802 浏览 1 评论 0原文

我正在 ruby​​ 中实现一个小型 thrift (0.6.0) 服务器,以充当另一个协议的代理角色,该协议与单个服务器有多个连接(多个客户端)。我希望能够在服务器端保留每个客户端的数据,并在处理程序函数的多次调用中跟踪“会话”参数。

我目前使用 Thrift::NonblockingServer ,因为 SimpleServer 似乎不允许并发连接。

我知道如何使用 TCPSocket::peeraddrThrift::NonblockingServer::IOManager::Worker::run 创建一个临时 MemoryBufferTransport它读取帧并将其作为输入/输出协议传递到处理器,因此似乎信息不是从那里传递下来的。

有没有一种干净的方法来做到这一点? 我正在考虑重新定义上面提到的 Thrift::NonblockingServer::IOManager::Worker::run 以在附加参数中包含 fd 或其他 ID 来处理或增强原型实例但由于我还必须担心生成的 ruby​​ 代码的一层(class Processor 中的 process_* 方法),它看起来有点重。

我想知道以前是否有人做过类似的事情。

谢谢!

ps 这与 这个 C++ thrift 问题类似的问题

I am implementing a small thrift (0.6.0) server in ruby to play a role of proxy to another protocol with several connections (multiple clients) to a single server. I want to be able and keep per-client data on the server side and track "session" parameters across multiple invocations of handler functions.

I currently use Thrift::NonblockingServer as SimpleServer does not seem to allow concurrent connections.

I know how to use TCPSocket::peeraddr but Thrift::NonblockingServer::IOManager::Worker::run creates a temporary MemoryBufferTransport with the frame it read and passes that as the input/output protocol down to the processor so it seems that the info is not passed down from there.

Is there a clean way to do this?
I was thinking of re-defining the above mentioned Thrift::NonblockingServer::IOManager::Worker::run to also include the fd, or other ID at an additional parameter to process or augment the proto instances but as I also have to worry about one layer of generated ruby code (process_* methods in class Processor) it seems a little heavy.

I wonder if someone did anything like this before.

Thanks!

p.s. this is similar problem to this C++ thrift question

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

帅的被狗咬 2024-12-14 08:25:38

以下是我如何更改 Thrift::NonblockingServer::IOManager::Worker::run 以支持此操作。

一些警告:

  • 正如我在问题中提到的,我不认为它是干净的(如果没有别的事情,我将不得不监视未来的 thrift 版本以了解此功能的更改,这是基于 0.6.0 的)。
  • 这是为了与 ruby​​ 1.8 一起使用而编写的(或者我会得到未翻译的 addr/port 请参阅我的其他问题
  • 我我是一个 ruby​​ 新手..我确信我做了一些“错误”的事情(例如, $connections 应该是 @@connections ConnEntry 或 diff 类?)。
  • 我知道线程本地存储的 Thread.current 哈希具有 名称空间污染问题

首先,在某个中央模块:

module MyThriftExt

  $connections={}

  class ConnEntry
    attr_reader :addr_info, :attr

    def initialize(thrift_fd)
      @addr_info=thrift_fd.handle.peeraddr
      @attr={}
    end

    def self.PreHandle(fd)
      $connections[fd]=ConnEntry.new(fd) unless $connections[fd].is_a? ConnEntry
      # make the connection entry as short-term thread-local variable 
      # (cleared in postHandle)
      Thread.current[:connEntry]=$connections[fd]
    end

    def self.PostHandle()
      Thread.current[:connEntry]=nil
    end

    def to_s()
      "#{addr_info}"
    end   end end


module Thrift   class NonblockingServer
    class IOManager
      alias :old_remove_connection :remove_connection
      def remove_connection(fd)
        $connections.delete fd
        old_remove_connection(fd)
      end

      class Worker
        # The following is verbatim from thrift 0.6.0 except for the two lines 
        # marked with "Added"
        def run
          loop do
            cmd, *args = @queue.pop
            case cmd
            when :shutdown
              @logger.debug "#{self} is shutting down, goodbye"
              break
            when :frame
              fd, frame = args
              begin
                otrans = @transport_factory.get_transport(fd)
                oprot = @protocol_factory.get_protocol(otrans)
                membuf = MemoryBufferTransport.new(frame)
                itrans = @transport_factory.get_transport(membuf)
                iprot = @protocol_factory.get_protocol(itrans)
                MyThriftExt::ConnEntry.PreHandle(fd)    # <<== Added
                @processor.process(iprot, oprot)
                MyThriftExt::ConnEntry.PostHandle       # <<== Added
              rescue => e
                @logger.error "#{Thread.current.inspect} raised error: #{e.inspect}\n#{e.backtrace.join("\n")}"
              end
            end
          end
        end
      end
    end   
  end
end

然后在处理程序中的任何位置您都可以访问Thread.current[:connEntry].addr_info 用于连接特定数据或在 Thread.current[:connEntry].attr 哈希中存储有关连接的任何内容。

Here is how I went about changing Thrift::NonblockingServer::IOManager::Worker::run to support this.

Few caveats:

  • As I mentioned in the question I don't consider it clean (if nothing else I will have to monitor future thrift versions for changes in this function, this is based on 0.6.0).
  • this is written to work with ruby 1.8 (or I would have gotten the non translated addr/port see my other question)
  • I'm a ruby newbie .. I'm sure I've done some of this "wrong" (for example, should $connections be @@connections in ConnEntry or a diff class?).
  • I am aware that the Thread.current hash for thread-local storage has a namespace pollution issue

First, at some central module:

module MyThriftExt

  $connections={}

  class ConnEntry
    attr_reader :addr_info, :attr

    def initialize(thrift_fd)
      @addr_info=thrift_fd.handle.peeraddr
      @attr={}
    end

    def self.PreHandle(fd)
      $connections[fd]=ConnEntry.new(fd) unless $connections[fd].is_a? ConnEntry
      # make the connection entry as short-term thread-local variable 
      # (cleared in postHandle)
      Thread.current[:connEntry]=$connections[fd]
    end

    def self.PostHandle()
      Thread.current[:connEntry]=nil
    end

    def to_s()
      "#{addr_info}"
    end   end end


module Thrift   class NonblockingServer
    class IOManager
      alias :old_remove_connection :remove_connection
      def remove_connection(fd)
        $connections.delete fd
        old_remove_connection(fd)
      end

      class Worker
        # The following is verbatim from thrift 0.6.0 except for the two lines 
        # marked with "Added"
        def run
          loop do
            cmd, *args = @queue.pop
            case cmd
            when :shutdown
              @logger.debug "#{self} is shutting down, goodbye"
              break
            when :frame
              fd, frame = args
              begin
                otrans = @transport_factory.get_transport(fd)
                oprot = @protocol_factory.get_protocol(otrans)
                membuf = MemoryBufferTransport.new(frame)
                itrans = @transport_factory.get_transport(membuf)
                iprot = @protocol_factory.get_protocol(itrans)
                MyThriftExt::ConnEntry.PreHandle(fd)    # <<== Added
                @processor.process(iprot, oprot)
                MyThriftExt::ConnEntry.PostHandle       # <<== Added
              rescue => e
                @logger.error "#{Thread.current.inspect} raised error: #{e.inspect}\n#{e.backtrace.join("\n")}"
              end
            end
          end
        end
      end
    end   
  end
end

Then at any point in the handler you can access Thread.current[:connEntry].addr_info for connection specific data or store anything regarding the connection in the Thread.current[:connEntry].attr hash.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文