python:如何在多线程中发送数据包,然后线程自行终止

发布于 2024-07-14 08:45:34 字数 1184 浏览 6 评论 0原文

我有个问题。 我想使用 python 将连续的字节流发送到某个主机一段时间(假设 1 分钟)。

这是到目前为止我的代码:

#! /usr/bin/env python                                                          

import socket
import thread
import time

IP = "192.168.0.2"
PADDING = "a" * 1000 #assume the MTU is slighly above 1000
DATA = PADDING + "this is sentence number = "
PORT = 14444
killed = False
test_time = 60 #60 seconds of testing

def send_data():
  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  s.connect((IP, PORT))
  count = 1
  starttime = time.clock()
  while elapsed < test_time:
    sent = s.send(DATA + str(count) + "\n")
    if sent == 0: break # assume that if nothing is sent -> connection died
    count = count+1
    elapsed = time.clock() - starttime
    if killed:
      break
  s.close()
  print str(count) + " has been sent"

print "to quit type quit"
thread.start_new_thread(send_data, ())

while True:
  var = raw_input("Enter something: ")
  if var == "quit":
    killed = True

几个问题,除了每次轮询 time.clock 之外,是否有更好的方法让线程在 60 秒后终止? 当我运行这个程序时,它正确发送字节,但是当我输入 quit 时,另一个线程不会死,即使我设置了 var Killed = True。 我想知道这是为什么? var Killed 的范围应该到达另一个线程,对吗?

谢谢

I have a question. I'd like to send a continuous streams of byte to some host for certain amount of time (let's say 1 minute) using python.

Here is my code so far:

#! /usr/bin/env python                                                          

import socket
import thread
import time

IP = "192.168.0.2"
PADDING = "a" * 1000 #assume the MTU is slighly above 1000
DATA = PADDING + "this is sentence number = "
PORT = 14444
killed = False
test_time = 60 #60 seconds of testing

def send_data():
  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  s.connect((IP, PORT))
  count = 1
  starttime = time.clock()
  while elapsed < test_time:
    sent = s.send(DATA + str(count) + "\n")
    if sent == 0: break # assume that if nothing is sent -> connection died
    count = count+1
    elapsed = time.clock() - starttime
    if killed:
      break
  s.close()
  print str(count) + " has been sent"

print "to quit type quit"
thread.start_new_thread(send_data, ())

while True:
  var = raw_input("Enter something: ")
  if var == "quit":
    killed = True

Few question, is there a better way to let a thread die after 60 seconds other than polling the time.clock every time?
When I run this program, it sends the bytes correctly but when I typed quit the other thread won't die, even though I set the var killed = True. I wonder why is that? the scope of var Killed should reach the other thread right?

Thanks

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(7

心房的律动 2024-07-21 08:45:34

我推荐使用线程模块。 更大的好处是使用 InterruptableThread 来终止线程。 您不必使用标志来终止您的线程,但如果您从父线程在此线程上调用terminate(),则会发生异常。 您可以处理异常,也可以不处理异常。

import threading, ctypes

class InterruptableThread(threading.Thread):
@classmethod
def _async_raise(cls, tid, excobj):
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(excobj))
    if res == 0:
        raise ValueError("nonexistent thread id")
    elif res > 1:
        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
        raise SystemError("PyThreadState_SetAsyncExc failed")

def raise_exc(self, excobj):
    assert self.isAlive(), "thread must be started"
    for tid, tobj in threading._active.items():
        if tobj is self:
            self._async_raise(tid, excobj)
            return

def terminate(self):
    self.raise_exc(SystemExit)

编辑:
您可以使用等待 1 分钟的另一个线程来重写代码,然后杀死另一个线程

def send_data:
    IP = ...
    # other vars

    ...
    s = socket.socket(.....)

    # no killed checking
    # no time checking
    # just do your work here
    ...
    s.close()


my_thread = InterruptableThread(target=send_data)
my_thread.start()

def one_minute_kill(who):
   time.sleep(60)
   who.terminate()

killer_thread = InterruptableThread(target=one_minute_kill, args=[my_thread])
killer.start()

print "to quit type quit"
while my_thread.isAlive():
  if raw_input("Enter something: ") == "quit":
    my_thread.terminate()

I recommned using threading module. Even more benefit is to use InterruptableThread for terminating the thread. You do not have to use flag for terminating your thread but exception will occur if you call terminate() on this thread from parent. You can handle exception or not.

import threading, ctypes

