Azure Blob Storage .uploadStream() 在较大文件上挂起

发布于 2025-01-10 14:34:55 字数 6397 浏览 0 评论 0原文

我目前正在研究使用 Express.js 和 @azure/storage-blob 包将较大文件上传到 Azure Blob 存储的功能。

我所拥有的对于 <5mb 的文件运行良好,但任何更大的文件都会在上传流上的某个时刻“失败”。我说“失败”是因为没有抛出错误(我可以看到),它只是在上传过程中挂起。因为没有错误,所以我正在努力找出可能导致错误的原因。

index.js

const express = require('express');
const bodyParser = require('body-parser');
const volleyball = require('volleyball');
const cors = require('cors');
const busboy = require('connect-busboy')

require('dotenv').config()
const fileRoutes = require('./routes/files.routes')

const app = express();

// Enable Logging
app.use(volleyball);

// parse application/json
app.use(bodyParser.json());

// run cors middleware
app.use(cors());

//process body contents for file uploads
app.use(busboy({
    highWaterMark: (1 * 1024 * 1024), 
    fileHwm: (1 * 1024 * 1024) / 2,
}))

app.use('/files', fileRoutes);

// Setting the port and publishing to that port
const port = process.env.PORT || 3000;
app.listen(port, () => {
    console.log('Listening on port', port);
});

/files/t2 路由的代码:

const storageblob = require('@azure/storage-blob')
const {v4: uuidv4} = require('uuid')

const busboy = require('connect-busboy')

const rand = require('randomstring') // todo: remove from final

let sharedClient = null

const createClient = async () => {
    const connectionString = process.env.STORAGE_CS
    const containerName = '3ac53750-882a-11ec-8579-db09c3889ffa'

    // Create the BlobServiceClient object which will be used to create a container client
    const blobServiceClient = await storageblob.BlobServiceClient.fromConnectionString(connectionString);
    const containerClient = blobServiceClient.getContainerClient(containerName);

    sharedClient = containerClient
}

const saveFile = async (req, res) => {
    if (sharedClient) {
        console.log('already set')
    } else {
        console.log('attempting to set')
        await createClient()
    }

    if (req.busboy) {
        req.busboy.on('file', async (fieldname, file, filename) => {
            console.log('Starting....')
            const blobClient = sharedClient.getBlockBlobClient(filename.filename)

            const bufferSize = (1 * 1024 * 1024) / 2

            const response = await blobClient.uploadStream(
                file,
                bufferSize,
                5,
                {
                    onProgress: (ev) => {
                        console.log(ev)
                        // console.log({file})
                    },
                    blobHTTPHeaders: {blobContentType: filename.mimeType}
                })
            response._response.status
            console.log({response})
            try {
            } catch (e) {
                console.log({e})
            }

            file.on('data', () => {
                console.log('we have an data call')
            })

            file.on('error', () => {
                console.log('we have an error - inside')
            })

            file.on('limit', () => {
                console.log('we have a limit reached - inside')
            })

            console.log('ending...')
        })

        req.busboy.on('error', () => {
            console.log('we have an error')
        })

        req.busboy.on('finish', () => {
            console.log('we finished')
        })

        req.busboy.on('close', () => {
            console.log('we closed')
        })

        req.on('aborted', () => {
            console.log('aborted')
        })

        console.log('starting pipe')
        req.pipe(req.busboy)

        res.send('finished...')
    }
}

module.exports = {
    saveFile,
}

以下是尝试上传 11MB 文件时日志的摘录:

MVPJ <—— 200 OK 6 B text/html; charset=utf-8 (<—> 2.8 ms)
{ loadedBytes: 524288 }
IeGN ——> POST /files/t2 11.49 MB multipart/form-data; boundary=--------------------------824775949779732965782851
already set
starting pipe
Starting....
IeGN <—— 200 OK 6 B text/html; charset=utf-8 (<—> 3.9 ms)
{ loadedBytes: 524288 }
{ loadedBytes: 1048576 }
E7Uh ——> POST /files/t2 11.49 MB multipart/form-data; boundary=--------------------------307975744074310648853410
already set
starting pipe
Starting....
E7Uh <—— 200 OK 6 B text/html; charset=utf-8 (<—> 3.1 ms)
{ loadedBytes: 524288 }
gIIV ——> POST /files/t2 11.49 MB multipart/form-data; boundary=--------------------------174965235814957167411128
already set
starting pipe
Starting....
gIIV <—— 200 OK 6 B text/html; charset=utf-8 (<—> 3.2 ms)
{ loadedBytes: 524288 }
{ loadedBytes: 1048576 }
OGOc ——> POST /files/t2 11.49 MB multipart/form-data; boundary=--------------------------880875904437719845837223
already set
starting pipe
Starting....
OGOc <—— 200 OK 6 B text/html; charset=utf-8 (<—> 3.1 ms)
{ loadedBytes: 524288 }
loMo ——> POST /files/t2 11.49 MB multipart/form-data; boundary=--------------------------121962168533176166251418
attempting to set
starting pipe
Starting....
loMo <—— 200 OK 6 B text/html; charset=utf-8 (<—> 31.2 ms)
{ loadedBytes: 524288 }
{ loadedBytes: 1048576 }
{ loadedBytes: 1572864 }
{ loadedBytes: 2097152 }
{ loadedBytes: 2621440 }
{ loadedBytes: 3145728 }

在这一轮测试中,它很快就失败了500KB 到 1MB。其他尝试在失败之前已经达到了 6MB。这让我相信这不是一些硬性限制,而是可能是吞吐量问题?频率和重新启动容器/快速服务器及其失败位置之间似乎没有太大的相关性。

更新 1

我开始通过将 maxConcurrency 增加到 400 来平均挂起之前更高的上传量,从我看到的示例来看,这似乎高得离谱,5 到 20 之间似乎是常态。不确定这显示了什么,但很有趣。

5s5m ——> POST /files/t2 25.6 MB multipart/form-data; boundary=--------------------------075957127342913853541742
attempting to set
starting pipe
Starting....
5s5m <—— 200 OK 6 B text/html; charset=utf-8 (<—> 36.0 ms)
{ loadedBytes: 524288 }
{ loadedBytes: 1048576 }
{ loadedBytes: 1572864 }
{ loadedBytes: 2097152 }
{ loadedBytes: 2621440 }
{ loadedBytes: 3145728 }
{ loadedBytes: 3670016 }
{ loadedBytes: 4194304 }
{ loadedBytes: 4718592 }
{ loadedBytes: 5242880 }
{ loadedBytes: 5767168 }
{ loadedBytes: 6291456 }
{ loadedBytes: 6815744 }
{ loadedBytes: 7340032 }
{ loadedBytes: 7864320 }
{ loadedBytes: 8388608 }
{ loadedBytes: 8912896 }
{ loadedBytes: 9437184 }
{ loadedBytes: 9961472 }
{ loadedBytes: 10485760 }

I'm currently working on the capability to upload larger files to Azure Blob Storage using Express.js and the @azure/storage-blob package.

What I have works fine on files <5mb but anything larger will "fail" at some point on the upload stream. I say "fail" because no error (that I can see) is thrown, it just hangs up mid-upload. Because there is no error I am struggling to figure out what could be causing the error.

index.js

const express = require('express');
const bodyParser = require('body-parser');
const volleyball = require('volleyball');
const cors = require('cors');
const busboy = require('connect-busboy')

require('dotenv').config()
const fileRoutes = require('./routes/files.routes')

const app = express();

// Enable Logging
app.use(volleyball);

// parse application/json
app.use(bodyParser.json());

// run cors middleware
app.use(cors());

//process body contents for file uploads
app.use(busboy({
    highWaterMark: (1 * 1024 * 1024), 
    fileHwm: (1 * 1024 * 1024) / 2,
}))

app.use('/files', fileRoutes);

// Setting the port and publishing to that port
const port = process.env.PORT || 3000;
app.listen(port, () => {
    console.log('Listening on port', port);
});

Code for the /files/t2 route:

const storageblob = require('@azure/storage-blob')
const {v4: uuidv4} = require('uuid')

const busboy = require('connect-busboy')

const rand = require('randomstring') // todo: remove from final

let sharedClient = null

const createClient = async () => {
    const connectionString = process.env.STORAGE_CS
    const containerName = '3ac53750-882a-11ec-8579-db09c3889ffa'

    // Create the BlobServiceClient object which will be used to create a container client
    const blobServiceClient = await storageblob.BlobServiceClient.fromConnectionString(connectionString);
    const containerClient = blobServiceClient.getContainerClient(containerName);

    sharedClient = containerClient
}

const saveFile = async (req, res) => {
    if (sharedClient) {
        console.log('already set')
    } else {
        console.log('attempting to set')
        await createClient()
    }

    if (req.busboy) {
        req.busboy.on('file', async (fieldname, file, filename) => {
            console.log('Starting....')
            const blobClient = sharedClient.getBlockBlobClient(filename.filename)

            const bufferSize = (1 * 1024 * 1024) / 2

            const response = await blobClient.uploadStream(
                file,
                bufferSize,
                5,
                {
                    onProgress: (ev) => {
                        console.log(ev)
                        // console.log({file})
                    },
                    blobHTTPHeaders: {blobContentType: filename.mimeType}
                })
            response._response.status
            console.log({response})
            try {
            } catch (e) {
                console.log({e})
            }

            file.on('data', () => {
                console.log('we have an data call')
            })

            file.on('error', () => {
                console.log('we have an error - inside')
            })

            file.on('limit', () => {
                console.log('we have a limit reached - inside')
            })

            console.log('ending...')
        })

        req.busboy.on('error', () => {
            console.log('we have an error')
        })

        req.busboy.on('finish', () => {
            console.log('we finished')
        })

        req.busboy.on('close', () => {
            console.log('we closed')
        })

        req.on('aborted', () => {
            console.log('aborted')
        })

        console.log('starting pipe')
        req.pipe(req.busboy)

        res.send('finished...')
    }
}

module.exports = {
    saveFile,
}

Here is an excerpt of what the logs look like when attempting to upload an 11MB file:

MVPJ <—— 200 OK 6 B text/html; charset=utf-8 (<—> 2.8 ms)
{ loadedBytes: 524288 }
IeGN ——> POST /files/t2 11.49 MB multipart/form-data; boundary=--------------------------824775949779732965782851
already set
starting pipe
Starting....
IeGN <—— 200 OK 6 B text/html; charset=utf-8 (<—> 3.9 ms)
{ loadedBytes: 524288 }
{ loadedBytes: 1048576 }
E7Uh ——> POST /files/t2 11.49 MB multipart/form-data; boundary=--------------------------307975744074310648853410
already set
starting pipe
Starting....
E7Uh <—— 200 OK 6 B text/html; charset=utf-8 (<—> 3.1 ms)
{ loadedBytes: 524288 }
gIIV ——> POST /files/t2 11.49 MB multipart/form-data; boundary=--------------------------174965235814957167411128
already set
starting pipe
Starting....
gIIV <—— 200 OK 6 B text/html; charset=utf-8 (<—> 3.2 ms)
{ loadedBytes: 524288 }
{ loadedBytes: 1048576 }
OGOc ——> POST /files/t2 11.49 MB multipart/form-data; boundary=--------------------------880875904437719845837223
already set
starting pipe
Starting....
OGOc <—— 200 OK 6 B text/html; charset=utf-8 (<—> 3.1 ms)
{ loadedBytes: 524288 }
loMo ——> POST /files/t2 11.49 MB multipart/form-data; boundary=--------------------------121962168533176166251418
attempting to set
starting pipe
Starting....
loMo <—— 200 OK 6 B text/html; charset=utf-8 (<—> 31.2 ms)
{ loadedBytes: 524288 }
{ loadedBytes: 1048576 }
{ loadedBytes: 1572864 }
{ loadedBytes: 2097152 }
{ loadedBytes: 2621440 }
{ loadedBytes: 3145728 }

During this round of testing, it was failing pretty fast right around 500KB to 1MB. Other attempts have gotten as high as 6MB before failing. This leads me to believe it's not some hard limit but a throughput issue possibly? There doesn't seem to be much of a correlation between frequency and restarting the container/express server and where it fails.

Update 1

I've started to average higher upload amounts before hanging by increasing the maxConcurrency to 400 which seems to be ridiculously high from the examples I've seen which make between 5 and 20 seem like the norm. Not sure what this shows but it is interesting.

5s5m ——> POST /files/t2 25.6 MB multipart/form-data; boundary=--------------------------075957127342913853541742
attempting to set
starting pipe
Starting....
5s5m <—— 200 OK 6 B text/html; charset=utf-8 (<—> 36.0 ms)
{ loadedBytes: 524288 }
{ loadedBytes: 1048576 }
{ loadedBytes: 1572864 }
{ loadedBytes: 2097152 }
{ loadedBytes: 2621440 }
{ loadedBytes: 3145728 }
{ loadedBytes: 3670016 }
{ loadedBytes: 4194304 }
{ loadedBytes: 4718592 }
{ loadedBytes: 5242880 }
{ loadedBytes: 5767168 }
{ loadedBytes: 6291456 }
{ loadedBytes: 6815744 }
{ loadedBytes: 7340032 }
{ loadedBytes: 7864320 }
{ loadedBytes: 8388608 }
{ loadedBytes: 8912896 }
{ loadedBytes: 9437184 }
{ loadedBytes: 9961472 }
{ loadedBytes: 10485760 }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

我三岁 2025-01-17 14:34:55

问题理论

我遇到了类似的问题,这就是我的理解。 (尽管我不是专家,但不要把我放在这个问题上)

  1. 当您调用 req.pipe(req.busboy) 时,来自 req 流的数据将通过管道传输到 Busboy 中,然后再传输到文件中流,并开始流动(这很重要)
  2. 因此数据正在流入并且侦听器 req.busboy.on('file', async (fieldname, file, filename) => { .. } 是触发,您可以在其中调用 const response = wait blobClient.uploadStream(...)
  3. 我关于错误所在的理论是:Busboy 处理流的最后一个字节,但是 blobClient code> 尚未完成上传所有内容,但是,busboy 流结束并关闭,此时 blobClient 认为流上传已完成并停止处理(因此“在中间”停止)。流但速度不同,其中之一决定何时完成,
  4. 因为您在 await blobClient.uploadStream(...) 之后注册了 file.on('data', ..); 监听器。仅在 Busboy 流关闭后完成,您来不及注册侦听器并查看日志消息。

解决方案

再次,对任何不准确之处表示歉意,因为这就是我的理解。请随意改进我的答案!

我们需要将 file 流传输到一个单独的可读流中,只有 blobClient 从中读取。为此,我们使用 PassThrough

req.busboy.on('file', async (fieldname, file, filename) => {

    const pt = new PassThrough(); // import {PassThrough} from "stream";

    const blobClient = sharedClient.getBlockBlobClient(filename.filename)

    const bufferSize = (1 * 1024 * 1024) / 2

    const uploadPromise = blobClient.uploadStream(file, bufferSize, 5,{
        onProgress: (ev) => {
            console.log(ev);
        }
    })

    file.on('error', () => {
        console.log('we have an error - inside')
    })

    file.pipe(pt);

    const response = await uploadPromise;

    //....
});
  1. 创建基本上只传递输入字节的直通到输出流。
  2. 异步调用uploadStream。还没有发生任何事情,因为我们还没有通过管道传输 file 流。
  3. file 流通过管道传输到用于上传的直通流中,
  4. 等待承诺,以防您之后想要执行任何操作。

Problem theory

I ran into a similar issue and this is how I understand it. (Don't pin me on this though as I am no expert)

  1. When you call req.pipe(req.busboy) the data from the req stream is piped into busboy, and in turn into the file stream(s), and starts flowing (this is important)
  2. So the data is flowing in and the listener req.busboy.on('file', async (fieldname, file, filename) => { .. } is triggered, in which you call const response = await blobClient.uploadStream(...)
  3. My theory on where it goes wrong is: Busboy processes the last bytes of the stream, but blobClient hasn't completed uploading everything yet. However the busboy stream ends and closes. At that point the blobClient thinks the stream upload has completed and stops processing (hence stopping "in the middle"). So basically 2 processes are reading the same stream but at different speeds, and one of them decides when finished.
  4. Since you register your file.on('data', ..); listeners after await blobClient.uploadStream(...) which only completes after the busboy stream is already closed, you are too late to register the listeners and see the log messages.

Solution

Again, apologies for any inaccuracies as this is how I understand it. Feel free to improve my answer!

We need to pipe the file stream into a separate Readable stream that only blobClient reads from. For this we use a PassThrough

req.busboy.on('file', async (fieldname, file, filename) => {

    const pt = new PassThrough(); // import {PassThrough} from "stream";

    const blobClient = sharedClient.getBlockBlobClient(filename.filename)

    const bufferSize = (1 * 1024 * 1024) / 2

    const uploadPromise = blobClient.uploadStream(file, bufferSize, 5,{
        onProgress: (ev) => {
            console.log(ev);
        }
    })

    file.on('error', () => {
        console.log('we have an error - inside')
    })

    file.pipe(pt);

    const response = await uploadPromise;

    //....
});
  1. Create the passthrough which basically just passes the input bytes to the output stream.
  2. Call uploadStream asynchronously. Nothing happens yet as we haven't piped the file stream yet.
  3. pipe the file stream into the passthrough stream that is used for the upload
  4. await the promise in case you want to do anything afterwards.
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文