自研RPC框架之序列化应用

自研RPC框架之序列化应用

1.序列化抽象

我们定义了序列化器的统一接口Serializer,其中包含序列化和反序列化两个方法。

1
2
3
4
5
public interface Serializer {
byte[] serialize(Object object);

<T> T deserialize(byte[] bytes, Class<T> clazz);
}

2.JDK序列化器

JDK序列化机制简单易用,只需实现Serializable接口,能够自动处理包含多个对象引用、循环引用等复杂对象图的序列化和反序列化,无需开发人员进行特殊处理,同时保留了对象类型信息,反序列化时能够准确地恢复对象的类型。

但是,JDK序列化产生的字节流相对较大,这不仅影响性能,还增加了存储和网络传输的负担。更重要的是,JDK序列化存在安全风险,恶意的序列化数据可能导致安全漏洞,例如执行任意代码。因此我们绝对不能使用JDK序列化方式!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class JdkSerializer implements Serializer {
private static final Logger log = LoggerFactory.getLogger(JdkSerializer.class);

@Override
public byte[] serialize(Object object) {
if (object == null) {
log.error("序列化对象不能为空");
throw new SerializeException("序列化对象不能为空");
}
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos)) {
oos.writeObject(object);
byte[] result = baos.toByteArray();
log.info("JDK序列化操作完成,字节数:{}", result.length);
return result;
} catch (IOException e) {
log.error("序列化过程发生异常:{}", object);
throw new SerializeException(e);
}
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
if (bytes == null || clazz == null) {
log.error("反序列化参数不能为空");
throw new SerializeException("反序列化参数不能为空");
}
try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bais)) {
log.info("JDK反序列化操作完成");
return (T) ois.readObject();
} catch (IOException | ClassNotFoundException e) {
log.error("反序列化过程发生异常:{}", clazz);
throw new SerializeException(e);
}
}
}

我们可以看到,JDK 自带的序列化机制对使用者而言是非常简单的。序列化具体的实现是由 ObjectOutputStream 完成的,而反序列化的具体实现是由 ObjectInputStream 完成的。那么 JDK 的序列化过程是怎样完成的呢?我们看下下面这张图:

img

序列化过程就是在读取对象数据的时候,不断加入一些特殊分隔符,这些特殊分隔符用于在反序列化过程中截断用。

  • 头部数据用来声明序列化协议序列化版本,用于高低版本向后兼容
  • 对象数据主要包括类名、签名、属性名、属性类型及属性值,当然还有开头结尾等数据,除了属性值属于真正的对象值,其他都是为了反序列化用的元数据
  • 存在对象引用、继承的情况下,就是递归遍历“写对象”逻辑

实际上任何一种序列化框架,核心思想就是设计一种序列化协议,将对象的类型、属性类型、属性值一一按照固定的格式写到二进制字节流中来完成序列化,再按照固定的格式一一读出对象的类型、属性类型、属性值,通过这些信息重新创建出一个新的对象,来完成反序列化。

3.Hessian序列化器

Hessian使用二进制数据格式进行序列化和反序列化,相较于基于文本的序列化协议(如XML、JSON),它更加紧凑,减少了传输数据的大小,提高了网络传输效率。Hessian设计之初就考虑到了跨语言的支持,使得不同语言编写的系统之间可以轻松地进行数据交换和通信。Hessian的API简单易用,开发人员可以快速地实现数据的序列化和反序列化,无需复杂的配置。

