如何在同一个 EventMachine 反应器中运行 Net::SSH 和 AMQP?
一些背景信息:Gerrit 通过 SSH 公开事件流。这是一个可爱的技巧,但我需要将这些事件转换为 AMQP 消息。我尝试使用 ruby-amqp 和 Net::SSH 但是,好吧,看起来好像不是 AMQP 子组件是偶数正在运行。
我对 EventMachine 还很陌生。有人可以指出我做错了什么吗? “单个 EventMachine 反应器中的多个服务器)的答案没有”该程序似乎适用,也可在 gist 中找到,以方便访问, 是:
#!/usr/bin/env ruby
require 'rubygems'
require 'optparse'
require 'net/ssh'
require 'json'
require 'yaml'
require 'amqp'
require 'logger'
trap(:INT) { puts; exit }
options = {
:logs => 'kili.log',
:amqp => {
:host => 'localhost',
:port => '5672',
},
:ssh => {
:host => 'localhost',
:port => '22',
:user => 'nobody',
:keys => '~/.ssh/id_rsa',
}
}
optparse = OptionParser.new do|opts|
opts.banner = "Usage: kili [options]"
opts.on( '--amqp_host HOST', 'The AMQP host kili will connect to.') do |a|
options[:amqp][:host] = a
end
opts.on( '--amqp_port PORT', 'The port for the AMQP host.') do |ap|
options[:amqp][:port] = ap
end
opts.on( '--ssh_host HOST', 'The SSH host kili will connect to.') do |s|
options[:ssh][:host] = s
end
opts.on( '--ssh_port PORT', 'The SSH port kili will connect on.') do |sp|
options[:ssh][:port] = sp
end
opts.on( '--ssh_keys KEYS', 'Comma delimeted SSH keys for user.') do |sk|
options[:ssh][:keys] = sk
end
opts.on( '--ssh_user USER', 'SSH user for host.') do |su|
options[:ssh][:user] = su
end
opts.on( '-l', '--log LOG', 'The log location of Kili') do |log|
options[:logs] = log
end
opts.on( '-h', '--help', 'Display this screen' ) do
puts opts
exit
end
end
optparse.parse!
log = Logger.new(options[:logs])
log.level = Logger::INFO
amqp = options[:amqp]
sshd = options[:ssh]
queue= EM::Queue.new
EventMachine.run do
AMQP.connect(:host => amqp[:host], :port => amqp[:port]) do |connection|
log.info "Connected to AMQP at #{amqp[:host]}:#{amqp[:port]}"
channel = AMQP::Channel.new(connection)
exchange = channel.topic("traut", :auto_delete => true)
queue.pop do |msg|
log.info("Pulled #{msg} out of queue.")
exchange.publish(msg[:data], :routing_key => msg[:route]) do
log.info("On route #{msg[:route]} published:\n#{msg[:data]}")
end
end
end
Net::SSH.start(sshd[:host], sshd[:user],
:port => sshd[:port], :keys => sshd[:keys].split(',')) do |ssh|
log.info "SSH connection to #{sshd[:host]}:#{sshd[:port]} as #{sshd[:user]} made."
channel = ssh.open_channel do |ch|
ch.exec "gerrit stream-events" do |ch, success|
abort "could not stream gerrit events" unless success
# "on_data" is called when the process writes something to
# stdout
ch.on_data do |c, data|
json = JSON.parse(data)
if json['type'] == 'change-merged'
project = json['change']['project']
route = "com.carepilot.event.code.review.#{project}"
msg = {:data => data, :route => route}
queue.push(msg)
log.info("Pushed #{msg} into queue.")
else
log.info("Ignoring event of type #{json['type']}")
end
end
# "on_extended_data" is called when the process writes
# something to stderr
ch.on_extended_data do |c, type, data|
log.error(data)
end
ch.on_close { log.info('Connection closed') }
end
end
end
end
Some background: Gerrit exposes an event stream through SSH. It's a cute trick, but I need to convert those events into AMQP messages. I've tried to do this with ruby-amqp and Net::SSH but, well, it doesn't seem as if the AMQP sub-component is even being run at all.
I'm fairly new to EventMachine. Can someone point out what I am doing incorrectly? The answer to "Multiple servers in a single EventMachine reactor) didn't seem applicable. The program, also available in a gist for easier access, is:
#!/usr/bin/env ruby
require 'rubygems'
require 'optparse'
require 'net/ssh'
require 'json'
require 'yaml'
require 'amqp'
require 'logger'
trap(:INT) { puts; exit }
options = {
:logs => 'kili.log',
:amqp => {
:host => 'localhost',
:port => '5672',
},
:ssh => {
:host => 'localhost',
:port => '22',
:user => 'nobody',
:keys => '~/.ssh/id_rsa',
}
}
optparse = OptionParser.new do|opts|
opts.banner = "Usage: kili [options]"
opts.on( '--amqp_host HOST', 'The AMQP host kili will connect to.') do |a|
options[:amqp][:host] = a
end
opts.on( '--amqp_port PORT', 'The port for the AMQP host.') do |ap|
options[:amqp][:port] = ap
end
opts.on( '--ssh_host HOST', 'The SSH host kili will connect to.') do |s|
options[:ssh][:host] = s
end
opts.on( '--ssh_port PORT', 'The SSH port kili will connect on.') do |sp|
options[:ssh][:port] = sp
end
opts.on( '--ssh_keys KEYS', 'Comma delimeted SSH keys for user.') do |sk|
options[:ssh][:keys] = sk
end
opts.on( '--ssh_user USER', 'SSH user for host.') do |su|
options[:ssh][:user] = su
end
opts.on( '-l', '--log LOG', 'The log location of Kili') do |log|
options[:logs] = log
end
opts.on( '-h', '--help', 'Display this screen' ) do
puts opts
exit
end
end
optparse.parse!
log = Logger.new(options[:logs])
log.level = Logger::INFO
amqp = options[:amqp]
sshd = options[:ssh]
queue= EM::Queue.new
EventMachine.run do
AMQP.connect(:host => amqp[:host], :port => amqp[:port]) do |connection|
log.info "Connected to AMQP at #{amqp[:host]}:#{amqp[:port]}"
channel = AMQP::Channel.new(connection)
exchange = channel.topic("traut", :auto_delete => true)
queue.pop do |msg|
log.info("Pulled #{msg} out of queue.")
exchange.publish(msg[:data], :routing_key => msg[:route]) do
log.info("On route #{msg[:route]} published:\n#{msg[:data]}")
end
end
end
Net::SSH.start(sshd[:host], sshd[:user],
:port => sshd[:port], :keys => sshd[:keys].split(',')) do |ssh|
log.info "SSH connection to #{sshd[:host]}:#{sshd[:port]} as #{sshd[:user]} made."
channel = ssh.open_channel do |ch|
ch.exec "gerrit stream-events" do |ch, success|
abort "could not stream gerrit events" unless success
# "on_data" is called when the process writes something to
# stdout
ch.on_data do |c, data|
json = JSON.parse(data)
if json['type'] == 'change-merged'
project = json['change']['project']
route = "com.carepilot.event.code.review.#{project}"
msg = {:data => data, :route => route}
queue.push(msg)
log.info("Pushed #{msg} into queue.")
else
log.info("Ignoring event of type #{json['type']}")
end
end
# "on_extended_data" is called when the process writes
# something to stderr
ch.on_extended_data do |c, type, data|
log.error(data)
end
ch.on_close { log.info('Connection closed') }
end
end
end
end
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
Net::SSH 不是异步的,因此您的 EventMachine.run() 永远不会到达块的末尾,因此永远不会恢复反应器线程。这会导致 AMQP 代码永远无法启动。我建议在另一个线程中运行 SSH 代码。
Net::SSH is not asynchronous, so your
EventMachine.run()
is never reaching the end of the block, thus never resuming the reactor thread. This causes the AMQP code to never start. I would suggest running your SSH code within another thread.如果您返回 EventMachine,请输入 em-ssh https://github.com/simulacre/em-ssh< /a> 尝试一下。
If you go back to EventMachine, give em-ssh https://github.com/simulacre/em-ssh a try.