LakeSoul 实习

Post on:2024-7-20|Last edited: 2025-1-15|
type
status
date
slug
summary
tags
category
icon
password

Substrait

Substrait 是一种用于描述结构化数据计算操作的格式
避免每个系统都需要在所有其他系统之间创建通信方法——每个系统都只支持摄取和产生 Substrait,它就会立即成为更大生态系统的一部分。
用到的库: substrait-java 可以很方便的的构建 substrait proto

使用substriait 替换了 Flink 中的 parquet filter

由于 parquet filterapi 没有提供一个统一的表达式类型, 所以很多地方都得手写.

实现思路

flink 中 expression 采用了 visitor 设计模式, 只要实现 ExpresssionVistor<Expression> 就可以将 ResolvedExpression 转换为 SubstraitExpression, 由于 substrait是基于逻辑计划, 所以在发送给 native-io 时还需要构造一个 dummy plan, native-io 侧将 substrait plan中的 Expression 抽取出来并且转换为 datafusion expr
ex:

plain

select * from type_info where id = 2;
Plain text

plain

Plan { version: None, extension_uris: [ SimpleExtensionUri { extension_uri_anchor: 1, uri: "/functions_comparison.yaml", }, ], extensions: [ SimpleExtensionDeclaration { mapping_type: Some( ExtensionFunction( ExtensionFunction { extension_uri_reference: 1, function_anchor: 0, name: "equal:any_any", }, ), ), }, ], relations: [ PlanRel { rel_type: Some( Root( RelRoot { input: Some( Rel { rel_type: Some( Read( ReadRel { common: Some( RelCommon { hint: None, advanced_extension: None, emit_kind: Some( Direct( Direct, ), ), }, ), base_schema: Some( NamedStruct { names: [], r#struct: Some( Struct { types: [], type_variation_reference: 0, nullability: Required, }, ), }, ), filter: Some( Expression { rex_type: Some( ScalarFunction( ScalarFunction { function_reference: 0, arguments: [ FunctionArgument { arg_type: Some( Value( Expression { rex_type: Some( Selection( FieldReference { reference_type: Some( DirectReference( ReferenceSegment { reference_type: Some( MapKey( MapKey { map_key: Some( Literal { nullable: true, type_variation_reference: 0, literal_type: Some( String( "id", ), ), }, ), child: None, }, ), ), }, ), ), root_type: Some( RootReference( RootReference, ), ), }, ), ), }, ), ), }, FunctionArgument { arg_type: Some( Value( Expression { rex_type: Some( Literal( Literal { nullable: false, type_variation_reference: 0, literal_type: Some( I32( 2, ), ), }, ), ), }, ), ), }, ], options: [], output_type: Some( Type { kind: Some( Bool( Boolean { type_variation_reference: 0, nullability: Nullable, }, ), ), }, ), args: [], }, ), ), }, ), best_effort_filter: None, projection: None, advanced_extension: None, read_type: Some( NamedTable( NamedTable { names: [ "type_info", ], advanced_extension: None, }, ), ), }, ), ), }, ), names: [], }, ), ), }, ], advanced_extensions: None, expected_type_urls: [], }
Plain text

plain

BinaryExpr( BinaryExpr { left: Column( Column { relation: Some( Bare { table: "?table?", }, ), name: "id", }, ), op: Eq, right: Literal( Int32(2), ), }, ), ]
Plain text

遇到的困难

  1. 对于 fieldRef substrait 默认是使用field index进行序列化, 而 lakesoul schema中由于会有 row-kind 的存在和未来的扩展性没有选择将 filed index + 1, 使用 substrait mapkey 序列化 filed name , native-io 中通过 schema 和 name 拿到具体的 field
  1. type coerion, 存在类型转换时flink 自己本身在优化阶段时会处理,而暴露给谓词下推中的表达式是原始的, 所以开始 datafusion 会报错, debug 之后发现 lakesoul-reader中默认关闭了类型转换的 optimize rule 打开即可
  1. debug 由于跨越了语言, ide的 debugger 不能打断点, 尝试过 gdb remote 但是失败了, 只能靠原始的 print,但是 print 在提交的时候还需要删除,所以使用了 log-rs,并且在 lakesoul-io 里默认初始化了 logger, 但是目前并没有具体定义日志格式和日志实现上选择了 env_logger, 可能有更好的选择

目前不支持的类型

bytes : 在 flink sql 中 sql 不会写
varbinary : 在 flink sql 中 sql 不会写
timestampltz: LocalTimeZone 获取不到

和 parquetfilter 不一致的地方

  1. select * from type_info where class=CAST(5 as TINYINT) parquetfilter 会 panic 由于 byte cannot be cast to Integer
  1. smallint
  1. select * from type_info where createTime =TO_TIMESTAMP('1995-10-10 13:10:20') parquetfilter 会是 null
  1. select * from type_info where male=true parquetfilter 会是 null
Spark刷题思路
Loading...