相对于JDK、JSON,由于Hessian更加高效,生成的字节数更小,有非常好的兼容性和稳定性,所以Hessian更加适合作为RPC框架远程通信的序列化协议。但Hessian本身也有问题,官方版本对Java里面一些常见对象的类型不支持,比如:

  • Linked系列,LinkedHashMap、LinkedHashSet等,但是可以通过扩展CollectionDeserializer类修复;
  • Locale类,可以通过扩展ContextSerializerFactory类修复;
  • Byte/Short反序列化的时候变成Integer。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class HessianSerializer implements Serializer {
private static final Logger log = LoggerFactory.getLogger(HessianSerializer.class);

@Override
public byte[] serialize(Object object) {
if (object == null) {
log.error("序列化对象不能为空");
throw new SerializeException("序列化对象不能为空");
}
Hessian2Output ho = null;
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
ho = new Hessian2Output(baos);
ho.writeObject(object);
ho.flush();
byte[] result = baos.toByteArray();
log.info("Hessian序列化操作完成,字节数:{}", result.length);
return result;
} catch (IOException e) {
log.error("Hessian序列化过程发生异常:{}", object);
throw new SerializeException(e);
} finally {
try {
if (ho != null) {
ho.close();
}
} catch (IOException e) {
// don't care
}
}
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
if (bytes == null || clazz == null) {
log.error("反序列化参数不能为空");
throw new SerializeException("反序列化参数不能为空");
}
Hessian2Input hi = null;
try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) {
hi = new Hessian2Input(bais);
log.info("Hessian反序列化操作完成");
return (T) hi.readObject();
} catch (IOException e) {
log.error("Hessian反序列化过程发生异常:{}", clazz);
throw new SerializeException(e);
} finally {
try {
if (hi != null) {
hi.close();
}
} catch (IOException e) {
// don't care
}
}
}
}

4.Protostuff序列化器

Protostuff提供了与ProtoBuf相当的性能和效率,特别是在序列化和反序列化大量数据时,不过不像ProtoBuf那样需要预先定义.proto文件,Protostuff可以直接对Java对象进行序列化和反序列化,简化了使用流程。相比ProtoBuf,Protostuff更容易在现有Java项目中集成和使用,减少了学习和开发的成本。

总体而言,Protostuff是一个高效、灵活的序列化框架,适合那些希望利用ProtoBuf性能但又不想被.proto文件约束的Java项目。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ProtostuffSerializer implements Serializer {
// 避免每次序列化时重新应用缓冲区空间
private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);

@Override
public byte[] serialize(Object object) {
Class<?> clazz = object.getClass();
Schema schema = RuntimeSchema.getSchema(clazz);
byte[] bytes;
try {
bytes = ProtostuffIOUtil.toByteArray(object, schema, BUFFER);
} finally {
BUFFER.clear();
}
return bytes;
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
Schema<T> schema = RuntimeSchema.getSchema(clazz);
T obj = schema.newMessage();
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
}

5.Kryo序列化器

Kryo是一个高性能的序列化/反序列化工具,由于其变长存储特性并使用了字节码生成机制,拥有较高的运行速度和较小的字节码体积。另外,Kryo已经是一种非常成熟的序列化实现了,已经在Twitter、Groupon、Yahoo以及多个著名开源项目(如 Hive、Storm)中广泛的使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class KryoSerializer implements Serializer {
private static final Logger log = LoggerFactory.getLogger(KryoSerializer.class);

// Kryo不是线程安全的,每个线程都应该有自己的Kryo的Input和Output,因此使用ThreadLocal存放Kryo对象
private static final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
kryo.register(OpenrpcRequest.class);
kryo.register(OpenrpcResponse.class);
kryo.setReferences(true); // 默认值为true,是否关闭注册行为,关闭之后可能存在序列化问题,一般推荐设置为true
kryo.setRegistrationRequired(false); // 默认值为false,是否关闭循环引用,可以提高性能,但是一般不推荐设置为true
return kryo;
});

@Override
public byte[] serialize(Object object) {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
Output output = new Output(baos)) {
Kryo kryo = kryoThreadLocal.get();
kryo.writeObject(output, object);
kryoThreadLocal.remove();
return output.toBytes();
} catch (IOException e) {
log.error("Kryo序列化过程发生异常:{}", object);
throw new SerializeException(e);
}
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
try(ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
Input input = new Input(bais)) {
Kryo kryo = kryoThreadLocal.get();
T t = kryo.readObject(input, clazz);
kryoThreadLocal.remove();
return t;
} catch (IOException e) {
log.error("Kryo反序列化过程发生异常:{}", clazz);
throw new SerializeException(e);
}
}
}

6.序列化简单工厂

我们在XML配置文件中可以通过<serializeType type="hessian"/>自定义配置序列化方案,其实压缩跟序列化类似,我们就不单独说了,不过需要注意一点:

