Apache Iceberg#
Apache Iceberg is an open-sourced table format originally developed at Netflix for large-scale analytical datasets.
Daft currently natively supports:
- Distributed Reads: Daft will fully distribute the I/O of reads over your compute resources (whether Ray or on local multithreading)
- Skipping Filtered Data: Daft uses
df.where(...)
filter calls to only read data that matches your predicates - All Catalogs From PyIceberg: Daft is natively integrated with PyIceberg, and supports all the catalogs that PyIceberg does
Reading a Table#
To read from the Apache Iceberg table format, use the daft.read_iceberg
function.
We integrate closely with PyIceberg (the official Python implementation for Apache Iceberg) and allow the reading of Daft dataframes easily from PyIceberg's Table objects. The following is an example snippet of loading an example table, but for more information please consult the PyIceberg Table loading documentation.
# Access a PyIceberg table as per normal
from pyiceberg.catalog import load_catalog
catalog = load_catalog("my_iceberg_catalog")
table = catalog.load_table("my_namespace.my_table")
After a table is loaded as the table
object, reading it into a DataFrame is extremely easy.
# Create a Daft Dataframe
import daft
df = daft.read_iceberg(table)
Any subsequent filter operations on the Daft df
DataFrame object will be correctly optimized to take advantage of Iceberg features such as hidden partitioning and file-level statistics for efficient reads.
# Filter which takes advantage of partition pruning capabilities of Iceberg
df = df.where(df["partition_key"] < 1000)
df.show()
Writing to a Table#
To write to an Apache Iceberg table, use the daft.DataFrame.write_iceberg
method.
The following is an example of appending data to an Iceberg table:
written_df = df.write_iceberg(table, mode="append")
written_df.show()
This call will then return a DataFrame containing the operations that were performed on the Iceberg table, like so:
āāāāāāāāāāāāā¬āāāāāāāā¬āāāāāāāāāāāā¬āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā®
ā operation ā rows ā file_size ā file_name ā
ā --- ā --- ā --- ā --- ā
ā Utf8 ā Int64 ā Int64 ā Utf8 ā
āāāāāāāāāāāāāŖāāāāāāāāŖāāāāāāāāāāāāŖāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā”
ā ADD ā 5 ā 707 ā 2f1a2bb1-3e64-49da-accd-1074eā¦ ā
ā°āāāāāāāāāāāā“āāāāāāāā“āāāāāāāāāāāā“āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāÆ
Type System#
Daft and Iceberg have compatible type systems. Here are how types are converted across the two systems.
When reading from an Iceberg table into Daft:
Iceberg | Daft |
---|---|
Primitive Types | |
boolean |
daft.DataType.bool() |
int |
daft.DataType.int32() |
long |
daft.DataType.int64() |
float |
daft.DataType.float32() |
double |
daft.DataType.float64() |
decimal(precision, scale) |
daft.DataType.decimal128(precision, scale) |
date |
daft.DataType.date() |
time |
daft.DataType.int64() |
timestamp |
daft.DataType.timestamp(timeunit="us", timezone=None) |
timestampz |
daft.DataType.timestamp(timeunit="us", timezone="UTC") |
string |
daft.DataType.string() |
uuid |
daft.DataType.binary() |
fixed(L) |
daft.DataType.binary() |
binary |
daft.DataType.binary() |
Nested Types | |
struct(fields) |
daft.DataType.struct(fields) |
list(child_type) |
daft.DataType.list(child_type) |
map(K, V) |
daft.DataType.struct({"key": K, "value": V}) |
Roadmap#
Here are some features of Iceberg that are works-in-progress:
- Reading Iceberg V2 equality deletes
- More extensive usage of Iceberg-provided statistics to further optimize queries
- Copy-on-write and merge-on-read writes
A more detailed Iceberg roadmap for Daft can be found on our Github Issues page.