%pip install getdaft
CI = False
# Skip this notebook execution in CI because it hits data in a relative path
if CI:
    import sys

    sys.exit()

#000 - Data Access#

This Feature-of-the-Week tutorial shows the canonical way of accessing data with Daft.

Daft reads from 3 main data sources:

  1. Files (local and remote)

  2. SQL Databases

  3. Data Catalogs

Let’s dive into each type of data access in more detail 🪂

import daft

Files#

You can read many different file types with Daft.

The most common file formats are:

  • CSV

  • JSON

  • Parquet

You can read these file types from local and remote filesystems.

CSV#

Use the read_csv method to read CSV files from your local filesystem.

Here, we’ll read in some synthetic US Census data:

# Read a single CSV file from your local filesystem
df = daft.read_csv("data/census001.csv")
df.show()

Int64
age
Int64
workclass
Utf8
education
Utf8
education_num
Int64
marital_status
Utf8
occupation
Utf8
relationship
Utf8
race
Utf8
sex
Utf8
capital_gain
Int64
capital_loss
Int64
hours_per_week
Int64
native_country
Utf8
income
Utf8
0
48
Self-emp-not-inc
Some-college
10
Married-civ-spouse
Craft-repair
Husband
White
Male
0
0
40
United-States
<=50K
1
17
?
11th
7
Never-married
?
Own-child
White
Female
0
0
16
United-States
<=50K
2
21
Private
Some-college
10
Never-married
Handlers-cleaners
Not-in-family
Black
Male
0
0
56
United-States
<=50K
3
31
Private
Bachelors
13
Married-civ-spouse
Adm-clerical
Husband
Asian-Pac-Islander
Male
0
0
40
OTHER
>50K
4
21
Private
Some-college
10
Never-married
Sales
Not-in-family
White
Female
0
0
40
United-States
<=50K
5
43
Local-gov
Bachelors
13
Married-civ-spouse
Adm-clerical
Wife
White
Female
0
0
36
United-States
<=50K
6
61
Self-emp-not-inc
Prof-school
15
Married-civ-spouse
Exec-managerial
Husband
White
Male
0
1958
40
United-States
>50K
7
33
Self-emp-not-inc
HS-grad
9
Married-civ-spouse
Farming-fishing
Husband
White
Male
0
0
40
United-States
<=50K
(Showing first 8 rows)
df.count_rows()
25

You can also read folders of CSV files or include wildcards to select for patterns of file paths.

These files will have to follow the same schema.

Here, we’ll read in multiple CSV files containing synthetic US Census data:

# Read multiple CSV files into one DataFrame
df = daft.read_csv("data/census*.csv")
df.show()

Int64
age
Int64
workclass
Utf8
education
Utf8
education_num
Int64
marital_status
Utf8
occupation
Utf8
relationship
Utf8
race
Utf8
sex
Utf8
capital_gain
Int64
capital_loss
Int64
hours_per_week
Int64
native_country
Utf8
income
Utf8
0
28
?
HS-grad
9
Never-married
?
Other-relative
Asian-Pac-Islander
Male
0
0
45
OTHER
<=50K
1
35
Private
7th-8th
4
Never-married
Other-service
Unmarried
White
Female
0
0
25
United-States
<=50K
2
37
Federal-gov
HS-grad
9
Married-civ-spouse
Transport-moving
Husband
White
Male
0
0
40
OTHER
<=50K
3
41
Self-emp-not-inc
Bachelors
13
Never-married
Prof-specialty
Not-in-family
Black
Male
0
0
40
United-States
<=50K
4
54
Private
Some-college
10
Divorced
Adm-clerical
Not-in-family
White
Male
0
0
19
United-States
<=50K
5
32
Private
11th
5
Never-married
Craft-repair
Unmarried
White
Male
0
0
40
United-States
<=50K
6
35
Self-emp-not-inc
Bachelors
13
Divorced
Tech-support
Not-in-family
White
Female
0
0
40
United-States
<=50K
7
31
Self-emp-inc
Some-college
10
Married-civ-spouse
Other-service
Husband
White
Male
0
0
25
United-States
<=50K
(Showing first 8 rows)
df.count_rows()
75

JSON#

You can read line-delimited JSON using the read_json method:

