自研RPC框架之序列化应用
1.序列化抽象
我们定义了序列化器的统一接口Serializer
,其中包含序列化和反序列化两个方法。
1 2 3 4 5
| public interface Serializer { byte[] serialize(Object object);
<T> T deserialize(byte[] bytes, Class<T> clazz); }
|
2.JDK序列化器
JDK序列化机制简单易用,只需实现Serializable
接口,能够自动处理包含多个对象引用、循环引用等复杂对象图的序列化和反序列化,无需开发人员进行特殊处理,同时保留了对象类型信息,反序列化时能够准确地恢复对象的类型。
但是,JDK序列化产生的字节流相对较大
,这不仅影响性能,还增加了存储和网络传输的负担。更重要的是,JDK序列化存在安全风险
,恶意的序列化数据可能导致安全漏洞,例如执行任意代码。因此我们绝对不能使用JDK序列化方式!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public class JdkSerializer implements Serializer { private static final Logger log = LoggerFactory.getLogger(JdkSerializer.class);
@Override public byte[] serialize(Object object) { if (object == null) { log.error("序列化对象不能为空"); throw new SerializeException("序列化对象不能为空"); } try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos)) { oos.writeObject(object); byte[] result = baos.toByteArray(); log.info("JDK序列化操作完成,字节数:{}", result.length); return result; } catch (IOException e) { log.error("序列化过程发生异常:{}", object); throw new SerializeException(e); } }
@Override public <T> T deserialize(byte[] bytes, Class<T> clazz) { if (bytes == null || clazz == null) { log.error("反序列化参数不能为空"); throw new SerializeException("反序列化参数不能为空"); } try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes); ObjectInputStream ois = new ObjectInputStream(bais)) { log.info("JDK反序列化操作完成"); return (T) ois.readObject(); } catch (IOException | ClassNotFoundException e) { log.error("反序列化过程发生异常:{}", clazz); throw new SerializeException(e); } } }
|
我们可以看到,JDK 自带的序列化机制对使用者而言是非常简单的。序列化具体的实现是由 ObjectOutputStream
完成的,而反序列化的具体实现是由 ObjectInputStream
完成的。那么 JDK 的序列化过程是怎样完成的呢?我们看下下面这张图:
序列化过程就是在读取对象数据的时候,不断加入一些特殊分隔符
,这些特殊分隔符用于在反序列化过程中截断用。
头部数据
用来声明序列化协议
、序列化版本
,用于高低版本向后兼容
对象数据
主要包括类名、签名、属性名、属性类型及属性值,当然还有开头结尾等数据,除了属性值属于真正的对象值,其他都是为了反序列化用的元数据
- 存在对象引用、继承的情况下,就是递归遍历“写对象”逻辑
实际上任何一种序列化框架,核心思想就是设计一种序列化协议,将对象的类型、属性类型、属性值一一按照固定的格式写到二进制字节流中来完成序列化,再按照固定的格式一一读出对象的类型、属性类型、属性值,通过这些信息重新创建出一个新的对象,来完成反序列化。
3.Hessian序列化器
Hessian使用二进制数据格式
进行序列化和反序列化,相较于基于文本的序列化协议(如XML、JSON),它更加紧凑
,减少了传输数据的大小,提高了网络传输效率。Hessian设计之初就考虑到了跨语言的支持
,使得不同语言编写的系统之间可以轻松地进行数据交换和通信。Hessian的API简单易用,开发人员可以快速地实现数据的序列化和反序列化,无需复杂的配置。
相对于JDK、JSON,由于Hessian更加高效,生成的字节数更小,有非常好的兼容性和稳定性,所以Hessian更加适合作为RPC框架远程通信的序列化协议。但Hessian本身也有问题,官方版本对Java里面一些常见对象的类型不支持,比如:
- Linked系列,LinkedHashMap、LinkedHashSet等,但是可以通过扩展CollectionDeserializer类修复;
- Locale类,可以通过扩展ContextSerializerFactory类修复;
- Byte/Short反序列化的时候变成Integer。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| public class HessianSerializer implements Serializer { private static final Logger log = LoggerFactory.getLogger(HessianSerializer.class);
@Override public byte[] serialize(Object object) { if (object == null) { log.error("序列化对象不能为空"); throw new SerializeException("序列化对象不能为空"); } Hessian2Output ho = null; try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { ho = new Hessian2Output(baos); ho.writeObject(object); ho.flush(); byte[] result = baos.toByteArray(); log.info("Hessian序列化操作完成,字节数:{}", result.length); return result; } catch (IOException e) { log.error("Hessian序列化过程发生异常:{}", object); throw new SerializeException(e); } finally { try { if (ho != null) { ho.close(); } } catch (IOException e) { } } }
@Override public <T> T deserialize(byte[] bytes, Class<T> clazz) { if (bytes == null || clazz == null) { log.error("反序列化参数不能为空"); throw new SerializeException("反序列化参数不能为空"); } Hessian2Input hi = null; try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) { hi = new Hessian2Input(bais); log.info("Hessian反序列化操作完成"); return (T) hi.readObject(); } catch (IOException e) { log.error("Hessian反序列化过程发生异常:{}", clazz); throw new SerializeException(e); } finally { try { if (hi != null) { hi.close(); } } catch (IOException e) { } } } }
|
4.Protostuff序列化器
Protostuff提供了与ProtoBuf
相当的性能和效率,特别是在序列化和反序列化大量数据时,不过不像ProtoBuf那样需要预先定义.proto
文件,Protostuff可以直接对Java对象
进行序列化和反序列化,简化了使用流程。相比ProtoBuf,Protostuff更容易在现有Java项目中集成和使用,减少了学习和开发的成本。
总体而言,Protostuff是一个高效、灵活的序列化框架,适合那些希望利用ProtoBuf性能但又不想被.proto文件约束的Java项目。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class ProtostuffSerializer implements Serializer { private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
@Override public byte[] serialize(Object object) { Class<?> clazz = object.getClass(); Schema schema = RuntimeSchema.getSchema(clazz); byte[] bytes; try { bytes = ProtostuffIOUtil.toByteArray(object, schema, BUFFER); } finally { BUFFER.clear(); } return bytes; }
@Override public <T> T deserialize(byte[] bytes, Class<T> clazz) { Schema<T> schema = RuntimeSchema.getSchema(clazz); T obj = schema.newMessage(); ProtostuffIOUtil.mergeFrom(bytes, obj, schema); return obj; } }
|
5.Kryo序列化器
Kryo是一个高性能
的序列化/反序列化工具,由于其变长存储特性并使用了字节码生成机制,拥有较高的运行速度和较小的字节码体积
。另外,Kryo已经是一种非常成熟的序列化实现了,已经在Twitter、Groupon、Yahoo以及多个著名开源项目(如 Hive、Storm)中广泛的使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public class KryoSerializer implements Serializer { private static final Logger log = LoggerFactory.getLogger(KryoSerializer.class);
private static final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> { Kryo kryo = new Kryo(); kryo.register(OpenrpcRequest.class); kryo.register(OpenrpcResponse.class); kryo.setReferences(true); kryo.setRegistrationRequired(false); return kryo; });
@Override public byte[] serialize(Object object) { try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); Output output = new Output(baos)) { Kryo kryo = kryoThreadLocal.get(); kryo.writeObject(output, object); kryoThreadLocal.remove(); return output.toBytes(); } catch (IOException e) { log.error("Kryo序列化过程发生异常:{}", object); throw new SerializeException(e); } }
@Override public <T> T deserialize(byte[] bytes, Class<T> clazz) { try(ByteArrayInputStream bais = new ByteArrayInputStream(bytes); Input input = new Input(bais)) { Kryo kryo = kryoThreadLocal.get(); T t = kryo.readObject(input, clazz); kryoThreadLocal.remove(); return t; } catch (IOException e) { log.error("Kryo反序列化过程发生异常:{}", clazz); throw new SerializeException(e); } } }
|
6.序列化简单工厂
我们在XML配置文件中可以通过<serializeType type="hessian"/>
自定义配置序列化方案,其实压缩跟序列化类似,我们就不单独说了,不过需要注意一点:
压缩可以自定义配置压缩阈值,因为对于较小的字节数组往往压缩后的体积反而会变大,有些得不偿失,因此我们可以配置压缩阈值如1K,只有当序列化后的字节数组大小超过1K字节后才会压缩,同时压缩其实是牺牲一定的CPU时间以减少网络通信时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public class SerializerFactory { private static final Map<String, ObjectWrapper<Serializer>> SERIALIZER_PROTOCOL_CACHE = new ConcurrentHashMap<>(8); private static final Map<Byte, ObjectWrapper<Serializer>> SERIALIZER_CODE_CACHE = new ConcurrentHashMap<>(8);
static { ObjectWrapper<Serializer> jdkSerializer = new ObjectWrapper<>((byte) 1, "jdk", new JdkSerializer()); ObjectWrapper<Serializer> hessianSerializer = new ObjectWrapper<>((byte) 2, "hessian", new HessianSerializer()); ObjectWrapper<Serializer> kryoSerializer = new ObjectWrapper<>((byte) 3, "kryo", new KryoSerializer()); ObjectWrapper<Serializer> protostuffSerializer = new ObjectWrapper<>((byte) 4, "protostuff", new ProtostuffSerializer()); SERIALIZER_PROTOCOL_CACHE.put("jdk", jdkSerializer); SERIALIZER_PROTOCOL_CACHE.put("hessian", hessianSerializer); SERIALIZER_PROTOCOL_CACHE.put("kryo", kryoSerializer); SERIALIZER_PROTOCOL_CACHE.put("protostuff", protostuffSerializer);
SERIALIZER_CODE_CACHE.put((byte) 1, jdkSerializer); SERIALIZER_CODE_CACHE.put((byte) 2, hessianSerializer); SERIALIZER_CODE_CACHE.put((byte) 3, kryoSerializer); SERIALIZER_CODE_CACHE.put((byte) 4, protostuffSerializer); }
public static ObjectWrapper<Serializer> getSerializer(String protocol) { ObjectWrapper<Serializer> serializerWrapper = SERIALIZER_PROTOCOL_CACHE.get(protocol); if (serializerWrapper == null) { throw new IllegalArgumentException("序列化类型不支持"); } return serializerWrapper; }
public static ObjectWrapper<Serializer> getSerializer(byte code) { ObjectWrapper<Serializer> serializerWrapper = SERIALIZER_CODE_CACHE.get(code); if (serializerWrapper == null) { throw new IllegalArgumentException("序列化类型不支持"); } return serializerWrapper; }
public static void addSerializer(ObjectWrapper<Serializer> serializerObjectWrapper) { SERIALIZER_PROTOCOL_CACHE.put(serializerObjectWrapper.getName(), serializerObjectWrapper); SERIALIZER_CODE_CACHE.put(serializerObjectWrapper.getCode(), serializerObjectWrapper); } }
|
7.选择序列化协议
在序列化的选择上,与序列化协议的效率、性能、序列化协议后的体积相比,其通用性
和兼容性
的优先级会更高,因为他是会直接关系到服务调用的稳定性和可用率的,对于服务的性能来说,服务的可靠性显然更加重要。我们更加看重这种序列化协议在版本升级后的兼容性是否很好,是否支持更多的对象类型,是否是跨平台、跨语言的,是否有很多人已经用过并且踩过了很多的坑,其次我们才会去考虑性能、效率和空间开销。
还有一点我要特别强调。除了序列化协议的通用性和兼容性,序列化协议的安全性也是非常重要的一个参考因素,甚至应该放在第一位去考虑。以JDK原生序列化为例,它就存在漏洞。如果序列化存在安全漏洞,那么线上的服务就很可能被入侵。
综合上面几个参考因素,现在我们再来总结一下这几个序列化协议。我们首选的还是Hessian
与Protobuf
,因为他们在性能、时间开销、空间开销、通用性、兼容性和安全性上,都满足了我们的要求。其中Hessian
在使用上更加方便,在对象的兼容性上更好;Protobuf则更加高效,通用性上更有优势。