压缩可以自定义配置压缩阈值,因为对于较小的字节数组往往压缩后的体积反而会变大,有些得不偿失,因此我们可以配置压缩阈值如1K,只有当序列化后的字节数组大小超过1K字节后才会压缩,同时压缩其实是牺牲一定的CPU时间以减少网络通信时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class SerializerFactory {
private static final Map<String, ObjectWrapper<Serializer>> SERIALIZER_PROTOCOL_CACHE = new ConcurrentHashMap<>(8);
private static final Map<Byte, ObjectWrapper<Serializer>> SERIALIZER_CODE_CACHE = new ConcurrentHashMap<>(8);

static {
ObjectWrapper<Serializer> jdkSerializer = new ObjectWrapper<>((byte) 1, "jdk", new JdkSerializer());
ObjectWrapper<Serializer> hessianSerializer = new ObjectWrapper<>((byte) 2, "hessian", new HessianSerializer());
ObjectWrapper<Serializer> kryoSerializer = new ObjectWrapper<>((byte) 3, "kryo", new KryoSerializer());
ObjectWrapper<Serializer> protostuffSerializer = new ObjectWrapper<>((byte) 4, "protostuff", new ProtostuffSerializer());
SERIALIZER_PROTOCOL_CACHE.put("jdk", jdkSerializer);
SERIALIZER_PROTOCOL_CACHE.put("hessian", hessianSerializer);
SERIALIZER_PROTOCOL_CACHE.put("kryo", kryoSerializer);
SERIALIZER_PROTOCOL_CACHE.put("protostuff", protostuffSerializer);

SERIALIZER_CODE_CACHE.put((byte) 1, jdkSerializer);
SERIALIZER_CODE_CACHE.put((byte) 2, hessianSerializer);
SERIALIZER_CODE_CACHE.put((byte) 3, kryoSerializer);
SERIALIZER_CODE_CACHE.put((byte) 4, protostuffSerializer);
}

public static ObjectWrapper<Serializer> getSerializer(String protocol) {
ObjectWrapper<Serializer> serializerWrapper = SERIALIZER_PROTOCOL_CACHE.get(protocol);
if (serializerWrapper == null) {
throw new IllegalArgumentException("序列化类型不支持");
}
return serializerWrapper;
}

public static ObjectWrapper<Serializer> getSerializer(byte code) {
ObjectWrapper<Serializer> serializerWrapper = SERIALIZER_CODE_CACHE.get(code);
if (serializerWrapper == null) {
throw new IllegalArgumentException("序列化类型不支持");
}
return serializerWrapper;
}

public static void addSerializer(ObjectWrapper<Serializer> serializerObjectWrapper) {
SERIALIZER_PROTOCOL_CACHE.put(serializerObjectWrapper.getName(), serializerObjectWrapper);
SERIALIZER_CODE_CACHE.put(serializerObjectWrapper.getCode(), serializerObjectWrapper);
}
}

7.选择序列化协议

在序列化的选择上,与序列化协议的效率、性能、序列化协议后的体积相比,其通用性兼容性的优先级会更高,因为他是会直接关系到服务调用的稳定性和可用率的,对于服务的性能来说,服务的可靠性显然更加重要。我们更加看重这种序列化协议在版本升级后的兼容性是否很好,是否支持更多的对象类型,是否是跨平台、跨语言的,是否有很多人已经用过并且踩过了很多的坑,其次我们才会去考虑性能、效率和空间开销。

还有一点我要特别强调。除了序列化协议的通用性和兼容性,序列化协议的安全性也是非常重要的一个参考因素,甚至应该放在第一位去考虑。以JDK原生序列化为例,它就存在漏洞。如果序列化存在安全漏洞,那么线上的服务就很可能被入侵。

img

综合上面几个参考因素,现在我们再来总结一下这几个序列化协议。我们首选的还是HessianProtobuf,因为他们在性能、时间开销、空间开销、通用性、兼容性和安全性上,都满足了我们的要求。其中Hessian在使用上更加方便,在对象的兼容性上更好;Protobuf则更加高效,通用性上更有优势。