doris-flink-connector 同步 postgre 带分区的表数据到 doris 失败的问题

最近我在尝试用 Flink 生态来进行数据同步,想要把 postgre 数据库的数据同步到 Doris 中。

doris-flink-connector-1.5
我通过源码打包了 jar

打包关键参数:

        <revision>1.5.0-SNAPSHOT</revision>
        <flink.version>1.17.1</flink.version>
        <flink.minor.version>1.17</flink.minor.version>
        <flink.sql.cdc.version>2.4.1</flink.sql.cdc.version>

在下列环境:
Doris → doris-2.0.1-rc04
Flink → 1.17.1
postgre → 15.3

实践发现它能完成 postgre → Doris 的操作,但仅限于 postgre 中未进行分区的表。

但如果 t_user 有按记录创建时间进行分区,存在 t_user_v202301、t_user_v202302、t_user_v202303 ……的子表,该如何操作才能时 postgre 分区表数据进入同一张 Doris 表 t_user 中呢?

bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=3 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.17-1.5.0-SNAPSHOT.jar \
postgres-sync-database \
--database ssdd \
--postgres-conf hostname=****** \
--postgres-conf port=*** \
--postgres-conf username=*** \
--postgres-conf password="***" \
--postgres-conf database-name=*** \
--postgres-conf schema-name=*** \
--postgres-conf slot.name=test \
--postgres-conf decoding.plugin.name=pgoutput \
--including-tables "t_user.*" \
--sink-conf fenodes=**** \
--sink-conf username=*** \
--sink-conf password=*** \
--sink-conf sink.enable-delete=false \
--sink-conf jdbc-url=jdbc:mysql://**** \
--sink-conf sink.label-prefix=label-multiple-16-T-1022 \
--table-conf replication_num=1

注册 Flink 任务示例:这样能同步 t_user 分区子表,但子表数据不会进入 Doris 的 t_user 表,而是原样在 Doris 中新建了 t_user_v202301 之类的表

正库同步不支持分区表入一张表把

可以参考这个pr:[feature] multiple tables to one by codeAntg · Pull Request #208 · apache/doris-flink-connector · GitHub

1 个赞

试验了下最新效果,数据量小一点的表(300w 行),单表同步已同步成功,新功能确实管用。

但我把同步目标表范围扩大到 10 张表时,未知原因 Flink 管理后台看见了这样的错误 (job 是起的 3 个并行-- Parallelism ,checkpointing.interval=3s,分布在了 2 个 Flink worker 节点进行数据同步)

Bytes Received 合计大概 3~40 GB (还没开始写 Doris 的操作),然后未知原因 job 会 fail,然后过一段时间,清空 Bytes Received 数据,重头开始又 fail 循环,直到我 cancel job

同步范围缩小到行数大一点的单表时,只启动了一个 job 并行,报错如下

java.io.IOException: Could not perform checkpoint 1 for operator {测试用的表名}: Writer -> {测试用的表名}: Committer (1/1)#0.
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1256)
	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
	at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
	at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.doris.flink.exception.DorisRuntimeException: tabel {} stream load error: {测试用的库名}.{测试用的表名}, see more in [INTERNAL_ERROR]cancelled: [INTERNAL_ERROR]too many filtered rows
	at org.apache.doris.flink.sink.writer.DorisWriter.prepareCommit(DorisWriter.java:205)
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:196)
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:166)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:321)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1299)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1287)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1244)

这个看着是checkpoint失败了,可以查看下taskmanager里面的报错信息

还真有。。。

see more in [INTERNAL_ERROR]cancelled: [INTERNAL_ERROR]too many filtered rows

Reason: column(测试用的字段名) values is null while columns is not nullable. src line [2023-06-25 19:28:35.000000 NULL 123456 NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL a 2023-06-25 19:28:35.000000 dd ee  NULL NULL NULL]; 

不过我检查了一下 pg 多个表里有这个字段都是非空的。会是字段匹配错位吗?如果说有 pg 、Doris 中同名的表存在不同的字段会发生什么吗?

另一个同步方案 doris catalog,远程表 Select into 操作其实是没问题的。这个连接器是怎么确定表字段顺序的呢? Doris 因为分区分桶,某些字段要作为 Key 列,在Doris 里字段顺序也是和 pg 不同的