在python中创建可中断进程

发布于 2024-10-10 05:52:23 字数 842 浏览 9 评论 0原文

我正在创建一个 python 脚本,它解析一个大的(但简单的)CSV。

需要一些时间来处理。我希望能够中断 CSV 的解析,以便我可以在稍后阶段继续。

目前我有这个 - 它位于一个更大的类中:(未完成)

编辑:

我有一些更改的代码。但系统将解析超过 300 万行。

def parseData(self)
    reader = csv.reader(open(self.file))
    for id, title, disc in reader:
        print "%-5s %-50s %s" % (id, title, disc)
        l = LegacyData()
        l.old_id = int(id)
        l.name = title
        l.disc_number = disc
        l.parsed = False
        l.save()

这是旧代码。

def parseData(self):
        #first line start
        fields = self.data.next()
        for row in self.data:
            items = zip(fields, row)
            item = {}
            for (name, value) in items:
                item[name] = value.strip()
            self.save(item)

谢谢你们。

I'm creating a python script of which parses a large (but simple) CSV.

It'll take some time to process. I would like the ability to interrupt the parsing of the CSV so I can continue at a later stage.

Currently I have this - of which lives in a larger class: (unfinished)

Edit:

I have some changed code. But the system will parse over 3 million rows.

def parseData(self)
    reader = csv.reader(open(self.file))
    for id, title, disc in reader:
        print "%-5s %-50s %s" % (id, title, disc)
        l = LegacyData()
        l.old_id = int(id)
        l.name = title
        l.disc_number = disc
        l.parsed = False
        l.save()

This is the old code.

def parseData(self):
        #first line start
        fields = self.data.next()
        for row in self.data:
            items = zip(fields, row)
            item = {}
            for (name, value) in items:
                item[name] = value.strip()
            self.save(item)

Thanks guys.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

双手揣兜 2024-10-17 05:52:23

如果在 Linux 下,请按 Ctrl-Z 并停止正在运行的进程。输入“fg”将其恢复并从停止的地方开始。

If under linux, hit Ctrl-Z and stop the running process. Type "fg" to bring it back and start where you stopped it.

冰魂雪魄 2024-10-17 05:52:23

您可以使用 signal 来捕获事件。这是一个解析器的模型,可以在 Windows 上捕获 CTRL-C 并停止解析:

import signal, tme, sys

def onInterupt(signum, frame):
    raise Interupted()

try:
    #windows
    signal.signal(signal.CTRL_C_EVENT, onInterupt)
except:
    pass

class Interupted(Exception): pass
class InteruptableParser(object):

    def __init__(self, previous_parsed_lines=0):
        self.parsed_lines = previous_parsed_lines

    def _parse(self, line):
        # do stuff
        time.sleep(1) #mock up
        self.parsed_lines += 1
        print 'parsed %d' % self.parsed_lines

   def parse(self, filelike):
        for line in filelike:
            try:
                self._parse(line)
            except Interupted:
                print 'caught interupt'
                self.save()
                print 'exiting ...'
                sys.exit(0)

    def save(self):
        # do what you need to save state
        # like write the parse_lines to a file maybe
        pass

parser = InteruptableParser()
parser.parse([1,2,3])

但无法测试它,因为我目前在 Linux 上。

You can use signal to catch the event. This is a mockup of a parser than can catch CTRL-C on windows and stop parsing:

import signal, tme, sys

def onInterupt(signum, frame):
    raise Interupted()

try:
    #windows
    signal.signal(signal.CTRL_C_EVENT, onInterupt)
except:
    pass

class Interupted(Exception): pass
class InteruptableParser(object):

    def __init__(self, previous_parsed_lines=0):
        self.parsed_lines = previous_parsed_lines

    def _parse(self, line):
        # do stuff
        time.sleep(1) #mock up
        self.parsed_lines += 1
        print 'parsed %d' % self.parsed_lines

   def parse(self, filelike):
        for line in filelike:
            try:
                self._parse(line)
            except Interupted:
                print 'caught interupt'
                self.save()
                print 'exiting ...'
                sys.exit(0)

    def save(self):
        # do what you need to save state
        # like write the parse_lines to a file maybe
        pass

parser = InteruptableParser()
parser.parse([1,2,3])

Can't test it though as I'm on linux at the moment.

累赘 2024-10-17 05:52:23

我这样做的方式:

将实际的处理代码放在一个类中,在该类上我将实现 Pickle 协议(http://docs.python.org/library/pickle.html)(基本上,编写正确的代码) __getstate____setstate__ 函数)

此类将接受文件名、保留打开的文件和 CSV 读取器实例作为实例成员。 __getstate__ 方法将保存当前文件位置,setstate 将重新打开文件,将其转发到正确的位置,并创建一个新的读取器。

我将在 __iter__ 方法中执行实际工作,该方法在处理每行后会生成一个外部函数。

这个外部函数将运行一个“主循环”,监视中断输入(套接字、键盘、文件系统上特定文件的状态等)——一切都很安静,它只会调用处理器的下一次迭代。如果发生中断,它会将处理器状态pickle到磁盘上的特定文件中。

当启动时,程序只需检查是否有保存的执行,如果有,则使用 pickle 检索执行器对象,然后恢复主循环。

这是一些(未经测试的)代码 - IEA 非常简单:

from cPickle import load, dump
import csv
import os, sys

SAVEFILE = "running.pkl"
STOPNOWFILE = "stop.now"

class Processor(object):
    def __init__(self, filename):
        self.file = open(filename, "rt")
        self.reader = csv.reader(self.file)
    def __iter__(self):
        for line in self.reader():
            # do stuff
            yield None
    def __getstate__(self):
        return (self.file.name, self.file.tell())
    def __setstate__(self, state):
        self.file = open(state[0],"rt")
        self.file.seek(state[1])
        self.reader = csv.reader(self.File)

def check_for_interrupts():
    # Use your imagination here!  
    # One simple thing would e to check for the existence of an specific file
    # on disk.
    # But you go all the way up to instantiate a tcp server and listen to 
    # interruptions on the network
    if os.path.exists(STOPNOWFILE): 
        return True
    return False

def main():
    if os.path.exists(SAVEFILE):
        with open(SAVEFILE) as savefile:
            processor = load(savefile)
        os.unlink(savefile)
    else:
        #Assumes the name of the .csv file to be passed on the command line
        processor = Processor(sys.argv[1])
    for line in processor:
        if check_for_interrupts():
            with open(SAVEFILE, "wb") as savefile:
                dump(processor)
            break

if __name__ == "__main__":
    main()

The way I'd do it:

Puty the actual processing code in a class, and on that class I'd implement the Pickle protocol (http://docs.python.org/library/pickle.html ) (basically, write proper __getstate__ and __setstate__ functions)

This class would accept the filename, keep the open file, and the CSV reader instance as instance members. The __getstate__ method would save the current file position, and setstate would reopen the file, forward it to the proper position, and create a new reader.

I'd perform the actuall work in an __iter__ method, that would yeld to an external function after each line was processed.

This external function would run a "main loop" monitoring input for interrupts (sockets, keyboard, state of an specific file on the filesystem, etc...) - everything being quiet, it would just call for the next iteration of the processor. If an interrupt happens, it would pickle the processor state to an specific file on disk.

When startingm the program just has to check if a there is a saved execution, if so, use pickle to retrieve the executor object, and resume the main loop.

Here goes some (untested) code - the iea is simple enough:

from cPickle import load, dump
import csv
import os, sys

SAVEFILE = "running.pkl"
STOPNOWFILE = "stop.now"

class Processor(object):
    def __init__(self, filename):
        self.file = open(filename, "rt")
        self.reader = csv.reader(self.file)
    def __iter__(self):
        for line in self.reader():
            # do stuff
            yield None
    def __getstate__(self):
        return (self.file.name, self.file.tell())
    def __setstate__(self, state):
        self.file = open(state[0],"rt")
        self.file.seek(state[1])
        self.reader = csv.reader(self.File)

def check_for_interrupts():
    # Use your imagination here!  
    # One simple thing would e to check for the existence of an specific file
    # on disk.
    # But you go all the way up to instantiate a tcp server and listen to 
    # interruptions on the network
    if os.path.exists(STOPNOWFILE): 
        return True
    return False

def main():
    if os.path.exists(SAVEFILE):
        with open(SAVEFILE) as savefile:
            processor = load(savefile)
        os.unlink(savefile)
    else:
        #Assumes the name of the .csv file to be passed on the command line
        processor = Processor(sys.argv[1])
    for line in processor:
        if check_for_interrupts():
            with open(SAVEFILE, "wb") as savefile:
                dump(processor)
            break

if __name__ == "__main__":
    main()
心安伴我暖 2024-10-17 05:52:23

我的完整代码

我遵循了带有标志的@jsbueno的建议 - 但我没有将另一个文件保存在类中作为变量:

我创建一个类 - 当我调用它时要求任何输入,然后开始另一个进程执行我的操作工作。当它循环时 - 如果我按下一个键,则会设置该标志,并且仅在为下一个解析调用循环时才检查该标志。因此我不会终止当前的操作。
在数据库中为我调用的数据中的每个对象添加一个 process 标志意味着我可以随时启动它并从中断处继续。

class MultithreadParsing(object):
    
    process = None
    process_flag = True
    
    def f(self):
        print "\nMultithreadParsing has started\n"
        while self.process_flag:
            ''' get my object from database '''
            legacy = LegacyData.objects.filter(parsed=False)[0:1]
            
            if legacy:
                print "Processing: %s %s" % (legacy[0].name, legacy[0].disc_number)
                for l in legacy:
                    ''' ... Do what I want it to do ...'''
                sleep(1)
            else:
                self.process_flag = False
                print "Nothing to parse"
                
        
    
    def __init__(self):
        self.process = Process(target=self.f)
        self.process.start()
        print self.process
        a = raw_input("Press any key to stop \n")
        print "\nKILL FLAG HAS BEEN SENT\n"
        
        if a:
            print "\nKILL\n"
            self.process_flag = False

感谢你们所有的帮助(尤其是你们的@jsbueno)——如果没有你们,我就不会得到这门课的想法。

My Complete Code

I followed the advice of @jsbueno with a flag - but instead of another file, I kept it within the class as a variable:

I create a class - when I call it asks for ANY input and then begins another process doing my work. As its looped - if I were to press a key, the flag is set and only checked when the loop is called for my next parse. Thus I don't kill the current action.
Adding a process flag in the database for each object from the data I'm calling means I can start this any any time and resume where I left off.

class MultithreadParsing(object):
    
    process = None
    process_flag = True
    
    def f(self):
        print "\nMultithreadParsing has started\n"
        while self.process_flag:
            ''' get my object from database '''
            legacy = LegacyData.objects.filter(parsed=False)[0:1]
            
            if legacy:
                print "Processing: %s %s" % (legacy[0].name, legacy[0].disc_number)
                for l in legacy:
                    ''' ... Do what I want it to do ...'''
                sleep(1)
            else:
                self.process_flag = False
                print "Nothing to parse"
                
        
    
    def __init__(self):
        self.process = Process(target=self.f)
        self.process.start()
        print self.process
        a = raw_input("Press any key to stop \n")
        print "\nKILL FLAG HAS BEEN SENT\n"
        
        if a:
            print "\nKILL\n"
            self.process_flag = False

Thanks for all you help guys (especially yours @jsbueno) - if it wasn't for you I wouldn't have got this class idea.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文