class InterruptableThread(threading.Thread):
@classmethod
def _async_raise(cls, tid, excobj):
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(excobj))
    if res == 0:
        raise ValueError("nonexistent thread id")
    elif res > 1:
        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
        raise SystemError("PyThreadState_SetAsyncExc failed")

def raise_exc(self, excobj):
    assert self.isAlive(), "thread must be started"
    for tid, tobj in threading._active.items():
        if tobj is self:
            self._async_raise(tid, excobj)
            return

def terminate(self):
    self.raise_exc(SystemExit)

EDIT:
You can rewrite your code like this using another thread that is waiting 1 minute and then killing your other thread

def send_data:
    IP = ...
    # other vars

    ...
    s = socket.socket(.....)

    # no killed checking
    # no time checking
    # just do your work here
    ...
    s.close()


my_thread = InterruptableThread(target=send_data)
my_thread.start()

def one_minute_kill(who):
   time.sleep(60)
   who.terminate()

killer_thread = InterruptableThread(target=one_minute_kill, args=[my_thread])
killer.start()

print "to quit type quit"
while my_thread.isAlive():
  if raw_input("Enter something: ") == "quit":
    my_thread.terminate()
你爱我像她 2024-07-21 08:45:34

我不知道如何使用“线程”模块执行此操作,但我可以使用“线程”模块执行此操作。 我认为这段代码可以实现你想要的。

有关线程模块的文档:
http://docs.python.org/library/threading.html

#!/usr/bin/python

import time
from threading import Thread
import threading
import sys

test_time = 10
killed = False

class SillyThread( threading.Thread ):
    def run(self):
        global killed
        starttime = time.time()
        counter = 0
        while (time.time() - starttime) < test_time:
            if killed:
                break
            counter = counter + 1
            time.sleep(0.1)
        print "I did %d loops" % counter

class ManageThread( threading.Thread ):
    def run(self):
        global killed
        while True:
            var = raw_input("Enter something: ")
            if var == "quit":
                killed = True
                break
        print "Got var [%s]" % var

silly = SillyThread()
silly.start()
ManageThread().start()
Thread.join(silly)
print "bye bye"
sys.exit(0)

