Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT]Downstream Fails to Read Hudi MOR Table and Write to Kudu Table When Using Bucketed Table, with Warning: reader has fallen behind too much from the writer #12585

Open
pengxianzi opened this issue Jan 7, 2025 · 15 comments

Comments

@pengxianzi
Copy link

We are using Apache Hudi to build a data lake and writing data to a Kudu table downstream. The following two scenarios exhibit different behaviors:
Scenario 1: The upstream writes data using a regular MOR (Merge On Read) Hudi table, and the downstream reads the Hudi table and writes to the Kudu table without any issues.
Scenario 2: The upstream writes data using a bucketed table, and when the downstream reads the Hudi table and attempts to write to the Kudu table, the task fails with the following warning:
caution: the reader has fallen behind too much from the writer, tweak 'read.tasks' option to add parallelism of read tasks

We have tried setting the read.tasks parameter to 10, but the issue persists. Below are our configuration and environment details:

Hudi version : 0.14.0

Spark version : 2.4.7

Hive version : 3.1.3

Hadoop version : 3.1.1

Storage Format: HDFS

Downstream Storage: Apache Kudu

Bucketed Table Configuration: Number of buckets is 10

Configuration Information

Below is our Hudi table configuration:

     Map<String, String> options = new HashMap<>();
    options.put(FlinkOptions.PATH.key(), basePath+tableName);
    options.put(FlinkOptions.TABLE_TYPE.key(), name);
    options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
    options.put(FlinkOptions.PRECOMBINE_FIELD.key(),precombing);
    options.put(FlinkOptions.READ_START_COMMIT.key(), "20210316134557");
    options.put("read.streaming.skip_clustering", "true");  
    options.put("read.streaming.skip_compaction", "true");  

Steps to Reproduce

  1. The upstream writes data to a Hudi MOR table using a bucketed table.
  2. The downstream reads the Hudi table and attempts to write the data to the Kudu table.
  3. The task fails with the warning: reader has fallen behind too much from the writer.

Attempted Solutions

  1. Set the read.tasks parameter to 10, but the issue persists.
  2. Checked the data distribution of the bucketed table to ensure there is no data skew.
  3. Checked the file layout of the Hudi table to ensure there are no excessive small files.

Expected Behavior
The downstream should be able to read the Hudi MOR table written by the bucketed table and write the data to the Kudu table normally.

Actual Behavior
The downstream read task fails with the warning: reader has fallen behind too much from the writer.

Log Snippet
Below is a snippet of the log when the task fails:

Caused by:org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:No TaskExecutor registered under container e38 1734494154374 0718 01 000002
caused by :org.apache.flink.util.FlinkRuntimeException:Exceeded checkpoint tolerable failure threshould
Caused by: java.util.concurrent.TimeoutException
caution :the reader has fall behind too much from the write , tweak 'read.tasks' option to add parallelism of read tasks

ERROR org.apache.flink.runtime.taseManagerRunner [] - Fatal error occurred while executing the TaskManager. Shutting it down ...
org.apache.flink.util.FlinkException: the TaskExecutor's registration at the ResourceManager akka.tcp://node1:7791/user/rpc/resourcemanager_0 has been rejected :Rejected TaskExecutor registration at the ResourceManager because:The ResourceManager does not recognize this TaskExecutor

Summary of Questions
We would like to know:

  1. Why does the read task fall behind when using a bucketed table?
  2. Are there any other configurations besides read.tasks that can optimize read performance?
  3. Are there any known issues or limitations related to the combination of bucketed tables and MOR tables?
@danny0405
Copy link
Contributor

caution: the reader has fallen behind too much from the writer, tweak 'read.tasks' option to add parallelism of read tasks

This log indicates the streaming reader has lag too much, probably there could be two reasons:

  1. the write to Kudu itself is kind of slow, which cause the input splits stacked up in the operator state and then cause delay;
  2. I see an explicit read start commit is declared, is it a long history commit then? A start commit earlier than the first active commit on active time would trigger a full table scan plus filtering.

@pengxianzi
Copy link
Author

警告:读取器落后于写入器太多,调整“read.tasks”选项以增加读取任务的并行性

此日志表明流媒体阅读器滞后太多,可能有两个原因:

  1. Kudu 本身的写入速度比较慢,导致输入分割在算子状态中堆积,造成延迟;
  2. 我看到声明了显式读取开始提交,那么这是一个长期历史提交吗?早于活动时间的第一个活动提交的开始提交将触发全表扫描和过滤。

Thank you for your response! We understand the potential reasons for the streaming read task lag:

Slow Kudu Write Speed:
Testing shows that regular MOR tables work fine with Kudu, but bucketed tables cause read task backlogs. Suspect the issue is related to bucketed table characteristics.

Full Table Scan Due to Early Read Start Commit:
We specified an early read start commit to read historical data. If we don’t specify it or set it later than the first active commit, how can we ensure reading from historical data? Are there ways to avoid a full table scan?