# Read a JSON file from your local filesystem
df = daft.read_json("data/sampled-tpch.jsonl")
df.show()
L_COMMENT
Utf8
L_COMMITDATE
Date
L_DISCOUNT
Float64
L_EXTENDEDPRICE
Float64
L_LINENUMBER
Int64
L_LINESTATUS
Utf8
L_ORDERKEY
Int64
L_PARTKEY
Int64
L_QUANTITY
Int64
L_RECEIPTDATE
Date
L_RETURNFLAG
Utf8
L_SHIPDATE
Date
L_SHIPINSTRUCT
Utf8
L_SHIPMODE
Utf8
L_SUPPKEY
Int64
L_TAX
Float64
egular courts above the
1996-02-12
0.04
33203.72
1
O
1
15518935
17
1996-03-22
N
1996-03-13
DELIVER IN PERSON
TRUCK
768951
0.02
ly final dependencies: slyly bold
1996-02-28
0.09
69788.52
2
O
1
6730908
36
1996-04-20
N
1996-04-12
TAKE BACK RETURN
MAIL
730909
0.06
riously. regular, express dep
1996-03-05
0.1
16381.28
3
O
1
6369978
8
1996-01-31
N
1996-01-29
TAKE BACK RETURN
REG AIR
369979
0.02
lites. fluffily even de
1996-03-30
0.09
29767.92
4
O
1
213150
28
1996-05-16
N
1996-04-21
NONE
AIR
463151
0.06
pending foxes. slyly re
1996-03-14
0.1
37596.96
5
O
1
2402664
24
1996-04-01
N
1996-03-30
NONE
FOB
152671
0.04
arefully slyly ex
1996-02-07
0.07
48267.84
6
O
1
1563445
32
1996-02-03
N
1996-01-30
DELIVER IN PERSON
MAIL
63448
0.02
ven requests. deposits breach a
1997-01-14
0
71798.72
1
O
2
10616973
38
1997-02-02
N
1997-01-28
TAKE BACK RETURN
RAIL
116994
0.05
ongside of the furiously brave acco
1994-01-04
0.06
73200.15
1
F
3
429697
45
1994-02-23
R
1994-02-02
NONE
AIR
179698
0
(Showing first 8 rows)

Parquet#

Use the read_parquet method to read Parquet files.

# Read a Parquet file from your local filesystem
df = daft.read_parquet("data/sample_taxi.parquet")
df.show()
VendorID
Int32
tpep_pickup_datetime
Timestamp(Microseconds, None)
tpep_dropoff_datetime
Timestamp(Microseconds, None)
passenger_count
Float64
trip_distance
Float64
RatecodeID
Float64
store_and_fwd_flag
Utf8
PULocationID
Int32
DOLocationID
Int32
payment_type
Int64
fare_amount
Float64
extra
Float64
mta_tax
Float64
tip_amount
Float64
tolls_amount
Float64
improvement_surcharge
Float64
total_amount
Float64
congestion_surcharge
Float64
Airport_fee
Float64
1
2023-12-01 00:06:06
2023-12-01 00:15:47
0
1.1
1
N
230
48
1
10
3.5
0.5
1.5
0
1
16.5
2.5
0
1
2023-12-01 00:22:26
2023-12-01 00:28:53
0
1.5
1
N
142
238
1
9.3
3.5
0.5
2.85
0
1
17.15
2.5
0
1
2023-12-01 00:59:44
2023-12-01 01:13:22
2
2.2
1
N
114
186
1
13.5
3.5
0.5
3
0
1
21.5
2.5
0
2
2023-12-01 00:22:17
2023-12-01 00:30:59
1
0.66
1
N
79
79
2
7.2
1
0.5
0
0
1
12.2
2.5
0
2
2023-12-01 00:18:16
2023-12-01 00:25:32
2
2.2
1
N
229
263
1
11.4
1
0.5
2
0
1
18.4
2.5
0
1
2023-12-01 00:13:17
2023-12-01 00:23:53
0
5.7
1
N
88
141
1
23.3
3.5
0.5
0
0
1
28.3
2.5
0
2
2023-12-01 00:17:09
2023-12-01 00:33:31
1
5.33
1
N
45
162
1
24.7
1
0.5
3
0
1
32.7
2.5
0
2
2023-12-01 00:40:49
2023-12-01 00:44:10
1
0.76
1
N
170
107
1
5.8
1
0.5
1
0
1
11.8
2.5
0
(Showing first 8 rows)

Remote Reads, e.g. S3#

