%pip install getdaft
CI = False
# Skip this notebook execution in CI because it hits data in a relative path
if CI:
import sys
sys.exit()
#000 - Data Access#
This Feature-of-the-Week tutorial shows the canonical way of accessing data with Daft.
Daft reads from 3 main data sources:
Files (local and remote)
SQL Databases
Data Catalogs
Let’s dive into each type of data access in more detail 🪂
import daft
Files#
You can read many different file types with Daft.
The most common file formats are:
CSV
JSON
Parquet
You can read these file types from local and remote filesystems.
CSV#
Use the read_csv
method to read CSV files from your local filesystem.
Here, we’ll read in some synthetic US Census data:
# Read a single CSV file from your local filesystem
df = daft.read_csv("data/census001.csv")
df.show()
Int64 | age Int64 | workclass Utf8 | education Utf8 | education_num Int64 | marital_status Utf8 | occupation Utf8 | relationship Utf8 | race Utf8 | sex Utf8 | capital_gain Int64 | capital_loss Int64 | hours_per_week Int64 | native_country Utf8 | income Utf8 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 48 | Self-emp-not-inc | Some-college | 10 | Married-civ-spouse | Craft-repair | Husband | White | Male | 0 | 0 | 40 | United-States | <=50K |
1 | 17 | ? | 11th | 7 | Never-married | ? | Own-child | White | Female | 0 | 0 | 16 | United-States | <=50K |
2 | 21 | Private | Some-college | 10 | Never-married | Handlers-cleaners | Not-in-family | Black | Male | 0 | 0 | 56 | United-States | <=50K |
3 | 31 | Private | Bachelors | 13 | Married-civ-spouse | Adm-clerical | Husband | Asian-Pac-Islander | Male | 0 | 0 | 40 | OTHER | >50K |
4 | 21 | Private | Some-college | 10 | Never-married | Sales | Not-in-family | White | Female | 0 | 0 | 40 | United-States | <=50K |
5 | 43 | Local-gov | Bachelors | 13 | Married-civ-spouse | Adm-clerical | Wife | White | Female | 0 | 0 | 36 | United-States | <=50K |
6 | 61 | Self-emp-not-inc | Prof-school | 15 | Married-civ-spouse | Exec-managerial | Husband | White | Male | 0 | 1958 | 40 | United-States | >50K |
7 | 33 | Self-emp-not-inc | HS-grad | 9 | Married-civ-spouse | Farming-fishing | Husband | White | Male | 0 | 0 | 40 | United-States | <=50K |
df.count_rows()
25
You can also read folders of CSV files or include wildcards to select for patterns of file paths.
These files will have to follow the same schema.
Here, we’ll read in multiple CSV files containing synthetic US Census data:
# Read multiple CSV files into one DataFrame
df = daft.read_csv("data/census*.csv")
df.show()
Int64 | age Int64 | workclass Utf8 | education Utf8 | education_num Int64 | marital_status Utf8 | occupation Utf8 | relationship Utf8 | race Utf8 | sex Utf8 | capital_gain Int64 | capital_loss Int64 | hours_per_week Int64 | native_country Utf8 | income Utf8 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 28 | ? | HS-grad | 9 | Never-married | ? | Other-relative | Asian-Pac-Islander | Male | 0 | 0 | 45 | OTHER | <=50K |
1 | 35 | Private | 7th-8th | 4 | Never-married | Other-service | Unmarried | White | Female | 0 | 0 | 25 | United-States | <=50K |
2 | 37 | Federal-gov | HS-grad | 9 | Married-civ-spouse | Transport-moving | Husband | White | Male | 0 | 0 | 40 | OTHER | <=50K |
3 | 41 | Self-emp-not-inc | Bachelors | 13 | Never-married | Prof-specialty | Not-in-family | Black | Male | 0 | 0 | 40 | United-States | <=50K |
4 | 54 | Private | Some-college | 10 | Divorced | Adm-clerical | Not-in-family | White | Male | 0 | 0 | 19 | United-States | <=50K |
5 | 32 | Private | 11th | 5 | Never-married | Craft-repair | Unmarried | White | Male | 0 | 0 | 40 | United-States | <=50K |
6 | 35 | Self-emp-not-inc | Bachelors | 13 | Divorced | Tech-support | Not-in-family | White | Female | 0 | 0 | 40 | United-States | <=50K |
7 | 31 | Self-emp-inc | Some-college | 10 | Married-civ-spouse | Other-service | Husband | White | Male | 0 | 0 | 25 | United-States | <=50K |
df.count_rows()
75
JSON#
You can read line-delimited JSON using the read_json
method:
# Read a JSON file from your local filesystem
df = daft.read_json("data/sampled-tpch.jsonl")
df.show()
L_COMMENT Utf8 | L_COMMITDATE Date | L_DISCOUNT Float64 | L_EXTENDEDPRICE Float64 | L_LINENUMBER Int64 | L_LINESTATUS Utf8 | L_ORDERKEY Int64 | L_PARTKEY Int64 | L_QUANTITY Int64 | L_RECEIPTDATE Date | L_RETURNFLAG Utf8 | L_SHIPDATE Date | L_SHIPINSTRUCT Utf8 | L_SHIPMODE Utf8 | L_SUPPKEY Int64 | L_TAX Float64 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
egular courts above the | 1996-02-12 | 0.04 | 33203.72 | 1 | O | 1 | 15518935 | 17 | 1996-03-22 | N | 1996-03-13 | DELIVER IN PERSON | TRUCK | 768951 | 0.02 |
ly final dependencies: slyly bold | 1996-02-28 | 0.09 | 69788.52 | 2 | O | 1 | 6730908 | 36 | 1996-04-20 | N | 1996-04-12 | TAKE BACK RETURN | MAIL | 730909 | 0.06 |
riously. regular, express dep | 1996-03-05 | 0.1 | 16381.28 | 3 | O | 1 | 6369978 | 8 | 1996-01-31 | N | 1996-01-29 | TAKE BACK RETURN | REG AIR | 369979 | 0.02 |
lites. fluffily even de | 1996-03-30 | 0.09 | 29767.92 | 4 | O | 1 | 213150 | 28 | 1996-05-16 | N | 1996-04-21 | NONE | AIR | 463151 | 0.06 |
pending foxes. slyly re | 1996-03-14 | 0.1 | 37596.96 | 5 | O | 1 | 2402664 | 24 | 1996-04-01 | N | 1996-03-30 | NONE | FOB | 152671 | 0.04 |
arefully slyly ex | 1996-02-07 | 0.07 | 48267.84 | 6 | O | 1 | 1563445 | 32 | 1996-02-03 | N | 1996-01-30 | DELIVER IN PERSON | MAIL | 63448 | 0.02 |
ven requests. deposits breach a | 1997-01-14 | 0 | 71798.72 | 1 | O | 2 | 10616973 | 38 | 1997-02-02 | N | 1997-01-28 | TAKE BACK RETURN | RAIL | 116994 | 0.05 |
ongside of the furiously brave acco | 1994-01-04 | 0.06 | 73200.15 | 1 | F | 3 | 429697 | 45 | 1994-02-23 | R | 1994-02-02 | NONE | AIR | 179698 | 0 |
Parquet#
Use the read_parquet
method to read Parquet files.
# Read a Parquet file from your local filesystem
df = daft.read_parquet("data/sample_taxi.parquet")
df.show()
VendorID Int32 | tpep_pickup_datetime Timestamp(Microseconds, None) | tpep_dropoff_datetime Timestamp(Microseconds, None) | passenger_count Float64 | trip_distance Float64 | RatecodeID Float64 | store_and_fwd_flag Utf8 | PULocationID Int32 | DOLocationID Int32 | payment_type Int64 | fare_amount Float64 | extra Float64 | mta_tax Float64 | tip_amount Float64 | tolls_amount Float64 | improvement_surcharge Float64 | total_amount Float64 | congestion_surcharge Float64 | Airport_fee Float64 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1 | 2023-12-01 00:06:06 | 2023-12-01 00:15:47 | 0 | 1.1 | 1 | N | 230 | 48 | 1 | 10 | 3.5 | 0.5 | 1.5 | 0 | 1 | 16.5 | 2.5 | 0 |
1 | 2023-12-01 00:22:26 | 2023-12-01 00:28:53 | 0 | 1.5 | 1 | N | 142 | 238 | 1 | 9.3 | 3.5 | 0.5 | 2.85 | 0 | 1 | 17.15 | 2.5 | 0 |
1 | 2023-12-01 00:59:44 | 2023-12-01 01:13:22 | 2 | 2.2 | 1 | N | 114 | 186 | 1 | 13.5 | 3.5 | 0.5 | 3 | 0 | 1 | 21.5 | 2.5 | 0 |
2 | 2023-12-01 00:22:17 | 2023-12-01 00:30:59 | 1 | 0.66 | 1 | N | 79 | 79 | 2 | 7.2 | 1 | 0.5 | 0 | 0 | 1 | 12.2 | 2.5 | 0 |
2 | 2023-12-01 00:18:16 | 2023-12-01 00:25:32 | 2 | 2.2 | 1 | N | 229 | 263 | 1 | 11.4 | 1 | 0.5 | 2 | 0 | 1 | 18.4 | 2.5 | 0 |
1 | 2023-12-01 00:13:17 | 2023-12-01 00:23:53 | 0 | 5.7 | 1 | N | 88 | 141 | 1 | 23.3 | 3.5 | 0.5 | 0 | 0 | 1 | 28.3 | 2.5 | 0 |
2 | 2023-12-01 00:17:09 | 2023-12-01 00:33:31 | 1 | 5.33 | 1 | N | 45 | 162 | 1 | 24.7 | 1 | 0.5 | 3 | 0 | 1 | 32.7 | 2.5 | 0 |
2 | 2023-12-01 00:40:49 | 2023-12-01 00:44:10 | 1 | 0.76 | 1 | N | 170 | 107 | 1 | 5.8 | 1 | 0.5 | 1 | 0 | 1 | 11.8 | 2.5 | 0 |
Remote Reads, e.g. S3#
You can read files from remote filesystems such as AWS S3:
# Read multiple Parquet files from s3
df = daft.read_parquet("s3://mybucket/path/to/*.parquet")
These reads can be specified with their corresponding protocols.
Reading from Public Buckets#
You can read from public buckets using an “anonymous” IO Config.
An anonymous IOConfig will access storage without credentials, and can only access fully public data.
# create anonymous config
MY_ANONYMOUS_IO_CONFIG = daft.io.IOConfig(s3=daft.io.S3Config(anonymous=True))
# Read this file using `MY_ANONYMOUS_IO_CONFIG`
df = daft.read_csv("s3://daft-public-data/melbourne-airbnb/melbourne_airbnb.csv", io_config=MY_ANONYMOUS_IO_CONFIG)
df.select("id", "text", "price", "review_scores_rating").show()
id Int64 | text Utf8 | price Int64 | review_scores_rating Float64 |
---|---|---|---|
25586695 | Beach side, art deco flat in heart of St Kilda | 90 | None |
1057401 | Modern Bayside Studio Apartment | 75 | 95 |
24949385 | Spacious Saint kilda home with a View | 47 | None |
20075093 | Rewarding Richmond Location-Outstanding Apartment! | 220 | 96 |
16275657 | Close to the centre of Melbourne. | 71 | 99 |
15136111 | Warm Apartment with Gym, Pool, Sauna near MCG | 79 | 96 |
4122488 | St Kilda's Loft Beachside Hideaway | 89 | 93 |
13721853 | Central Melbourne Private Room 3 | 50 | 85 |
Remote IO Configuration#
Use the IOConfig
module to configure access to remote data.
Daft will automatically detect S3 credentials from your local environment. If your current session is authenticated with access credentials to your private bucket, then you can access the bucket without explicitly passing credentials.
Substitute the path below with a path to a private bucket you have access to with your credentials.
bucket = "s3://avriiil/yellow_tripdata_2023-12.parquet"
Now configure your IOConfig object:
from daft.io import IOConfig, S3Config
io_config = IOConfig(
s3=S3Config(
region_name="eu-north-1",
)
)
df = daft.read_parquet(bucket, io_config=io_config)
df.show()
VendorID Int32 | tpep_pickup_datetime Timestamp(Microseconds, None) | tpep_dropoff_datetime Timestamp(Microseconds, None) | passenger_count Int64 | trip_distance Float64 | RatecodeID Int64 | store_and_fwd_flag Utf8 | PULocationID Int32 | DOLocationID Int32 | payment_type Int64 | fare_amount Float64 | extra Float64 | mta_tax Float64 | tip_amount Float64 | tolls_amount Float64 | improvement_surcharge Float64 | total_amount Float64 | congestion_surcharge Float64 | Airport_fee Float64 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1 | 2023-12-01 00:06:06 | 2023-12-01 00:15:47 | 0 | 1.1 | 1 | N | 230 | 48 | 1 | 10 | 3.5 | 0.5 | 1.5 | 0 | 1 | 16.5 | 2.5 | 0 |
1 | 2023-12-01 00:22:26 | 2023-12-01 00:28:53 | 0 | 1.5 | 1 | N | 142 | 238 | 1 | 9.3 | 3.5 | 0.5 | 2.85 | 0 | 1 | 17.15 | 2.5 | 0 |
1 | 2023-12-01 00:59:44 | 2023-12-01 01:13:22 | 2 | 2.2 | 1 | N | 114 | 186 | 1 | 13.5 | 3.5 | 0.5 | 3 | 0 | 1 | 21.5 | 2.5 | 0 |
2 | 2023-12-01 00:22:17 | 2023-12-01 00:30:59 | 1 | 0.66 | 1 | N | 79 | 79 | 2 | 7.2 | 1 | 0.5 | 0 | 0 | 1 | 12.2 | 2.5 | 0 |
2 | 2023-12-01 00:18:16 | 2023-12-01 00:25:32 | 2 | 2.2 | 1 | N | 229 | 263 | 1 | 11.4 | 1 | 0.5 | 2 | 0 | 1 | 18.4 | 2.5 | 0 |
1 | 2023-12-01 00:13:17 | 2023-12-01 00:23:53 | 0 | 5.7 | 1 | N | 88 | 141 | 1 | 23.3 | 3.5 | 0.5 | 0 | 0 | 1 | 28.3 | 2.5 | 0 |
2 | 2023-12-01 00:17:09 | 2023-12-01 00:33:31 | 1 | 5.33 | 1 | N | 45 | 162 | 1 | 24.7 | 1 | 0.5 | 3 | 0 | 1 | 32.7 | 2.5 | 0 |
2 | 2023-12-01 00:40:49 | 2023-12-01 00:44:10 | 1 | 0.76 | 1 | N | 170 | 107 | 1 | 5.8 | 1 | 0.5 | 1 | 0 | 1 | 11.8 | 2.5 | 0 |
The IOConfig object has many more configuration options. You can use this object to configure access to:
All the cloud-specific Config options follow the standard protocols of the respective cloud platforms. See the documentation links above for more information.
There is a dedicated section for the IOConfig
object at the end of this tutorial.
SQL Databases#
You can use Daft to read the results of SQL queries from databases, data warehouses, and query engines, into a Daft DataFrame via the daft.read_sql()
function.
# Read from a PostgreSQL database
uri = "postgresql://user:password@host:port/database"
df = daft.read_sql("SELECT * FROM my_table", uri)
In order to partition the data, you can specify a partition column:
# Read with a partition column
df = daft.read_sql("SELECT * FROM my_table", partition_col="date", uri)
Partitioning your data will allow Daft to read the data in parallel. This will make your queries faster.
ConnectorX vs SQLAlchemy#
Daft uses ConnectorX under the hood to read SQL data. ConnectorX is a fast, Rust based SQL connector that reads directly into Arrow Tables, enabling zero-copy transfer into Daft dataframes. If the database is not supported by ConnectorX, Daft will fall back to using SQLAlchemy.
You can also directly provide a SQL alchemy connection via a connection factory. This way, you have the flexibility to provide additional parameters to the engine.
Example#
Let’s look at an example with a simple local database.
You will need some extra dependencies installed. The easiest way to do so is using the [sql]
extras:
#!pip install "getdaft[sql]"
create local SQL database from CSV file#
Let’s start by creating a local SQLite database from a CSV file using ConnectorX:
import sqlite3
connection = sqlite3.connect("example.db")
connection.execute("CREATE TABLE IF NOT EXISTS books (title TEXT, author TEXT, year INTEGER)")
connection.execute(
"""
INSERT INTO books (title, author, year)
VALUES
('The Great Gatsby', 'F. Scott Fitzgerald', 1925),
('To Kill a Mockingbird', 'Harper Lee', 1960),
('1984', 'George Orwell', 1949),
('The Catcher in the Rye', 'J.D. Salinger', 1951)
"""
)
connection.commit()
connection.close()
# Read SQL query into Daft DataFrame
df = daft.read_sql(
"SELECT * FROM books",
"sqlite://example.db",
)
df.show()
title Utf8 | author Utf8 | year Int64 |
---|---|---|
The Great Gatsby | F. Scott Fitzgerald | 1925 |
To Kill a Mockingbird | Harper Lee | 1960 |
1984 | George Orwell | 1949 |
The Catcher in the Rye | J.D. Salinger | 1951 |
You can also directly provide a SQL alchemy connection via a connection factory. This way, you have the flexibility to provide additional parameters to the engine.
Let’s use a SQL alchemy connection to read a CSV file into a local SQLite database and then query it with Daft:
from sqlalchemy import create_engine
# substitue the uri below with the engine path on your local machine
engine_uri = "sqlite:////Users/rpelgrim/daft_sql"
engine = create_engine(engine_uri, echo=True)
import pandas as pd
csv_file_path = "data/census-01.csv"
df = pd.read_csv(csv_file_path)
sql_df = df.to_sql(name="censustable", con=engine, index=False, index_label="id", if_exists="replace")
Access SQL Database with Daft#
Great, now let’s see how we can access this data with Daft.
# Read from local SQLite database
uri = "sqlite:////Users/rpelgrim/daft_sql" # replace with your local uri
df = daft.read_sql("SELECT * FROM censustable", uri)
df.show()
age Int64 | workclass Utf8 | education Utf8 | education_num Int64 | marital_status Utf8 | occupation Utf8 | relationship Utf8 | race Utf8 | sex Utf8 | capital_gain Int64 | capital_loss Int64 | hours_per_week Int64 | native_country Utf8 | income Utf8 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
48 | Self-emp-not-inc | Some-college | 10 | Married-civ-spouse | Craft-repair | Husband | White | Male | 0 | 0 | 40 | United-States | <=50K |
17 | ? | 11th | 7 | Never-married | ? | Own-child | White | Female | 0 | 0 | 16 | United-States | <=50K |
21 | Private | Some-college | 10 | Never-married | Handlers-cleaners | Not-in-family | Black | Male | 0 | 0 | 56 | United-States | <=50K |
31 | Private | Bachelors | 13 | Married-civ-spouse | Adm-clerical | Husband | Asian-Pac-Islander | Male | 0 | 0 | 40 | OTHER | >50K |
21 | Private | Some-college | 10 | Never-married | Sales | Not-in-family | White | Female | 0 | 0 | 40 | United-States | <=50K |
43 | Local-gov | Bachelors | 13 | Married-civ-spouse | Adm-clerical | Wife | White | Female | 0 | 0 | 36 | United-States | <=50K |
61 | Self-emp-not-inc | Prof-school | 15 | Married-civ-spouse | Exec-managerial | Husband | White | Male | 0 | 1958 | 40 | United-States | >50K |
33 | Self-emp-not-inc | HS-grad | 9 | Married-civ-spouse | Farming-fishing | Husband | White | Male | 0 | 0 | 40 | United-States | <=50K |
Parallel and Distributed Reads#
Supply a partition column and optionally the number of partitions to enable parallel reads:
df = daft.read_sql(
"SELECT * FROM censustable",
uri,
partition_col="education",
# num_partitions=12
)
df.show()
age Int64 | workclass Utf8 | education Utf8 | education_num Int64 | marital_status Utf8 | occupation Utf8 | relationship Utf8 | race Utf8 | sex Utf8 | capital_gain Int64 | capital_loss Int64 | hours_per_week Int64 | native_country Utf8 | income Utf8 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
48 | Self-emp-not-inc | Some-college | 10 | Married-civ-spouse | Craft-repair | Husband | White | Male | 0 | 0 | 40 | United-States | <=50K |
17 | ? | 11th | 7 | Never-married | ? | Own-child | White | Female | 0 | 0 | 16 | United-States | <=50K |
21 | Private | Some-college | 10 | Never-married | Handlers-cleaners | Not-in-family | Black | Male | 0 | 0 | 56 | United-States | <=50K |
31 | Private | Bachelors | 13 | Married-civ-spouse | Adm-clerical | Husband | Asian-Pac-Islander | Male | 0 | 0 | 40 | OTHER | >50K |
21 | Private | Some-college | 10 | Never-married | Sales | Not-in-family | White | Female | 0 | 0 | 40 | United-States | <=50K |
43 | Local-gov | Bachelors | 13 | Married-civ-spouse | Adm-clerical | Wife | White | Female | 0 | 0 | 36 | United-States | <=50K |
61 | Self-emp-not-inc | Prof-school | 15 | Married-civ-spouse | Exec-managerial | Husband | White | Male | 0 | 1958 | 40 | United-States | >50K |
33 | Self-emp-not-inc | HS-grad | 9 | Married-civ-spouse | Farming-fishing | Husband | White | Male | 0 | 0 | 40 | United-States | <=50K |
Data Skipping Optimizations#
Filter, projection, and limit pushdown optimizations can be used to reduce the amount of data read from the database.
In the example below, Daft reads the top ranked terms from the BigQuery Google Trends dataset. The where and select expressions in this example will be pushed down into the SQL query itself, we can see this by calling the df.explain()
method:
import daft, sqlalchemy, datetime
def create_conn():
engine = sqlalchemy.create_engine(
"bigquery://", credentials_path="path/to/service_account_credentials.json"
)
return engine.connect()
df = daft.read_sql("SELECT * FROM `bigquery-public-data.google_trends.top_terms`", create_conn)
df = df.where((df["refresh_date"] >= datetime.date(2024, 4, 1)) & (df["refresh_date"] < datetime.date(2024, 4, 8)))
df = df.where(df["rank"] == 1)
df = df.select(df["refresh_date"].alias("Day"), df["term"].alias("Top Search Term"), df["rank"])
df = df.distinct()
df = df.sort(df["Day"], desc=True)
df.explain(show_all=True)
# Output
# ..
# == Physical Plan ==
# ..
# | SQL Query = SELECT refresh_date, term, rank FROM
# (SELECT * FROM `bigquery-public-data.google_trends.top_terms`)
# AS subquery WHERE rank = 1 AND refresh_date >= CAST('2024-04-01' AS DATE)
# AND refresh_date < CAST('2024-04-08' AS DATE)
You could code the SQL query to add the filters and projections yourself, but this may become lengthy and error-prone, particularly with many expressions. Daft automatically handles these performance optimizations for you.
Data Catalogs#
Daft is built for efficient data access from Data Catalogs using open table formats like Delta Lake, Iceberg and Hudi.
Delta Lake#
You can easily read Delta Lake tables using the read_deltalake()
method.
df = daft.read_deltalake("data/delta_table")
df.show()
VendorID Int32 | tpep_pickup_datetime Timestamp(Microseconds, None) | tpep_dropoff_datetime Timestamp(Microseconds, None) | passenger_count Float64 | trip_distance Float64 | RatecodeID Float64 | store_and_fwd_flag Utf8 | PULocationID Int32 | DOLocationID Int32 | payment_type Int64 | fare_amount Float64 | extra Float64 | mta_tax Float64 | tip_amount Float64 | tolls_amount Float64 | improvement_surcharge Float64 | total_amount Float64 | congestion_surcharge Float64 | Airport_fee Float64 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1 | 2023-12-01 00:06:06 | 2023-12-01 00:15:47 | 0 | 1.1 | 1 | N | 230 | 48 | 1 | 10 | 3.5 | 0.5 | 1.5 | 0 | 1 | 16.5 | 2.5 | 0 |
1 | 2023-12-01 00:22:26 | 2023-12-01 00:28:53 | 0 | 1.5 | 1 | N | 142 | 238 | 1 | 9.3 | 3.5 | 0.5 | 2.85 | 0 | 1 | 17.15 | 2.5 | 0 |
1 | 2023-12-01 00:59:44 | 2023-12-01 01:13:22 | 2 | 2.2 | 1 | N | 114 | 186 | 1 | 13.5 | 3.5 | 0.5 | 3 | 0 | 1 | 21.5 | 2.5 | 0 |
2 | 2023-12-01 00:22:17 | 2023-12-01 00:30:59 | 1 | 0.66 | 1 | N | 79 | 79 | 2 | 7.2 | 1 | 0.5 | 0 | 0 | 1 | 12.2 | 2.5 | 0 |
2 | 2023-12-01 00:18:16 | 2023-12-01 00:25:32 | 2 | 2.2 | 1 | N | 229 | 263 | 1 | 11.4 | 1 | 0.5 | 2 | 0 | 1 | 18.4 | 2.5 | 0 |
1 | 2023-12-01 00:13:17 | 2023-12-01 00:23:53 | 0 | 5.7 | 1 | N | 88 | 141 | 1 | 23.3 | 3.5 | 0.5 | 0 | 0 | 1 | 28.3 | 2.5 | 0 |
2 | 2023-12-01 00:17:09 | 2023-12-01 00:33:31 | 1 | 5.33 | 1 | N | 45 | 162 | 1 | 24.7 | 1 | 0.5 | 3 | 0 | 1 | 32.7 | 2.5 | 0 |
2 | 2023-12-01 00:40:49 | 2023-12-01 00:44:10 | 1 | 0.76 | 1 | N | 170 | 107 | 1 | 5.8 | 1 | 0.5 | 1 | 0 | 1 | 11.8 | 2.5 | 0 |
To access Delta tables on S3 you will have to pass some more config options:
the AWS Region name
your access credentials
import boto3
session = boto3.session.Session()
creds = session.get_credentials()
# set io configs
io_config = daft.io.IOConfig(
s3=daft.io.S3Config(
region_name="eu-north-1",
key_id=creds.access_key,
access_key=creds.secret_key,
)
)
# Read Delta Lake table in S3 into a Daft DataFrame.
table_uri = "s3://avriiil/delta-test-daft/"
df = daft.read_deltalake(table_uri, io_config=io_config)
df.show()
x Int64 |
---|
1 |
2 |
3 |
Iceberg#
Daft is integrated with PyIceberg, the official Python implementation for Apache Iceberg.
This means you can easily read Iceberg tables into Daft DataFrames in 2 steps:
Load your Iceberg table from your Iceberg catalog using PyIceberg
Read your Iceberg table into Daft
We’ll use a simple local SQLite Catalog implementation for this toy example.
# initialize your catalog
from pyiceberg.catalog.sql import SqlCatalog
warehouse_path = "data/iceberg-warehouse/"
catalog = SqlCatalog(
"default",
**{
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"file://{warehouse_path}",
},
)
# load your table
table = catalog.load_table("default.taxi_dataset")
# Read into Daft
df = daft.read_iceberg(table)
df.show()
VendorID Int64 | tpep_pickup_datetime Timestamp(Microseconds, None) | tpep_dropoff_datetime Timestamp(Microseconds, None) | passenger_count Float64 | trip_distance Float64 | RatecodeID Float64 | store_and_fwd_flag Utf8 | PULocationID Int64 | DOLocationID Int64 | payment_type Int64 | fare_amount Float64 | extra Float64 | mta_tax Float64 | tip_amount Float64 | tolls_amount Float64 | improvement_surcharge Float64 | total_amount Float64 | congestion_surcharge Float64 | airport_fee Float64 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
2 | 2023-01-01 00:32:10 | 2023-01-01 00:40:36 | 1 | 0.97 | 1 | N | 161 | 141 | 2 | 9.3 | 1 | 0.5 | 0 | 0 | 1 | 14.3 | 2.5 | 0 |
2 | 2023-01-01 00:55:08 | 2023-01-01 01:01:27 | 1 | 1.1 | 1 | N | 43 | 237 | 1 | 7.9 | 1 | 0.5 | 4 | 0 | 1 | 16.9 | 2.5 | 0 |
2 | 2023-01-01 00:25:04 | 2023-01-01 00:37:49 | 1 | 2.51 | 1 | N | 48 | 238 | 1 | 14.9 | 1 | 0.5 | 15 | 0 | 1 | 34.9 | 2.5 | 0 |
1 | 2023-01-01 00:03:48 | 2023-01-01 00:13:25 | 0 | 1.9 | 1 | N | 138 | 7 | 1 | 12.1 | 7.25 | 0.5 | 0 | 0 | 1 | 20.85 | 0 | 1.25 |
2 | 2023-01-01 00:10:29 | 2023-01-01 00:21:19 | 1 | 1.43 | 1 | N | 107 | 79 | 1 | 11.4 | 1 | 0.5 | 3.28 | 0 | 1 | 19.68 | 2.5 | 0 |
2 | 2023-01-01 00:50:34 | 2023-01-01 01:02:52 | 1 | 1.84 | 1 | N | 161 | 137 | 1 | 12.8 | 1 | 0.5 | 10 | 0 | 1 | 27.8 | 2.5 | 0 |
2 | 2023-01-01 00:09:22 | 2023-01-01 00:19:49 | 1 | 1.66 | 1 | N | 239 | 143 | 1 | 12.1 | 1 | 0.5 | 3.42 | 0 | 1 | 20.52 | 2.5 | 0 |
2 | 2023-01-01 00:27:12 | 2023-01-01 00:49:56 | 1 | 11.7 | 1 | N | 142 | 200 | 1 | 45.7 | 1 | 0.5 | 10.74 | 3 | 1 | 64.44 | 2.5 | 0 |
Any subsequent filter operations on the Daft df
DataFrame object will be correctly optimized to take advantage of Iceberg features such as hidden partitioning and file-level statistics for efficient reads.
Hudi#
To read from an Apache Hudi table, use the daft.read_hudi()
function.
The following is an example snippet of loading an example table
# Read Apache Hudi table into a Daft DataFrame.
import daft
df = daft.read_hudi("data/hudi-data")
df.show()
_hoodie_commit_time Utf8 | _hoodie_commit_seqno Utf8 | _hoodie_record_key Utf8 | _hoodie_partition_path Utf8 | _hoodie_file_name Utf8 | id Int32 | name Utf8 | isActive Boolean | intField Int32 | longField Int64 | floatField Float32 | doubleField Float64 | decimalField Decimal128(10, 5) | dateField Date | timestampField Timestamp(Microseconds, Some("UTC")) | binaryField Binary | arrayField List[Struct[arr_struct_f1: Utf8, arr_struct_f2: Int32]] | mapField Map[Struct[key: Utf8, value: Struct[map_field_value_struct_f1: Float64, map_field_value_struct_f2: Boolean]]] | structField Struct[field1: Utf8, field2: Int32, child_struct: Struct[child_field1: Float64, child_field2: Boolean]] | byteField Int32 | shortField Int32 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
20240418173235694 | 20240418173235694_0_0 | id:1,name:Alice | byteField=10/shortField=300 | a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet | 1 | Alice | false | 15000 | 1234567890 | 1 | 3.14159 | 12345.67890 | 2023-04-01 | 2023-04-01 17:01:00 +00:00 | b"binary data" | [{arr_struct_f1: red, arr_struct_f2: 100, }, {arr_struct_f1: blue, arr_struct_f2: 200, }, {arr_struct_f1: green, arr_struct_f2: 300, }] | [{key: key1, value: {map_field_value_struct_f1: 123.456, map_field_value_struct_f2: true, }, }, {key: key2, value: {map_field_value_struct_f1: 789.012, map_field_value_struct_f2: false, }, }] | {field1: Alice, field2: 30, child_struct: {child_field1: 123.456, child_field2: true, }, } | 10 | 300 |
20240418173213674 | 20240418173213674_1_1 | id:3,name:Carol | byteField=10/shortField=300 | a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet | 3 | Carol | true | 35000 | 1928374650 | 3 | 1.41421 | 11111.22222 | 2023-04-03 | 2023-04-03 19:03:00 +00:00 | b"even more binary data" | [{arr_struct_f1: black, arr_struct_f2: 600, }, {arr_struct_f1: white, arr_struct_f2: 700, }, {arr_struct_f1: pink, arr_struct_f2: 800, }] | [{key: key5, value: {map_field_value_struct_f1: 345.678, map_field_value_struct_f2: true, }, }, {key: key6, value: {map_field_value_struct_f1: 654.321, map_field_value_struct_f2: false, }, }] | {field1: Carol, field2: 25, child_struct: {child_field1: 456.789, child_field2: true, }, } | 10 | 300 |
20240418173213674 | 20240418173213674_0_0 | id:2,name:Bob | byteField=20/shortField=100 | bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet | 2 | Bob | false | 25000 | 9876543210 | 2 | 2.71828 | 67890.12345 | 2023-04-02 | 2023-04-02 18:02:00 +00:00 | b"more binary data" | [{arr_struct_f1: yellow, arr_struct_f2: 400, }, {arr_struct_f1: purple, arr_struct_f2: 500, }] | [{key: key3, value: {map_field_value_struct_f1: 234.567, map_field_value_struct_f2: true, }, }, {key: key4, value: {map_field_value_struct_f1: 567.89, map_field_value_struct_f2: false, }, }] | {field1: Bob, field2: 40, child_struct: {child_field1: 789.012, child_field2: false, }, } | 20 | 100 |
20240418173235694 | 20240418173235694_1_0 | id:4,name:Diana | byteField=30/shortField=100 | 4668e35e-bff8-4be9-9ff2-e7fb17ecb1a7-0_1-161-224_20240418173235694.parquet | 4 | Diana | true | 45000 | 987654321 | 4 | 2.468 | 65432.12345 | 2023-04-04 | 2023-04-04 20:04:00 +00:00 | b"new binary data" | [{arr_struct_f1: orange, arr_struct_f2: 900, }, {arr_struct_f1: gray, arr_struct_f2: 1000, }] | [{key: key7, value: {map_field_value_struct_f1: 456.789, map_field_value_struct_f2: true, }, }, {key: key8, value: {map_field_value_struct_f1: 123.456, map_field_value_struct_f2: false, }, }] | {field1: Diana, field2: 50, child_struct: {child_field1: 987.654, child_field2: true, }, } | 30 | 100 |
Currently there are limitations of reading Hudi tables:
Only support snapshot read of Copy-on-Write tables
Only support reading table version 5 & 6 (tables created using release 0.12.x - 0.15.x)
Table must not have hoodie.datasource.write.drop.partition.columns=true
IOConfig Deep Dive#
Let’s dive a little deeper into the IOConfig options for tweaking your remote data access.
IOConfig
is Daft’s mechanism for controlling the behavior of data input/output from storage. It is useful for:
Providing credentials for authenticating with cloud storage services
Tuning performance or reducing load on storage services
Default IOConfig Behavior#
The default behavior for IOConfig is to automatically detect credentials on your machines.
import daft
# By default, calls to AWS S3 will use credentials retrieved from the machine(s) that they are called from
#
# For AWS S3 services, the default mechanism is to look through a chain of possible "providers":
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html#configuring-credentials
df = daft.read_csv("s3://daft-public-data/file.csv")
df.collect()
Overriding the IOConfig#
Setting a Global Override#
Often you may want Daft to just use a certain configuration by default whenever it has to access storage such as S3, GCS or Azure Blob Store.
Example:
An extremely common use-case is to create a set of temporary credentials once, and share that across all calls to data access happening in Daft.
The example below demonstrates this with AWS S3’s
boto3
Python SDK.
# Use the boto3 library to generate temporary credentials which can be used for S3 access
import boto3
session = boto3.session.Session()
creds = session.get_credentials()
# Attach temporary credentials to a Daft IOConfig object
MY_IO_CONFIG = daft.io.IOConfig(
s3=daft.io.S3Config(
key_id=creds.access_key,
access_key=creds.secret_key,
session_token=creds.token,
)
)
# Set the default config to `MY_IO_CONFIG` so that it is used in the absence of any overrides
daft.set_planning_config(default_io_config=MY_IO_CONFIG)
Overriding IOConfigs per-API call#
Daft also allows for more granular per-call overrides through the use of keyword arguments.
This is extremely flexible, allowing you to use a different set of credentials to read from two different locations!
Here we use daft.read_csv
as an example, but the same io_config=...
keyword arg also exists for other I/O related functionality such as:
daft.read_parquet
daft.read_json
Expression.url.download()
# An "Anonymous" IOConfig will access storage **without credentials**, and can only access fully public data
MY_ANONYMOUS_IO_CONFIG = daft.io.IOConfig(s3=daft.io.S3Config(anonymous=True))
# Read this file using `MY_ANONYMOUS_IO_CONFIG` instead of the overridden global config `MY_IO_CONFIG`
df1 = daft.read_csv("s3://daft-public-data/melbourne-airbnb/melbourne_airbnb.csv", io_config=MY_ANONYMOUS_IO_CONFIG)
For more see: IOConfig Documentation
Data Access with Daft#
In this tutorial you have seen the canonical ways of accessing data with Daft.
We’ve seen how to access:
local files, incl. JSON, CSV and Parquet
data in SQL databases
data in Data Catalogs, incl. Delta Lake, Iceberg and Hudi
Take a look at our hands-on Use Case tutorials if you feel ready to start building workflows with Daft.