4.3. 数据类型和序列化#

Note

本教程已出版为《Flink 原理与实践》,感兴趣的读者请在各大电商平台购买!

几乎所有的大数据框架都要面临分布式计算、数据传输和持久化问题。数据传输过程前后要进行数据的序列化和反序列化:序列化就是将一个内存对象转换成二进制串,形成可网络传输或者可持久化的数据流。反序列化将二进制串转换为内存对象,这样就可以直接在编程语言中读写和操作这个对象。一种最简单的序列化方法就是将复杂数据结构转化成 JSON 格式。序列化和反序列化是很多大数据框架必须考虑的问题,在 Java 和大数据生态圈中,已有不少序列化工具,比如 Java 自带的序列化工具、Kryo 等。一些 RPC 框架也提供序列化功能,比如最初用于 Hadoop 的 Apache Avro、Facebook 开发的 Apache Thrift 和 Google 开发的 Protobuf,这些工具在速度和压缩比等方面比 JSON 有明显的优势。

但是 Flink 依然选择了重新开发了自己的序列化框架,因为序列化和反序列化将关乎整个流处理框架各方面的性能,对数据类型了解越多,可以更早地完成数据类型检查,节省数据存储空间。

泛型和其他类型#

当以上任何一个类型均不满足时,Flink 认为该数据结构是一种泛型(GenericType),使用 Kryo 来进行序列化和反序列化。但 Kryo 在有些流处理场景效率非常低,有可能造成流数据的积压。我们可以使用 senv.getConfig.disableGenericTypes() 来禁用 Kryo,禁用后,Flink 遇到无法处理的数据类型将抛出异常,这种方法对于调试非常有效。

TypeInformation#

以上如此多的类型,在 Flink 中,统一使用 TypeInformation 类表示。比如,POJO 在 Flink 内部使用 PojoTypeInfo 来表示,PojoTypeInfo 继承自 CompositeTypeCompositeType 继承自 TypeInformation。下图展示了 TypeInformation 的继承关系,可以看到,前面提到的诸多数据类型,在 Flink 中都有对应的类型。TypeInformation 的一个重要的功能就是创建 TypeSerializer 序列化器,为该类型的数据做序列化。每种类型都有一个对应的序列化器来进行序列化。

../_images/typeinformation.png

图 4.15 TypeInformation 继承关系#

使用前面介绍的各类数据类型时,Flink 会自动探测传入的数据类型,生成对应的 TypeInformation,调用对应的序列化器,因此用户其实无需关心类型推测。比如,Flink 的 map 函数 Scala 签名为:def map[R: TypeInformation](fun: T => R): DataStream[R],传入 map 的数据类型是 T,生成的数据类型是 R,Flink 会推测 T 和 R 的数据类型,并使用对应的序列化器进行序列化。

../_images/type-inference-process.svg

图 4.16 Flink 数据类型推断和序列化#

图 4.16 展示了 Flink 的类型推断和序列化过程,以一个字符串 String 类型为例,Flink 首先推断出该类型,并生成对应的 TypeInformation,然后在序列化时调用对应的序列化器,将一个内存对象写入内存块。

注册类#

如果传递给 Flink 算子的数据类型是父类,实际运行过程中使用的是子类,子类中有一些父类没有的数据结构和特性,将子类注册可以提高性能。在执行环境上调用 env.registerType(clazz) 来注册类。registerType 方法的源码如下所示,其中 TypeExtractor 对数据类型进行推断,如果传入的类型是 POJO,则可以被 Flink 识别和注册,否则将使用 Kryo。

// Flink registerType java 源码
public void registerType(Class<?> type) {
  if (type == null) {
    throw new NullPointerException("Cannot register null type class.");
  }

  TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(type);

  if (typeInfo instanceof PojoTypeInfo) {
    config.registerPojoType(type);
  } else {
    config.registerKryoType(type);
  }
}

注册序列化器#

如果你的数据类型不是 Flink 支持的上述类型,这时 Flink 会使用 Kryo 序列化。我们需要对数据类型和序列化器进行注册,以便 Flink 对该数据类型进行序列化。

// 使用对 TestClassSerializer 对 TestClass 进行序列化
env.registerTypeWithKryoSerializer(TestClass.class, new TestClassSerializer());

其中 TestClassSerializer 要继承 com.esotericsoftware.kryo.Serializer。下面的代码是一个序列化示意案例。

static class TestClassSerializer extends Serializer<TestClass> implements Serializable {

  private static final long serialVersionUID = -3585880741695717533L;

  @Override
  public void write(Kryo kryo, Output output, TestClass testClass) {
    ...
  }

  @Override
  public TestClass read(Kryo kryo, Input input, Class<TestClass> aClass) {
    ...
  }
}

相应的包需要添加到 pom 中:

<dependency>
  <groupId>com.esotericsoftware.kryo</groupId>
  <artifactId>kryo</artifactId>
  <version>2.24.0</version>
</dependency>

对于 Apache Thrift 和 Protobuf 的用户,已经有人将序列化器编写好,我们可以直接拿来使用:

// Google Protobuf
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, ProtobufSerializer.class);

// Apache Thrift
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);

Google Protobuf 的 pom:

<dependency>
	<groupId>com.twitter</groupId>
	<artifactId>chill-protobuf</artifactId>
	<version>0.7.6</version>
	<exclusions>
		<exclusion>
			<groupId>com.esotericsoftware.kryo</groupId>
			<artifactId>kryo</artifactId>
		</exclusion>
	</exclusions>
</dependency>
<dependency>
	<groupId>com.google.protobuf</groupId>
	<artifactId>protobuf-java</artifactId>
	<version>3.7.0</version>
</dependency>

Apache Thrift 的 pom:

<dependency>
	<groupId>com.twitter</groupId>
	<artifactId>chill-thrift</artifactId>
	<version>0.7.6</version>
	<exclusions>
		<exclusion>
			<groupId>com.esotericsoftware.kryo</groupId>
			<artifactId>kryo</artifactId>
		</exclusion>
	</exclusions>
</dependency>
<dependency>
	<groupId>org.apache.thrift</groupId>
	<artifactId>libthrift</artifactId>
	<version>0.11.0</version>
	<exclusions>
		<exclusion>
			<groupId>javax.servlet</groupId>
			<artifactId>servlet-api</artifactId>
		</exclusion>
		<exclusion>
			<groupId>org.apache.httpcomponents</groupId>
			<artifactId>httpclient</artifactId>
		</exclusion>
	</exclusions>
</dependency>