扭曲的task.loop和pb auth
学习扭曲。我决定编写一个服务器和客户端,每秒一次共享数据。 写了一个实现,但在我看来,它是不正确的。
# -*- coding: utf-8 -*-
from twisted.spread import pb
from twisted.internet import reactor, task
from twisted.cred import credentials
from win32com.server import factory
class login_send:
def __init__(self):
self.count=0
self.timeout = 1.0
self.factory = pb.PBClientFactory()
reactor.connectTCP("localhost", 8800, self.factory)
def testTimeout(self):
self.count+=1
print self.count
def1 = self.factory.login(credentials.UsernamePassword("test1","bb1b"))
def1.addCallbacks(self.good_connected, self.bad_connected)
def1.addCallback(self.send_data)
def1.addErrback(self.disconnect)
if self.count>10:def1.addBoth(self.disconnect)
def start(self):
l = task.LoopingCall(self.testTimeout)
l.start(self.timeout)
reactor.run()
def good_connected(self, perspective):
print 'good login and password', perspective
return perspective
def bad_connected(self, perspective):
print 'bad login or password', perspective
return perspective
def send_data(self, perspective):
print 'send'
return perspective.callRemote("foo", self.count)
def disconnect(self, perspective):
print 'disconnect'
reactor.stop()
if __name__ == "__main__":
st=login_send()
st.start()
代码:如果登录名和密码为True->发送 self.count,如果登录名或密码为 False ->断开连接,如果 self.count>10 -> 在我看来
,第一个错误是我每次都必须登录。
def1 = self.factory.login(credentials.UsernamePassword("test1", "bb1b"))
如何进行一次授权,并持续每秒发送数据?
简单的测试服务器代码:
from zope.interface import implements
from twisted.spread import pb
from twisted.cred import checkers, portal
from twisted.internet import reactor
class MyPerspective(pb.Avatar):
def __init__(self, name):
self.name = name
def perspective_foo(self, arg):
print "I am", self.name, "perspective_foo(",arg,") called on", self
return arg
class MyRealm:
implements(portal.IRealm)
def requestAvatar(self, avatarId, mind, *interfaces):
if pb.IPerspective not in interfaces:
print 'qqqq'
raise NotImplementedError
return pb.IPerspective, MyPerspective(avatarId), lambda:None
p = portal.Portal(MyRealm())
c = checkers.InMemoryUsernamePasswordDatabaseDontUse(test1="bbb",
user2="pass2")
p.registerChecker(c)
reactor.listenTCP(8800, pb.PBServerFactory(p))
reactor.run()
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我相信这应该可以解决问题。
I believe this should do the trick.