You can read files from remote filesystems such as AWS S3:

# Read multiple Parquet files from s3
df = daft.read_parquet("s3://mybucket/path/to/*.parquet")

These reads can be specified with their corresponding protocols.

Reading from Public Buckets#

You can read from public buckets using an “anonymous” IO Config.

An anonymous IOConfig will access storage without credentials, and can only access fully public data.

# create anonymous config
MY_ANONYMOUS_IO_CONFIG = daft.io.IOConfig(s3=daft.io.S3Config(anonymous=True))

# Read this file using `MY_ANONYMOUS_IO_CONFIG`
df = daft.read_csv("s3://daft-public-data/melbourne-airbnb/melbourne_airbnb.csv", io_config=MY_ANONYMOUS_IO_CONFIG)
df.select("id", "text", "price", "review_scores_rating").show()
id
Int64
text
Utf8
price
Int64
review_scores_rating
Float64
25586695
Beach side, art deco flat in heart of St Kilda
90
None
1057401
Modern Bayside Studio Apartment
75
95
24949385
Spacious Saint kilda home with a View
47
None
20075093
Rewarding Richmond Location-Outstanding Apartment!
220
96
16275657
Close to the centre of Melbourne.
71
99
15136111
Warm Apartment with Gym, Pool, Sauna near MCG
79
96
4122488
St Kilda's Loft Beachside Hideaway
89
93
13721853
Central Melbourne Private Room 3
50
85
(Showing first 8 rows)

Remote IO Configuration#

Use the IOConfig module to configure access to remote data.

Daft will automatically detect S3 credentials from your local environment. If your current session is authenticated with access credentials to your private bucket, then you can access the bucket without explicitly passing credentials.

Substitute the path below with a path to a private bucket you have access to with your credentials.

bucket = "s3://avriiil/yellow_tripdata_2023-12.parquet"

Now configure your IOConfig object:

from daft.io import IOConfig, S3Config

io_config = IOConfig(
    s3=S3Config(
        region_name="eu-north-1",
    )
)
df = daft.read_parquet(bucket, io_config=io_config)
df.show()
VendorID
Int32
tpep_pickup_datetime
Timestamp(Microseconds, None)
tpep_dropoff_datetime
Timestamp(Microseconds, None)
passenger_count
Int64
trip_distance
Float64
RatecodeID
Int64
store_and_fwd_flag
Utf8
PULocationID
Int32
DOLocationID
Int32
payment_type
Int64
fare_amount
Float64
extra
Float64
mta_tax
Float64
tip_amount
Float64
tolls_amount
Float64
improvement_surcharge
Float64
total_amount
Float64
congestion_surcharge
Float64
Airport_fee
Float64
1
2023-12-01 00:06:06
2023-12-01 00:15:47
0
1.1
1
N
230
48
1
10
3.5
0.5
1.5
0
1
16.5
2.5
0
1
2023-12-01 00:22:26
2023-12-01 00:28:53
0
1.5
1
N
142
238
1
9.3
3.5
0.5
2.85
0
1
17.15
2.5
0
1
2023-12-01 00:59:44
2023-12-01 01:13:22
2
2.2
1
N
114
186
1
13.5
3.5
0.5
3
0
1
21.5
2.5
0
2
2023-12-01 00:22:17
2023-12-01 00:30:59
1
0.66
1
N
79
79
2
7.2
1
0.5
0
0
1
12.2
2.5
0
2
2023-12-01 00:18:16
2023-12-01 00:25:32
2
2.2
1
N
229
263
1
11.4
1
0.5
2
0
1
18.4
2.5
0
1
2023-12-01 00:13:17
2023-12-01 00:23:53
0
5.7
1
N
88
141
1
23.3
3.5
0.5
0
0
1
28.3
2.5
0
2
2023-12-01 00:17:09
2023-12-01 00:33:31
1
5.33
1
N
45
162
1
24.7
1
0.5
3
0
1
32.7
2.5
0
2
2023-12-01 00:40:49
2023-12-01 00:44:10
1
0.76
1
N
170
107
1
5.8
1
0.5
1
0
1
11.8
2.5
0
(Showing first 8 rows)

The IOConfig object has many more configuration options. You can use this object to configure access to:

All the cloud-specific Config options follow the standard protocols of the respective cloud platforms. See the documentation links above for more information.

There is a dedicated section for the IOConfig object at the end of this tutorial.

SQL Databases#

You can use Daft to read the results of SQL queries from databases, data warehouses, and query engines, into a Daft DataFrame via the daft.read_sql() function.

# Read from a PostgreSQL database
uri = "postgresql://user:password@host:port/database"
df = daft.read_sql("SELECT * FROM my_table", uri)

In order to partition the data, you can specify a partition column:

# Read with a partition column
df = daft.read_sql("SELECT * FROM my_table", partition_col="date", uri)

Partitioning your data will allow Daft to read the data in parallel. This will make your queries faster.

ConnectorX vs SQLAlchemy#

Daft uses ConnectorX under the hood to read SQL data. ConnectorX is a fast, Rust based SQL connector that reads directly into Arrow Tables, enabling zero-copy transfer into Daft dataframes. If the database is not supported by ConnectorX, Daft will fall back to using SQLAlchemy.

You can also directly provide a SQL alchemy connection via a connection factory. This way, you have the flexibility to provide additional parameters to the engine.

Example#

Let’s look at an example with a simple local database.

You will need some extra dependencies installed. The easiest way to do so is using the [sql] extras:

#!pip install "getdaft[sql]"

create local SQL database from CSV file#

Let’s start by creating a local SQLite database from a CSV file using ConnectorX:

import sqlite3

connection = sqlite3.connect("example.db")
connection.execute("CREATE TABLE IF NOT EXISTS books (title TEXT, author TEXT, year INTEGER)")
connection.execute(
    """
INSERT INTO books (title, author, year)
VALUES
    ('The Great Gatsby', 'F. Scott Fitzgerald', 1925),
    ('To Kill a Mockingbird', 'Harper Lee', 1960),
    ('1984', 'George Orwell', 1949),
    ('The Catcher in the Rye', 'J.D. Salinger', 1951)
"""
)
connection.commit()
connection.close()
# Read SQL query into Daft DataFrame
df = daft.read_sql(
    "SELECT * FROM books",
    "sqlite://example.db",
)

df.show()
title
Utf8
author
Utf8
year
Int64
The Great Gatsby
F. Scott Fitzgerald
1925
To Kill a Mockingbird
Harper Lee
1960
1984
George Orwell
1949
The Catcher in the Rye
J.D. Salinger
1951
(Showing first 4 of 4 rows)

You can also directly provide a SQL alchemy connection via a connection factory. This way, you have the flexibility to provide additional parameters to the engine.

Let’s use a SQL alchemy connection to read a CSV file into a local SQLite database and then query it with Daft:

from sqlalchemy import create_engine

# substitue the uri below with the engine path on your local machine
engine_uri = "sqlite:////Users/rpelgrim/daft_sql"
engine = create_engine(engine_uri, echo=True)
import pandas as pd

csv_file_path = "data/census-01.csv"
df = pd.read_csv(csv_file_path)

sql_df = df.to_sql(name="censustable", con=engine, index=False, index_label="id", if_exists="replace")

Access SQL Database with Daft#

Great, now let’s see how we can access this data with Daft.

# Read from local SQLite database
uri = "sqlite:////Users/rpelgrim/daft_sql"  # replace with your local uri

df = daft.read_sql("SELECT * FROM censustable", uri)
df.show()
age
Int64
workclass
Utf8
education
Utf8
education_num
Int64
marital_status
Utf8
occupation
Utf8
relationship
Utf8
race
Utf8
sex
Utf8
capital_gain
Int64
capital_loss
Int64
hours_per_week
Int64
native_country
Utf8
income
Utf8
48
Self-emp-not-inc
Some-college
10
Married-civ-spouse
Craft-repair
Husband
White
Male
0
0
40
United-States
<=50K
17
?
11th
7
Never-married
?
Own-child
White
Female
0
0
16
United-States
<=50K
21
Private
Some-college
10
Never-married
Handlers-cleaners
Not-in-family
Black
Male
0
0
56
United-States
<=50K
31
Private
Bachelors
13
Married-civ-spouse
Adm-clerical
Husband
Asian-Pac-Islander
Male
0
0
40
OTHER
>50K
21
Private
Some-college
10
Never-married
Sales
Not-in-family
White
Female
0
0
40
United-States
<=50K
43
Local-gov
Bachelors
13
Married-civ-spouse
Adm-clerical
Wife
White
Female
0
0
36
United-States
<=50K
61
Self-emp-not-inc
Prof-school
15
Married-civ-spouse
Exec-managerial
Husband
White
Male
0
1958
40
United-States
>50K
33
Self-emp-not-inc
HS-grad
9
Married-civ-spouse
Farming-fishing
Husband
White
Male
0
0
40
United-States
<=50K
(Showing first 8 rows)

Parallel and Distributed Reads#

Supply a partition column and optionally the number of partitions to enable parallel reads:

df = daft.read_sql(
    "SELECT * FROM censustable",
    uri,
    partition_col="education",
    #    num_partitions=12
)

df.show()
age
Int64
workclass
Utf8
education
Utf8
education_num
Int64
marital_status
Utf8
occupation
Utf8
relationship
Utf8
race
Utf8
sex
Utf8
capital_gain
Int64
capital_loss
Int64
hours_per_week
Int64
native_country
Utf8
income
Utf8
48
Self-emp-not-inc
Some-college
10
Married-civ-spouse
Craft-repair
Husband
White
Male
0
0
40
United-States
<=50K
17
?
11th
7
Never-married
?
Own-child
White
Female
0
0
16
United-States
<=50K
21
Private
Some-college
10
Never-married
Handlers-cleaners
Not-in-family
Black
Male
0
0
56
United-States
<=50K
31
Private
Bachelors
13
Married-civ-spouse
Adm-clerical
Husband
Asian-Pac-Islander
Male
0
0
40
OTHER
>50K
21
Private
Some-college
10
Never-married
Sales
Not-in-family
White
Female
0
0
40
United-States
<=50K
43
Local-gov
Bachelors
13
Married-civ-spouse
Adm-clerical
Wife
White
Female
0
0
36
United-States
<=50K
61
Self-emp-not-inc
Prof-school
15
Married-civ-spouse
Exec-managerial
Husband
White
Male
0
1958
40
United-States
>50K
33
Self-emp-not-inc
HS-grad
9
Married-civ-spouse
Farming-fishing
Husband
White
Male
0
0
40
United-States
<=50K
(Showing first 8 rows)

Data Skipping Optimizations#

Filter, projection, and limit pushdown optimizations can be used to reduce the amount of data read from the database.

In the example below, Daft reads the top ranked terms from the BigQuery Google Trends dataset. The where and select expressions in this example will be pushed down into the SQL query itself, we can see this by calling the df.explain() method:

import daft, sqlalchemy, datetime

def create_conn():
    engine = sqlalchemy.create_engine(
        "bigquery://", credentials_path="path/to/service_account_credentials.json"
    )
    return engine.connect()


df = daft.read_sql("SELECT * FROM `bigquery-public-data.google_trends.top_terms`", create_conn)

df = df.where((df["refresh_date"] >= datetime.date(2024, 4, 1)) & (df["refresh_date"] < datetime.date(2024, 4, 8)))
df = df.where(df["rank"] == 1)
df = df.select(df["refresh_date"].alias("Day"), df["term"].alias("Top Search Term"), df["rank"])
df = df.distinct()
df = df.sort(df["Day"], desc=True)

df.explain(show_all=True)

# Output
# ..
# == Physical Plan ==
# ..
# |   SQL Query = SELECT refresh_date, term, rank FROM
#  (SELECT * FROM `bigquery-public-data.google_trends.top_terms`)
#  AS subquery WHERE rank = 1 AND refresh_date >= CAST('2024-04-01' AS DATE)
#  AND refresh_date < CAST('2024-04-08' AS DATE)

You could code the SQL query to add the filters and projections yourself, but this may become lengthy and error-prone, particularly with many expressions. Daft automatically handles these performance optimizations for you.

Data Catalogs#

Daft is built for efficient data access from Data Catalogs using open table formats like Delta Lake, Iceberg and Hudi.

Delta Lake#

You can easily read Delta Lake tables using the read_deltalake() method.

df = daft.read_deltalake("data/delta_table")
df.show()
VendorID
Int32
tpep_pickup_datetime
Timestamp(Microseconds, None)
tpep_dropoff_datetime
Timestamp(Microseconds, None)
passenger_count
Float64
trip_distance
Float64
RatecodeID
Float64
store_and_fwd_flag
Utf8
PULocationID
Int32
DOLocationID
Int32
payment_type
Int64
fare_amount
Float64
extra
Float64
mta_tax
Float64
tip_amount
Float64
tolls_amount
Float64
improvement_surcharge
Float64
total_amount
Float64
congestion_surcharge
Float64
Airport_fee
Float64
1
2023-12-01 00:06:06
2023-12-01 00:15:47
0
1.1
1
N
230
48
1
10
3.5
0.5
1.5
0
1
16.5
2.5
0
1
2023-12-01 00:22:26
2023-12-01 00:28:53
0
1.5
1
N
142
238
1
9.3
3.5
0.5
2.85
0
1
17.15
2.5
0
1
2023-12-01 00:59:44
2023-12-01 01:13:22
2
2.2
1
N
114
186
1
13.5
3.5
0.5
3
0
1
21.5
2.5
0
2
2023-12-01 00:22:17
2023-12-01 00:30:59
1
0.66
1
N
79
79
2
7.2
1
0.5
0
0
1
12.2
2.5
0
2
2023-12-01 00:18:16
2023-12-01 00:25:32
2
2.2
1
N
229
263
1
11.4
1
0.5
2
0
1
18.4
2.5
0
1
2023-12-01 00:13:17
2023-12-01 00:23:53
0
5.7
1
N
88
141
1
23.3
3.5
0.5
0
0
1
28.3
2.5
0
2
2023-12-01 00:17:09
2023-12-01 00:33:31
1
5.33
1
N
45
162
1
24.7
1
0.5
3
0
1
32.7
2.5
0
2
2023-12-01 00:40:49
2023-12-01 00:44:10
1
0.76
1
N
170
107
1
5.8
1
0.5
1
0
1
11.8
2.5
0
(Showing first 8 rows)

To access Delta tables on S3 you will have to pass some more config options:

  • the AWS Region name

  • your access credentials

import boto3

session = boto3.session.Session()
creds = session.get_credentials()

# set io configs
io_config = daft.io.IOConfig(
    s3=daft.io.S3Config(
        region_name="eu-north-1",
        key_id=creds.access_key,
        access_key=creds.secret_key,
    )
)

# Read Delta Lake table in S3 into a Daft DataFrame.
table_uri = "s3://avriiil/delta-test-daft/"

df = daft.read_deltalake(table_uri, io_config=io_config)
df.show()
x
Int64
1
2
3
(Showing first 3 of 3 rows)

Iceberg#

Daft is integrated with PyIceberg, the official Python implementation for Apache Iceberg.

This means you can easily read Iceberg tables into Daft DataFrames in 2 steps:

  1. Load your Iceberg table from your Iceberg catalog using PyIceberg

  2. Read your Iceberg table into Daft

We’ll use a simple local SQLite Catalog implementation for this toy example.

# initialize your catalog

from pyiceberg.catalog.sql import SqlCatalog

warehouse_path = "data/iceberg-warehouse/"
catalog = SqlCatalog(
    "default",
    **{
        "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
        "warehouse": f"file://{warehouse_path}",
    },
)
# load your table
table = catalog.load_table("default.taxi_dataset")
# Read into Daft
df = daft.read_iceberg(table)
df.show()
VendorID
Int64
tpep_pickup_datetime
Timestamp(Microseconds, None)
tpep_dropoff_datetime
Timestamp(Microseconds, None)
passenger_count
Float64
trip_distance
Float64
RatecodeID
Float64
store_and_fwd_flag
Utf8
PULocationID
Int64
DOLocationID
Int64
payment_type
Int64
fare_amount
Float64
extra
Float64
mta_tax
Float64
tip_amount
Float64
tolls_amount
Float64
improvement_surcharge
Float64
total_amount
Float64
congestion_surcharge
Float64
airport_fee
Float64
2
2023-01-01 00:32:10
2023-01-01 00:40:36
1
0.97
1
N
161
141
2
9.3
1
0.5
0
0
1
14.3
2.5
0
2
2023-01-01 00:55:08
2023-01-01 01:01:27
1
1.1
1
N
43
237
1
7.9
1
0.5
4
0
1
16.9
2.5
0
2
2023-01-01 00:25:04
2023-01-01 00:37:49
1
2.51
1
N
48
238
1
14.9
1
0.5
15
0
1
34.9
2.5
0
1
2023-01-01 00:03:48
2023-01-01 00:13:25
0
1.9
1
N
138
7
1
12.1
7.25
0.5
0
0
1
20.85
0
1.25
2
2023-01-01 00:10:29
2023-01-01 00:21:19
1
1.43
1
N
107
79
1
11.4
1
0.5
3.28
0
1
19.68
2.5
0
2
2023-01-01 00:50:34
2023-01-01 01:02:52
1
1.84
1
N
161
137
1
12.8
1
0.5
10
0
1
27.8
2.5
0
2
2023-01-01 00:09:22
2023-01-01 00:19:49
1
1.66
1
N
239
143
1
12.1
1
0.5
3.42
0
1
20.52
2.5
0
2
2023-01-01 00:27:12
2023-01-01 00:49:56
1
11.7
1
N
142
200
1
45.7
1
0.5
10.74
3
1
64.44
2.5
0
(Showing first 8 rows)

Any subsequent filter operations on the Daft df DataFrame object will be correctly optimized to take advantage of Iceberg features such as hidden partitioning and file-level statistics for efficient reads.

Hudi#

To read from an Apache Hudi table, use the daft.read_hudi() function.

The following is an example snippet of loading an example table

# Read Apache Hudi table into a Daft DataFrame.
import daft

df = daft.read_hudi("data/hudi-data")
df.show()
_hoodie_commit_time
Utf8
_hoodie_commit_seqno
Utf8
_hoodie_record_key
Utf8
_hoodie_partition_path
Utf8
_hoodie_file_name
Utf8
id
Int32
name
Utf8
isActive
Boolean
intField
Int32
longField
Int64
floatField
Float32
doubleField
Float64
decimalField
Decimal128(10, 5)
dateField
Date
timestampField
Timestamp(Microseconds, Some("UTC"))
binaryField
Binary
arrayField
List[Struct[arr_struct_f1: Utf8, arr_struct_f2: Int32]]
mapField
Map[Struct[key: Utf8, value: Struct[map_field_value_struct_f1: Float64, map_field_value_struct_f2: Boolean]]]
structField
Struct[field1: Utf8, field2: Int32, child_struct: Struct[child_field1: Float64, child_field2: Boolean]]
byteField
Int32
shortField
Int32
20240418173235694
20240418173235694_0_0
id:1,name:Alice
byteField=10/shortField=300
a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet
1
Alice
false
15000
1234567890
1
3.14159
12345.67890
2023-04-01
2023-04-01 17:01:00 +00:00
b"binary data"
[{arr_struct_f1: red,
arr_struct_f2: 100,
}, {arr_struct_f1: blue,
arr_struct_f2: 200,
}, {arr_struct_f1: green,
arr_struct_f2: 300,
}]
[{key: key1,
value: {map_field_value_struct_f1: 123.456,
map_field_value_struct_f2: true,
},
}, {key: key2,
value: {map_field_value_struct_f1: 789.012,
map_field_value_struct_f2: false,
},
}]
{field1: Alice,
field2: 30,
child_struct: {child_field1: 123.456,
child_field2: true,
},
}
10
300
20240418173213674
20240418173213674_1_1
id:3,name:Carol
byteField=10/shortField=300
a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet
3
Carol
true
35000
1928374650
3
1.41421
11111.22222
2023-04-03
2023-04-03 19:03:00 +00:00
b"even more binary data"
[{arr_struct_f1: black,
arr_struct_f2: 600,
}, {arr_struct_f1: white,
arr_struct_f2: 700,
}, {arr_struct_f1: pink,
arr_struct_f2: 800,
}]
[{key: key5,
value: {map_field_value_struct_f1: 345.678,
map_field_value_struct_f2: true,
},
}, {key: key6,
value: {map_field_value_struct_f1: 654.321,
map_field_value_struct_f2: false,
},
}]
{field1: Carol,
field2: 25,
child_struct: {child_field1: 456.789,
child_field2: true,
},
}
10
300
20240418173213674
20240418173213674_0_0
id:2,name:Bob
byteField=20/shortField=100
bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet
2
Bob
false
25000
9876543210
2
2.71828
67890.12345
2023-04-02
2023-04-02 18:02:00 +00:00
b"more binary data"
[{arr_struct_f1: yellow,
arr_struct_f2: 400,
}, {arr_struct_f1: purple,
arr_struct_f2: 500,
}]
[{key: key3,
value: {map_field_value_struct_f1: 234.567,
map_field_value_struct_f2: true,
},
}, {key: key4,
value: {map_field_value_struct_f1: 567.89,
map_field_value_struct_f2: false,
},
}]
{field1: Bob,
field2: 40,
child_struct: {child_field1: 789.012,
child_field2: false,
},
}
20
100
20240418173235694
20240418173235694_1_0
id:4,name:Diana
byteField=30/shortField=100
4668e35e-bff8-4be9-9ff2-e7fb17ecb1a7-0_1-161-224_20240418173235694.parquet
4
Diana
true
45000
987654321
4
2.468
65432.12345
2023-04-04
2023-04-04 20:04:00 +00:00
b"new binary data"
[{arr_struct_f1: orange,
arr_struct_f2: 900,
}, {arr_struct_f1: gray,
arr_struct_f2: 1000,
}]
[{key: key7,
value: {map_field_value_struct_f1: 456.789,
map_field_value_struct_f2: true,
},
}, {key: key8,
value: {map_field_value_struct_f1: 123.456,
map_field_value_struct_f2: false,
},
}]
{field1: Diana,
field2: 50,
child_struct: {child_field1: 987.654,
child_field2: true,
},
}
30
100
(Showing first 4 of 4 rows)

Currently there are limitations of reading Hudi tables:

  • Only support snapshot read of Copy-on-Write tables

  • Only support reading table version 5 & 6 (tables created using release 0.12.x - 0.15.x)

  • Table must not have hoodie.datasource.write.drop.partition.columns=true

IOConfig Deep Dive#

Let’s dive a little deeper into the IOConfig options for tweaking your remote data access.

IOConfig is Daft’s mechanism for controlling the behavior of data input/output from storage. It is useful for:

  1. Providing credentials for authenticating with cloud storage services

  2. Tuning performance or reducing load on storage services

Default IOConfig Behavior#

The default behavior for IOConfig is to automatically detect credentials on your machines.

import daft

# By default, calls to AWS S3 will use credentials retrieved from the machine(s) that they are called from
#
# For AWS S3 services, the default mechanism is to look through a chain of possible "providers":
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html#configuring-credentials
df = daft.read_csv("s3://daft-public-data/file.csv")
df.collect()

Overriding the IOConfig#

Setting a Global Override#

Often you may want Daft to just use a certain configuration by default whenever it has to access storage such as S3, GCS or Azure Blob Store.

Example:

An extremely common use-case is to create a set of temporary credentials once, and share that across all calls to data access happening in Daft.

The example below demonstrates this with AWS S3’s boto3 Python SDK.

# Use the boto3 library to generate temporary credentials which can be used for S3 access
import boto3

session = boto3.session.Session()
creds = session.get_credentials()

# Attach temporary credentials to a Daft IOConfig object
MY_IO_CONFIG = daft.io.IOConfig(
    s3=daft.io.S3Config(
        key_id=creds.access_key,
        access_key=creds.secret_key,
        session_token=creds.token,
    )
)

# Set the default config to `MY_IO_CONFIG` so that it is used in the absence of any overrides
daft.set_planning_config(default_io_config=MY_IO_CONFIG)

Overriding IOConfigs per-API call#

Daft also allows for more granular per-call overrides through the use of keyword arguments.

This is extremely flexible, allowing you to use a different set of credentials to read from two different locations!

Here we use daft.read_csv as an example, but the same io_config=... keyword arg also exists for other I/O related functionality such as:

  1. daft.read_parquet

  2. daft.read_json

  3. Expression.url.download()

# An "Anonymous" IOConfig will access storage **without credentials**, and can only access fully public data
MY_ANONYMOUS_IO_CONFIG = daft.io.IOConfig(s3=daft.io.S3Config(anonymous=True))

# Read this file using `MY_ANONYMOUS_IO_CONFIG` instead of the overridden global config `MY_IO_CONFIG`
df1 = daft.read_csv("s3://daft-public-data/melbourne-airbnb/melbourne_airbnb.csv", io_config=MY_ANONYMOUS_IO_CONFIG)

For more see: IOConfig Documentation

Data Access with Daft#

In this tutorial you have seen the canonical ways of accessing data with Daft.

We’ve seen how to access:

  • local files, incl. JSON, CSV and Parquet

  • data in SQL databases

  • data in Data Catalogs, incl. Delta Lake, Iceberg and Hudi

Take a look at our hands-on Use Case tutorials if you feel ready to start building workflows with Daft.