请注意,我使用 time.time() 而不是 time.clock()。 time.clock() 给出 Unix 上经过的处理器时间(参见 http://docs.python.org /library/time.html)。 我认为 time.clock() 应该在任何地方都有效。 我将 test_time 设置为 10 秒,因为我一分钟都没有耐心。

如果我让它运行整整 10 秒,会发生以下情况:

leif@peacock:~/tmp$ ./test.py
Enter something: I did 100 loops
bye bye

如果我输入“退出”,会发生以下情况:

leif@peacock:~/tmp$ ./test.py
Enter something: quit
Got var [quit]
I did 10 loops
bye bye

希望这会有所帮助。

I don't know how to do this with the "thread" module, but I can do it with the "threading" module. I think this code accomplishes what you want.

For documentation on the threading module:
http://docs.python.org/library/threading.html

#!/usr/bin/python

import time
from threading import Thread
import threading
import sys

test_time = 10
killed = False

class SillyThread( threading.Thread ):
    def run(self):
        global killed
        starttime = time.time()
        counter = 0
        while (time.time() - starttime) < test_time:
            if killed:
                break
            counter = counter + 1
            time.sleep(0.1)
        print "I did %d loops" % counter

class ManageThread( threading.Thread ):
    def run(self):
        global killed
        while True:
            var = raw_input("Enter something: ")
            if var == "quit":
                killed = True
                break
        print "Got var [%s]" % var

silly = SillyThread()
silly.start()
ManageThread().start()
Thread.join(silly)
print "bye bye"
sys.exit(0)

Note that I use time.time() instead of time.clock(). time.clock() gives elapsed processor time on Unix (see http://docs.python.org/library/time.html). I think time.clock() should work everywhere. I set my test_time to 10 seconds because I don't have the patience for a minute.

Here's what happens if I let it run the full 10 seconds:

leif@peacock:~/tmp$ ./test.py
Enter something: I did 100 loops
bye bye

Here's what happens if I type 'quit':

leif@peacock:~/tmp$ ./test.py
Enter something: quit
Got var [quit]
I did 10 loops
bye bye

Hope this helps.

时光倒影 2024-07-21 08:45:34

如上所述,使用 threading 模块,使用起来更加方便,并且提供了几个同步原语。 它还提供了一个在指定时间后运行的 Timer 类。

如果你只是想让程序退出,你可以简单地将发送线程设置为守护进程。 您可以通过在调用 start() 之前调用 setDaemon(True) 来完成此操作(2.6 可能会使用守护进程属性)。 只要非守护线程正在运行,Python 就不会退出。

As mentioned above, use the threading module, it is much easier to use and provides several synchronization primitives. It also provides a Timer class that runs after a specified amount of time.

If you just want the program to exit, you can simply make the sending thread a daemon. You do this by calling setDaemon(True) before calling start() (2.6 might use a daemon attribute instead). Python won't exit so long as a non-daemon thread is running.

够运 2024-07-21 08:45:34

无需线程即可轻松完成此操作。 例如,使用 Twisted,您只需设置一个定时调用和一个生产者:

from twisted.internet.protocol import ClientFactory, Protocol
from twisted.internet import reactor

class Noisy(Protocol):
    def __init__(self, delay, data):
        self.delay = delay
        self.data = data

    def stop(self):
        self.transport.unregisterProducer()
        self.transport.loseConnection()
        reactor.stop()

    def resumeProducing(self):
        self.transport.write(self.data)

    def connectionMade(self):
        self.transport.registerProducer(self, False)
        reactor.callLater(self.delay, self.stop)

factory = ClientFactory()
factory.protocol = lambda: Noisy(60, "hello server")
reactor.connectTCP(host, port, factory)
reactor.run()

与线程方法相比,这具有多种优点。 它不依赖于守护线程,因此您实际上可以清理网络连接(例如,必要时发送关闭消息),而不是依赖平台来销毁它。 它为您处理所有实际的低级网络代码(您最初的示例在 socket.send 返回 0 的情况下做了错误的事情;此代码将正确处理这种情况)。 您也不必依赖 ctypes 或晦涩的 CPython API 在另一个线程中引发异常(因此它可以移植到更多版本的 Python,并且实际上可以立即中断阻塞的发送,这与其他一些建议的方法不同)。

You can do this pretty easily without threads. For example, using Twisted, you just set up a timed call and a producer:

from twisted.internet.protocol import ClientFactory, Protocol
from twisted.internet import reactor

class Noisy(Protocol):
    def __init__(self, delay, data):
        self.delay = delay
        self.data = data

    def stop(self):
        self.transport.unregisterProducer()
        self.transport.loseConnection()
        reactor.stop()

    def resumeProducing(self):
        self.transport.write(self.data)

    def connectionMade(self):
        self.transport.registerProducer(self, False)
        reactor.callLater(self.delay, self.stop)

factory = ClientFactory()
factory.protocol = lambda: Noisy(60, "hello server")
reactor.connectTCP(host, port, factory)
reactor.run()

This has various advantages over the threaded approach. It doesn't rely on daemon threads, so you can actually clean up the network connection (eg, to send a close message if necessary) rather than relying on the platform to destroy it. It handles all the actual low level networking code for you (your original example is doing the wrong thing in the case of socket.send returning 0; this code will handle that case properly). You also don't have to rely on ctypes or the obscure CPython API for raising an exception in another thread (so it's portable to more versions of Python and can actually interrupt a blocked send immediately, unlike some of the other suggested approaches).

画离情绘悲伤 2024-07-21 08:45:34

确保“退出”正常工作,并添加一个小字体来测试输入是否正常工作。

if var == "quit":
 print "Hey we got quit"

Ensure that the "quit" is working correctly and add a small print to test that the input is working.

if var == "quit":
 print "Hey we got quit"
2024-07-21 08:45:34

变量 elapsed 未初始化。 在 while 循环上方将其设置为零。

The variable elapsed is not initialized. Set it to zero above the while loop.

偏爱自由 2024-07-21 08:45:34

测试 killed 的范围很容易:

>>> import thread
>>> killed = False
>>> import time
>>> def test():
...  while True:
...   time.sleep(1)
...   if killed:
...     print 'Dead.'
...     break
... 
>>> thread.start_new_thread(test, ())
25479680
>>> time.sleep(3)
>>> killed = True
>>> Dead.

It's easy to test the scope of killed:

>>> import thread
>>> killed = False
>>> import time
>>> def test():
...  while True:
...   time.sleep(1)
...   if killed:
...     print 'Dead.'
...     break
... 
>>> thread.start_new_thread(test, ())
25479680
>>> time.sleep(3)
>>> killed = True
>>> Dead.
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文