在 Linux 中使用 Python 将文件写入 USB 记忆棒?

发布于 2024-11-26 16:10:40 字数 3052 浏览 4 评论 0原文

我在写入文件时遇到的麻烦比我预期的要多得多。我有一台小型单板计算机,在带有 Angstrom 嵌入式发行版的 ARM 处理器上运行。我正在用 python 为其编写一个应用程序。应用程序应检测 USB 记忆棒,然后将文件转储到其中。我遇到的问题是,即使我的脚本似乎正确写入了文件,并且文件似乎在 Linux 中使用“ls”和“cat”,当我拔下 USB 记忆棒并尝试查看Windows 或其他 Linux 发行版中的文件是空的,或者更常见的是根本不存在。我的应用程序是多线程的。我创建 /media/mymntpnt 作为根(目录)。

任何帮助都会受到赞赏。

以下是我的代码片段:

这部分是我用来识别 USB 记忆棒的部分:

class MediaScanner():
    def __init__(self):
        self.lok = thread.allocate_lock()
        self.running = True
        self.started = False

    def register_cb(self,func):
        self.cb = func

    def start(self):
        if not self.started:
            thread.start_new_thread(self.scan_thread,())

    def scan_thread(self):
        self.quit = False
        self.started = True
        last_devices = []
        while self.running:
            devices = self.scan_media()
            if (devices != last_devices):
                self.cb(devices) #call the callback as its own thread
            last_devices = devices

            time.sleep(0.1)

        self.quit = True    

    def stop(self):
        self.running = False
        while(not self.quit):
            pass
        return True

    def is_running(self):
        return self.running


    def scan_media(self):
        with self.lok:
            partitionsFile = open("/proc/partitions")
            lines = partitionsFile.readlines()[2:]#Skips the header lines
            devices = []
            for line in lines:
                words = [x.strip() for x in line.split()]
                minorNumber = int(words[1])
                deviceName = words[3]
                if minorNumber % 16 == 0:
                    path = "/sys/class/block/" + deviceName
                    if os.path.islink(path):
                        if os.path.realpath(path).find("/usb") > 0:
                            devices.append('/dev/'+deviceName)

            partitionsFile.close()

            return devices

这是我编写文件的脚本示例:

from mediascanner import *
from util import *
from database import *
from log import *

ms = MediaScanner()

devices = ms.scan_media()

print devices

if devices:
    db = TestDatabase(init_tables=False)

    data = db.get_all_test_data()



    for device in devices:
        print device
        mount_partition(get_partition(device))
        write_and_verify('/media/mymntpnt/test_'+get_log_file_name(),flatten_tests(data))
        unmount_partition()

以下是我的实用函数:

def write_and_verify(f_n,data):
    f = file(f_n,'w')
    f.write(data)
    f.flush()
    f.close()
    f = file(f_n,'r')
    verified = f.read()
    f.close()
    return  verified == data and f.closed


def get_partition(dev):
    os.system('fdisk -l %s > output' % dev)
    f = file('output')
    data = f.read()
    print data
    f.close()
    return data.split('\n')[-2].split()[0].strip()

def mount_partition(partition):
    os.system('mount %s /media/mymntpnt' % partition)


def unmount_partition():
    os.system('umount /media/mymntpnt')

I'm having a lot more trouble than expected writing to a file than I expected. I have a small Single Board computer running on an arm processor with the Angstrom embedded distribution. I'm writing an application for it with python. The application should detect the USB stick and then dump the file to it. The issue I'm running into is that even when my script appears to write the file correctly, and the file appears to be there using "ls" and "cat" in Linux, when I remove the USB stick and try to look at the file in Windows or another linux distro it's empty or more often not there at all. My application is multithreaded. I create /media/mymntpnt as root (directory.)

Any help is appreciated.

Here are some snippets of my code:

This part is what I use to identify the USB stick:

class MediaScanner():
    def __init__(self):
        self.lok = thread.allocate_lock()
        self.running = True
        self.started = False

    def register_cb(self,func):
        self.cb = func

    def start(self):
        if not self.started:
            thread.start_new_thread(self.scan_thread,())

    def scan_thread(self):
        self.quit = False
        self.started = True
        last_devices = []
        while self.running:
            devices = self.scan_media()
            if (devices != last_devices):
                self.cb(devices) #call the callback as its own thread
            last_devices = devices

            time.sleep(0.1)

        self.quit = True    

    def stop(self):
        self.running = False
        while(not self.quit):
            pass
        return True

    def is_running(self):
        return self.running


    def scan_media(self):
        with self.lok:
            partitionsFile = open("/proc/partitions")
            lines = partitionsFile.readlines()[2:]#Skips the header lines
            devices = []
            for line in lines:
                words = [x.strip() for x in line.split()]
                minorNumber = int(words[1])
                deviceName = words[3]
                if minorNumber % 16 == 0:
                    path = "/sys/class/block/" + deviceName
                    if os.path.islink(path):
                        if os.path.realpath(path).find("/usb") > 0:
                            devices.append('/dev/'+deviceName)

            partitionsFile.close()

            return devices

And here's an example of my script to write the file:

from mediascanner import *
from util import *
from database import *
from log import *

ms = MediaScanner()

devices = ms.scan_media()

print devices

if devices:
    db = TestDatabase(init_tables=False)

    data = db.get_all_test_data()



    for device in devices:
        print device
        mount_partition(get_partition(device))
        write_and_verify('/media/mymntpnt/test_'+get_log_file_name(),flatten_tests(data))
        unmount_partition()

And here are my utility functions:

def write_and_verify(f_n,data):
    f = file(f_n,'w')
    f.write(data)
    f.flush()
    f.close()
    f = file(f_n,'r')
    verified = f.read()
    f.close()
    return  verified == data and f.closed


def get_partition(dev):
    os.system('fdisk -l %s > output' % dev)
    f = file('output')
    data = f.read()
    print data
    f.close()
    return data.split('\n')[-2].split()[0].strip()

def mount_partition(partition):
    os.system('mount %s /media/mymntpnt' % partition)


def unmount_partition():
    os.system('umount /media/mymntpnt')

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

从﹋此江山别 2024-12-03 16:10:40

错误在于写入函数,该函数应该在关闭文件之前同步文件,如下所示。读取总是会成功,因为数据已经在 RAM 中。您可能想尝试不同的方法来验证它,例如检查字节数。

def write_and_verify(f_n,data):
    f = file(f_n,'w')
    f.write(data)
    f.flush()
    os.fsync(f.fileno())
    f.close()
    f = file(f_n,'r')
    verified = f.read()
    f.close()
    return  verified == data and f.closed

如果您获得像 finfo = os.stat(f_n) 这样的文件信息,那么您可以将 finfo.st_size 与您期望写入的字节数进行比较。

The mistake is in the write function which should sync the file, before closing it, as seen below. The read will always succeed because the data is already in RAM. You might want to try something different to verify it such as checking the bytecount.

def write_and_verify(f_n,data):
    f = file(f_n,'w')
    f.write(data)
    f.flush()
    os.fsync(f.fileno())
    f.close()
    f = file(f_n,'r')
    verified = f.read()
    f.close()
    return  verified == data and f.closed

If you get the file info like this finfo = os.stat(f_n) then you can compare finfo.st_size with the number of bytes that you expected to write.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文