Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Remote filesystem view only works on the first micro-batch in Spark Structured Streaming #12652

Open
psendyk opened this issue Jan 16, 2025 · 3 comments
Labels
spark-streaming spark structured streaming related

Comments

@psendyk
Copy link

psendyk commented Jan 16, 2025

Describe the problem you faced

After enabling timeline server and using REMOTE_ONLY file system view, Spark Structured Streaming ingestion into Hudi fails on the second microbatch with org.apache.hudi.exception.HoodieRemoteException: Connect to localhost:26754 [localhost/127.0.0.1] failed: Connection refused

To Reproduce

Steps to reproduce the behavior:

  1. Start Spark Structured Streaming ingestion into Hudi with following filesystem view config:
        HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_ENABLE.key() -> "true",
        FileSystemViewStorageConfig.VIEW_TYPE.key() -> FileSystemViewStorageType.REMOTE_ONLY.name(),
        FileSystemViewStorageConfig.REMOTE_BACKUP_VIEW_ENABLE.key() -> "false"
  1. Wait for the second micro-batch, the job should fail immediately during Getting small files from partitions stage

Expected behavior

The application should continue without failure

Environment Description

  • Hudi version : 0.15.0

  • Spark version : 3.3.0

  • Hive version :

  • Hadoop version : 3.2.1

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : no

Additional context

This issue occurs because the timeline service is stopped after the first write and is never restarted. Specifically, the write client is closed, which then closes the timeline server in the finally block in HoodieSparkSqlWriterInternal.writeInternal. Here's a stack trace showing how this part of the code is reached:

        at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:508)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125)
	at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$3(HoodieStreamingSink.scala:141)
	at org.apache.hudi.HoodieStreamingSink$$Lambda$3032/943647142.apply(Unknown Source)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$2(HoodieStreamingSink.scala:133)
	at org.apache.hudi.HoodieStreamingSink$$Lambda$3031/1476625006.apply(Unknown Source)
	at org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:237)
	at org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:132)
       - locked <0x0000ffefe71a1808> (a org.apache.hudi.HoodieStreamingSink)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:660)

The timeline server is only started when instantiating HoodieBaseClient via startEmbeddedServerView but the write client is reused across batches within HoodieStreamingSink.addBatch. Therefore, the second batch uses a client that has closed the timeline server. The error may first seem like a config issue but it only looks like this because the view storage config is updated with the timeline service config when the service is started, then reset to the client-provided config when the service is stopped. We can see the correct config pointing to the timeline service is used for the first write:

25/01/16 15:18:13 INFO FileSystemViewManager: Creating remote view for basePath <REDACTED>. Server=ip-<REDACTED>:37393, Timeout=300

but not for the second write:

25/01/16 15:20:15 INFO FileSystemViewManager: Creating remote view for basePath <REDACTED>. Server=localhost:26754, Timeout=300

I was able to work around this failure mode by modifying HoodieSparkSqlWriterInternal.writeInternal to restart the timeline server at the beginning of each write but as you can imagine this only gives us partial improvement (within a batch but not across batches). I'm wondering if there's a reason why the write client, and subsequently timeline server, is stopped after each batch?

Stacktrace

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 47 in stage 11.0 failed 4 times, most recent failure: Lost task 47.3 in stage 11.0 (TID 12380) (ip-10-18-138-81.heap executor 6): org.apache.hudi.exception.HoodieRemoteException: Connect to localhost:26754 [localhost/127.0.0.1] failed: Connection refused (Connection refused)
	at org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.getLatestFileSlicesStreamFromParams(RemoteHoodieTableFileSystemView.java:313)
	at org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.getLatestFileSlicesBeforeOrOn(RemoteHoodieTableFileSystemView.java:347)
	at org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner.getSmallFileCandidates(SparkUpsertDeltaCommitPartitioner.java:106)
	at org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner.getSmallFiles(SparkUpsertDeltaCommitPartitioner.java:66)
	at org.apache.hudi.table.action.commit.UpsertPartitioner.lambda$getSmallFilesForPartitions$f1d92f9e$1(UpsertPartitioner.java:285)
	at org.apache.spark.api.java.JavaPairRDD$.$anonfun$pairFunToScalaFun$1(JavaPairRDD.scala:1073)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2269)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:138)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
@psendyk
Copy link
Author

psendyk commented Jan 16, 2025

To add some more context, we tried using REMOTE_ONLY view because we've been deleting lots of partitions, and in effect creating replacecommits on the timeline. Over a few weeks, our ingestion throughput decreased by ~75% with the largest degradation happening in Getting small files from partitions stage. We temporarily disabled small file expansion to alleviate ingestion latency but without async clustering this is not a sustainable solution as it creates lots of small files. I profiled one of the Spark executors when gathering small files and noticed that ~99% of the time is spent creating the in-memory FS view. Specifically, most of the work happens in AbstractTableFileSystemView.resetFileGroupsReplaced when the FS view object is instantiated. This happens for every partition/Spark task and creates a lot of repetitive work when we ingest data into ~10-15k partitions; given the deletion rate and our archiver retention each Spark task takes ~45s. Using remote FS view alleviates this issue as the in-memory FS view is instantiated just once within the timeline server. For our use case, restarting the timeline server for each batch is good enough as it only requires processing all the replacecommits once per batch. I was hoping to understand why the timeline server is closed in the first place to ensure it's safe to restart it for each batch. From #3967 (cc @nsivabalan @yihua) this seems like an artifact of the existing code structure rather than an intentional design but wanted to ask anyway. Also welcome any suggestions if there are other ways we can address this performance regression. Thank you!

@danny0405
Copy link
Contributor

Maybe @the-other-tim-brown can take a look.

@danny0405 danny0405 added the spark-streaming spark structured streaming related label Jan 17, 2025
@github-project-automation github-project-automation bot moved this to ⏳ Awaiting Triage in Hudi Issue Support Jan 17, 2025
@the-other-tim-brown
Copy link
Contributor

I am not familiar with the spark streaming code but these are lines I find suspicious as @psendyk is seeing that the configs reset to the default for the remote FSView which happens when the write client is closed:

  1. The writer takes in a client
  2. If there is no async compaction or clustering, then the write client is closed at the end of the write. This write client is returned to the HoodieStreamingSink and can potentially be passed in to item (1).

It may be better to return null for the WriteClient from (2) if it is closed so that the code will just create a new client each iteration.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
spark-streaming spark structured streaming related
Projects
Status: Awaiting Triage
Development

No branches or pull requests

3 participants