读取和绘制从大文件中读取的数据

发布于 2024-10-27 18:00:57 字数 835 浏览 6 评论 0原文

我们有相当大的文件,大约为 1-1.5 GB(主要是日志文件),其中包含可以轻松解析为 csv 的原始数据,随后应该将其绘制成图表以生成一组图形图像。

目前,我们正在使用 bash 脚本将原始数据转换为 csv 文件,其中仅包含需要绘制图表的数字,然后将其输入到 gnuplot 脚本中。但这个过程极其缓慢。我尝试通过使用单个 awk 命令替换一些管道式 cut、tr 等来加速 bash 脚本,尽管这改进了速度,整个事情还是很慢。

所以,我开始相信这个过程有更好的工具。我目前正在考虑用 python+numpy 或 R 重写这个过程。我的一个朋友建议使用 JVM,如果我要这样做,我将使用 clojure,但我不确定 JVM 将如何执行。

我在处理此类问题方面没有太多经验,因此任何有关如何进行的建议都会很好。谢谢。

编辑:另外,我想要存储(到磁盘)生成的中间数据,即 csv,所以我不必重新生成它,我应该选择我想要不同的外观图形。

编辑2:原始数据文件每行一条记录,各字段之间用分隔符(|)分隔。并非所有字段都是数字。我在输出 csv 中需要的每个字段都是通过对输入记录应用特定公式获得的,该公式可能使用输入数据中的多个字段。输出 csv 每行有 3-4 个字段,我需要在(可能是)条形图中绘制 1-2、1-3、1-4 个字段的图表。我希望这能提供更好的图片。

编辑3:我对@adirau的脚本做了一些修改,它似乎运行得很好。我已经走得足够远了,我正在读取数据,发送到处理器线程池(伪处理,将线程名称附加到数据),并通过另一个收集器线程将其聚合到输出文件中。

PS:我不太清楚这个问题的标签,欢迎指正。

We have pretty large files, the order of 1-1.5 GB combined (mostly log files) with raw data that is easily parseable to a csv, which is subsequently supposed to be graphed to generate a set of graph images.

Currently, we are using bash scripts to turn the raw data into a csv file, with just the numbers that need to be graphed, and then feeding it into a gnuplot script. But this process is extremely slow. I tried to speed up the bash scripts by replacing some piped cuts, trs etc. with a single awk command, although this improved the speed, the whole thing is still very slow.

So, I am starting to believe there are better tools for this process. I am currently looking to rewrite this process in python+numpy or R. A friend of mine suggested using the JVM, and if I am to do that, I will use clojure, but am not sure how the JVM will perform.

I don't have much experience in dealing with these kind of problems, so any advice on how to proceed would be great. Thanks.

Edit: Also, I will want to store (to disk) the generated intermediate data, i.e., the csv, so I don't have to re-generate it, should I choose I want a different looking graph.

Edit 2: The raw data files have one record per one line, whose fields are separated by a delimiter (|). Not all fields are numbers. Each field I need in the output csv is obtained by applying a certain formula on the input records, which may use multiple fields from the input data. The output csv will have 3-4 fields per line, and I need graphs that plot 1-2, 1-3, 1-4 fields in a (may be) bar chart. I hope that gives a better picture.

Edit 3: I have modified @adirau's script a little and it seems to be working pretty well. I have come far enough that I am reading data, sending to a pool of processor threads (pseudo processing, append thread name to data), and aggregating it into an output file, through another collector thread.

PS: I am not sure about the tagging of this question, feel free to correct it.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

飘然心甜 2024-11-03 18:00:57

python 听起来是一个不错的选择,因为它有一个很好的线程 API(尽管实现有问题)、matplotlib 和 pylab。我错过了您的更多规格,但也许这对您来说可能是一个很好的起点: matplotlib:使用线程异步绘图
我会选择一个线程来处理批量磁盘 I/O 读取,并将队列同步到线程池进行数据处理(如果您有固定的记录长度,通过预先计算读取偏移量并将偏移量传递到线程池,事情可能会变得更快) ;使用diskio线程,我将映射数据源文件,读取预定义的num字节+再读取一次,以最终获取当前数据源行输入末尾的最后一个字节;应选择接近平均行输入长度的 numbytes;接下来是通过队列进行池馈送以及在线程池中进行的数据处理/绘图;我这里没有很好的图片(您到底在绘制什么),但我希望这会有所帮助。

编辑:有 file.readlines([sizehint]) 可以一次抓取多行;好吧,它可能不会那么快,因为文档说它在内部使用 readline()

编辑:快速骨架代码

import threading
from collections import deque
import sys
import mmap


class processor(Thread):
    """
        processor gets a batch of data at time from the diskio thread
    """
    def __init__(self,q):
        Thread.__init__(self,name="plotter")
        self._queue = q
    def run(self):
        #get batched data 
        while True:
            #we wait for a batch
            dataloop = self.feed(self._queue.get())
            try:
                while True:
                    self.plot(dataloop.next())
            except StopIteration:
                pass
            #sanitizer exceptions following, maybe

    def parseline(self,line):
        """ return a data struct ready for plotting """
        raise NotImplementedError

    def feed(self,databuf):
        #we yield one-at-time datastruct ready-to-go for plotting
        for line in databuf:
            yield self.parseline(line)

    def plot(self,data):
        """integrate
        https://www.esclab.tw/wiki/index.php/Matplotlib#Asynchronous_plotting_with_threads
        maybe
        """
