Hive SerDe 序列化与反序列化
SerDe 是 Serializer/Deserializer 的缩写,即序列化和反序列化。
- 序列化是对象转换成字节序列的过程。
- 反序列化是字节序列转换成对象的过程。
对象的序列化主要有两种用途:
- 对象的持久化,即把对象转换成字节序列后保存到文件。
- 对象数据的网络传输。
除了上面这两点,Hive 中序列化的作用还包括:Hive 的反序列化是对 key / value 反序列化成 Hive 表的每个列的值。Hive 可以方便的把数据加载到表而不需要对数据进行转换。这样在处理海量数据时可以节省大量的时间。
SerDe 可以让 Hive 从表中读取数据,然后以任意自定义格式把数据写回 HDFS。你可以根据具体的数据格式开发自定义的 SerDe 实现。
表数据行的序列化和反序列化大概过程如下:
- HDFS 文件 ——> InputFileFormate ——>
——> 反序列化 ——> 数据行对象(Row object) - 数据行对象(Row object)——> 序列化 ——>
——> OutputFileFormate ——> HDFS 文件
需要特别注意的是,在数据读取的时候 “key” 部分是会被忽略的,并且在数据写入的时候 “key” 部分总是一个常量。而数据行对象是被存储在 “value” 里的。
SerDe 的使用
数据的序列化方式与反序列号方式是在创建表的时候指定的。
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
[(col_name data_type [COMMENT col_comment], ...)]
[COMMENT table_comment]
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
[CLUSTERED BY (col_name, col_name, ...)
[SORTED BY (col_name [ASC|DESC], ...)]
INTO num_buckets BUCKETS]
[ROW FORMAT row_format]
[STORED AS file_format]
[LOCATION hdfs_path]
如上创建表语句, 使用 row format
参数来指定 SerDe
的类型。
Hive 的 SerDe 分类
Hive 最新的 SerDe 库是 org.apache.hadoop.hive.serde2
,而旧的库 org.apache.hadoop.hive.serde
已经摒弃不用了。看源码的话只需要看 serde2 这个库即可。
Hive 内置的 SerDe 类型
Hive 使用下面的 FileFormat 类型来读写 HDFS 文件:
TextInputFormat/HiveIgnoreKeyTextOutputFormat
该格式用于读写文本文件格式的数据。
SequenceFileInputFormat/SequenceFileOutputFormat
该格式用于读写 Hadoop 的序列文件格式。
Hive 序列化和反序列化数据使用以下 SerDe 类:
MetadataTypedColumnsetSerDe
这个 SerDe 类型用于读写以某个分隔符分隔的记录。比如使用逗号分隔符的记录(CSV),tab 键分隔符的记录。
LazySimpleSerDe
这个是默认的 SerDe 类型。读取与 MetadataTypedColumnsetSerDe 和 TCTLSeparatedProtocol 相同的数据格式,可以用这个 Hive SerDe 类型。它是以惰性的方式创建对象的,因此具有更好的性能。在 Hive 0.14.0 版本以后,在读写数据时它支持指定字符编码。例如:
ALTER TABLE person SET SERDEPROPERTIES (‘serialization.encoding’=’GBK’)
如果把配置属性 hive.lazysimple.extended_boolean_literal
设置为 true
(Hive 0.14.0 以后版本),LazySimpleSerDe 可以把 ‘T’, ‘t’, ‘F’, ‘f’, ‘1’, and ‘0’ 视为合法的布尔字面量。而该配置默认是 false 的,因此它只会把 ‘True’ 和 ‘False’ 视为合法的布尔字面量。
Thrift SerDe
读写 Thrift 序列化对象,可以使用这种 Hive SerDe 类型。需要确定的是,对于 Thrift 对象,类文件必须先被加载。
动态 SerDe
为了读写 Thrift 序列化对象,我们可以使用这种 SerDe 类型。它可以理解 Thrift DDL 语句,所以对象的模式可以在运行时被提供。另外,它支持许多不同的协议,包括 TBinaryProtocol, TJSONProtocol, TCTLSeparatedProtocol。
下面汇集了一些其他的 SerDe 类型及其新增的对应 Hive 版本:
Hive 0.12.0 新增支持 Json 文件的 JsonSerDe。
Hive 0.9.1 新增支持 Avro 文件格式的 AvroSerDe,在建表时,如果文件格式指定的是 AVRO(STORED AS AVRO),那么默认 SerDe 就是 AvroSerDe。
Hive 0.11.0 新增支持 ORC 文件格式的 SerDe ,即 ORCSerDe
Hive 0.10 新增支持 Parquet 文件格式的 SerDe ,即 ParquetSerDe
自定义 SerDe 类型
如果 Hive 的自定义 Serde 类型不能满足你的需求,你可以自己定义自己的 SerDe 类型。
开发自定义 SerDe 类型
定义一个类, 继承抽象类 AbstractSerDe, 实现 initialize 和 deserialize 两个方法。示例代码如下:
package com.coder4.hive;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
public class MySerDe extends AbstractSerDe {
// params
private List<String> columnNames = null;
private List<TypeInfo> columnTypes = null;
private ObjectInspector objectInspector = null;
// seperator
private String nullString = null;
private String lineSep = null;
private String kvSep = null;
@Override
public void initialize(Configuration conf, Properties tbl)
throws SerDeException {
// Read sep
lineSep = "\n";
kvSep = "=";
nullString = tbl.getProperty(Constants.SERIALIZATION_NULL_FORMAT, "");
// Read Column Names
String columnNameProp = tbl.getProperty(Constants.LIST_COLUMNS);
if (columnNameProp != null && columnNameProp.length() > 0) {
columnNames = Arrays.asList(columnNameProp.split(","));
} else {
columnNames = new ArrayList<String>();
}
// Read Column Types
String columnTypeProp = tbl.getProperty(Constants.LIST_COLUMN_TYPES);
// default all string
if (columnTypeProp == null) {
String[] types = new String[columnNames.size()];
Arrays.fill(types, 0, types.length, Constants.STRING_TYPE_NAME);
columnTypeProp = StringUtils.join(types, ":");
}
columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProp);
// Check column and types equals
if (columnTypes.size() != columnNames.size()) {
throw new SerDeException("len(columnNames) != len(columntTypes)");
}
// Create ObjectInspectors from the type information for each column
List<ObjectInspector> columnOIs = new ArrayList<ObjectInspector>();
ObjectInspector oi;
for (int c = 0; c < columnNames.size(); c++) {
oi = TypeInfoUtils
.getStandardJavaObjectInspectorFromTypeInfo(columnTypes
.get(c));
columnOIs.add(oi);
}
objectInspector = ObjectInspectorFactory
.getStandardStructObjectInspector(columnNames, columnOIs);
}
@Override
public Object deserialize(Writable wr) throws SerDeException {
// Split to kv pair
if (wr == null)
return null;
Map<String, String> kvMap = new HashMap<String, String>();
Text text = (Text) wr;
for (String kv : text.toString().split(lineSep)) {
String[] pair = kv.split(kvSep);
if (pair.length == 2) {
kvMap.put(pair[0], pair[1]);
}
}
// Set according to col_names and col_types
ArrayList<Object> row = new ArrayList<Object>();
String colName = null;
TypeInfo type_info = null;
Object obj = null;
for (int i = 0; i < columnNames.size(); i++) {
colName = columnNames.get(i);
type_info = columnTypes.get(i);
obj = null;
if (type_info.getCategory() == ObjectInspector.Category.PRIMITIVE) {
PrimitiveTypeInfo p_type_info = (PrimitiveTypeInfo) type_info;
switch (p_type_info.getPrimitiveCategory()) {
case STRING:
obj = StringUtils.defaultString(kvMap.get(colName), "");
break;
case LONG:
case INT:
try {
obj = Long.parseLong(kvMap.get(colName));
} catch (Exception e) {
}
}
}
row.add(obj);
}
return row;
}
@Override
public ObjectInspector getObjectInspector() throws SerDeException {
return objectInspector;
}
@Override
public SerDeStats getSerDeStats() {
return null;
}
@Override
public Class<? extends Writable> getSerializedClass() {
return Text.class;
}
@Override
public Writable serialize(Object arg0, ObjectInspector arg1)
throws SerDeException {
return null;
}
}
使用自定义 Serde 类型
hive > add jar MySerDe.jar
创建表格时属性 row fromat 指定自定义的 SerDe 类。
CREATE EXTERNAL TABLE IF NOT EXISTS teacher (
id BIGINT,
name STRING,
age INT)
ROW FORMAT SERDE 'com.coder4.hive.MySerDe'
STORED AS TEXTFILE
LOCATION '/usr/hive/text/'
ObjectInspector
Hive 使用 ObjectInspector 对象分析行对象的内部结构以及列的结构。
具体来说,ObjectInspector 给访问复杂的对象提供了一种统一的方式。对象可能以多种格式存储在内存中:
- Java 类实例,Thrift 或者 原生 Java
- 标准 Java 对象,比如 Map 字段,我们使用 java.util.List 表示 Struct 和 Array ,以及使用 java.util.Map。
- 惰性初始化对象。
此外,可以通过 (ObjectInspector, java 对象) 这种结构表示一个复杂的对象。它为我们提供了访问对象内部字段的方法,而不涉及对象结构相关的信息。出了序列化的目的,Hive 建议为自定义 SerDes 创建自定义的 objectinspector,SerDe 有两个构造器,一个无参构造器,一个常规构造器。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论