Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Hoodie Custom Merge Paylod results in UnsupportedOperationException #12571

Open
dataproblems opened this issue Jan 3, 2025 · 3 comments

Comments

@dataproblems
Copy link

dataproblems commented Jan 3, 2025

Problem Description

I created a custom payload for merging records on my hudi table with record level index and then tried to upsert values in the table. The upsert operation was successful for a single record as well as batches of records less than 50k in size but as I tried with 50k records ( or more ) I ran into an UnsupportedOperationException ( stacktrace provided ).

How to reproduce the problem?

Step 1: Generate the data and hudi table

Case class that encapsulates my dataset

final case class RandomData(
  id: String,
  field1: String,
  field2: String,
  field3: String,
  field4: Boolean,
  field5: Long,
  ts: Long,
  partition: String,
  fruits: String
) {
  def combine(that: RandomData): RandomData = {
    val updatedFruit: String = this.fruits + s",${that.fruits}"
    RandomData(
      id = this.id,
      field1 = this.field1,
      field2 = this.field2,
      field3 = this.field3,
      field4 = this.field4,
      field5 = this.field5,
      ts = Math.max(this.ts, that.ts),
      partition = that.partition,
      fruits = updatedFruit
    )
  }
}

object RandomData {
  def apply(id: Long, partition: String, fruits: String): RandomData = {
    RandomData(
      id = id.toString,
      field1 = UUID.randomUUID().toString,
      field2 = UUID.randomUUID().toString,
      field3 = UUID.randomUUID().toString,
      field4 = true,
      field5 = 1000L,
      ts = 2880000L,
      partition = partition,
      fruits = fruits
    )
  }
}

Spark code to generate sample dataframe

val partitions = List("One", "Two", "Three", "Four")
import spark.implicits._

val randomData = spark
    .range(1, 10 * 10000000L)
    .map(f => RandomData(id = f, partition = Random.shuffle(partitions).head, fruits = "apple"))

Step 2: Create hudi table from the random data


import scala.util.Random
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieIndexConfig
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.keygen.SimpleKeyGenerator

val tableName = "randomDataWithFruits"

  val insertOptions: Map[String, String] = Map(
    DataSourceWriteOptions.OPERATION.key() -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
    DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
    HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key() -> "snappy",
    HoodieTableConfig.POPULATE_META_FIELDS.key() -> "true",
    HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true",
    HoodieIndexConfig.INDEX_TYPE.key() -> "RECORD_INDEX",
    DataSourceWriteOptions.META_SYNC_ENABLED.key() -> "false",
    "hoodie.metadata.record.index.enable" -> "true",
    "hoodie.metadata.enable" -> "true",
    "hoodie.datasource.write.hive_style_partitioning" -> "true",
    "hoodie.datasource.write.partitionpath.field" -> "partition",
    "hoodie.datasource.write.recordkey.field" -> "id",
    "hoodie.datasource.write.precombine.field" -> "ts",
    "hoodie.table.name" -> tableName,
    DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> classOf[SimpleKeyGenerator].getName,
    "hoodie.write.markers.type" -> "DIRECT",
    "hoodie.embed.timeline.server" -> "false"
  )

  val basePath = // Some S3 path. 

  randomData.repartition(100).write.format("hudi").options(insertOptions).mode(Overwrite).save(basePath)

Step 3: Create a custom hudi payload class

import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
import org.apache.avro.reflect.ReflectData
import org.apache.hudi.common.model.{BaseAvroPayload, HoodieRecordPayload}
import org.apache.hudi.common.util.{Option => HudiOption}

class RandomDataPayload(record: GenericRecord, orderingVal: Comparable[_])
    extends BaseAvroPayload(record, orderingVal)
    with HoodieRecordPayload[RandomDataPayload] {

  def this(record: HudiOption[GenericRecord]) = this(record = record.orElse(null), orderingVal = 0)

  def toAvro: RandomData => GenericRecord = RandomDataPayload.toAvro

  def fromAvro: GenericRecord => RandomData = RandomDataPayload.fromAvro

  override def preCombine(oldValue: RandomDataPayload): RandomDataPayload = {
    (oldValue.recordBytes.isEmpty, oldValue.orderingVal.asInstanceOf[Comparable[Any]].compareTo(orderingVal)) match {
      case (true, _)            => this
      case (false, c) if c > 0  => oldValue
      case (false, c) if c <= 0 => this
    }
  }

  override def preCombine(oldValue: RandomDataPayload, schema: Schema, properties: Properties): RandomDataPayload = preCombine(oldValue = oldValue)

  @throws[IOException]
  override def combineAndGetUpdateValue(valueInStorage: IndexedRecord, schema: Schema): HudiOption[IndexedRecord] = {
    val currentValueObject = fromAvro(valueInStorage.asInstanceOf[GenericRecord])
    val otherObject = fromAvro(this.record)

    HudiOption.of(toAvro(currentValueObject.combine(otherObject)))
  }

  @throws[IOException]
  override def combineAndGetUpdateValue(valueInStorage: IndexedRecord, schema: Schema, properties: Properties): HudiOption[IndexedRecord] =
    combineAndGetUpdateValue(valueInStorage = valueInStorage, schema = schema)

  @throws[IOException]
  override def getInsertValue(schema: Schema): HudiOption[IndexedRecord] = {
    if (recordBytes.isEmpty || isDeletedRecord) {
      HudiOption.empty[IndexedRecord]
    } else {
      HudiOption.of(record)
    }
  }

  @throws[IOException]
  override def getInsertValue(schema: Schema, properties: Properties): HudiOption[IndexedRecord] = getInsertValue(schema)
}