class sharedq(object):
    """i dont recall where i got this implementation from 
    you may write a better one"""
    def __init__(self,maxsize=8192):
        self.queue = deque()
        self.barrier = threading.RLock()
        self.read_c = threading.Condition(self.barrier)
        self.write_c = threading.Condition(self.barrier)
        self.msz = maxsize
    def put(self,item):
        self.barrier.acquire()
        while len(self.queue) >= self.msz:
            self.write_c.wait()
        self.queue.append(item)
        self.read_c.notify()
        self.barrier.release()
    def get(self):
        self.barrier.acquire()
        while not self.queue:
            self.read_c.wait()
        item = self.queue.popleft()
        self.write_c.notify()
        self.barrier.release()
        return item



q = sharedq()
#sizehint for readine lines
numbytes=1024
for i in xrange(8):
    p = processor(q)
    p.start()
for fn in sys.argv[1:]
    with open(fn, "r+b") as f:
        #you may want a better sizehint here
        map = mmap.mmap(f.fileno(), 0)
        #insert a loop here, i forgot
        q.put(map.readlines(numbytes))

#some cleanup code may be desirable

python sounds to be a good choice because it has a good threading API (the implementation is questionable though), matplotlib and pylab. I miss some more specs from your end but maybe this could be a good starting point for you: matplotlib: async plotting with threads.
I would go for a single thread for handling bulk disk i/o reads and sync queueing to a pool of threads for data processing (if you have fixed record lengths things may get faster by precomputing reading offsets and passing just the offsets to the threadpool); with the diskio thread I would mmap the datasource files, read a predefined num bytes + one more read to eventually grab the last bytes to the end of the current datasource lineinput; the numbytes should be chosen somewhere near your average lineinput length; next is pool feeding via the queue and the data processing / plotting that takes place in the threadpool; I don't have a good picture here (of what are you plotting exactly) but I hope this helps.

EDIT: there's file.readlines([sizehint]) to grab multiple lines at once; well it may not be so speedy cuz the docs are saying its using readline() internally

EDIT: a quick skeleton code

import threading
from collections import deque
import sys
import mmap


class processor(Thread):
    """
        processor gets a batch of data at time from the diskio thread
    """
    def __init__(self,q):
        Thread.__init__(self,name="plotter")
        self._queue = q
    def run(self):
        #get batched data 
        while True:
            #we wait for a batch
            dataloop = self.feed(self._queue.get())
            try:
                while True:
                    self.plot(dataloop.next())
            except StopIteration:
                pass
            #sanitizer exceptions following, maybe

    def parseline(self,line):
        """ return a data struct ready for plotting """
        raise NotImplementedError

    def feed(self,databuf):
        #we yield one-at-time datastruct ready-to-go for plotting
        for line in databuf:
            yield self.parseline(line)

    def plot(self,data):
        """integrate
        https://www.esclab.tw/wiki/index.php/Matplotlib#Asynchronous_plotting_with_threads
        maybe
        """
class sharedq(object):
    """i dont recall where i got this implementation from 
    you may write a better one"""
    def __init__(self,maxsize=8192):
        self.queue = deque()
        self.barrier = threading.RLock()
        self.read_c = threading.Condition(self.barrier)
        self.write_c = threading.Condition(self.barrier)
        self.msz = maxsize
    def put(self,item):
        self.barrier.acquire()
        while len(self.queue) >= self.msz:
            self.write_c.wait()
        self.queue.append(item)
        self.read_c.notify()
        self.barrier.release()
    def get(self):
        self.barrier.acquire()
        while not self.queue:
            self.read_c.wait()
        item = self.queue.popleft()
        self.write_c.notify()
        self.barrier.release()
        return item



q = sharedq()
#sizehint for readine lines
numbytes=1024
for i in xrange(8):
    p = processor(q)
    p.start()
for fn in sys.argv[1:]
    with open(fn, "r+b") as f:
        #you may want a better sizehint here
        map = mmap.mmap(f.fileno(), 0)
        #insert a loop here, i forgot
        q.put(map.readlines(numbytes))

#some cleanup code may be desirable
厌倦 2024-11-03 18:00:57

我认为 python+Numpy 将是最有效的方法,考虑到速度和易于实施。
Numpy 经过高度优化,因此性能不错,而 python 则可以简化算法实现部分。

这个组合应该适合您的情况,只要您优化内存上文件的加载,尝试找到处理不太大但足够大以最小化读写周期的数据块之间的中间点,因为这 如果你觉得这需要更多的加速(我真诚地

怀疑),你可以使用 Cython 来加速缓慢的部分。

I think python+Numpy would be the most efficient way, regarding speed and ease of implementation.
Numpy is highly optimized so the performance is decent, and python would ease up the algorithm implementation part.

This combo should work well for your case, providing you optimize the loading of the file on memory, try to find the middle point between processing a data block that isn't too large but large enough to minimize the read and write cycles, because this is what will slow down the program

If you feel that this needs more speeding up (which i sincerely doubt), you could use Cython to speed up the sluggish parts.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文