Further Questions
Impact of Bucketed Table:
Does the bucketed table have a specific impact on read performance? Any optimization suggestions?

Historical Data Reading:
How to read historical data without triggering a full table scan? Are there incremental read configurations?

Kudu Write Performance:
Does the data distribution of bucketed tables affect Kudu write performance?

Summary
We would like to know:

How to optimize read performance for bucketed tables.

How to read historical data without a full table scan.

Are there other configurations or tools to help troubleshoot?

Thank you for your help!

@danny0405
Copy link
Contributor

For bucketed table are you referring to the bucket index of MOR table? One fact to know is that the writer would write pure avro logs at first so the streaming reader would also read these logs.

For streaming read we have an option value named "earliest" for the read.start-commit option, which is more straight-forward.

It looks like the waning log is normal because of the explicit specified read start commit, this log shows there when the commit to read has already been archived.

@pengxianzi
Copy link
Author

For bucketed table are you referring to the bucket index of MOR table? One fact to know is that the writer would write pure avro logs at first so the streaming reader would also read these logs.

For streaming read we have an option value named "earliest" for the read.start-commit option, which is more straight-forward.

It looks like the waning log is normal because of the explicit specified read start commit, this log shows there when the commit to read has already been archived.

Thank you for your help! We followed your suggestion and used the following configuration:

options.put(FlinkOptions.READ_START_COMMIT.key(), "earliest");

This configuration indeed resolved the read task lag issue, and we were able to read the Hudi table and write to the Kudu table normally. However, the task stopped after running for a while and threw the following error:

org.apache.flink.runtime.executiongraph.ExecutionGraph [] - split_reader -> Sink:Unnamed(1/1) switched from INITIALIZING to FAILED on container_e30_xxx

org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job switched from state RUNNING to FAILED

org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, backoffTimeMs=60000)

Caused by: org.apache.hudi.exception.HoodieException: Get reader error for path: hdfs://nameservice1:xxx.parquet

We tried to skip files by using the following configurations:

options.put("read.streaming.skip_clustering", "true");
options.put("read.streaming.skip_compaction", "true");

Clean Policy:
We used the following clean policy:

options.put("hoodie.clean.automatic", "true");
options.put("hoodie.cleaner.policy", "KEEP_LATEST_COMMITS");
options.put("hoodie.cleaner.commits.retained", "5");
options.put("hoodie.clean.async", "true");

But the issue persists.

@pengxianzi pengxianzi changed the title Downstream Fails to Read Hudi MOR Table and Write to Kudu Table When Using Bucketed Table, with Warning: reader has fallen behind too much from the writer [SUPPORT]Downstream Fails to Read Hudi MOR Table and Write to Kudu Table When Using Bucketed Table, with Warning: reader has fallen behind too much from the writer Jan 8, 2025
@pengxianzi
Copy link
Author

@danny0405 Is there any way to skip the Parquet files that are currently being merged?

Looking forward to your response.

@danny0405
Copy link
Contributor

All the data files being written could never be seen by the reader, this is the Snapshot Isolation contract of a table format.

Caused by: org.apache.hudi.exception.HoodieException: Get reader error for path: hdfs://nameservice1:xxx.parquet

Do you have more detailed exceptions, I can not get the cues from these errors, they all look like Flink related errors which may be caused by resource starvation.

@pengxianzi
Copy link
Author

@danny0405 The error is as follows:

org.apache.flink.runtime.executiongraph.ExecutionGraph [] - split_reader -> Sink:Unnamed(1/1) switched from INITIALIZING to FAILED on container_e30_xxx
FAILED
Wicht Failure causes: org.apache.hudit.exception.HoodsException: Get render error for path:
hdfs://node1/XXXXX.parquet
at org.apache.hudi.table.format.mor.MergeOnReadInputFormat.getBaseFileIteratorWithMetadata(MergeOnReadInputFormat.java:315)
at org.apache.hudi.table.format.mor.MergeOnReadInputFormat.initIterator(MergeOnReadInputFormat.java:230)
at org.apache.hudi.table.format.mor.MergeOnReadInputFormat.open(MergeOnReadInputFormat.java:189)
at org.apache.hudi.source.StreamReadOperator.processSplit(StreamReadOperator.java:169)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailbox(MailboxProcessor.java:359)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:323)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeUserCode(StreamTask.java:565)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:587)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)

org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job switched from state RUNNING to FAILED
Wicht Failure causes: org.apache.hudit.exception.HoodsException: Get render error for path:
hdfs://node1/XXXXX.parquet
at org.apache.hudit.table.format_nor.MergeOnNeedInputFormat.getBaseFileTerrorWithMetadata(MergeOnNeedInputFormat.java:315)
at org.apache.hudit.table.format_nor.MergeOnNeedInputFormat.initTerror(WergeOnNeedInputFormat.java:230)
at org.apache.hudit.table.format_nor.MergeOnNeedInputFormat.open(MergeOnNeedInputFormat.java:159)
at org.apache.flink.source.StreamHeadOperator.procesSqlite(StreamHeadOperator.java:169)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50))
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.procesmMail&WhenDefaultActionNewsTable[MeilBoxProcessor.java:244]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.procesmMail(MailboxProcessor.java:330)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailDownload(pMailboxProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:456)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMtbtCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.runtime.tasks.StreamTask.Invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Tab1.Column(Task.java:575)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)

