Flink cdc +doris生产遇到的问题汇总-持续更新
问题:
该数据库表的主键字段被定义为字符串数据类型,在cdc读取时自己进行了split操作, 导致checkpoint长时间处于等待执行状态, 查看日志输出记录为INFO级别

checkpoint一直卡在那里

程序一直等待中:

原因:处理全部数据量的chunlSplitter耗时过长;寻求在线解答或解决方案。
知识备份:
阿里云Flink CDC文档地址:
MySQL中的增量数据复制(CDC)源表 - 实时计算框架(Flink)版本 - 阿里云平台
cdc参数:
WITH参数
| 参数 | 说明 | 是否必填 | 数据类型 | 备注 |
|---|---|---|---|---|
| connector | 源表类型。 | 是 | STRING | 可以填写为mysql-cdc或者mysql,二者等价。 |
| hostname | MySQL数据库的IP地址或者Hostname。 | 是 | STRING | 无。 |
| username | MySQL数据库服务的用户名。 | 是 | STRING | 无。 |
| password | MySQL数据库服务的密码。 | 是 | STRING | 无。 |
| database-name | MySQL数据库名称。 | 是 | STRING | 数据库名称支持正则表达式以读取多个数据库的数据。 |
| table-name | MySQL表名。 | 是 | STRING | 表名支持正则表达式以读取多个表的数据。 |
| port | MySQL数据库服务的端口号。 | 否 | INTEGER | 默认值为3306。 |
| server-id | 数据库客户端的一个数字 ID。 | 否 | STRING | 该ID必须是MySQL集群中全局唯一的。建议针对同一个数据库的每个作业都设置一个不同的ID。默认会随机生成一个5400~6400的值。 该参数也支持ID范围的格式,例如5400-5408。在开启增量读取模式时支持多并发读取,此时推荐设定为ID范围,使得每个并发使用不同的ID。 |
该配置项指定是否启用增量快照功能。\n\n其默认值为否,并且类型为布尔值类型(BOOLEAN)。\n\n该配置项在系统启动时默认处于开启状态。\n\n其核心作用是快速获取当前完整数据集的快照,并且相较于传统快照方法具有显著优势。\n\n相比传统的快速复制方法和完整数据复制方式等传统方案而言,该机制具备诸多优势
- 读取全量数据时,Source能够实现同时完成数据的并行处理。
- 该模块具备分片级别检查点功能。
- 在无需共享锁的情况下,默认会通过flush表后获得临时锁来实现数据的一致性管理。
如果您希望Source支持并发读取,每个并发的Reader需要有一个唯一的服务器ID,因此server-id必须是5400-6400这样的范围,并且范围必须大于等于并发数。 |
| scan.incremental.snapshot.chunk.size | 表的chunk的大小(行数)。 | 否 | Integer | 默认值为8096。当开启增量快照读取时,表会被切分成多个chunk读取。在读完chunk的数据之前,chunk的数据会先缓存在内存中,因此chunk 太大,可能导致内存OOM。chunk越小,故障恢复的粒度也越小,但也会降低吞吐。 |
|---|
| scan.startup.mode| 消费数据时的启动模式。| 否| STRING| 参数取值如下:
- default:在首次运行时,默认情况下系统会先对完整的过去数据集进行扫描,并随后获取最新的日志记录。
- latest_version:在首次运行时,默认情况下系统将直接从当前Binlog文件的最后一段开始读取日志记录,并仅提取自上一次连接启动之后的所有更新内容。
。 |
|server-time-zone|数据库在使用的会话时区。|否|STRING|例如Asia/Shanghai,该参数控制了MySQL中的TIMESTAMP类型如何转成STRING类型。更多信息请参见Debezium时间类型。|
| debezium.min.row.count.to.stream.results | 当一张表的数据量达到此阈值时,系统将采用分批读取策略. | 否 | INTEGER | 其默认设置为1000条. | 为了高效处理MySQL数据,Flink采用了以下策略:
- 全量read: 主要是将全部data一次性加载到memory中. 优势在于运行速度快, 但潜在问题在于会导致较大的memory占用. 如果原始data规模特别大, 可能会导致out-of-memory(oom)error.
- 分批read: 采用多轮调用的方式逐步加载data直至完成. 这种方法能够处理大数据量而不至于导致oom问题, 但其disadvantage在于加载速度相对而言较慢.
版本:
Flink版本 1.13
Flink cdc版本 2.1.1
场景说明:
使用flink cdc stream api 读取mysql整库数据直接写入doris
约100GB的数据量左右,在几十个表格中存在大小不一的情况;该系统支持较多数量的字段,并且每个字段的数据类型较为复杂;无论是字段数量较少还是较多的情况都被系统涵盖,并且涵盖了各种情况的可能性和可能性分布情况。
出现情况:
任务运行一段时间之后挂掉,出现问题:
以下是改写后的内容
问题分析:
看日志,监控等等,没有发现是代码报错,任务自动重启重试也是失败。
- 首先我采用checkpoint进行任务数据恢复测试,在运行一段时间后仍然遇到相同错误。
- 检查是否是某个表的数据出现问题,在清空该表的数据后再次尝试使用checkpoint进行数据恢复,在运行一段时间后仍然遇到相同错误,并发现规律是较大的表会导致问题而较小的表的数据会被cdc读取库按单个表格读取。
第三步中可能出现的问题主要是资源不足问题,请尝试优化Flink任务的资源配置。(在测试环境中输入的数据量达到100G的情况下,现有资源有限)
问题搜索:
在flink cdc社区搜索关键字:
Indication shows a critical failure occurred, triggering the shutdown of the coordinator executor thread.
地址:
This query signifies an occurrence of a critical issue, which has resulted in the shutdown of the coordinator executor thread.


遂加大jobManager和 taskManger内存(扩大4倍),目前任务正在运行中
问题2:
读取离线数据完成之后 yarn任务自己死掉了。
2022-02-14 12:34:53,207 INFO com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator [] - Assign split MySqlBinlogSplit{splitId='binlog-split', offset={ts_sec=0, file=mysql-bin.006360, pos=93487577, gtids=bcd981b2-d261-11e9-9c67-00163e068674:1-18300905, row=0, event=0}, endOffset={ts_sec=0, file=, pos=-9223372036854775808, row=0, event=0}} to subtask 0
2022-02-14 12:34:58,680 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: dataSourceStream -> processStream (1/3) (e6807bd2ac2a982054dd3bb62006a462) switched from RUNNING to FAILED on container_e05_1641803753156_0111_01_000002 @ prod-qd-ct7-cdh-data-node02 (dataPort=1818).
java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-dist_2.11-1.13.0.jar:1.13.0]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
Caused by: java.lang.RuntimeException: SplitFetcher thread 6349 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at java.util.concurrent.ExecutorsRunnableAdapter.call(Executors.java:511) ~[?:1.8.0_181] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_181] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_181] at java.util.concurrent.ThreadPoolExecutorWorker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_181]
... 1 more
Caused by: java.lang.IllegalStateException: The connector is trying to read binlog starting at Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1644813298581,db=,server_id=0,file=mysql-bin.006360,pos=93487577,row=0}, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.
at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.loadStartingOffsetState(StatefulTaskContext.java:179) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:113) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:93) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:65) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.checkSplitOrStartNext(MySqlSplitReader.java:147) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:69) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101) ~[flink-cdc-1.0-SNAPSHOT-jar-with-dependencies-all.jar:?]
at java.util.concurrent.ExecutorsRunnableAdapter.call(Executors.java:511) ~[?:1.8.0_181] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_181] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_181] at java.util.concurrent.ThreadPoolExecutorWorker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_181]
... 1 more
2022-02-14 12:34:58,715 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Removing registered reader after failure for subtask 0 of source Source: dataSourceStream -> processStream.
2022-02-14 12:34:58,715 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task 4eb3790e2b522ba9fc475405b3a70da8_0.
2022-02-14 12:34:58,716 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 6 tasks should be restarted to recover the failed task 4eb3790e2b522ba9fc475405b3a70da8_0.
2022-02-14 12:34:58,718 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job dev-SingleInstanceData2doris (8082a3ead0f36284e54f4bf28b8a695e) switched from state RUNNING to RESTARTING.
2022-02-14 12:34:58,719 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Removing registered reader after failure for subtask 1 of source Source: dataSourceStream -> processStream.
2022-02-14 12:34:58,719 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: dataSourceStream -> processStream (2/3) (492cacb3c7d5d7cddc270a19a213bad1) switched from RUNNING to CANCELING.
2022-02-14 12:34:58,721 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Removing registered reader after failure for subtask 2 of source Source: dataSourceStream -> processStream.
2022-02-14 12:34:58,721 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: dataSourceStream -> processStream (3/3) (8567979d8c8ac7597ce59e60fc40e519) switched from RUNNING to CANCELING.
2022-02-14 12:34:58,721 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - sinkDoris (3/3) (32d1f168eaa11108c8f723a79654a115) switched from RUNNING to CANCELING.
2022-02-14 12:34:58,721 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - sinkDoris (2/3) (d121c9c53de708a1c1382192501a1229) switched from RUNNING to CANCELING.
2022-02-14 12:34:58,721 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - sinkDoris (1/3) (dd5257ab18fd539e0f05928e5ee64c28) switched from RUNNING to CANCELING.
2022-02-14 12:34:58,726 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: dataSourceStream -> processStream (2/3) (492cacb3c7d5d7cddc270a19a213bad1) switched from CANCELING to CANCELED.
2022-02-14 12:34:58,727 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: dataSourceStream -> processStream (3/3) (8567979d8c8ac7597ce59e60fc40e519) switched from CANCELING to CANCELED.
2022-02-14 12:34:59,365 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - sinkDoris (3/3) (32d1f168eaa11108c8f723a79654a115) switched from CANCELING to CANCELED.
2022-02-14 12:34:59,367 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job 8082a3ead0f36284e54f4bf28b8a695e: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=2}]
2022-02-14 12:34:59,426 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - sinkDoris (1/3) (dd5257ab18fd539e0f05928e5ee64c28) switched from CANCELING to CANCELED.
2022-02-14 12:34:59,426 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job 8082a3ead0f36284e54f4bf28b8a695e: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}]
2022-02-14 12:34:59,569 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - sinkDoris (2/3) (d121c9c53de708a1c1382192501a1229) switched from CANCELING to CANCELED.
2022-02-14 12:34:59,570 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job 8082a3ead0f36284e54f4bf28b8a695e
2022-02-14 12:35:18,727 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job dev-SingleInstanceData2doris (8082a3ead0f36284e54f4bf28b8a695e) switched from state RESTARTING to RUNNING.
2022-02-14 12:35:18,730 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job 8082a3ead0f36284e54f4bf28b8a695e from Checkpoint 93 @ 1644813292042 for 8082a3ead0f36284e54f4bf28b8a695e located at hdfs://nameservice1/checkpoints/flink-1.13.0/cdc/rocksDBStateBackend/8082a3ead0f36284e54f4bf28b8a695e/chk-93.
2022-02-14 12:35:18,730 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master state to restore
2022-02-14 12:35:18,732 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Recovering subtask 2 to checkpoint 93 for source Source: dataSourceStream -> processStream to checkpoint.
2022-02-14 12:35:18,733 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: dataSourceStream -> processStream (1/3) (9975f5c8e492cda22add5c1abd34ba64) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: dataSourceStream -> processStream (2/3) (097d045473dfbc8e980daf2ed5095b96) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: dataSourceStream -> processStream (3/3) (bfeda1f01099ea1fe1656a6f431a56f7) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - sinkDoris (1/3) (41433c56861b77cb145c5e2eabda66ce) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - sinkDoris (2/3) (d88b153f6c0db78c8b461b768d90cb0f) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - sinkDoris (3/3) (eee0927845d4dcff02ecf8a7af2810f8) switched from CREATED to SCHEDULED.
2022-02-14 12:35:18,733 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Recovering subtask 1 to checkpoint 93 for source Source: dataSourceStream -> processStream to checkpoint.
分析:
可能是binlog文件已被删除了,在随后的通过checkpoint savepoint进行恢复的过程中也发现该binlog文件不再存在而导致任务恢复未能成功完成
问题 :
以下是无法完成的任务描述
分析:
遇到了二进制字段 需要单独判断 ,二进制字段放入json报错
}
问题:
cdc读取mysql 写入doris 字段映射处理问题,直接上测试代码吧
更新:
mysql timestmp_test2 timestamp(3)
这个代码转换会报错

