Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using the Struct type as the primary key in equalDelete operation will cause data reading errors. #11611

Open
1 of 3 tasks
Leven2023 opened this issue Nov 21, 2024 · 0 comments
Labels
bug Something isn't working

Comments

@Leven2023
Copy link

Leven2023 commented Nov 21, 2024

Apache Iceberg version

1.5.2

Query engine

Other

Please describe the bug 🐞

1. Steps to reproduce the bug:

1)Create a new table, specify the storage format as parquet, and the table structure as Schema { Struct{ int, String }, int ,int, int };

2)Use org.apache.iceberg.io.TaskWriter to write 10,000 rows of data first; then use Struct{int ,String} as the primary key to delete the 10,000 rows of data just written (Here should be equalDelete method);

3)Use org.apache.iceberg.spark.actions.SparkActions#rewriteDataFiles to organize the files.

4)Read the data again, we expecting to read 0 rows of data, but found that 9999 rows of data were read。

2. Reason

1)In iceberg1.5.2,SparkActions rewrite data files by reading and rewriting the data into parquet based on the dataFile and deleteFile of the current snapshot. This process involves equalDelete behavior.

2)Spark equalDelete process needs to read all delete information through DeleteFilter. In iceberg1.5.2, DeleteFilter reads the delete records of DeleteFiles through BaseDeleteLoader, materializing records into memory, and cache them in a Set for subsequent use. In BaseDeleteLoader reads the DeleteFile process, it uses org.apache.iceberg.data.parquet.GenericParquetReaders.RecordReader to read, and the RecordReader reading process is a reused GenericRecord template. Therefore, before materializing it into memory, Record:copy will be executed.

public class BaseDeleteLoader implements DeleteLoader {
  ...
  protected Iterable<StructLike> readEqDeletes(DeleteFile deleteFile, Schema projection) {
      CloseableIterable<Record> deletes = openDeletes(deleteFile, projection);
      CloseableIterable<Record> copiedDeletes = CloseableIterable.transform(deletes, Record::copy);
      CloseableIterable<StructLike> copiedDeletesAsStructs = toStructs(copiedDeletes, projection);
      return materialize(copiedDeletesAsStructs);
    }
   ...
  // materializes the iterable and releases resources so that the result can be cached
  private <T> Iterable<T> materialize(CloseableIterable<T> iterable) {
    try (CloseableIterable<T> closeableIterable = iterable) {
      return ImmutableList.copyOf(closeableIterable);
    } catch (IOException e) {
      throw new UncheckedIOException("Failed to close iterable", e);
    }
  }
  ...
}
public class GenericParquetReaders extends BaseParquetReaders<Record> {
   ...
    private static class RecordReader extends ParquetValueReaders.StructReader<Record, Record> {
        private final GenericRecord template;

        RecordReader(List<Type> types, List<ParquetValueReader<?>> readers, Types.StructType struct) {
            super(types, readers);
            this.template = struct != null ? GenericRecord.create(struct) : null;
        }

        protected Record newStructData(Record reuse) {
            return (Record)(reuse != null ? reuse : this.template.copy());
        }

        protected Object getField(Record intermediate, int pos) {
            return intermediate.get(pos);
        }

        protected Record buildStruct(Record struct) {
            return struct;
        }

        protected void set(Record struct, int pos, Object value) {
            struct.set(pos, value);
        }
    }
}

3) The #copy process of GenericRecord is a shallow copy of the internal Object[] value.

public class GenericRecord implements Record, StructLike {
  ...
  public static GenericRecord create(Schema schema) {
    return new GenericRecord(schema.asStruct());
  }

  public static GenericRecord create(StructType struct) {
    return new GenericRecord(struct);
  }

  private final StructType struct;
  private final int size;
  private final Object[] values;
  private final Map<String, Integer> nameToPos;

  ...
  private GenericRecord(GenericRecord toCopy) {
    this.struct = toCopy.struct;
    this.size = toCopy.size;
    this.values = Arrays.copyOf(toCopy.values, toCopy.values.length);
    this.nameToPos = toCopy.nameToPos;
  }
  ...

  @Override
  public GenericRecord copy() {
    return new GenericRecord(this);
  }
  ...
}

4)After analyzing this, you may found that since the copy process of GenericRecord is a shallow copy, when the shallow copy of GenericRecord that read from DeleteFile is put into List for equalDelete, the internal values ​​of all GenericRecord elements in List are just a reference, not a deep copy. As the reading of DeleteFile continues, RecordReader will continuously load new data into the GenericRecord template. The value references in all GenericRecord elements in the List will also be constantly modified, that is, after 10,000 lines of Delete information are put into the List, these 10,000 lines of delete information are actually duplicated (equivalent to the last line of record read), so after spark completes the equalDelete process , the data rowNum read is 10000-1=9999 finally.

5)Since DeleteLoader is also used in the equalDelete process of reading parquet table, this data error bug will also appear in the reading process after client use the Struct type as the primary key in equalDelete operation.

3. Solution

From a personal perspective, changing the copy of GenericRecord from shallow copy to deep copy can solve this bug. This requires adjusting the code of org.apache.iceberg.data.GenericRecord#GenericRecord(GenericRecord toCopy).
Besides, I have verified the feasibility and correctness of this solution locally.

Negative impact:
1)When the schema contains only basic field types, the deep copy process may have a higher cost than the previous shallow copy. The extent of this influence has not yet been fully demonstrated.

4. Other notes

If the bug is caused by the insufficiency of my demo case, or if the problem has been fixed in the new iceberg version, I hope readers or community workers can tell me. Thank you very much.

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time
@Leven2023 Leven2023 added the bug Something isn't working label Nov 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant