# 前言

最近有个需求是把 PostgreSQL(百万行数据规模)的数据准实时同步到 Doris。问了一圈 ChatGPT,ChatGPT 有提到 Flink 和 Apache SeaTunnel。Flink 是鼎鼎有名的 ETL 工具,而 SeaTunnel 之前没听说过。多问了 ChatGPT 几轮,了解到 SeaTunnel 是一个轻量化、上手快的工具。体验了一下,果然非常省事,开发效率很高,对于这个量级的同步需求真香。

Apache SeaTunnel 官网 (opens new window)。这个名字也很有意思,SeaTunnel 让人想到海底隧道,就像是在两个数据库之间的建立的海底光缆一样。

体验下来,它的优点有(部分特性需要目标数据库支持,不同数据库支持可能有差异):

  • 配置简单:60 行配置一个 Postgres 库的 8 张表同步到 Doris
  • 运行简单:可以独立服务模式运行,无需依赖计算集群
  • 超高性能
  • 支持通配符指定目标表(写 Doris):无需为每张数据源表指定目标表的名字和配置
  • 自动建表(写 Doris):无需手动建表,但后续表新增列需要手动同步到 Doris
  • 基于主键自动 upsert(写 Doris):保证写目标库的幂等性
  • 自动识别列名:源数据 SELECT * 会自动识别列名,自动 upsert 到对应列,无需手动配置字段映射
  • 支持使用 WHERE 条件实现增量同步
  • 支持使用 CDC 增量同步

体验下来一天就能上手,真的非常简单。

# 目前缺点

目前体验下来,唯一的缺点是报错信息不太友好。

使用 ChatGPT 给的配置文件跑,报的错很模糊,只知道是解析配置的时候报错了:Caused by: org.apache.seatunnel.shade.com.typesafe.config.ConfigException$NotResolved: called unwrapped() on value with unresolved substitutions, need to Config#resolve() first, see API docs。对比官方文档才发现是因为 ChatGPT 给的是旧版的配置。

又比如 Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.util.Map 这个报错也没有提示报错的具体位置,丢给 ChatGPT 才知道配置里的数组不能有 trailing comma。

如果报错了,需要自己对照配置文件和官方文档,一点一点地 debug,我的大部分时间也是花在了这里。


另外 2.3.9 版本似乎不支持直接读取环境变量,需要显式地从命令行传参 --variable PG_url=jdbc:postgresql://localhost:5432/postgres,这个也是需要注意的。

# 在 Docker 中运行

由于是单体服务,也没有依赖什么存储,一个 Docker 就能跑起来。只需要把配置文件挂进去即可。

docker run -it --rm \
    -v $PWD/seatunnel/config.conf:/config/config.conf:ro \
    apache/seatunnel:2.3.9 \
    ./bin/seatunnel.sh -m local -c /config/config.conf

# 配置文件

配置文件是 SeaTunnel 使用的核心。它分为了三个部分:env 指定全局配置,source 指定源数据库,sink 指定目标数据库。

env {
  job.mode = "BATCH"
  parallelism = 4
}

source {
  Jdbc {
    url="jdbc:postgresql://localhost:5432/postgres"
    user="iDm82k6Q0Tq+wUprWnPsLQ=="
    driver="org.postgresql.Driver"
    password="iDm82k6Q0Tq+wUprWnPsLQ=="
    "table_list"=[  # 只需要指定表名,不需要指定 schema
      {
        "table_path"="demo.public.AllDataType_1"
      },
      {
        "table_path"="demo.public.alldatatype"
      }
    ]
    # 如果需要增量同步,可以在这里指定条件,具体条件需要在外部维护,通过变量传进来
    # where_condition= "where id > 100"
    split.size = 10000
    # split.even-distribution.factor.upper-bound = 100
    # split.even-distribution.factor.lower-bound = 0.05
    # split.sample-sharding.threshold = 1000
    # split.inverse-sampling.rate = 1000
  }
}

sink {
  Doris {
    fenodes = "doris_cdc_e2e:8030"
    username = root
    password = ""
    database = "${schema_name}"
    table = "${table_name}"     # 使用通配符指定表名
    sink.label-prefix = "sync_task"
    sink.enable-2pc = "true"
    sink.enable-delete = "true"
    doris.config {
      format = "json"
      read_json_by_line = "true"
    }
  }
}

SeaTunnel 也支持 transform(毕竟是 ETL 工具),支持使用内置的 transform,也可以用 Java 开发 transform 插件来自定义 transform 逻辑。不过单纯的数据同步不用涉及到这一块。

每个部分对着官方文档配置即可。Source - Postgres (opens new window)Sink - Doris (opens new window)

# 性能测试

目前生产环境下有两张大表,都是 80w 行,还有六张小表。目前配置的是全量同步,在只有少量数据更新的场景下,每次 SeaTunnel 的运行时间在 20s,相当快。这个速度也不用配置增量同步了,直接五分钟跑一次同步就完事了,简单粗暴。

千万级别的性能测试、百万条插入、百万条更新操作 TBD