所以要用 hutool 工具类去转换。
问题
mysql char字段类型 跟doris 大小不是一对一
mysql char类型 我们映射为doris varchar 发小扩大3倍
原因: 英文跟中文大小不一样
备注:
| mysql | 对应的drois | 如何处理? | 测试结构 | drois | 备注说明,针对E列 |
|---|---|---|---|---|---|
| bigint | bigint | bigint | |||
| bit | #N/A | ? | int | bitmap | |
| blob | #N/A | varchar | boolean | ||
| char | char | char | |||
| date | date | date | |||
| datetime | datetime | datetime | |||
| decimal | decimal | description | |||
| double | double | keyword | |||
| enum | #N/A | ? | varchar | decimal | |
| float | float | double | |||
| int | int | float | |||
| json | #N/A | ? | string | hll | 在doris里面就是TEXT |
| longblob | #N/A | varchar | int | ||
| longtext | #N/A | varchar | largeint | ||
| mediumblob | #N/A | varchar | smallint | ||
| mediumint | #N/A | bigint | string | ||
| mediumtext | #N/A | string | tinyint | ||
| set | #N/A | ? | varchar | varchar | |
| smallint | smallint | ||||
| text | #N/A | string | |||
| time | #N/A | varchar |
timestamp这一列记录了mysql实际的时间信息。
cdc获取的时间格式为:Unix时间戳(带时区信息),需向前推移8小时。
tinyint字段类型与varchar字段类型不同。
varbinary字段类型与varchar字段类型在数据存储上有相似之处。
year字段主要用于记录数据的基本年份信息。
问题:
cdc ddl语句监控
问题:
通过Flink流处理API的方式将数据写入Doris系统时未检测到预期的数据量,并未触发任何任务级别的错误异常
分析:
潜在的问题可能出现在Doris内部处理环节中。经过调查发现,丢失的数据主要由于某些字段过于庞大,超出了Doris的最大字段容量限制。这些问题就出现在这里,导致一些数据已被过滤掉。

