Skip to content

Apache Hudi#

Apache Hudi is an open-sourced transactional data lake platform that brings database and data warehouse capabilities to data lakes. Hudi supports transactions, efficient upserts/deletes, advanced indexes, streaming ingestion services, data clustering/compaction optimizations, and concurrency all while keeping your data in open source file formats.

Daft currently supports:

  1. Parallel + Distributed Reads: Daft parallelizes Hudi table reads over all cores of your machine, if using the default multithreading runner, or all cores + machines of your Ray cluster, if using the distributed Ray runner.

  2. Skipping Filtered Data: Daft ensures that only data that matches your df.where(...) filter will be read, often skipping entire files/partitions.

  3. Multi-cloud Support: Daft supports reading Hudi tables from AWS S3, Azure Blob Store, and GCS, as well as local files.

Installing Daft with Apache Hudi Support#

Daft supports installing Hudi through optional dependency.

pip install -U "daft[hudi]"

Reading a Table#

To read from an Apache Hudi table, use the daft.read_hudi function. The following is an example snippet of loading an example table:

# Read Apache Hudi table into a Daft DataFrame.
import daft

df = daft.read_hudi("some-table-uri")
df = df.where(df["foo"] > 5)
df.show()

Type System#

Daft and Hudi have compatible type systems. Here are how types are converted across the two systems.

When reading from a Hudi table into Daft:

Apachi Hudi Daft
Primitive Types
boolean daft.DataType.bool()
byte daft.DataType.int8()
short daft.DataType.int16()
int daft.DataType.int32()
long daft.DataType.int64()
float daft.DataType.float32()
double daft.DataType.float64()
decimal(precision, scale) daft.DataType.decimal128(precision, scale)
date daft.DataType.date()
timestamp daft.DataType.timestamp(timeunit="us", timezone=None)
timestampz daft.DataType.timestamp(timeunit="us", timezone="UTC")
string daft.DataType.string()
binary daft.DataType.binary()
Nested Types
struct(fields) daft.DataType.struct(fields)
list(child_type) daft.DataType.list(child_type)
map(K, V) daft.DataType.struct({"key": K, "value": V})

Roadmap#

Currently there are limitations of reading Hudi tables

  • Only support snapshot read of Copy-on-Write tables
  • Only support reading table version 5 & 6 (tables created using release 0.12.x - 0.15.x)
  • Table must not have hoodie.datasource.write.drop.partition.columns=true

Support for more Hudi features are tracked as below:

  1. Support incremental query for Copy-on-Write tables issue).
  2. Read support for 1.0 table format (issue).
  3. Read support (snapshot) for Merge-on-Read tables (issue).
  4. Write support (issue).