Quarkus通过反应性REST API发送/接收“巨大”溪流

发布于 2025-02-04 20:22:28 字数 4047 浏览 4 评论 0 原文

我为自己的概念证明而苦苦挣扎。 这是主要想法:

  • 在一侧,REST API服务器应允许
  • 在另一侧获取并发送“流”(作为InputStream,而不是文件),客户端应像InPustream一样进行相同的操作(可能是/从文件中,但是拥有一个输入流将回答所有情况)

首先,我知道Quarkus反应式框架允许发送异步,但这不是一个选项:

  • 然后将内容(请求/响应)转发到/从另一个服务中获取(对象存储)
  • 不再提交文件,因为它是一种微服务,即使在此服务中暂时存储大量文件是反对体系结构逻辑的,
  • 因此当然不是出于明显的原因(不允许Microservice拥有1 tb的RAM),

我尝试为了找出如何做到这一点,当然要考虑到后压力,以避免任何方式(或几乎)在内存中(无论是client< - > server)。 我的主要问题是目前在将InputStream发送回客户端以进行响应时与服务器端。

因此,我的主要问题是:如何通过REST HTTP Service Server Side发送一个巨大的流,而无需将所有内容放入内存或使用Quarkus的文件中?? 然后,下一个问题将是:

  • 相同但在客户端(可能是通过网络定位和管道,但是很难看如何做,因为我有一个输入式的响应,并且想消耗一个输入流,而不是writestream) ?
  • 最后在Re​​vese Way上(接收服务器端的输入流,由客户端发送)?

到目前为止,我已经尝试了几种方法:

  • 使用 inputStream 的直接发送/接收,但是
  • 使用 inputStream (使用 uni )但是,
  • 使用反应性发送/接收 Multi< buff> multi< byte []> ,但
  • 试图通过请求实现后压力的 相同问题但是没有解决(我相信HTTP不是HTTP/2)
  • 来弄清楚如何使用Mutiny使用 Overflow ,但没有找到该如何做(当然不要掉落缓冲区
  • )要创建一个asyncinputStream,请从vert.x和mutiny创建 asyncfile ,但无法使其
  • 在服务器和客户端定义上具有类似的效果(因此使用Resteasy Mutiny Way,用于服务器和客户端)(请注意,请注意客户端,我也尝试了Vert.x Web客户端,但主要问题在服务器端)。
@Path("/objects/{name}"}
@POST
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
public Uni<MyResponse> createObject(@RestPath String name, InputStream content) {
   // For the moment not the main issue, but I believed the InpuStream rely in memory, which is very bad
}

@Path("/objects/{name}"}
@GET
@Produces(MediaType.APPLICATION_OCTET_STREAM)
public Uni<InputStream> getObject1(@RestPath String name) {
  // OOME
}

@Path("/objects2/{name}"}
@GET
@Produces(MediaType.APPLICATION_OCTET_STREAM)
public Multi<Buffer> getObject2(@RestPath String name) {
  // OOME or Blocked
}

@Path("/objects2/{name}"}
@GET
@Produces(MediaType.APPLICATION_OCTET_STREAM)
public Multi<byte[]> getObject3(@RestPath String name) {
  // OOME or Blocked
}

因此,重点关注服务器端,我尝试过:(伪代码)

  • 使用直接输入流:限制为10 MB(quarkus默认值),并且在内存中,上面是错误的(InputStream在客户端上是无效的,并且如果服务器发送了,则没有线索一切都不是在响应时间)。
  • 没有反向压力:带有
Multi.createFrom().emitter(em -> {
  // Functional interface as "sendAsRequested"
  while (InputStream as something) {
     byte[] bufferBytes = read from InpuStream
     if (read ended) {
        em.complete();
        break;
     }
     Buffer buffer = Buffer.buffer().appendBytes(bufferBytes);
     em.emit(buffer);    
  }
}).onCompletion().invoke(() -> {
    Close the InputStream
});
  • 后压的内存:但是,即使在客户端上,我都不会回到服务器,我发送了我发送 request(1)
Multi.createFrom().emitter(em -> {
  // Functional interface as "sendAsRequested"
  long nb = em.requested();
  for (loop on nb) {
     byte[] bufferBytes = read from InpuStream
     if (read ended) {
        em.complete();
        break;
     }
     Buffer buffer = Buffer.buffer().appendBytes(bufferBytes);
     em.emit(buffer);    
  }
}).onRequest().invoke(() -> {
    // will relaunch the functional interface "sendAsRequested"
}).onCompletion().invoke(() -> {
    Close the InputStream
});

和客户端的每个项目:(似乎请求从未到达服务器)

Multi<Buffer> received;
received.subscribe().withSubcriber(new MultiSubscriber<Buffer>() {
  Subscription s;
  @Override
  public void onSubscribe(Subscription s) {
    this.s = s;
    s.request(1);
  }
  @Override
  public void onItem(Buffer buffer) {
    s.request(1);
    consume buffer
  }
});
  • 查看 asyncfile 实现以编写一些“ asyncinputstream”,但是
    • 似乎当前的兵变实现不允许添加新的“类型”(各种类型的硬编码)
    • 试图将异步范围扩展到“模仿”其行为的行为,但不确定我是否会朝着正确的方向。

我有点好奇默认的Quarkus将10 MB表示为身体的限制(身体块不受限制)。似乎所有努力都将用于小型或小流(如JSON方面),但没有inputStream之类的流(在文件除外,但在传统的容器微服务方式中不可能,而内存和磁盘太小了对于这样的文件方式)。我了解原因(占需求的80%),但似乎Inputstream目前在Quarkus中缺乏支持(除非少于10 MB)。

有人有任何想法或建议吗?

I'm struggling with my proof of concept.
Here is the main idea:

  • On one side, the REST API Server shall allow to get and send "stream" (as InputStream, not File)
  • On other side, the client shall do the same, reversely, again as InpuStream (possibly into/from File, but having an InputStream will answear all cases)

First, I know that Quarkus Reactive framework allow to send an AsyncFile, but that's not an option there:

  • The content (request/response) is then forwarded to/get from another service (object storage)
  • Not File again since, as it is a microservice, having huge files stored even temporarily within this service is against architecture logic
  • Not in Memory of course for obvious reasons (microservice will not be allowed to have 1 TB of RAM)

I try to find out how to do it, of course taking into account back-pressure to avoid everything (or almost) being in memory, whatever the way (client <-> server).
My main issue is currently with Server side when sending InputStream back to client in response.

So my main question is: How can I send through REST HTTP service server side a huge stream without putting everything in memory or into a File with Quarkus?
Then the next questions would be:

  • The same but on Client side (probably through WebClient and pipe but difficult to see how to do it since I've got an InputStream as response and would like to have a InputStream to consume, not a WriteStream)?
  • And finally on revese way (receiving InputStream on server side, sent by client)?

I've tried so far several ways:

  • Using Direct sending/receiving of InputStream but leads to Out of Memory
  • Using Reactive sending/receiving of InputStream (using Uni) but same issue
  • Using Reactive sending/receiving of Multi<Buffer> or Multi<byte[]> but same issue
  • Trying to implement back-pressure through request but did not worked out (since HTTP not HTTP/2 I believe)
  • Tring to figure out how to use overflow with Mutiny, but did not found how to do it (to not drop buffers of course)
  • Trying to create a AsyncInputStream, as AsyncFile from vert.x and Mutiny, but cannot get it working
  • Having on Server and Client definition something like (so using RestEasy Mutiny way, both for Server and Client) (note on client, I tried also the vert.x Web client too, but the main issue is on Server side).
@Path("/objects/{name}"}
@POST
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
public Uni<MyResponse> createObject(@RestPath String name, InputStream content) {
   // For the moment not the main issue, but I believed the InpuStream rely in memory, which is very bad
}

@Path("/objects/{name}"}
@GET
@Produces(MediaType.APPLICATION_OCTET_STREAM)
public Uni<InputStream> getObject1(@RestPath String name) {
  // OOME
}

@Path("/objects2/{name}"}
@GET
@Produces(MediaType.APPLICATION_OCTET_STREAM)
public Multi<Buffer> getObject2(@RestPath String name) {
  // OOME or Blocked
}

@Path("/objects2/{name}"}
@GET
@Produces(MediaType.APPLICATION_OCTET_STREAM)
public Multi<byte[]> getObject3(@RestPath String name) {
  // OOME or Blocked
}

So focusing on Server side, I've tried for instance: (pseudo code)

  • Using directly InputStream: limitation to 10 MB (Quarkus default) and in Memory, above is faulty (inputStream is null on client side and no clue if the server sends everything or not, probably not in respect of response time).
  • Without back-pressure: Out of Memory
Multi.createFrom().emitter(em -> {
  // Functional interface as "sendAsRequested"
  while (InputStream as something) {
     byte[] bufferBytes = read from InpuStream
     if (read ended) {
        em.complete();
        break;
     }
     Buffer buffer = Buffer.buffer().appendBytes(bufferBytes);
     em.emit(buffer);    
  }
}).onCompletion().invoke(() -> {
    Close the InputStream
});
  • With back-pressure: But requests never come back to server, even if on Client, for each item I send a request(1)
Multi.createFrom().emitter(em -> {
  // Functional interface as "sendAsRequested"
  long nb = em.requested();
  for (loop on nb) {
     byte[] bufferBytes = read from InpuStream
     if (read ended) {
        em.complete();
        break;
     }
     Buffer buffer = Buffer.buffer().appendBytes(bufferBytes);
     em.emit(buffer);    
  }
}).onRequest().invoke(() -> {
    // will relaunch the functional interface "sendAsRequested"
}).onCompletion().invoke(() -> {
    Close the InputStream
});

And on client side: (seems that requests never reached the server)

Multi<Buffer> received;
received.subscribe().withSubcriber(new MultiSubscriber<Buffer>() {
  Subscription s;
  @Override
  public void onSubscribe(Subscription s) {
    this.s = s;
    s.request(1);
  }
  @Override
  public void onItem(Buffer buffer) {
    s.request(1);
    consume buffer
  }
});
  • Looking at AsyncFile implementation to try to write some "AsyncInputStream", but
    • It seems that current Mutiny implementation does not allow to add a new "type" (various types hard coded)
    • Trying to extend the AsyncFile to "mimic" its behavior for InpuStream backed, but not sure if I'm taking the right direction.

I'm a bit curious that default Quarkus said 10 MB as a limit for body (chunked body is not limited). And it seems that all efforts were going to small items or small streams (as in Json aspect), but none to streams such as InputStream (at the exception of File, but not possible in traditional container microservice way where memory and disk are too small for such File way). I understand the reason (80% of the needs), but it seems that InputStream is lacking of support right now in Quarkus (except if less than 10 MB).

Does someone has any ideas or suggestions?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文