解决方式:
添加一个判断

更新:
海豚调度器定时调度doris sql脚本
mysql并连接到本地机器IP地址为'192.168.xx.xx'、端口设为9030;以root权限运行'||'指令;将默认数据库设为'D.example_db'、并从文件'sqlFile/dwd_finance_stable合并_20220304114348.sql'中读取SQL语句执行
更新
cdc的bug:
1,正常的表结构 mysql 5.x版本
CREATE TABLE `order_sign_received` (
`order_number` varchar(32) NOT NULL COMMENT '运单编号',
`sign_people_mobile` varchar(12) DEFAULT NULL COMMENT '签收人电话',
`function_code` varchar(32) DEFAULT NULL COMMENT '方法编码',
PRIMARY KEY (`order_number`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
2,执行语句
ALTER TABLE order_sign_received ADD(
received_pay_method_changed TINYINT DEFAULT 0 NOT NULL COMMENT '签收修改了收款方式,0:否 1:是'
);
发现报错:
18:51:38,237 ERROR io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Error during binlog processing. Last offset stored = null, binlog reader near position = mysql-bin.000119/315091120
18:51:38,238 ERROR io.debezium.pipeline.ErrorHandler - Producer failure
io.debezium.DebeziumException: Error processing binlog event
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:369)
at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:94)
at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1118)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:966)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.debezium.DebeziumException: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:588)
at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl.dispatchSchemaChangeEvent(EventDispatcherImpl.java:140)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleQueryEvent(MySqlStreamingChangeEventSource.java:583)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:352)
... 6 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:374)
at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:119)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:117)
at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130)
at io.debezium.connector.mysql.MySqlDatabaseSchema.lambda$applySchemaChange$2(MySqlDatabaseSchema.java:171)
at java.lang.Iterable.forEach(Iterable.java:75)
at io.debezium.connector.mysql.MySqlDatabaseSchema.applySchemaChange(MySqlDatabaseSchema.java:171)
at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcherImpl.java:204)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:585)
... 9 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT16: class java.lang.String for field: "null"
at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245)
at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
... 27 more
18:51:38,307 ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager - Received uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 1 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:369)
at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:94)
at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1118)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:966)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)
... 1 more
Caused by: io.debezium.DebeziumException: Error processing binlog event
... 7 more
Caused by: io.debezium.DebeziumException: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:588)
at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl.dispatchSchemaChangeEvent(EventDispatcherImpl.java:140)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleQueryEvent(MySqlStreamingChangeEventSource.java:583)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:352)
... 6 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:374)
at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:119)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:117)
at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130)
at io.debezium.connector.mysql.MySqlDatabaseSchema.lambda$applySchemaChange$2(MySqlDatabaseSchema.java:171)
at java.lang.Iterable.forEach(Iterable.java:75)
at io.debezium.connector.mysql.MySqlDatabaseSchema.applySchemaChange(MySqlDatabaseSchema.java:171)
at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcherImpl.java:204)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:585)
... 9 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT16: class java.lang.String for field: "null"
at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245)
at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
... 27 more
18:51:38,315 WARN org.apache.flink.runtime.taskmanager.Task - Source: config Source -> Sink: Print to Std. Out (1/1)#0 (d7461d16ef50d2d32d78e64e3718734f) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: SplitFetcher thread 1 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:369)
at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:94)
at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1118)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:966)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)
... 1 more
Caused by: io.debezium.DebeziumException: Error processing binlog event
... 7 more
Caused by: io.debezium.DebeziumException: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:588)
at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl.dispatchSchemaChangeEvent(EventDispatcherImpl.java:140)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleQueryEvent(MySqlStreamingChangeEventSource.java:583)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:352)
... 6 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:374)
at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:119)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:117)
at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130)
at io.debezium.connector.mysql.MySqlDatabaseSchema.lambda$applySchemaChange$2(MySqlDatabaseSchema.java:171)
at java.lang.Iterable.forEach(Iterable.java:75)
at io.debezium.connector.mysql.MySqlDatabaseSchema.applySchemaChange(MySqlDatabaseSchema.java:171)
at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcherImpl.java:204)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:585)
... 9 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT16: class java.lang.String for field: "null"
at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245)
at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
... 27 more
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
at akka.actor.Actor$class.aroundReceive(Actor.scala)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: SplitFetcher thread 1 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:369)
at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:94)
at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1118)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:966)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)
... 1 more
Caused by: io.debezium.DebeziumException: Error processing binlog event
... 7 more
Caused by: io.debezium.DebeziumException: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:588)
at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl.dispatchSchemaChangeEvent(EventDispatcherImpl.java:140)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleQueryEvent(MySqlStreamingChangeEventSource.java:583)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:352)
... 6 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:374)
at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:119)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:117)
at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130)
at io.debezium.connector.mysql.MySqlDatabaseSchema.lambda$applySchemaChange$2(MySqlDatabaseSchema.java:171)
at java.lang.Iterable.forEach(Iterable.java:75)
at io.debezium.connector.mysql.MySqlDatabaseSchema.applySchemaChange(MySqlDatabaseSchema.java:171)
at com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcherImpl.java:204)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$1(MySqlStreamingChangeEventSource.java:585)
... 9 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT16: class java.lang.String for field: "null"
at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245)
at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
... 27 more
本地测试 这个地方debug就不进。