org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, backoffTimeMs=60000)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:492)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala)
at scala.PartialFunction.applyOrElse(PartialFunction.scala)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala)
Caused by: org.apache.hudi.exception.HoodieException: Get reader error for path:
hdfs://node1/XXXX.parquet
at org.apache.hudi.table.format.mor.MergeOnReadInputFormat.getBaseFileIteratorWithMetadata(MergeOnReadInputFormat.java:315)
at org.apache.hudi.table.format.mor.MergeOnReadInputFormat.initIterator(MergeOnReadInputFormat.java:230)
at org.apache.hudi.table.format.mor.MergeOnReadInputFormat.open(MergeOnReadInputFormat.java:189)
at org.apache.hudi.source.StreamReadOperator.processSplit(StreamReadOperator.java:169)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailbox(MailboxProcessor.java:359)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:323)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeUserCode(StreamTask.java:565)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:587)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)

@pengxianzi
Copy link
Author

@danny0405
This is the resource parameters provided in the command I used to start the Flink job.
flink run -m yarn-cluster -d -yim 4096m -ytm 8144m -yD taskmanager.numberOfTaskSlots=10

options.put("read.streaming.skip_clustering", "true");
options.put("read.streaming.skip_compaction", "true");
It seems these two parameters haven't taken effect. Do you have any good solutions?

@danny0405
Copy link
Contributor

I still can not get the clues with the error you pasted, why you thought the skipping does not take effect then, if you read from the earliest, the reader would trigger a full table scan first which would include the parquet files.

I see you set up 8 slots in one TM, can you set up 1 slot for 1 TM and add more resource to the reader then?

@pengxianzi
Copy link
Author

@danny0405
Thank you for your suggestion! We followed your advice and set the slot per TaskManager to 1, and set read.tasks to 10. However, when the Flink task starts, a full table scan reads upstream files, which may include Parquet files that are being merged or Log files that have just been written, causing errors. The actual size of these files is 0 MB, leading to task failures. Below are some error summaries:

  1. Parquet File Error:
    org.apache.flink.runtime.taskmanager.task [] - split_reader switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: hdfs://node1/XXX.parquet is not a Parquet file (length is too low: 0)
    2.Log File Error:
    ERROR org.apache.hudi.exception.HoodieIOException: IOException when reading logfile HoodieLogFile{pathStr='hdfs://node1/XXXX.log.1_0-1-0', fileLen=-1}

Problem Analysis
The full table scan triggered at task startup reads all files, including Parquet files being merged and Log files that have just been written.

These files may not have completed writing or merging, resulting in a file size of 0 MB or invalid, causing read failures.

We would like to know:

  1. How to avoid reading incomplete files (e.g., Parquet or Log files with a size of 0 MB).

  2. Whether there is a mechanism to ensure that files are read only after they have completed writing or merging.

  3. Whether there is a way to optimize the full table scan at task startup to avoid reading invalid files.

Looking forward to your reply!

@danny0405
Copy link
Contributor

We already have a valid check for the incremental read files:

, are all the parquet files empty or just some of them then?

@pengxianzi
Copy link
Author

@danny0405
In general, when one or two parquet files are empty, an error will occur.
I understand that you have already done a good job of checking the incremental reading of files. After carefully reviewing the configuration file, I found that the following two parameters may be causing the conflict:

options.put("read.streaming.skip_clustering", "true");
options.put("read.streaming.skip_compaction", "true");

After commenting these out, the process runs normally for a period of time. However, if the upstream completely overwrites the parquet files, it still throws an error saying that the parquet files cannot be found.

Here are my current configuration parameters:

Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), basePath + tableName);
options.put(FlinkOptions.TABLE_TYPE.key(), name);
options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
options.put(FlinkOptions.PRECOMBINE_FIELD.key(), precombing);
options.put(FlinkOptions.READ_START_COMMIT.key(), "earliest");
options.put(FlinkOptions.READ_STREAMING_CHECK_INTERVAL.key(), "600000");
options.put("read.tasks", "5");
options.put("hoodie.datasource.merge.type", "skip_merge");
options.put("hoodie.filesystem.operation.retry.enable", "true");

The specific error message is as follows:

error.txt

I look forward to your reply.Let me know if you need further adjustments!

@danny0405
Copy link
Contributor

However, if the upstream completely overwrites the parquet file

@pengxianzi Are you referring to the INSERT_OVERWRITE operation?

@pengxianzi
Copy link
Author

@danny0405 Upstream I write using options.put (" write. operation ", " upsert "); this pattern

@danny0405
Copy link
Contributor

All the regular write parquet files should be read if they are committed successfully.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants