处理守护线程上的数据库连接
我在处理我一直在处理的守护进程中的数据库连接时遇到问题,我首先连接到我的 postgres 数据库:
try:
psycopg2.apilevel = '2.0'
psycopg2.threadsafety = 3
cnx = psycopg2.connect( "host='192.168.10.36' dbname='db' user='vas' password='vas'")
except Exception, e:
print "Unable to connect to DB. Error [%s]" % ( e,)
exit( )
之后我选择数据库中状态 = 0 的所有行:
try:
cursor = cnx.cursor( cursor_factory = psycopg2.extras.DictCursor)
cursor.execute( "SELECT * FROM table WHERE status = 0")
rows = cursor.fetchall( )
cursor.close( )
except Exception, e:
print "Error on sql query [%s]" % ( e,)
然后如果有行选择了程序forks into:
while 1:
try:
psycopg2.apilevel = '2.0'
psycopg2.threadsafety = 3
cnx = psycopg2.connect( "host='192.168.10.36' dbname='sms' user='vas' password='vas'")
except Exception, e:
print "Unable to connect to DB. Error [%s]" % ( e,)
exit( )
if rows:
daemonize( )
for i in rows:
try:
global q, l
q = Queue.Queue( max_threads)
for i in rows:
cursor = cnx.cursor( cursor_factory = psycopg2.extras.DictCursor)
t = threading.Thread( target=sender, args=(i, cursor))
t.setDaemon( True)
t.start( )
for i in rows:
q.put( i)
q.join( )
except Exception, e:
print "Se ha producido el siguente error [%s]" % ( e,)
exit( )
else:
print "No rows where selected\n"
time.sleep( 5)
我的 daemonize 函数如下所示:
def daemonize( ):
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError, e:
print >>sys.stderr, "fork #1 failed: %d (%s)" % (e.errno, e.strerror)
sys.exit(1)
os.chdir("/")
os.umask(0)
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError, e:
print >>sys.stderr, "fork #2 failed: %d (%s)" % (e.errno, e.strerror)
sys.exit(1)
线程目标为发送者函数:
def sender( row, db):
while 1:
item = q.get( )
if send_to( row['to'], row['text']):
db.execute( "UPDATE table SET status = 1 WHERE id = %d" % sms['id'])
else:
print "UPDATE table SET status = 2 WHERE id = %d" % sms['id']
db.execute( "UPDATE table SET status = 2 WHERE id = %d" % sms['id'])
db.close( )
q.task_done( )
send_to
函数仅打开一个 url 并在成功时返回 true 或 false
从昨天开始,我不断收到这些错误,并且无法找到路径:
UPDATE outbox SET status = 2 WHERE id = 36
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib/python2.6/threading.py", line 525, in __bootstrap_inner
self.run()
File "/usr/lib/python2.6/threading.py", line 477, in run
self.__target(*self.__args, **self.__kwargs)
File "sender.py", line 30, in sender
db.execute( "UPDATE table SET status = 2 WHERE id = %d" % sms['id'])
File "/usr/lib/python2.6/dist-packages/psycopg2/extras.py", line 88, in execute
return _cursor.execute(self, query, vars, async)
OperationalError: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
I have a problem handling database connections in a daemon I've been working on, I first connect to my postgres database with:
try:
psycopg2.apilevel = '2.0'
psycopg2.threadsafety = 3
cnx = psycopg2.connect( "host='192.168.10.36' dbname='db' user='vas' password='vas'")
except Exception, e:
print "Unable to connect to DB. Error [%s]" % ( e,)
exit( )
after that I select all rows in the DB that are with status = 0:
try:
cursor = cnx.cursor( cursor_factory = psycopg2.extras.DictCursor)
cursor.execute( "SELECT * FROM table WHERE status = 0")
rows = cursor.fetchall( )
cursor.close( )
except Exception, e:
print "Error on sql query [%s]" % ( e,)
then if there are rows selected the program forks into:
while 1:
try:
psycopg2.apilevel = '2.0'
psycopg2.threadsafety = 3
cnx = psycopg2.connect( "host='192.168.10.36' dbname='sms' user='vas' password='vas'")
except Exception, e:
print "Unable to connect to DB. Error [%s]" % ( e,)
exit( )
if rows:
daemonize( )
for i in rows:
try:
global q, l
q = Queue.Queue( max_threads)
for i in rows:
cursor = cnx.cursor( cursor_factory = psycopg2.extras.DictCursor)
t = threading.Thread( target=sender, args=(i, cursor))
t.setDaemon( True)
t.start( )
for i in rows:
q.put( i)
q.join( )
except Exception, e:
print "Se ha producido el siguente error [%s]" % ( e,)
exit( )
else:
print "No rows where selected\n"
time.sleep( 5)
My daemonize function looks like this:
def daemonize( ):
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError, e:
print >>sys.stderr, "fork #1 failed: %d (%s)" % (e.errno, e.strerror)
sys.exit(1)
os.chdir("/")
os.umask(0)
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError, e:
print >>sys.stderr, "fork #2 failed: %d (%s)" % (e.errno, e.strerror)
sys.exit(1)
threads target to sender function:
def sender( row, db):
while 1:
item = q.get( )
if send_to( row['to'], row['text']):
db.execute( "UPDATE table SET status = 1 WHERE id = %d" % sms['id'])
else:
print "UPDATE table SET status = 2 WHERE id = %d" % sms['id']
db.execute( "UPDATE table SET status = 2 WHERE id = %d" % sms['id'])
db.close( )
q.task_done( )
send_to
function just opens a url and return true or false on success
Since yesterday i keep getting these error and cant find my way thru:
UPDATE outbox SET status = 2 WHERE id = 36
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib/python2.6/threading.py", line 525, in __bootstrap_inner
self.run()
File "/usr/lib/python2.6/threading.py", line 477, in run
self.__target(*self.__args, **self.__kwargs)
File "sender.py", line 30, in sender
db.execute( "UPDATE table SET status = 2 WHERE id = %d" % sms['id'])
File "/usr/lib/python2.6/dist-packages/psycopg2/extras.py", line 88, in execute
return _cursor.execute(self, query, vars, async)
OperationalError: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
数据库句柄无法在
fork()
中保留。您需要在每个子进程中打开一个新的数据库句柄,即在调用daemonize()
后调用psycopg2.connect
。我没有使用过 postgres,但我知道这对于 MySQL 来说绝对是正确的。
Database handles don't survive across
fork()
. You'll need to open a new database handle in each subprocess, ie after you calldaemonize()
callpsycopg2.connect
.I've not used postgres but I know this to be definitely true for MySQL.