使读取顺序序列

发布于 2025-02-07 13:36:48 字数 1329 浏览 4 评论 0原文

我用fetchapi将数据从后端检索为流。 我通过块解密数据块,并将内容重新放回原始文件中。

我发现,每次将Chunnks不同时,该流似乎以不同的方式提供数据。我如何将流到原始序列中的块。

   fetch(myRequest, myInit).then(response => {
    var tmpResult = new Uint8Array();
    const reader = response.body.getReader();
    return new ReadableStream({
      start(controller) {
        return pump();
        function pump() {
          return reader.read().then(({ done, value }) => {
            // When no more data needs to be consumed, close the stream

            if (value) {
                //values here are different in order every time 
                //making my concatenated values different every time

                controller.enqueue(value);

                var decrypted = cryptor.decrypt(value);
                var arrayResponse = decrypted.toArrayBuffer();

                if (arrayResponse) {
                   tmpResult = arrayBufferConcat(tmpResult, arrayResponse);
                }
            }
            // Enqueue the next data chunk into our target stream

            if (done) {
                    if (counter == length) { 
                        callback(obj);        
                    }
              controller.close();
              return;
            }    
            return pump();
          });
        }
      }
    })
  })

I use the fetchAPI to retrieve my data from the backend as a stream.
I decrypt the data chunk by chunk and the concat the content back together for the original file.

I have found that the stream seems to provide data differently each time makling the chunnks different. How can I force the stream to the chunks in the original sequence.

   fetch(myRequest, myInit).then(response => {
    var tmpResult = new Uint8Array();
    const reader = response.body.getReader();
    return new ReadableStream({
      start(controller) {
        return pump();
        function pump() {
          return reader.read().then(({ done, value }) => {
            // When no more data needs to be consumed, close the stream

            if (value) {
                //values here are different in order every time 
                //making my concatenated values different every time

                controller.enqueue(value);

                var decrypted = cryptor.decrypt(value);
                var arrayResponse = decrypted.toArrayBuffer();

                if (arrayResponse) {
                   tmpResult = arrayBufferConcat(tmpResult, arrayResponse);
                }
            }
            // Enqueue the next data chunk into our target stream

            if (done) {
                    if (counter == length) { 
                        callback(obj);        
                    }
              controller.close();
              return;
            }    
            return pump();
          });
        }
      }
    })
  })

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

我ぃ本無心為│何有愛 2025-02-14 13:36:48

文档告诉我们:

依次读取每个块并输出到UI,直到流
已经完成了阅读,这时我们退出了递归
功能并打印整个流到UI的另一部分。

我使用 node-fetch

import fetch from 'node-fetch';

const testStreamChunkOrder = async () => {
    return new Promise(async (resolve) => {
        let response = await fetch('https://jsonplaceholder.typicode.com/todos/');
        let stream = response.body;
        let data = '';

        stream.on('readable', () => {
            let chunk;

            while (null !== (chunk = stream.read())) {
                data += chunk;
            }
        })

        stream.on('end', () => {
            resolve(JSON.parse(data).splice(0, 5).map((x) => x.title));
        })
    });
}

(async () => {
    let results = await Promise.all(new Array(10).fill(testStreamChunkOrder()))
    let joined = results.map((r) => r.join(''));
    console.log(`Is every result same: ${joined.every((j) => j.localeCompare(joined[0]) === 0)}`)
})()

这是一个< a href =“ https://jsonplaceholder.typicode.com/todos/” rel =“ nofollow noreferrer”>随机的todo-list json 并将其插入块 - by-by-chunk,将块堆积到数据中。流程完成后,我们将解析完整的JSON,并采用Todo-List的前5个元素,仅保留标题,然后我们随后异步返回结果。

整个过程完成了10次。当我们有10个流式标题列表时,我们会浏览每个标题列表,并将标题名称加在一起形成一个字符串。最后,我们使用.ever查看10个字符串中的每个字符串是否相同,这意味着每个JSON都以相同的顺序获取和流式传输

因此,我相信问题在其他地方 - 流媒体本身正常工作。虽然我确实使用node-fetch而不是实际的提取API ,我认为可以肯定地说,实际的获取API可以根据应有的作用。

我还注意到您是直接调用wenders.body.getReader(),但是当我查看

fetch('./tortoise.png')
.then(response => response.body)
.then(body => {
  const reader = body.getReader();

/api/wonsever/hody#示例“ rel =” nofollow noreferrer“>文档,body.getReader呼叫在另一个然后在语句 可能并不重要,但是考虑代码中的其他所有内容,例如函数的过度包装和返回,我认为您的问题可能会通过在流中阅读几个教程并稍微清理代码而消失。而且,如果没有,您仍然可以更好地弄清楚问题是否在您不愿意曝光的众多功能之一中。异步代码的行为本质上很难进行调试,并且缺少有关此类代码的信息使它变得更加困难。

The documentation tells us that:

Each chunk is read sequentially and output to the UI, until the stream
has finished being read, at which point we return out of the recursive
function and print the entire stream to another part of the UI.

I made a test program with node, using node-fetch:

import fetch from 'node-fetch';

const testStreamChunkOrder = async () => {
    return new Promise(async (resolve) => {
        let response = await fetch('https://jsonplaceholder.typicode.com/todos/');
        let stream = response.body;
        let data = '';

        stream.on('readable', () => {
            let chunk;

            while (null !== (chunk = stream.read())) {
                data += chunk;
            }
        })

        stream.on('end', () => {
            resolve(JSON.parse(data).splice(0, 5).map((x) => x.title));
        })
    });
}

(async () => {
    let results = await Promise.all(new Array(10).fill(testStreamChunkOrder()))
    let joined = results.map((r) => r.join(''));
    console.log(`Is every result same: ${joined.every((j) => j.localeCompare(joined[0]) === 0)}`)
})()

This one fetches some random todo-list json and streams it chunk-by-chunk, accumulating the chunks into data. When the stream is done, we parse the full json and take the first 5 elements of the todo-list and keep only the titles, after which we then return the result asynchronously.

This whole process is done 10 times. When we have 10 streamed title-lists, we go through each title-list and join the title names together to form a string. Finally we use .every to see if each of the 10 strings are the same, which means that each json was fetched and streamed in the same order.

So I believe the problems lies somewhere else - the streaming itself is working correctly. While I did use node-fetch instead of the actual Fetch API, I think it is safe to say that the actual Fetch API works as it should.

Also I noticed that you are directly calling response.body.getReader(), but when I looked at the documentation, the body.getReader call is done inside another then statement:

fetch('./tortoise.png')
.then(response => response.body)
.then(body => {
  const reader = body.getReader();

This might not matter, but considering everything else in your code, such as the excessive wrapping and returning of functions, I think your problems could go away just by reading a couple of tutorials on streams and cleaning up the code a bit. And if not, you will still be in a better position to figure out if the problem is in one of your many functions you are unwilling to expose. Asynchronous code's behavior is inherently difficult to debug and lacking information around such code makes it even harder.

月依秋水 2025-02-14 13:36:48

我假设您正在使用Node的 crypto 库。我们可以通过首先将 readableStream 进入deciphertransformstream这既可以阅读又可写),通过 readableStrem#pipe#pipe() pipe()

const { createDecipherIv } = require('crypto');
const { createWriteStream } = require('fs');
const { pipeline } = require('stream');

// change these to match your encryption scheme and key retrieval
const algo = 'aes-256-cbc';
const key = 'my5up3r53cr3t';

// put your initialization vector you've determined here
// leave null if you are not (or the algo doesn't support) using an iv
const iv = null;

// creates the decipher TransformStream
const decipher = createDecipherIv(algo, key, iv);

// write plaintext file here
const destFile = createWriteStream('/path/to/destination.ext');

fetch(myRequest, myInit)
    .then(response => response.body)
    .then(body => body.pipe(decipher).pipe(destFile))
    .then(stream => stream.on('end', console.log('done writing file')));

您也可以将其管道读取,以便在浏览器等上进行缓冲,管道上的管道,只需确保在定义密码/解密功能的任何地方匹配您的算法,键和IV。

I'm assuming you're using the cipher/decipher family of methods in node's crypto library. We can simplify this using streams by first piping the ReadableStream into a decipher TransformStream (a stream that is both readable and writable) via ReadableStream#pipe().

const { createDecipherIv } = require('crypto');
const { createWriteStream } = require('fs');
const { pipeline } = require('stream');

// change these to match your encryption scheme and key retrieval
const algo = 'aes-256-cbc';
const key = 'my5up3r53cr3t';

// put your initialization vector you've determined here
// leave null if you are not (or the algo doesn't support) using an iv
const iv = null;

// creates the decipher TransformStream
const decipher = createDecipherIv(algo, key, iv);

// write plaintext file here
const destFile = createWriteStream('/path/to/destination.ext');

fetch(myRequest, myInit)
    .then(response => response.body)
    .then(body => body.pipe(decipher).pipe(destFile))
    .then(stream => stream.on('end', console.log('done writing file')));

You may also pipe this to be read out in a buffer, pipe to the browser, etc, just be sure to match your algorithm, key, and iv wherever you're defining your cipher/decipher functions.

時窥 2025-02-14 13:36:48

如果我们认真对待该MDN示例中的模式,则应使用控制器来编写解密的数据(不是仍在加密的value),并将结果与​​第一个承诺返回的流进行汇总。换句话说...

return fetch(myRequest, myInit)
  // Retrieve its body as ReadableStream
  .then(response => {
    const reader = response.body.getReader();
    return new ReadableStream({
      start(controller) {
        return pump();
        function pump() {
          return reader.read().then(({ done, value }) => {
            // When no more data needs to be consumed, close the stream
            if (done) {
              controller.close();
              return;
            }

            // do the computational work on each chunk here and enqueue
            // *the result of that work* on the controller stream...

            const decrypted = cryptor.decrypt(value);
            controller.enqueue(decrypted);
            return pump();
          });
        }
      }
    })
  })
  // Create a new response out of the stream
  .then(stream => new Response(stream))
  // Create an object URL for the response
  .then(response => response.blob())
  .then(blob => {
    const arrayResponse = blob.toArrayBuffer();
    // arrayResponse is the properly sequenced result
    // if the caller wants a promise to resolve to this, just return it
    return arrayResponse;

    // OR... the OP code makes reference to a callback. if that's real, 
    // call the callback with this result
    // callback(arrayResponse);
  })
  .catch(err => console.error(err));

If we take the pattern in that MDN example seriously, we should use the controller to enqueue the decrypted data (not the still encrypted value), and aggregate the results with the stream returned by the first promise. In other words...

return fetch(myRequest, myInit)
  // Retrieve its body as ReadableStream
  .then(response => {
    const reader = response.body.getReader();
    return new ReadableStream({
      start(controller) {
        return pump();
        function pump() {
          return reader.read().then(({ done, value }) => {
            // When no more data needs to be consumed, close the stream
            if (done) {
              controller.close();
              return;
            }

            // do the computational work on each chunk here and enqueue
            // *the result of that work* on the controller stream...

            const decrypted = cryptor.decrypt(value);
            controller.enqueue(decrypted);
            return pump();
          });
        }
      }
    })
  })
  // Create a new response out of the stream
  .then(stream => new Response(stream))
  // Create an object URL for the response
  .then(response => response.blob())
  .then(blob => {
    const arrayResponse = blob.toArrayBuffer();
    // arrayResponse is the properly sequenced result
    // if the caller wants a promise to resolve to this, just return it
    return arrayResponse;

    // OR... the OP code makes reference to a callback. if that's real, 
    // call the callback with this result
    // callback(arrayResponse);
  })
  .catch(err => console.error(err));
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文