✅ Doris2.0.0-alpha1使用Routine Load导入Kafka中的Json对象数据出现解析错误问题,求助

使用版本:2.0.0-alpha1 ( latest ) 2023-05-06
服务器配置:Centos7.6 64C 128G 部署1FE 1BE
Kafka版本:2.8.0(2节点)

问题出现步骤:
1、建表:

create table if not exists mlink.table1 (
	id CHAR(50) COMMENT "id",
    dt DATETIMEV2 COMMENT "数据时间",
    ceid STRING ,
    等等。大概75个String 字段
    imtm DATETIMEV2
    ) 
	UNIQUE KEY (`id`,`dt`) 
	PARTITION BY RANGE (`dt`)() 
	DISTRIBUTED BY HASH(`id`) BUCKETS AUTO 
	ROPERTIES (
 "dynamic_partition.enable" = "true",
 "dynamic_partition.time_unit" = "DAY",
 "dynamic_partition.time_zone" = "Asia/Shanghai",
 "dynamic_partition.start" = "-365",
 "dynamic_partition.end" = "30",
 "dynamic_partition.buckets" = "3",
 "dynamic_partition.prefix" = "p",
 "dynamic_partition.create_history_partition" = "true",
 "replication_num" = "1",
 "dynamic_schema" = "true",
 "enable_unique_key_merge_on_write" = "true"
)

2、Routine Load任务语句

CREATE
ROUTINE LOAD mlink.table1_job ON table1
COLUMNS (
    id,
    dt=ifnull(from_unixtime(catm),now()),
省略了75个字段,
    ceid,
    imtm
)
PROPERTIES
(
  "desired_concurrent_number"="3",
  "max_batch_interval"="5",
  "max_batch_rows"="300000",
  "max_batch_size"="209715200",
  "strict_mode"="false",
  "format"="json"
)
FROM KAFKA
(
  "kafka_broker_list"="192.168.1.100:9092,192.168.1.101:9092",
  "kafka_topic"="doris_person_face",
  "property.kafka_default_offsets"="OFFSET_BEGINNING",
  "property.group.id"="doris_person_person",
  "property.client.id"="doris_person_person"
);


3、Kafka内Json对象样例

{
  "ceid": "1123123123",
  "id": "123123123",
  "aaa": "省略75个字段",
  "mipa_1": "http://10.30.30.43:8083/resource/image?repoName\\u003dbodypasserby\\u0026customId\\u003d05420347001F201712230220360208081616307130126207\\u0026imgFlag\\u003d2",
  "mipa_2": "http://10.30.30.43:8083/resource/image?repoName\\u003dbodypasserby\\u0026customId\\u003d05420347001F201712230220360208081616307130126207\\u0026imgFlag\\u003d2",
  "carryid": ""
}

4、报错信息:

Reason: Parse json data for JsonDoc failed. error info: [DATA_QUALITY_ERROR]Failed to parse object 
{"ceid":"1",省略了75个字段,"carryid":"". src line [{"ceid":"1",省略了75个字段,"carryid":""]; 

根据报错信息初步判断,像是读取Json文件时,对文件结束符的判断存在误判?
执行导入任务时,Kafka内的数量级百万左右,分区数有6个,大概是读取到30w+时报错。
重复建表和建导入任务多次,每次错误都位置不同。(像是并发处理字符串时造成的问题?)

我重复检查了Kafka内的数据,可以保证Kafka内的Json对象格式是完全正确的

麻烦各位大佬帮忙排查看看

建议采用下图的方式,指定kafka中几个字段调试一下,以尽快排查问题

image
目测,截图中红圈内的内容有问题;