Azure Blob Storage .uploadStream() 在较大文件上挂起
我目前正在研究使用 Express.js 和 @azure/storage-blob
包将较大文件上传到 Azure Blob 存储的功能。
我所拥有的对于 <5mb 的文件运行良好,但任何更大的文件都会在上传流上的某个时刻“失败”。我说“失败”是因为没有抛出错误(我可以看到),它只是在上传过程中挂起。因为没有错误,所以我正在努力找出可能导致错误的原因。
index.js
const express = require('express');
const bodyParser = require('body-parser');
const volleyball = require('volleyball');
const cors = require('cors');
const busboy = require('connect-busboy')
require('dotenv').config()
const fileRoutes = require('./routes/files.routes')
const app = express();
// Enable Logging
app.use(volleyball);
// parse application/json
app.use(bodyParser.json());
// run cors middleware
app.use(cors());
//process body contents for file uploads
app.use(busboy({
highWaterMark: (1 * 1024 * 1024),
fileHwm: (1 * 1024 * 1024) / 2,
}))
app.use('/files', fileRoutes);
// Setting the port and publishing to that port
const port = process.env.PORT || 3000;
app.listen(port, () => {
console.log('Listening on port', port);
});
/files/t2
路由的代码:
const storageblob = require('@azure/storage-blob')
const {v4: uuidv4} = require('uuid')
const busboy = require('connect-busboy')
const rand = require('randomstring') // todo: remove from final
let sharedClient = null
const createClient = async () => {
const connectionString = process.env.STORAGE_CS
const containerName = '3ac53750-882a-11ec-8579-db09c3889ffa'
// Create the BlobServiceClient object which will be used to create a container client
const blobServiceClient = await storageblob.BlobServiceClient.fromConnectionString(connectionString);
const containerClient = blobServiceClient.getContainerClient(containerName);
sharedClient = containerClient
}
const saveFile = async (req, res) => {
if (sharedClient) {
console.log('already set')
} else {
console.log('attempting to set')
await createClient()
}
if (req.busboy) {
req.busboy.on('file', async (fieldname, file, filename) => {
console.log('Starting....')
const blobClient = sharedClient.getBlockBlobClient(filename.filename)
const bufferSize = (1 * 1024 * 1024) / 2
const response = await blobClient.uploadStream(
file,
bufferSize,
5,
{
onProgress: (ev) => {
console.log(ev)
// console.log({file})
},
blobHTTPHeaders: {blobContentType: filename.mimeType}
})
response._response.status
console.log({response})
try {
} catch (e) {
console.log({e})
}
file.on('data', () => {
console.log('we have an data call')
})
file.on('error', () => {
console.log('we have an error - inside')
})
file.on('limit', () => {
console.log('we have a limit reached - inside')
})
console.log('ending...')
})
req.busboy.on('error', () => {
console.log('we have an error')
})
req.busboy.on('finish', () => {
console.log('we finished')
})
req.busboy.on('close', () => {
console.log('we closed')
})
req.on('aborted', () => {
console.log('aborted')
})
console.log('starting pipe')
req.pipe(req.busboy)
res.send('finished...')
}
}
module.exports = {
saveFile,
}
以下是尝试上传 11MB 文件时日志的摘录:
MVPJ <—— 200 OK 6 B text/html; charset=utf-8 (<—> 2.8 ms)
{ loadedBytes: 524288 }
IeGN ——> POST /files/t2 11.49 MB multipart/form-data; boundary=--------------------------824775949779732965782851
already set
starting pipe
Starting....
IeGN <—— 200 OK 6 B text/html; charset=utf-8 (<—> 3.9 ms)
{ loadedBytes: 524288 }
{ loadedBytes: 1048576 }
E7Uh ——> POST /files/t2 11.49 MB multipart/form-data; boundary=--------------------------307975744074310648853410
already set
starting pipe
Starting....
E7Uh <—— 200 OK 6 B text/html; charset=utf-8 (<—> 3.1 ms)
{ loadedBytes: 524288 }
gIIV ——> POST /files/t2 11.49 MB multipart/form-data; boundary=--------------------------174965235814957167411128
already set
starting pipe
Starting....
gIIV <—— 200 OK 6 B text/html; charset=utf-8 (<—> 3.2 ms)
{ loadedBytes: 524288 }
{ loadedBytes: 1048576 }
OGOc ——> POST /files/t2 11.49 MB multipart/form-data; boundary=--------------------------880875904437719845837223
already set
starting pipe
Starting....
OGOc <—— 200 OK 6 B text/html; charset=utf-8 (<—> 3.1 ms)
{ loadedBytes: 524288 }
loMo ——> POST /files/t2 11.49 MB multipart/form-data; boundary=--------------------------121962168533176166251418
attempting to set
starting pipe
Starting....
loMo <—— 200 OK 6 B text/html; charset=utf-8 (<—> 31.2 ms)
{ loadedBytes: 524288 }
{ loadedBytes: 1048576 }
{ loadedBytes: 1572864 }
{ loadedBytes: 2097152 }
{ loadedBytes: 2621440 }
{ loadedBytes: 3145728 }
在这一轮测试中,它很快就失败了500KB 到 1MB。其他尝试在失败之前已经达到了 6MB。这让我相信这不是一些硬性限制,而是可能是吞吐量问题?频率和重新启动容器/快速服务器及其失败位置之间似乎没有太大的相关性。
更新 1
我开始通过将 maxConcurrency 增加到 400 来平均挂起之前更高的上传量,从我看到的示例来看,这似乎高得离谱,5 到 20 之间似乎是常态。不确定这显示了什么,但很有趣。
5s5m ——> POST /files/t2 25.6 MB multipart/form-data; boundary=--------------------------075957127342913853541742
attempting to set
starting pipe
Starting....
5s5m <—— 200 OK 6 B text/html; charset=utf-8 (<—> 36.0 ms)
{ loadedBytes: 524288 }
{ loadedBytes: 1048576 }
{ loadedBytes: 1572864 }
{ loadedBytes: 2097152 }
{ loadedBytes: 2621440 }
{ loadedBytes: 3145728 }
{ loadedBytes: 3670016 }
{ loadedBytes: 4194304 }
{ loadedBytes: 4718592 }
{ loadedBytes: 5242880 }
{ loadedBytes: 5767168 }
{ loadedBytes: 6291456 }
{ loadedBytes: 6815744 }
{ loadedBytes: 7340032 }
{ loadedBytes: 7864320 }
{ loadedBytes: 8388608 }
{ loadedBytes: 8912896 }
{ loadedBytes: 9437184 }
{ loadedBytes: 9961472 }
{ loadedBytes: 10485760 }
I'm currently working on the capability to upload larger files to Azure Blob Storage using Express.js and the @azure/storage-blob
package.
What I have works fine on files <5mb but anything larger will "fail" at some point on the upload stream. I say "fail" because no error (that I can see) is thrown, it just hangs up mid-upload. Because there is no error I am struggling to figure out what could be causing the error.
index.js
const express = require('express');
const bodyParser = require('body-parser');
const volleyball = require('volleyball');
const cors = require('cors');
const busboy = require('connect-busboy')
require('dotenv').config()
const fileRoutes = require('./routes/files.routes')
const app = express();
// Enable Logging
app.use(volleyball);
// parse application/json
app.use(bodyParser.json());
// run cors middleware
app.use(cors());
//process body contents for file uploads
app.use(busboy({
highWaterMark: (1 * 1024 * 1024),
fileHwm: (1 * 1024 * 1024) / 2,
}))
app.use('/files', fileRoutes);
// Setting the port and publishing to that port
const port = process.env.PORT || 3000;
app.listen(port, () => {
console.log('Listening on port', port);
});
Code for the /files/t2
route:
const storageblob = require('@azure/storage-blob')
const {v4: uuidv4} = require('uuid')
const busboy = require('connect-busboy')
const rand = require('randomstring') // todo: remove from final
let sharedClient = null
const createClient = async () => {
const connectionString = process.env.STORAGE_CS
const containerName = '3ac53750-882a-11ec-8579-db09c3889ffa'
// Create the BlobServiceClient object which will be used to create a container client
const blobServiceClient = await storageblob.BlobServiceClient.fromConnectionString(connectionString);
const containerClient = blobServiceClient.getContainerClient(containerName);
sharedClient = containerClient
}
const saveFile = async (req, res) => {
if (sharedClient) {
console.log('already set')
} else {
console.log('attempting to set')
await createClient()
}
if (req.busboy) {
req.busboy.on('file', async (fieldname, file, filename) => {
console.log('Starting....')
const blobClient = sharedClient.getBlockBlobClient(filename.filename)
const bufferSize = (1 * 1024 * 1024) / 2
const response = await blobClient.uploadStream(
file,
bufferSize,
5,
{
onProgress: (ev) => {
console.log(ev)
// console.log({file})
},
blobHTTPHeaders: {blobContentType: filename.mimeType}
})
response._response.status
console.log({response})
try {
} catch (e) {
console.log({e})
}
file.on('data', () => {
console.log('we have an data call')
})
file.on('error', () => {
console.log('we have an error - inside')
})
file.on('limit', () => {
console.log('we have a limit reached - inside')
})
console.log('ending...')
})
req.busboy.on('error', () => {
console.log('we have an error')
})
req.busboy.on('finish', () => {
console.log('we finished')
})
req.busboy.on('close', () => {
console.log('we closed')
})
req.on('aborted', () => {
console.log('aborted')
})
console.log('starting pipe')
req.pipe(req.busboy)
res.send('finished...')
}
}
module.exports = {
saveFile,
}
Here is an excerpt of what the logs look like when attempting to upload an 11MB file:
MVPJ <—— 200 OK 6 B text/html; charset=utf-8 (<—> 2.8 ms)
{ loadedBytes: 524288 }
IeGN ——> POST /files/t2 11.49 MB multipart/form-data; boundary=--------------------------824775949779732965782851
already set
starting pipe
Starting....
IeGN <—— 200 OK 6 B text/html; charset=utf-8 (<—> 3.9 ms)
{ loadedBytes: 524288 }
{ loadedBytes: 1048576 }
E7Uh ——> POST /files/t2 11.49 MB multipart/form-data; boundary=--------------------------307975744074310648853410
already set
starting pipe
Starting....
E7Uh <—— 200 OK 6 B text/html; charset=utf-8 (<—> 3.1 ms)
{ loadedBytes: 524288 }
gIIV ——> POST /files/t2 11.49 MB multipart/form-data; boundary=--------------------------174965235814957167411128
already set
starting pipe
Starting....
gIIV <—— 200 OK 6 B text/html; charset=utf-8 (<—> 3.2 ms)
{ loadedBytes: 524288 }
{ loadedBytes: 1048576 }
OGOc ——> POST /files/t2 11.49 MB multipart/form-data; boundary=--------------------------880875904437719845837223
already set
starting pipe
Starting....
OGOc <—— 200 OK 6 B text/html; charset=utf-8 (<—> 3.1 ms)
{ loadedBytes: 524288 }
loMo ——> POST /files/t2 11.49 MB multipart/form-data; boundary=--------------------------121962168533176166251418
attempting to set
starting pipe
Starting....
loMo <—— 200 OK 6 B text/html; charset=utf-8 (<—> 31.2 ms)
{ loadedBytes: 524288 }
{ loadedBytes: 1048576 }
{ loadedBytes: 1572864 }
{ loadedBytes: 2097152 }
{ loadedBytes: 2621440 }
{ loadedBytes: 3145728 }
During this round of testing, it was failing pretty fast right around 500KB to 1MB. Other attempts have gotten as high as 6MB before failing. This leads me to believe it's not some hard limit but a throughput issue possibly? There doesn't seem to be much of a correlation between frequency and restarting the container/express server and where it fails.
Update 1
I've started to average higher upload amounts before hanging by increasing the maxConcurrency to 400 which seems to be ridiculously high from the examples I've seen which make between 5 and 20 seem like the norm. Not sure what this shows but it is interesting.
5s5m ——> POST /files/t2 25.6 MB multipart/form-data; boundary=--------------------------075957127342913853541742
attempting to set
starting pipe
Starting....
5s5m <—— 200 OK 6 B text/html; charset=utf-8 (<—> 36.0 ms)
{ loadedBytes: 524288 }
{ loadedBytes: 1048576 }
{ loadedBytes: 1572864 }
{ loadedBytes: 2097152 }
{ loadedBytes: 2621440 }
{ loadedBytes: 3145728 }
{ loadedBytes: 3670016 }
{ loadedBytes: 4194304 }
{ loadedBytes: 4718592 }
{ loadedBytes: 5242880 }
{ loadedBytes: 5767168 }
{ loadedBytes: 6291456 }
{ loadedBytes: 6815744 }
{ loadedBytes: 7340032 }
{ loadedBytes: 7864320 }
{ loadedBytes: 8388608 }
{ loadedBytes: 8912896 }
{ loadedBytes: 9437184 }
{ loadedBytes: 9961472 }
{ loadedBytes: 10485760 }
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
问题理论
我遇到了类似的问题,这就是我的理解。 (尽管我不是专家,但不要把我放在这个问题上)
req.pipe(req.busboy)
时,来自 req 流的数据将通过管道传输到 Busboy 中,然后再传输到文件中流,并开始流动(这很重要)req.busboy.on('file', async (fieldname, file, filename) => { .. }
是触发,您可以在其中调用const response = wait blobClient.uploadStream(...)
blobClient
code> 尚未完成上传所有内容,但是,busboy 流结束并关闭,此时 blobClient 认为流上传已完成并停止处理(因此“在中间”停止)。流但速度不同,其中之一决定何时完成,await blobClient.uploadStream(...)
之后注册了file.on('data', ..);
监听器。仅在 Busboy 流关闭后完成,您来不及注册侦听器并查看日志消息。解决方案
再次,对任何不准确之处表示歉意,因为这就是我的理解。请随意改进我的答案!
我们需要将
file
流传输到一个单独的可读流中,只有blobClient
从中读取。为此,我们使用 PassThroughuploadStream
。还没有发生任何事情,因为我们还没有通过管道传输file
流。file
流通过管道传输到用于上传的直通流中,Problem theory
I ran into a similar issue and this is how I understand it. (Don't pin me on this though as I am no expert)
req.pipe(req.busboy)
the data from the req stream is piped into busboy, and in turn into the file stream(s), and starts flowing (this is important)req.busboy.on('file', async (fieldname, file, filename) => { .. }
is triggered, in which you callconst response = await blobClient.uploadStream(...)
blobClient
hasn't completed uploading everything yet. However the busboy stream ends and closes. At that point the blobClient thinks the stream upload has completed and stops processing (hence stopping "in the middle"). So basically 2 processes are reading the same stream but at different speeds, and one of them decides when finished.file.on('data', ..);
listeners afterawait blobClient.uploadStream(...)
which only completes after the busboy stream is already closed, you are too late to register the listeners and see the log messages.Solution
Again, apologies for any inaccuracies as this is how I understand it. Feel free to improve my answer!
We need to pipe the
file
stream into a separate Readable stream that onlyblobClient
reads from. For this we use a PassThroughuploadStream
asynchronously. Nothing happens yet as we haven't piped thefile
stream yet.file
stream into the passthrough stream that is used for the upload