--doris 报错
实际遇到之后:
[taskAppId=TASK-4882385308064_1-5934-10038]:[61] - -> ERROR 1105 (HY000) at line 21: errCode = 2, detailMessage = Failed to create partition[rep_op_com_reconciliation_stat]. Timeout. Unfinished mark: 10004=257590, 10004=257594, 10004=257598
发生时间大致为 :
[INFO] 2022-04-24 01:14:51.998
我们划分时间范围为:
【2022-04-24 01:10:51.998,2022-04-24 01:15:51.998】
然后找到1004节点(这个在doris web端可以看到 )

我们去1004阶段查看
be.INFO 中找到 tablet id 相关日志
tablet id = 257590
tablet id = 257594
tablet id = 257598


参考文章:
Doris 建表是按照 Partition 粒度依次创建的。当一个 Partition 创建失败时,可能会报这个错误。即使不使用 Partition,当建表出现问题时,也会报 Failed to create partition,因为如前文所述,Doris 会为没有指定 Partition 的表创建一个不可更改的默认的 Partition。
当遇到这个错误是,通常是 BE 在创建数据分片时遇到了问题。可以参照以下步骤排查:
在 fe.log 中,查找对应时间点的 Failed to create partition 日志。在该日志中,会出现一系列类似 {10001-10010} 字样的数字对。数字对的第一个数字表示 Backend ID,第二个数字表示 Tablet ID。如上这个数字对,表示 ID 为 10001 的 Backend 上,创建 ID 为 10010 的 Tablet 失败了。
前往对应 Backend 的 be.INFO 日志,查找对应时间段内,tablet id 相关的日志,可以找到错误信息。
以下罗列一些常见的 tablet 创建失败错误,包括但不限于:
BE 没有收到相关 task,此时无法在 be.INFO 中找到 tablet id 相关日志。或者 BE 创建成功,但汇报失败。以上问题,请参阅 [部署与升级文档] 检查 FE 和 BE 的连通性。
预分配内存失败。可能是表中一行的字节长度超过了 100KB。
Too many open files。打开的文件句柄数超过了 Linux 系统限制。需修改 Linux 系统的句柄数限制。
如果创建数据分片时超时,也可以通过在 fe.conf 中设置 tablet_create_timeout_second=xxx 以及 max_create_table_timeout_second=xxx 来延长超时时间。其中 tablet_create_timeout_second 默认是1秒, max_create_table_timeout_second 默认是60秒,总体的超时时间为min(tablet_create_timeout_second * replication_num, max_create_table_timeout_second);建表命令长时间不返回结果。
Doris 的建表命令是同步命令。该命令的超时时间目前设置的比较简单,即(tablet num * replication num)秒。如果创建较多的数据分片,并且其中有分片创建失败,则可能导致等待较长超时后,才会返回错误。
正常情况下,建表语句会在几秒或十几秒内返回。如果超过一分钟,建议直接取消掉这个操作,前往 FE 或 BE 的日志查看相关错误。
————————————————
版权声明:本文为博主「墨卿风竹」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:
