Django:线程导致 InMemoryUploadedFile 过早关闭?

发布于 2025-01-10 10:11:46 字数 2573 浏览 0 评论 0原文

美好的一天,

我遇到一个问题:发送到视图并随后传递到新线程进行处理的 CSV 文件有时会过早关闭,我不明白为什么。该行为是间歇性的,只有在我切换到使用新线程来处理文件后才开始发生。

这是我处理文件的原始方式并且它有效,但是对于大文件,它会导致客户端超时问题:

class LoadCSVFile(APIView):
    permission_class = (IsAuthenticated,)
    parser_classes = [FormParser, MultiPartParser]

    def post(self, request):
        file = request.FILES['file']
        data_set = file.read().decode('utf-8')
        io_string = io.StringIO(data_set)

        for row_data in csv.reader(io_string, delimiter=',', quotechar='"'):
            print('row_data:', row_data)

        return Response({ 'message': 'File received and is currently processing.', 'status_code': status.HTTP_200_OK }, status.HTTP_200_OK)

所以我现在在新线程中处理文件,如下所示:

class LoadCSVFile(APIView):
    permission_class = (IsAuthenticated,)
    parser_classes = [FormParser, MultiPartParser]

    def post(self, request):
        request_handler = RequestHandler(request)
        csv_handler = CSVHandler(request.FILES['file'])

        # Fire and forget the file processing.
        t = threading.Thread(target=request_handler.resolve, kwargs={ 'csv_handler': csv_handler })
        t.start()
        return Response({ 'message': 'File received and is currently processing.', 'status_code': status.HTTP_200_OK }, status.HTTP_200_OK)


class RequestHandler(object):
    def __init__(self, request : Request):
        self.request = request

    def resolve(self, **kwargs):
        csv_handler = kwargs['csv_handler']

        try:
            print('Processing file...')
            csv_handler.process_file()
        except Exception as e:
            print('process_file level error:', e)


class CSVHandler(object):
    def __init__(self, file):
        self.file = file

    def get_reader(self):
        # Error is raised at the following line: "I/O operation on closed file."
        data_set = self.file.read().decode('utf-8')
        io_string = io.StringIO(data_set)
        return csv.reader(io_string, delimiter=',', quotechar='"')

    def process_file(self, **kwargs):
        for row_data in self.get_reader():
            print('row_data:', row_data)

有一段时间它很棒,但后来我开始注意到偶尔出现 I/O 错误。

  • 对于大文件(5000 行)和小文件(2 行)会发生这种情况。
  • 我可以上传 50 次而没有看到错误,然后它会连续发生 2 或 3 次。或者介于两者之间的任何地方。
  • 在线程启动之前,请求都保存在 RequestHandler 中,文件也保存在 CSVHandler 中,我不知道如何才能使 InMemoryUploadedFile 保持活动状态,直到我需要它为止 (csv_handler.get_reader() )。

有什么建议吗?

谢谢您的宝贵时间。

Good day,

I have a problem where a CSV file, sent to a View and later passed into a new thread for processing, sometimes closes prematurely and I can't figure out why. The behaviour is intermittent and only started happening after I switched to using a new thread to process the file.

This is the original way I was processing the file and it worked, but for large files it caused time-out issues on the client:

class LoadCSVFile(APIView):
    permission_class = (IsAuthenticated,)
    parser_classes = [FormParser, MultiPartParser]

    def post(self, request):
        file = request.FILES['file']
        data_set = file.read().decode('utf-8')
        io_string = io.StringIO(data_set)

        for row_data in csv.reader(io_string, delimiter=',', quotechar='"'):
            print('row_data:', row_data)

        return Response({ 'message': 'File received and is currently processing.', 'status_code': status.HTTP_200_OK }, status.HTTP_200_OK)

So I now process the file in a new thread like so:

class LoadCSVFile(APIView):
    permission_class = (IsAuthenticated,)
    parser_classes = [FormParser, MultiPartParser]

    def post(self, request):
        request_handler = RequestHandler(request)
        csv_handler = CSVHandler(request.FILES['file'])

        # Fire and forget the file processing.
        t = threading.Thread(target=request_handler.resolve, kwargs={ 'csv_handler': csv_handler })
        t.start()
        return Response({ 'message': 'File received and is currently processing.', 'status_code': status.HTTP_200_OK }, status.HTTP_200_OK)


class RequestHandler(object):
    def __init__(self, request : Request):
        self.request = request

    def resolve(self, **kwargs):
        csv_handler = kwargs['csv_handler']

        try:
            print('Processing file...')
            csv_handler.process_file()
        except Exception as e:
            print('process_file level error:', e)


class CSVHandler(object):
    def __init__(self, file):
        self.file = file

    def get_reader(self):
        # Error is raised at the following line: "I/O operation on closed file."
        data_set = self.file.read().decode('utf-8')
        io_string = io.StringIO(data_set)
        return csv.reader(io_string, delimiter=',', quotechar='"')

    def process_file(self, **kwargs):
        for row_data in self.get_reader():
            print('row_data:', row_data)

For a while it was great, but then I started to notice occasional I/O errors.

  • This happens with large (5000 lines) and small (2 lines) files.
  • I can go 50 uploads without seeing the error, then it will happen 2 or 3 times in a row. Or anywhere in between.
  • Both the request is saved in the RequestHandler and the file is saved in CSVHandler before the thread is initiated and I don't know how else to keep the InMemoryUploadedFile alive until I need it (csv_handler.get_reader()).

Any suggestions?

Thank you for your time.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

甜心 2025-01-17 10:11:46

该问题是由 csv_handler.get_reader() 打开 CSV 文件之前主线程从 POST 请求返回引起的。我仍然不确定当 RequestHandlerCSVHandler 引用了 requestfile 时文件是如何丢失的> 对象。也许这是姜戈的事情。

我通过将读取器逻辑移至 CSVHandler 构造函数并使用锁来防止竞争条件来修复此问题。

class CSVHandler(object):
    def __init__(self, file):
        self.lock = threading.Lock()
        with self.lock:
            self.file = file
            data_set = self.file.read().decode('utf-8')
            io_string = io.StringIO(data_set)
            self.reader = csv.reader(io_string, delimiter=',', quotechar='"')

    def process_file(self, **kwargs):
        for row_data in self.reader:
            print('row_data:', row_data)

The issue was caused by the main thread returning from the POST request prior to the CSV file being opened by csv_handler.get_reader(). I'm still not sure how the file gets lost while the RequestHandler and CSVHandler have references on the request and file objects. Maybe it's a Django thing.

I fixed it by moving the reader logic up into the CSVHandler constructor and using a lock to prevent the race condition.

class CSVHandler(object):
    def __init__(self, file):
        self.lock = threading.Lock()
        with self.lock:
            self.file = file
            data_set = self.file.read().decode('utf-8')
            io_string = io.StringIO(data_set)
            self.reader = csv.reader(io_string, delimiter=',', quotechar='"')

    def process_file(self, **kwargs):
        for row_data in self.reader:
            print('row_data:', row_data)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文