object RandomDataPayload {
  val RandomDataSchema: Schema = {
    ReflectData.get.getSchema(classOf[RandomData])
  }

  def toAvro(payload: RandomData): GenericRecord = {
    val record = new GenericData.Record(RandomDataSchema)
    record.put("id", payload.id)
    record.put("field1", payload.field1)
    record.put("field2", payload.field2)
    record.put("field3", payload.field3)
    record.put("field4", payload.field4)
    record.put("field5", payload.field5)
    record.put("ts", payload.ts)
    record.put("partition", payload.partition)
    record.put("fruits", payload.fruits)
    record
  }

  def fromAvro(record: GenericRecord): RandomData = {
    RandomData(
      id = record.get("id").toString,
      field1 = record.get("field1").toString,
      field2 = record.get("field2").toString,
      field3 = record.get("field3").toString,
      field4 = record.get("field4").toString.toBoolean,
      field5 = record.get("field5").toString.toLong,
      partition = record.get("partition").toString,
      ts = record.get("ts").toString.toLong,
      fruits = record.get("fruits").toString
    )
  }

}

Step 4: Create a data parcel for the upsert

val updateParcel = randomData.map(f => f.copy(ts = f.ts + 100, fruits = "banana")).limit(50000)

Step 5: Perform the upsert operation

val randomDataUpsertOptions: Map[String, String] = Map(
    "hoodie.datasource.write.precombine.field" -> "ts",
    "hoodie.datasource.write.recordkey.field" -> "id",
    "hoodie.table.name" -> "randomDataWithFruits",
    "hoodie.datasource.write.partitionpath.field" -> "partition",
    "hoodie.datasource.write.payload.class" -> classOf[RandomDataPayload].getName,
    "hoodie.upsert.shuffle.parallelism" -> "2000"
  )

updateParcel.write.format("hudi").mode("append").options(randomDataUpsertOptions).save(basePath)

Expected Behavior

I expect the upsert operation to go through without any exceptions.

Environment Description

  • Hudi version : 0.15.0

  • Spark version : 3.4.1

  • Hive version :

  • Hadoop version :

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : No

Additional Context

This problem does not surface when I try 20k or 30k rows for the upsert but does surface when I go higher.

Stacktrace

Caused by: org.apache.hudi.exception.HoodieException: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
record (com.test.hudi.RandomDataPayload)
  at org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:75)
  at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
  ... 33 more
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
record (com.test.hudi.RandomDataPayload)
  at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
  at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
  at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:161)
  at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
  at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
  at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
  at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
  at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
  at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
  at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
  at org.apache.hudi.common.model.HoodieAvroRecord.readRecordPayload(HoodieAvroRecord.java:231)
  at org.apache.hudi.common.model.HoodieAvroRecord.readRecordPayload(HoodieAvroRecord.java:48)
  at org.apache.hudi.common.model.HoodieRecord.read(HoodieRecord.java:373)
  at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.read(DefaultSerializers.java:520)
  at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.read(DefaultSerializers.java:512)
  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
  at org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:102)
  at org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:76)
  at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:209)
  at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:202)
  at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:198)
  at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:67)
  at org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:198)
  at org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:56)
  at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:350)
  at org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consume(BaseMergeHelper.java:54)
  at org.apache.hudi.table.action.commit.BaseMergeHelper$UpdateHandler.consume(BaseMergeHelper.java:44)
  at org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:69)
  ... 34 more
Caused by: java.lang.UnsupportedOperationException
  at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
  at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
  at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
  at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
  ... 66 more
@rangareddy
Copy link

Hi @dataproblems

I will attempt to reproduce the issue internally and follow up with the next steps.

@rangareddy
Copy link

Hi @dataproblems

The Spark job completes successfully with the default payload. I suspect the issue lies with the custom payload. Could you please review your code to identify the problem?

@dataproblems
Copy link
Author

@rangareddy - I'm running into the issue with the custom payload that I have linked in this ticket. Were you able to reproduce that on your end? Or are you saying that you generated the random data and then used the default payload to run the spark job?

I'm seeking help as the custom payload is causing an exception and not the default one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants