Skip to content

Delete files and expression caching for Iceberg read #9

@yingsu00

Description

@yingsu00

Description

We know that a unique HiveDataSource object is created for a unique TableScan operator, and the splits received by a HiveDataSource instance belong to the same query and same table. Additionally for Iceberg splits, they must be reading the same snapshot of an Iceberg table. When the HiveDataSource receives a new Iceberg split with some equality delete files, it would create a new IcebergSplitReader, which would open the delete files. If the equality delete file can be interpreted into some domain filters or filter functions, the scanSpec_ and remainingFilterExprSet_ in HIveDataSource may need to be updated.

Currently, the Iceberg library selects the qualified data and delete files based on partitions and snapshot Ids or transaction sequence numbers. For a single transaction, the snapshot is fixed, and all delete files from the same partition would go with the base data files when the splits are enumerated. So we can assume for now that all splits received from the same partition are the same for a single HiveDataSource. However, the delete files for different partitions could be different, and the splits from multiple partitions could arrive out of order. If we updated the scanSpec_ and remainingFilterExprSet_ for previous partition, we will need to restore them back to the original before applying the current set of delete files. As the first implementation, we will make a copy of these objects in the IcebergSplitReader and restore them back when the IcebergSplitReader is destructed.

In some user's workloads, the deletions are quite frequent, and the number of delete files coming with a split for a subsequent SELECT query can be many. For all splits in a partition, the delete files may be the same. We don't want to repeatedly read such equality delete files for every split a HiveDataSource needs to handle. One way of overcoming this is to build an expression cache. There are 2 levels of the caching ideas:

A hash table in HiveDataSource
A process wide cache for all Iceberg scans.
In 1, the key of the hash table is <partition, snapshotId> and the values are the compiled filters and expressions.
In 2, the key of the cache is <table, partition, snapshotId> and the values are the compiled filters and expressions. To avoid excessive contentions, we can divide the cache into multiple levels. The implementation will be adjusted with more experiments and observations of the customer workloads in the future.

If the Iceberg library changes its TableScan or FileScan in the future and can additionally prune the delete files based on each individual base data files, we will need to change the cache keys and add the information for the base file.

We will work on caching in the future when we understands the workloads better.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions