10 minutes Quickstart
Contents
!pip install getdaft --pre --extra-index-url https://pypi.anaconda.org/daft-nightly/simple
Hint
✨✨✨ Run this notebook on Google Colab ✨✨✨
10 minutes Quickstart#
This is a short introduction to all the main functionality in Daft, geared towards new users.
We import from daft as follows:
from daft import DataFrame
DataFrame creation#
See also: API Reference: DataFrame Construction
We can create a DataFrame from a dictionary of columns - this is a dictionary where the keys are strings representing the columns’ names and the values are equal-length lists representing the columns’ values.
import datetime
df = DataFrame.from_pydict({
"A": [1, 2, 3, 4],
"B": [1.5, 2.5, 3.5, 4.5],
"C": [True, True, False, False],
"D": ["a", "b", "c", "d"],
"E": [b"a", b"b", b"c", b"d"],
"F": [datetime.date(1994, 1, 1), datetime.date(1994, 1, 2), datetime.date(1994, 1, 3), datetime.date(1994, 1, 4)],
"G": [[1, 1, 1], [2, 2, 2], [3, 3, 3], [4, 4, 4]],
})
2023-01-21 14:36:57.855 | INFO | daft.context:runner:85 - Using PyRunner
You can also load DataFrames from other sources, such as:
CSV files:
DataFrame.read_csv("s3://bucket/*.csv")
Parquet files:
DataFrame.read_parquet("/path/*.parquet")
JSON line-delimited files:
DataFrame.read_json("/path/*.parquet")
Files on disk:
DataFrame.from_glob_path("/path/*.jpeg")
Daft automatically supports local paths as well as paths to object storage such as AWS S3.
Inspect your dataframe by printing the df
variable
df
A INTEGER | B FLOAT | C LOGICAL | D STRING | E BYTES | F DATE | G PY[list] |
Executing your DataFrame and Displaying Data#
Notice that instead of the contents of the dataframe, the message (no data to display: Dataframe not materialized)
is displayed when we printed our dataframe in the previous section.
This is because Daft is lazy and only executes computations when explicitly told to do so. When you call methods on DataFrames such as .select
, .where
and .read_csv
, Daft actually only enqueues these operations in a Logical Plan. You can examine this logical plan using DataFrame.explain()
:
df.explain()
┌─InMemoryScan
output=[col(A#0: INTEGER), col(B#1: FLOAT), col(C#2: LOGICAL), col(D#3: STRING),
col(E#4: BYTES), col(F#5: DATE), col(G#6: PY[list])]
cache_id='6eea47a9faa44b2bb50c495246920c7f'
partitioning=PartitionSpec(scheme=UNKNOWN, num_partitions=1, by=None)
Our currently plan says that there is only one operation to be executed, which is an InMemoryScan
operation that reads from a set of in-memory data.
To execute all operations on all data in the current DataFrame’s plan, you can use the DataFrame.collect()
method.
df.collect()
A INTEGER | B FLOAT | C LOGICAL | D STRING | E BYTES | F DATE | G PY[list] |
---|---|---|---|---|---|---|
1 | 1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] |
2 | 2.5 | true | b | b'b' | 1994-01-02 | [2, 2, 2] |
3 | 3.5 | false | c | b'c' | 1994-01-03 | [3, 3, 3] |
4 | 4.5 | false | d | b'd' | 1994-01-04 | [4, 4, 4] |
DataFrame.collect()
is useful because it executes computations on all your data, and shows you a little preview of the materialized results. These results are then kept in memory so that subsequent operations will avoid recomputations.
However, if you only wish to “peek” at your data instead of materializing the entire dataframe (e.g. your dataframe has a million rows, and you only want to view the first 10 without materializing the entire result set in memory), you can use DataFrame.show(N)
instead to view the first N
rows of your dataframe. This is especially useful when developing interactively on small samples of data.
df.show(2)
A INTEGER | B FLOAT | C LOGICAL | D STRING | E BYTES | F DATE | G PY[list] |
---|---|---|---|---|---|---|
1 | 1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] |
2 | 2.5 | true | b | b'b' | 1994-01-02 | [2, 2, 2] |
Sorting Data#
You can sort a dataframe with DataFrame.sort
, which we do so here in descending order:
df.sort(df["A"], desc=True).collect()
A INTEGER | B FLOAT | C LOGICAL | D STRING | E BYTES | F DATE | G PY[list] |
---|---|---|---|---|---|---|
4 | 4.5 | false | d | b'd' | 1994-01-04 | [4, 4, 4] |
3 | 3.5 | false | c | b'c' | 1994-01-03 | [3, 3, 3] |
2 | 2.5 | true | b | b'b' | 1994-01-02 | [2, 2, 2] |
1 | 1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] |
Data Selection#
You can limit the number of rows in a dataframe by calling DataFrame.limit
.
df_limited = df.limit(1)
df_limited.collect()
A INTEGER | B FLOAT | C LOGICAL | D STRING | E BYTES | F DATE | G PY[list] |
---|---|---|---|---|---|---|
1 | 1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] |
To select just a few columns, you can use DataFrame.select
:
df_selected = df.select(df["A"], df["B"])
df_selected.collect()
A INTEGER | B FLOAT |
---|---|
1 | 1.5 |
2 | 2.5 |
3 | 3.5 |
4 | 4.5 |
Column selection also allows you to rename columns using Expression.alias
:
df_renamed = df.select(df["A"].alias("A2"), df["B"])
df_renamed.collect()
A2 INTEGER | B FLOAT |
---|---|
1 | 1.5 |
2 | 2.5 |
3 | 3.5 |
4 | 4.5 |
To drop columns from the dataframe, call DataFrame.exclude
:
df_excluded = df.exclude("A")
df_excluded.collect()
B FLOAT | C LOGICAL | D STRING | E BYTES | F DATE | G PY[list] |
---|---|---|---|---|---|
1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] |
2.5 | true | b | b'b' | 1994-01-02 | [2, 2, 2] |
3.5 | false | c | b'c' | 1994-01-03 | [3, 3, 3] |
4.5 | false | d | b'd' | 1994-01-04 | [4, 4, 4] |
Expressions#
See: Expressions
Expressions are an API for defining computation that needs to happen over your columns.
For example, to create a new column that is just the column A incremented by 1:
df_A_plus1 = df.with_column("A_plus1", df["A"] + 1) # does not run any computation
df_A_plus1.collect() # materializes the new DataFrame, which includes the new column "A_plus1"
A INTEGER | B FLOAT | C LOGICAL | D STRING | E BYTES | F DATE | G PY[list] | A_plus1 INTEGER |
---|---|---|---|---|---|---|---|
1 | 1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] | 2 |
2 | 2.5 | true | b | b'b' | 1994-01-02 | [2, 2, 2] | 3 |
3 | 3.5 | false | c | b'c' | 1994-01-03 | [3, 3, 3] | 4 |
4 | 4.5 | false | d | b'd' | 1994-01-04 | [4, 4, 4] | 5 |
Method Accessors#
Some Expression methods are only allowed on certain types and are accessible through “method accessors” such as the Expression.str
accessor (see: Expression Accessor Properties).
For example, the .str.length()
expression is only valid when run on a STRING column:
df_E_length = df.with_column("D_length", df["D"].str.length())
df_E_length.collect()
A INTEGER | B FLOAT | C LOGICAL | D STRING | E BYTES | F DATE | G PY[list] | D_length INTEGER |
---|---|---|---|---|---|---|---|
1 | 1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] | 1 |
2 | 2.5 | true | b | b'b' | 1994-01-02 | [2, 2, 2] | 1 |
3 | 3.5 | false | c | b'c' | 1994-01-03 | [3, 3, 3] | 1 |
4 | 4.5 | false | d | b'd' | 1994-01-04 | [4, 4, 4] | 1 |
Another example of a useful method accessor is the .url
accessor. You can use .url.download()
to download data from a column of URLs like so:
image_url_df = DataFrame.from_pydict({
"urls": [
"http://farm9.staticflickr.com/8186/8119368305_4e622c8349_z.jpg",
"http://farm1.staticflickr.com/1/127244861_ab0c0381e7_z.jpg",
"http://farm3.staticflickr.com/2169/2118578392_1193aa04a0_z.jpg",
],
})
image_downloaded_df = image_url_df.with_column("image_bytes", image_url_df["urls"].url.download())
image_downloaded_df.collect()
urls STRING | image_bytes BYTES |
---|---|
http://farm9.staticflickr.com/8186/8119368305_4e622c8349_... | b'\xff\xd8\xff\xe1\x00TExif\x00\x00MM\x00*\x00\x00\x00\x0... |
http://farm1.staticflickr.com/1/127244861_ab0c0381e7_z.jpg | b'\xff\xd8\xff\xe1\x00(Exif\x00\x00MM\x00*\x00\x00\x00\x0... |
http://farm3.staticflickr.com/2169/2118578392_1193aa04a0_... | b'\xff\xd8\xff\xe1\x00\x16Exif\x00\x00MM\x00*\x00\x00\x00... |
For a full list of all Expression methods and operators, see: Expressions API Docs
Operations on PY columns#
PY columns contain Python objects and operations called on these columns will be mapped on each object as well.
To work with such columns, Daft provides a few useful Expression methods.
For example, to repeat each list in column G
3 times, we can use the Python list
’s native Python *
operator:
df_G_extend_0 = df.with_column("G_repeat", df["G"] * 3)
df_G_extend_0.collect()
A INTEGER | B FLOAT | C LOGICAL | D STRING | E BYTES | F DATE | G PY[list] | G_repeat PY[object] |
---|---|---|---|---|---|---|---|
1 | 1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] | [1, 1, 1, 1, 1, 1, 1, 1, 1] |
2 | 2.5 | true | b | b'b' | 1994-01-02 | [2, 2, 2] | [2, 2, 2, 2, 2, 2, 2, 2, 2] |
3 | 3.5 | false | c | b'c' | 1994-01-03 | [3, 3, 3] | [3, 3, 3, 3, 3, 3, 3, 3, 3] |
4 | 4.5 | false | d | b'd' | 1994-01-04 | [4, 4, 4] | [4, 4, 4, 4, 4, 4, 4, 4, 4] |
To call a method on each list in column G
, we can use the .as_py
method. For example, here we use the Python list
’s .count()
method to count the number of occurences of the integer in column A:
df_G_count_A = df.with_column("G_count_A", df["G"].as_py(list).count(df["A"]))
df_G_count_A.collect()
A INTEGER | B FLOAT | C LOGICAL | D STRING | E BYTES | F DATE | G PY[list] | G_count_A PY[object] |
---|---|---|---|---|---|---|---|
1 | 1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] | 3 |
2 | 2.5 | true | b | b'b' | 1994-01-02 | [2, 2, 2] | 3 |
3 | 3.5 | false | c | b'c' | 1994-01-03 | [3, 3, 3] | 3 |
4 | 4.5 | false | d | b'd' | 1994-01-04 | [4, 4, 4] | 3 |
For more complicated functions, you can use .apply(f)
to call a function f
on every object in the column. For example, here we construct a Numpy array for every list in column G.
Note
It is good practice to supply Daft with the return type of your function.
You can do this using the return_dtype=
keyword argument in .apply
, or by correctly type-annotating your function like in the list_to_numpy
function below.
Annotating the return type of your function correctly lets Daft effectively optimize your data and operations under the hood. For example, in this case we specify return_dtype=np.ndarray
which tells Daft that each row in this column contains a Numpy array object.
import numpy as np
def list_to_numpy(l: list) -> np.ndarray:
return np.array(l)
df_G_to_numpy = df.with_column("G_to_numpy", df["G"].apply(list_to_numpy))
df_G_to_numpy.collect()
A INTEGER | B FLOAT | C LOGICAL | D STRING | E BYTES | F DATE | G PY[list] | G_to_numpy PY[ndarray] |
---|---|---|---|---|---|---|---|
1 | 1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] | <np.ndarray shape=(3,) dtype=int64> |
2 | 2.5 | true | b | b'b' | 1994-01-02 | [2, 2, 2] | <np.ndarray shape=(3,) dtype=int64> |
3 | 3.5 | false | c | b'c' | 1994-01-03 | [3, 3, 3] | <np.ndarray shape=(3,) dtype=int64> |
4 | 4.5 | false | d | b'd' | 1994-01-04 | [4, 4, 4] | <np.ndarray shape=(3,) dtype=int64> |
Iterable types such as a PY[list] column can be exploded with DataFrame.explode
, splitting each list into a row of its own and repeating the other columns:
df_G_exploded = df.explode(df["G"])
df_G_exploded.collect()
A INTEGER | B FLOAT | C LOGICAL | D STRING | E BYTES | F DATE | G PY[object] |
---|---|---|---|---|---|---|
1 | 1.5 | true | a | b'a' | 1994-01-01 | 1 |
1 | 1.5 | true | a | b'a' | 1994-01-01 | 1 |
1 | 1.5 | true | a | b'a' | 1994-01-01 | 1 |
2 | 2.5 | true | b | b'b' | 1994-01-02 | 2 |
2 | 2.5 | true | b | b'b' | 1994-01-02 | 2 |
2 | 2.5 | true | b | b'b' | 1994-01-02 | 2 |
3 | 3.5 | false | c | b'c' | 1994-01-03 | 3 |
3 | 3.5 | false | c | b'c' | 1994-01-03 | 3 |
3 | 3.5 | false | c | b'c' | 1994-01-03 | 3 |
4 | 4.5 | false | d | b'd' | 1994-01-04 | 4 |
User-Defined Functions#
.apply
makes it really easy to map a function on a single column, but is limited in 2 main ways:
Only runs on a single column: some algorithms require multiple columns as inputs
Only runs on a single row: some algorithms run much more efficiently when run on a batch of rows instead
To overcome these limitations, you can use User-Defined Functions (UDFs).
See Also: UDF User Guide
import datetime
from daft import udf
import polars as pl
@udf(return_dtype=datetime.date, input_columns={"f_date_data": pl.Series, "a_days_data": pl.Series})
def add_days(f_date_data: pl.Series, a_days_data: pl.Series):
return f_date_data + pl.duration(days=a_days_data)
df.with_column("F_add_A_days", add_days(df["F"], df["A"])).collect()
A INTEGER | B FLOAT | C LOGICAL | D STRING | E BYTES | F DATE | G PY[list] | F_add_A_days DATE |
---|---|---|---|---|---|---|---|
1 | 1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] | 1994-01-02 |
2 | 2.5 | true | b | b'b' | 1994-01-02 | [2, 2, 2] | 1994-01-04 |
3 | 3.5 | false | c | b'c' | 1994-01-03 | [3, 3, 3] | 1994-01-06 |
4 | 4.5 | false | d | b'd' | 1994-01-04 | [4, 4, 4] | 1994-01-08 |
The simple UDF demonstrated above is a “stateless UDF”, and no state is maintained between invocations of the function. In certain use-cases, it can be important to maintain some state with a “stateful UDF”, which you can write using a Class instead of a Function. For example, running machine learning models often requires downloading some trained weights and initializing the model in memory/on a GPU, which an expensive operation and should be cached between UDF invocations.
@udf(return_dtype=float, input_columns={"a_data": np.ndarray, "b_data": np.ndarray})
class RunExpensiveModel:
def __init__(self):
# Initialize and cache an "expensive" model between invocations of the UDF
self.model = np.array([1.23, 4.56])
def __call__(self, a_data: np.ndarray, b_data: np.ndarray):
return np.matmul(self.model, np.array([a_data, b_data]))
df.with_column("expensive_model_results", RunExpensiveModel(df["A"], df["B"])).collect()
A INTEGER | B FLOAT | C LOGICAL | D STRING | E BYTES | F DATE | G PY[list] | expensive_model_results FLOAT |
---|---|---|---|---|---|---|---|
1 | 1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] | 8.07 |
2 | 2.5 | true | b | b'b' | 1994-01-02 | [2, 2, 2] | 13.86 |
3 | 3.5 | false | c | b'c' | 1994-01-03 | [3, 3, 3] | 19.65 |
4 | 4.5 | false | d | b'd' | 1994-01-04 | [4, 4, 4] | 25.44 |
Filtering Data#
You can filter rows in dataframe using DataFrame.where
, which accepts a LOGICAL type Expression as an argument:
# Keep only rows where values in column "A" are less than 3
df_filtered = df.where(df["A"] < 3)
df_filtered.collect()
A INTEGER | B FLOAT | C LOGICAL | D STRING | E BYTES | F DATE | G PY[list] |
---|---|---|---|---|---|---|
1 | 1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] |
2 | 2.5 | true | b | b'b' | 1994-01-02 | [2, 2, 2] |
Missing Data#
All columns in Daft are “nullable” by default. Unlike other frameworks such as Pandas, Daft differentiates between “null” (missing) and “nan” (stands for not a number - a special value indicating an invalid float).
missing_data_df = DataFrame.from_pydict({
"floats": [1.5, None, float("nan")],
})
missing_data_df = missing_data_df \
.with_column("floats_is_null", missing_data_df["floats"].is_null()) \
.with_column("floats_is_nan", missing_data_df["floats"].is_nan())
missing_data_df.collect()
floats FLOAT | floats_is_null LOGICAL | floats_is_nan LOGICAL |
---|---|---|
1.5 | false | false |
None | true | none |
nan | false | true |
To fill in missing values, a useful Expression is the .if_else
expression which can be used to fill in values if the value is null:
missing_data_df = missing_data_df.with_column("filled_in_floats", (missing_data_df["floats"].is_null()).if_else(0.0, missing_data_df["floats"]))
missing_data_df.collect()
floats FLOAT | floats_is_null LOGICAL | floats_is_nan LOGICAL | filled_in_floats FLOAT |
---|---|---|---|
1.5 | false | false | 1.5 |
None | true | none | 0 |
nan | false | true | nan |
Merging Dataframes#
DataFrames can be joined with .join
. Here is a naive example of a self-join where we join df
on itself with column “A” as the join key.
joined_df = df.join(df, on="A")
joined_df.collect()
A INTEGER | B FLOAT | C LOGICAL | D STRING | E BYTES | F DATE | G PY[list] |
---|---|---|---|---|---|---|
1 | 1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] |
2 | 2.5 | true | b | b'b' | 1994-01-02 | [2, 2, 2] |
3 | 3.5 | false | c | b'c' | 1994-01-03 | [3, 3, 3] |
4 | 4.5 | false | d | b'd' | 1994-01-04 | [4, 4, 4] |
Grouping and Aggregations#
Groupby aggregation operations over a dataset happens in 2 phases:
Splitting the data into groups based on some criteria using
DataFrame.groupby
Specifying how to aggregate the data for each group using
GroupedDataFrame.agg
Let’s take a look at an example:
grouping_df = DataFrame.from_pydict(
{
"A": ["foo", "bar", "foo", "bar", "foo", "bar", "foo", "foo"],
"B": ["a", "a", "b", "c", "b", "b", "a", "c"],
"C": [i for i in range(8)],
"D": [i for i in range(8)],
}
)
grouping_df.collect()
A STRING | B STRING | C INTEGER | D INTEGER |
---|---|---|---|
foo | a | 0 | 0 |
bar | a | 1 | 1 |
foo | b | 2 | 2 |
bar | c | 3 | 3 |
foo | b | 4 | 4 |
bar | b | 5 | 5 |
foo | a | 6 | 6 |
foo | c | 7 | 7 |
First we group by “A”, so that we will evaluate rows with A=foo
and A=bar
separately in their respective groups.
grouped_df = grouping_df.groupby(grouping_df["A"])
grouped_df
GroupedDataFrame(df=+----------+----------+-----------+-----------+
| A | B | C | D |
| STRING | STRING | INTEGER | INTEGER |
+==========+==========+===========+===========+
| foo | a | 0 | 0 |
+----------+----------+-----------+-----------+
| bar | a | 1 | 1 |
+----------+----------+-----------+-----------+
| foo | b | 2 | 2 |
+----------+----------+-----------+-----------+
| bar | c | 3 | 3 |
+----------+----------+-----------+-----------+
| foo | b | 4 | 4 |
+----------+----------+-----------+-----------+
| bar | b | 5 | 5 |
+----------+----------+-----------+-----------+
| foo | a | 6 | 6 |
+----------+----------+-----------+-----------+
| foo | c | 7 | 7 |
+----------+----------+-----------+-----------+
(Showing first 8 of 8 rows), group_by=[col(A#23: STRING)])
Now we can specify the aggregations we want to compute over columns C and D. Here we compute the sum over column C, and the mean over column D for each group:
aggregated_df = grouped_df.agg([
(grouped_df["C"].alias("C_sum"), "sum"),
(grouped_df["D"].alias("D_mean"), "mean"),
])
aggregated_df.collect()
A STRING | C_sum INTEGER | D_mean INTEGER |
---|---|---|
bar | 9 | 3 |
foo | 19 | 3.8 |
These operations work as well when run over multiple groupby columns, which will produce one row for each combination of columns that occur in the DataFrame:
grouping_df \
.groupby(grouping_df["A"], grouping_df["B"]) \
.agg([
(grouping_df["C"].alias("C_sum"), "sum"),
(grouping_df["D"].alias("D_mean"), "mean"),
]) \
.collect()
A STRING | B STRING | C_sum INTEGER | D_mean INTEGER |
---|---|---|---|
foo | a | 6 | 3 |
bar | b | 5 | 5 |
foo | c | 7 | 7 |
bar | a | 1 | 1 |
bar | c | 3 | 3 |
foo | b | 6 | 3 |
Writing Data#
See: Writing Data
Writing data will execute your DataFrame and write the results out to the specified backend. For example, to write data out to CSV:
# NOTE: Daft does not write PY columns at the moment.
# This is a feature that is on the roadmap as various options for implementation are being designed.
write_df = df.exclude("G")
written_df = write_df.write_csv("my-dataframe.csv")
Note that writing your dataframe is a blocking operation that executes your DataFrame. It will return a new DataFrame
that contains the filepaths to the written data:
written_df.collect()
file_path STRING |
---|
my-dataframe.csv/c9da510b-9bef-4d4b-a3db-d40462100b52-0.csv |