如果任何精通 Apache Avro 的 Java 实现的人正在阅读本文,那么这有点盲目。
我的高级目标是有某种方式通过网络传输一系列 avro 数据(例如 HTTP,但特定协议对于此目的并不那么重要)。在我的上下文中,我有一个 HttpServletResponse 我需要以某种方式写入这些数据。
我最初尝试将数据写入相当于 avro 容器文件的虚拟版本(假设“响应”是 HttpServletResponse 类型):
response.setContentType("application/octet-stream");
response.setHeader("Content-transfer-encoding", "binary");
ServletOutputStream outStream = response.getOutputStream();
BufferedOutputStream bos = new BufferedOutputStream(outStream);
Schema someSchema = Schema.parse(".....some valid avro schema....");
GenericRecord someRecord = new GenericData.Record(someSchema);
someRecord.put("somefield", someData);
...
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(someSchema);
DataFileWriter<GenericRecord> fileWriter = new DataFileWriter<GenericRecord>(datumWriter);
fileWriter.create(someSchema, bos);
fileWriter.append(someRecord);
fileWriter.close();
bos.flush();
这一切都很好,但事实证明 Avro 并没有真正提供除了实际文件之外读取容器文件的方法:DataFileReader 只有两个构造函数:
public DataFileReader(File file, DatumReader<D> reader);
其中
public DataFileReader(SeekableInput sin, DatumReader<D> reader);
SeekableInput 是某种 avro 特定的自定义表单,其创建最终也会从文件中读取。现在考虑到这一点,除非有某种方法以某种方式将 InputStream 强制转换为文件(http://stackoverflow.com/questions/578305/create-a-java-file-object-or-equivalent-using-a-byte- array-in-memory-without-a 表明没有,并且我也尝试查看 Java 文档),如果 OutputStream 另一端的阅读器收到该 avro 容器文件(我是不知道为什么他们允许将 avro 二进制容器文件输出到任意 OutputStream,而不提供一种从另一端相应的 InputStream 读取它们的方法,但这不是重点)。看来容器文件读取器的实现需要具体文件提供的“可查找”功能。
好吧,看来这种方法不会达到我想要的效果。创建一个模仿 avro 容器文件的 JSON 响应怎么样?
public static Schema WRAPPER_SCHEMA = Schema.parse(
"{\"type\": \"record\", " +
"\"name\": \"AvroContainer\", " +
"\"doc\": \"a JSON avro container file\", " +
"\"namespace\": \"org.bar.foo\", " +
"\"fields\": [" +
"{\"name\": \"schema\", \"type\": \"string\", \"doc\": \"schema representing the included data\"}, " +
"{\"name\": \"data\", \"type\": \"bytes\", \"doc\": \"packet of data represented by the schema\"}]}"
);
考虑到上述限制,我不确定这是否是解决此问题的最佳方法,但看起来这可能会奏效。我会将模式(例如上面的“Schema someSchema”)作为字符串放入“schema”字段中,然后放入适合该模式的记录的 avro 二进制序列化形式(即“GenericRecord”) someRecord”)位于“数据”字段内。
我实际上想了解下面描述的具体细节,但我认为也值得提供更大的背景,以便如果有更好的高级方法我可以采取(这种方法有效)但只是感觉不太理想)请告诉我。
我的问题是,假设我采用这种基于 JSON 的方法,如何将记录的 avro 二进制表示形式写入 AvroContainer 架构的“数据”字段?例如,我到达这里:
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(someSchema);
Encoder e = new BinaryEncoder(baos);
datumWriter.write(resultsRecord, e);
e.flush();
GenericRecord someRecord = new GenericData.Record(someSchema);
someRecord.put("schema", someSchema.toString());
someRecord.put("data", ByteBuffer.wrap(baos.toByteArray()));
datumWriter = new GenericDatumWriter<GenericRecord>(WRAPPER_SCHEMA);
JsonGenerator jsonGenerator = new JsonFactory().createJsonGenerator(baos, JsonEncoding.UTF8);
e = new JsonEncoder(WRAPPER_SCHEMA, jsonGenerator);
datumWriter.write(someRecord, e);
e.flush();
PrintWriter printWriter = response.getWriter(); // recall that response is the HttpServletResponse
response.setContentType("text/plain");
response.setCharacterEncoding("UTF-8");
printWriter.print(baos.toString("UTF-8"));
我最初尝试省略 ByteBuffer.wrap 子句,但随后该行
datumWriter.write(someRecord, e);
引发了一个异常,即我无法将字节数组转换为 ByteBuffer。公平地说,看起来当调用 Encoder 类(JsonEncoder 是其子类)来编写 avro Bytes 对象时,它需要提供 ByteBuffer 作为参数。因此,我尝试用 java.nio.ByteBuffer.wrap 封装 byte[],但是当打印出数据时,它被打印为一系列直接的字节,而没有通过 avro 十六进制表示形式传递:
"data": {"bytes": ".....some gibberish other than the expected format...}
这似乎不是正确的。根据avro文档,他们给出的示例字节对象说我需要放入一个json对象,其示例看起来像“\u00FF”,而我在那里放入的内容显然不是这种格式。我现在想知道的是:
- avro 字节格式的示例是什么?它看起来像“\uDEADBEEFDEADBEEF...”吗?
- 如何将二进制 avro 数据(由 BinaryEncoder 输出到 byte[] 数组)强制转换为可以粘贴到 GenericRecord 对象中并以 JSON 格式正确打印的格式?例如,我想要一个对象数据,我可以为其调用一些 GenericRecord“someRecord.put(“data”, DATA);”里面有我的 avro 序列化数据吗?
- 当给定文本 JSON 表示形式并想要重新创建由 AvroContainer 格式 JSON 表示的 GenericRecord 时,我如何将该数据读回到另一端(消费者)的字节数组中?
- (重申之前的问题)有没有更好的方法可以完成这一切?
This is somewhat of a shot in the dark in case anyone savvy with the Java implementation of Apache Avro is reading this.
My high-level objective is to have some way to transmit some series of avro data over the network (let's just say HTTP for example, but the particular protocol is not that important for this purpose). In my context I have a HttpServletResponse I need to write this data to somehow.
I initially attempted to write the data as what amounted to a virtual version of an avro container file (suppose that "response" is of type HttpServletResponse):
response.setContentType("application/octet-stream");
response.setHeader("Content-transfer-encoding", "binary");
ServletOutputStream outStream = response.getOutputStream();
BufferedOutputStream bos = new BufferedOutputStream(outStream);
Schema someSchema = Schema.parse(".....some valid avro schema....");
GenericRecord someRecord = new GenericData.Record(someSchema);
someRecord.put("somefield", someData);
...
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(someSchema);
DataFileWriter<GenericRecord> fileWriter = new DataFileWriter<GenericRecord>(datumWriter);
fileWriter.create(someSchema, bos);
fileWriter.append(someRecord);
fileWriter.close();
bos.flush();
This was all fine and dandy, except that it turns out Avro doesn't really provide a way to read a container file apart from an actual file: the DataFileReader only has two constructors:
public DataFileReader(File file, DatumReader<D> reader);
and
public DataFileReader(SeekableInput sin, DatumReader<D> reader);
where SeekableInput is some avro-specific customized form whose creation also ends up reading from a file. Now given that, unless there is some way to somehow coerce an InputStream into a File (http://stackoverflow.com/questions/578305/create-a-java-file-object-or-equivalent-using-a-byte-array-in-memory-without-a suggests that there is not, and I have tried looking around the Java documentation as well), this approach won't work if the reader on the other end of the OutputStream receives that avro container file (I'm not sure why they allowed one to output avro binary container files to an arbitrary OutputStream without providing a way to read them from the corresponding InputStream on the other end, but that's beside the point). It seems that the implementation of the container file reader requires the "seekable" functionality that a concrete File provides.
Okay, so it doesn't look like that approach will do what I want. How about creating a JSON response that mimics the avro container file?
public static Schema WRAPPER_SCHEMA = Schema.parse(
"{\"type\": \"record\", " +
"\"name\": \"AvroContainer\", " +
"\"doc\": \"a JSON avro container file\", " +
"\"namespace\": \"org.bar.foo\", " +
"\"fields\": [" +
"{\"name\": \"schema\", \"type\": \"string\", \"doc\": \"schema representing the included data\"}, " +
"{\"name\": \"data\", \"type\": \"bytes\", \"doc\": \"packet of data represented by the schema\"}]}"
);
I'm not sure if this is the best way to approach this given the above constraints, but it looks like this might do the trick. I'll put the schema (of "Schema someSchema" from above, for instance) as a String inside the "schema" field, and then put in the avro-binary-serialized form of a record fitting that schema (ie. "GenericRecord someRecord") inside the "data" field.
I actually wanted to know about a specific detail of that which is described below, but I thought it would be worthwhile to give a bigger context as well, so that if there is a better high-level approach I could be taking (this approach works but just doesn't feel optimal) please do let me know.
My question is, assuming I go with this JSON-based approach, how do I write the avro binary representation of my Record into the "data" field of the AvroContainer schema? For example, I got up to here:
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(someSchema);
Encoder e = new BinaryEncoder(baos);
datumWriter.write(resultsRecord, e);
e.flush();
GenericRecord someRecord = new GenericData.Record(someSchema);
someRecord.put("schema", someSchema.toString());
someRecord.put("data", ByteBuffer.wrap(baos.toByteArray()));
datumWriter = new GenericDatumWriter<GenericRecord>(WRAPPER_SCHEMA);
JsonGenerator jsonGenerator = new JsonFactory().createJsonGenerator(baos, JsonEncoding.UTF8);
e = new JsonEncoder(WRAPPER_SCHEMA, jsonGenerator);
datumWriter.write(someRecord, e);
e.flush();
PrintWriter printWriter = response.getWriter(); // recall that response is the HttpServletResponse
response.setContentType("text/plain");
response.setCharacterEncoding("UTF-8");
printWriter.print(baos.toString("UTF-8"));
I initially tried omitting the ByteBuffer.wrap clause, but then then the line
datumWriter.write(someRecord, e);
threw an exception that I couldn't cast a byte array into ByteBuffer. Fair enough, it looks like when the Encoder class (of which JsonEncoder is a subclass) is called to write an avro Bytes object, it requires a ByteBuffer to be given as an argument. Thus, I tried encapsulating the byte[] with java.nio.ByteBuffer.wrap, but when the data was printed out, it was printed as a straight series of bytes, without being passed through the avro hexadecimal representation:
"data": {"bytes": ".....some gibberish other than the expected format...}
That doesn't seem right. According to the avro documentation, the example bytes object they give says that I need to put in a json object, an example of which looks like "\u00FF", and what I have put in there is clearly not of that format. What I now want to know is the following:
- What is an example of an avro bytes format? Does it look something like "\uDEADBEEFDEADBEEF..."?
- How do I coerce my binary avro data (as output by the BinaryEncoder into a byte[] array) into a format that I can stick into the GenericRecord object and have it print correctly in JSON? For example, I want an Object DATA for which I can call on some GenericRecord "someRecord.put("data", DATA);" with my avro serialized data inside?
- How would I then read that data back into a byte array on the other (consumer) end, when it is given the text JSON representation and wants to recreate the GenericRecord as represented by the AvroContainer-format JSON?
- (reiterating the question from before) Is there a better way I could be doing all this?
发布评论
评论(3)
正如 Knut 所说,如果您想使用文件以外的其他内容,您可以:
这些就是你的答案。
As Knut said, if you want to use something other than a file, you can either:
Those are your answers.
在 Java 和 Scala 下,我们尝试通过使用 Scala 硝基代码生成器生成的代码来使用 inception。 Inception 是 Javascript mtth/avsc 库解决这个 问题。然而,我们在使用 Java 库时遇到了几个序列化问题,其中错误的字节始终被注入到字节流中,而且我们无法弄清楚这些字节来自哪里。
当然,这意味着使用 ZigZag 编码构建我们自己的 Varint 实现。嗯。
这里是:
Under Java and Scala, we tried using inception via code generated using the Scala nitro codegen. Inception is how the Javascript mtth/avsc library solved this problem. However, we ran into several serialization problems using the Java library where there were erroneous bytes being injected into the byte stream, consistently - and we could not figure out where those bytes were coming from.
Of course that meant building our own implementation of Varint with ZigZag encoding. Meh.
Here it is:
我解决这个问题的方法是将模式与数据分开传送。我设置了一个连接握手,从服务器向下传输架构,然后来回发送编码数据。您必须创建一个如下所示的外部包装对象:
首先将记录数组逐一编码为编码字节数组的数组。一个数组中的所有内容都应该具有相同的架构。然后,使用上述模式对包装器对象进行编码 - 将“schemaName”设置为用于对数组进行编码的模式的名称。
在服务器上,您将首先解码包装器对象。一旦解码了包装器对象,您就知道了 schemaName,并且您拥有了一个知道如何解码的对象数组——您可以随意使用!
请注意,如果您使用像
WebSockets
这样的协议和像Socket.IO
(对于Node.js
)套接字。 io 为您提供了浏览器和服务器之间基于通道的通信层。在这种情况下,只需为每个通道使用特定的模式,在发送每条消息之前对其进行编码。当连接启动时,您仍然必须共享架构 - 但如果您使用 WebSockets,这很容易实现。完成后,您将在客户端和服务器之间拥有任意数量的强类型双向流。The way I solved this was to ship the schemas separately from the data. I set up a connection handshake that transmits the schemas down from the server, then I send encoded data back and forth. You have to create an outside wrapper object like this:
Where you first encode your array of records, one by one, into an array of encoded byte arrays. Everything in one array should have the same schema. Then you encode the wrapper object with the above schema -- set "schemaName" to be the name of the schema you used to encode the array.
On the server, you will decode the wrapper object first. Once you decode the wrapper object, you know the schemaName, and you have an array of objects you know how to decode -- use as you will!
Note that you can get away without using the wrapper object if you use a protocol like
WebSockets
and an engine likeSocket.IO
(forNode.js
) Socket.io gives you a channel-based communication layer between browser and server. In that case, just use a specific schema for each channel, encode each message before you send it. You still have to share the schemas when the connection initiates -- but if you are usingWebSockets
this is easy to implement. And when you are done you have an arbitrary number of strongly-typed, bidirectional streams between client and server.