Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT]Unable to read new data in streaming mode with specific timestamp #12661

Open
xiearthur opened this issue Jan 17, 2025 · 6 comments
Open

Comments

@xiearthur
Copy link

xiearthur commented Jan 17, 2025

Describe the problem you faced
When using Flink to read a Hudi COW table in streaming mode, specific timestamp can read new data written after the Flink job starts. The streaming job only reads data up to its start time.

To Reproduce

Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), basePath + tableName);
options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.COPY_ON_WRITE.name());
options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");


// specific timestamp - same behavior
options.put(FlinkOptions.READ_START_COMMIT.key(), "20240116000000");

HoodiePipeline.Builder builder = HoodiePipeline.builder(tableName)
    .options(options);
DataStream<RowData> rowDataDS = builder.source(env);

Expected behavior
The streaming job should continuously read new data written after job starts, regardless of using specific timestamp.

Environment Description

  • Hudi version: 0.14.0
  • Flink version: 1.16.0
  • Hadoop version: 3.1.0
  • Storage: HDFS
@xiearthur xiearthur changed the title [SUPPORT]Unable to read new data in streaming mode with both earliest and specific timestamp [SUPPORT]Unable to read new data in streaming mode with specific timestamp Jan 17, 2025
@cshuo
Copy link
Contributor

cshuo commented Jan 17, 2025

The streaming job only reads data up to its start time.

Sounds like a batch reading behavior, does the job finished after reading, or become an idle running?
By the way, it would be better if you can provide a more complete reproducing code.

@xiearthur
Copy link
Author

I can confirm this issue and have traced the source code to understand why.

Root Cause Analysis:
In IncrementalInputSplits.java, the core logic for streaming reads is:

public List<HoodieInstant> filterInstantsWithRange(
      HoodieTimeline commitTimeline,
      @Nullable final String issuedInstant) {
    // For continuous streaming read
    if (issuedInstant != null) {
        return completedTimeline
            .getInstantsAsStream()
            .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, issuedInstant))
            .collect(Collectors.toList());
    }

    // For initial read
    Stream<HoodieInstant> instantStream = completedTimeline.getInstantsAsStream();
    if (OptionsResolver.hasNoSpecificReadCommits(this.conf)) {
        // snapshot read - only reads the latest commit
        return completedTimeline.lastInstant().map(Collections::singletonList).orElseGet(Collections::emptyList);
    }

    // With specific start commit time
    if (OptionsResolver.isSpecificStartCommit(this.conf)) {
        final String startCommit = this.conf.get(FlinkOptions.READ_START_COMMIT);
        instantStream = instantStream
            .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, startCommit));
    }
}

Currently, this only works correctly with earliest setting because:

  1. When using snapshot mode, it only reads the latest commit without monitoring new data
  2. When using specific timestamp, it only reads data before the Flink job starts
  3. Only with earliest, it can:
    • Start reading from the earliest data
    • Correctly update issuedInstant
    • Continue monitoring new data

Question:
We have a large amount of historical data, and reading from the earliest commit every time we start the Flink job consumes significant resources. Is there any way to:

  1. Start reading from a specific timestamp
  2. Still be able to monitor and read new data changes
  3. Avoid processing all historical data

Looking forward to your suggestions on this.

@danny0405
Copy link
Contributor

Did you execute your job in streaming mode?

@xiearthur
Copy link
Author

Yes, we did execute the job in streaming mode by setting:

options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
options.put(FlinkOptions.READ_START_COMMIT.key(), "20240116000000"); // specific timestamp
options.put(FlinkOptions.READ_STREAMING_CHECK_INTERVAL.key(), "5");

We've found that:

  1. With earliest: can continuously read both historical and new data
  2. With specific timestamp: only reads data up to Flink job start time, missing new data written after that

We tried different configurations but still couldn't make it work with a specific timestamp. The job can only read new data when using READ_START_COMMIT=earliest. Is there any other configuration we should set to make it work with a specific timestamp?

@danny0405
Copy link
Contributor

I mean the flink streaming execution mode, not just the option options.put(FlinkOptions.READ_AS_STREAMING.key(), "true")

@xiearthur
Copy link
Author

Yes, I’m sure.
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

This is my current complete write code.


 public static DataStream<RowData> readHudiDataStream(StreamExecutionEnvironment env, ParameterTool params, Map<String, String> schema) throws Exception {

        String tableName = params.get("read_tablename");
        String basePath = params.get("read_basepath");
        String primaryKey = params.get("read_primarykey");
        String hoodieTableType = params.get("read_hoodie_table_type");
        String precombing = params.get("read_precombing");

        String name = HoodieTableType.COPY_ON_WRITE.name();
        if (hoodieTableType.equals("cow")) {
            name = HoodieTableType.COPY_ON_WRITE.name();
        }
        if (hoodieTableType.equals("mor")) {
            name = HoodieTableType.MERGE_ON_READ.name();
        }
        Map<String, String> options = new HashMap<>();
        options.put(FlinkOptions.PATH.key(), basePath+tableName);
        options.put(FlinkOptions.TABLE_TYPE.key(), name);
        options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
        options.put(FlinkOptions.PRECOMBINE_FIELD.key(),precombing);
        options.put(FlinkOptions.READ_START_COMMIT.key(), "earliest");
        options.put("read.tasks","5");

        HoodiePipeline.Builder builder = HoodiePipeline.builder(tableName);
        for (Map.Entry<String, String> entry : schema.entrySet()) {
            builder.column(entry.getKey() + " " + entry.getValue());
        }
        builder.pk(primaryKey)
                .options(options);

        // Get DataStream of RowData
        DataStream<RowData> rowDataDS = builder.source(env);

        return rowDataDS;
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants