# 前言
最近有个需求是把 PostgreSQL(百万行数据规模)的数据准实时同步到 Doris。问了一圈 ChatGPT,ChatGPT 有提到 Flink 和 Apache SeaTunnel。Flink 是鼎鼎有名的 ETL 工具,而 SeaTunnel 之前没听说过。多问了 ChatGPT 几轮,了解到 SeaTunnel 是一个轻量化、上手快的工具。体验了一下,果然非常省事,开发效率很高,对于这个量级的同步需求真香。
Apache SeaTunnel 官网 (opens new window)。这个名字也很有意思,SeaTunnel 让人想到海底隧道,就像是在两个数据库之间的建立的海底光缆一样。
体验下来,它的优点有(部分特性需要目标数据库支持,不同数据库支持可能有差异):
- 配置简单:60 行配置一个 Postgres 库的 8 张表同步到 Doris
- 运行简单:可以独立服务模式运行,无需依赖计算集群
- 超高性能
- 支持通配符指定目标表(写 Doris):无需为每张数据源表指定目标表的名字和配置
- 自动建表(写 Doris):无需手动建表,但后续表新增列需要手动同步到 Doris
- 基于主键自动 upsert(写 Doris):保证写目标库的幂等性
- 自动识别列名:源数据
SELECT *
会自动识别列名,自动 upsert 到对应列,无需手动配置字段映射 - 支持使用 WHERE 条件实现增量同步
- 支持使用 CDC 增量同步
体验下来一天就能上手,真的非常简单。
# 目前缺点
目前体验下来,唯一的缺点是报错信息不太友好。
使用 ChatGPT 给的配置文件跑,报的错很模糊,只知道是解析配置的时候报错了:Caused by: org.apache.seatunnel.shade.com.typesafe.config.ConfigException$NotResolved: called unwrapped() on value with unresolved substitutions, need to Config#resolve() first, see API docs
。对比官方文档才发现是因为 ChatGPT 给的是旧版的配置。
又比如 Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.util.Map
这个报错也没有提示报错的具体位置,丢给 ChatGPT 才知道配置里的数组不能有 trailing comma。
如果报错了,需要自己对照配置文件和官方文档,一点一点地 debug,我的大部分时间也是花在了这里。
另外 2.3.9 版本似乎不支持直接读取环境变量,需要显式地从命令行传参 --variable PG_url=jdbc:postgresql://localhost:5432/postgres
,这个也是需要注意的。
# 在 Docker 中运行
由于是单体服务,也没有依赖什么存储,一个 Docker 就能跑起来。只需要把配置文件挂进去即可。
docker run -it --rm \
-v $PWD/seatunnel/config.conf:/config/config.conf:ro \
apache/seatunnel:2.3.9 \
./bin/seatunnel.sh -m local -c /config/config.conf
# 配置文件
配置文件是 SeaTunnel 使用的核心。它分为了三个部分:env
指定全局配置,source
指定源数据库,sink
指定目标数据库。
env {
job.mode = "BATCH"
parallelism = 4
}
source {
Jdbc {
url="jdbc:postgresql://localhost:5432/postgres"
user="iDm82k6Q0Tq+wUprWnPsLQ=="
driver="org.postgresql.Driver"
password="iDm82k6Q0Tq+wUprWnPsLQ=="
"table_list"=[ # 只需要指定表名,不需要指定 schema
{
"table_path"="demo.public.AllDataType_1"
},
{
"table_path"="demo.public.alldatatype"
}
]
# 如果需要增量同步,可以在这里指定条件,具体条件需要在外部维护,通过变量传进来
# where_condition= "where id > 100"
split.size = 10000
# split.even-distribution.factor.upper-bound = 100
# split.even-distribution.factor.lower-bound = 0.05
# split.sample-sharding.threshold = 1000
# split.inverse-sampling.rate = 1000
}
}
sink {
Doris {
fenodes = "doris_cdc_e2e:8030"
username = root
password = ""
database = "${schema_name}"
table = "${table_name}" # 使用通配符指定表名
sink.label-prefix = "sync_task"
sink.enable-2pc = "true"
sink.enable-delete = "true"
doris.config {
format = "json"
read_json_by_line = "true"
}
}
}
SeaTunnel 也支持 transform(毕竟是 ETL 工具),支持使用内置的 transform,也可以用 Java 开发 transform 插件来自定义 transform 逻辑。不过单纯的数据同步不用涉及到这一块。
每个部分对着官方文档配置即可。Source - Postgres (opens new window) 和 Sink - Doris (opens new window)。
# 性能测试
目前生产环境下有两张大表,都是 80w 行,还有六张小表。目前配置的是全量同步,在只有少量数据更新的场景下,每次 SeaTunnel 的运行时间在 20s,相当快。这个速度也不用配置增量同步了,直接五分钟跑一次同步就完事了,简单粗暴。
千万级别的性能测试、百万条插入、百万条更新操作 TBD