10 minutes Quickstart
Contents
!pip install getdaft --pre --extra-index-url https://pypi.anaconda.org/daft-nightly/simple
Hint
✨✨✨ Run this notebook on Google Colab ✨✨✨
10 minutes Quickstart#
This is a short introduction to all the main functionality in Daft, geared towards new users.
We import from daft as follows:
import daft
from daft import DataType
DataFrame creation#
See also: API Reference: DataFrame Construction
We can create a DataFrame from a dictionary of columns - this is a dictionary where the keys are strings representing the columns’ names and the values are equal-length lists representing the columns’ values.
import datetime
df = daft.from_pydict({
"integers": [1, 2, 3, 4],
"floats": [1.5, 2.5, 3.5, 4.5],
"bools": [True, True, False, False],
"strings": ["a", "b", "c", "d"],
"bytes": [b"a", b"b", b"c", b"d"],
"dates": [datetime.date(1994, 1, 1), datetime.date(1994, 1, 2), datetime.date(1994, 1, 3), datetime.date(1994, 1, 4)],
"lists": [[1, 1, 1], [2, 2, 2], [3, 3, 3], [4, 4, 4]],
})
2023-04-19 11:57:24.125 | INFO | daft.context:runner:88 - Using PyRunner
You can also load DataFrames from other sources, such as:
CSV files:
daft.read_csv("s3://bucket/*.csv")
Parquet files:
daft.read_parquet("/path/*.parquet")
JSON line-delimited files:
daft.read_json("/path/*.parquet")
Files on disk:
daft.from_glob_path("/path/*.jpeg")
Daft automatically supports local paths as well as paths to object storage such as AWS S3.
Inspect your dataframe by printing the df
variable
df
integers Int64 | floats Float64 | bools Boolean | strings Utf8 | bytes Binary | dates Date | lists List[Int64] |
Executing your DataFrame and Displaying Data#
Notice that instead of the contents of the dataframe, the message (no data to display: Dataframe not materialized)
is displayed when we printed our dataframe in the previous section.
This is because Daft is lazy and only executes computations when explicitly told to do so. When you call methods on DataFrames such as df.select()
, df.where()
and daft.read_csv()
, Daft actually only enqueues these operations in a Logical Plan. You can examine this logical plan using df.explain()
:
df.explain()
┌─InMemoryScan
output=[col(integers), col(floats), col(bools), col(strings), col(bytes), col(dates),
col(lists)]
cache_id='bfc052d3c7ef4606946671fceae7a44a'
partitioning={'by': None, 'num_partitions': 1, 'scheme': UNKNOWN}
Our currently plan says that there is only one operation to be executed, which is an InMemoryScan
operation that reads from a set of in-memory data.
To execute all operations on all data in the current DataFrame’s plan, you can use the df.collect()
method.
df.collect()
integers Int64 | floats Float64 | bools Boolean | strings Utf8 | bytes Binary | dates Date | lists List[Int64] |
---|---|---|---|---|---|---|
1 | 1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] |
2 | 2.5 | true | b | b'b' | 1994-01-02 | [2, 2, 2] |
3 | 3.5 | false | c | b'c' | 1994-01-03 | [3, 3, 3] |
4 | 4.5 | false | d | b'd' | 1994-01-04 | [4, 4, 4] |
df.collect()
is useful because it executes computations on all your data, and shows you a little preview of the materialized results. These results are then kept in memory so that subsequent operations will avoid recomputations.
However, if you only wish to “peek” at your data instead of materializing the entire dataframe (e.g. your dataframe has a million rows, and you only want to view the first 10 without materializing the entire result set in memory), you can use df.show(N)
instead to view the first N
rows of your dataframe. This is especially useful when developing interactively on small samples of data.
df.show(2)
integers Int64 | floats Float64 | bools Boolean | strings Utf8 | bytes Binary | dates Date | lists List[Int64] |
---|---|---|---|---|---|---|
1 | 1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] |
2 | 2.5 | true | b | b'b' | 1994-01-02 | [2, 2, 2] |
Sorting Data#
You can sort a dataframe with df.sort()
, which we do so here in descending order:
df.sort(df["integers"], desc=True).collect()
integers Int64 | floats Float64 | bools Boolean | strings Utf8 | bytes Binary | dates Date | lists List[Int64] |
---|---|---|---|---|---|---|
4 | 4.5 | false | d | b'd' | 1994-01-04 | [4, 4, 4] |
3 | 3.5 | false | c | b'c' | 1994-01-03 | [3, 3, 3] |
2 | 2.5 | true | b | b'b' | 1994-01-02 | [2, 2, 2] |
1 | 1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] |
Data Selection#
You can limit the number of rows in a dataframe by calling df.limit()
.
df_limited = df.limit(1)
df_limited.collect()
integers Int64 | floats Float64 | bools Boolean | strings Utf8 | bytes Binary | dates Date | lists List[Int64] |
---|---|---|---|---|---|---|
1 | 1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] |
To select just a few columns, you can use df.select()
:
df_selected = df.select(df["integers"], df["floats"])
df_selected.collect()
integers Int64 | floats Float64 |
---|---|
1 | 1.5 |
2 | 2.5 |
3 | 3.5 |
4 | 4.5 |
Column selection also allows you to rename columns using .alias()
:
df_renamed = df.select(df["integers"].alias("ints"), df["floats"])
df_renamed.collect()
ints Int64 | floats Float64 |
---|---|
1 | 1.5 |
2 | 2.5 |
3 | 3.5 |
4 | 4.5 |
To drop columns from the dataframe, call df.exclude()
:
df_excluded = df.exclude("integers")
df_excluded.collect()
floats Float64 | bools Boolean | strings Utf8 | bytes Binary | dates Date | lists List[Int64] |
---|---|---|---|---|---|
1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] |
2.5 | true | b | b'b' | 1994-01-02 | [2, 2, 2] |
3.5 | false | c | b'c' | 1994-01-03 | [3, 3, 3] |
4.5 | false | d | b'd' | 1994-01-04 | [4, 4, 4] |
Expressions#
See: Expressions
Expressions are an API for defining computation that needs to happen over your columns.
For example, to create a new column that is just the column A incremented by 1:
df_A_plus1 = df.with_column("integers_plus_1", df["integers"] + 1) # does not run any computation
df_A_plus1.collect() # materializes the new DataFrame, which includes the new column "A_plus1"
integers Int64 | floats Float64 | bools Boolean | strings Utf8 | bytes Binary | dates Date | lists List[Int64] | integers_plus_1 Int64 |
---|---|---|---|---|---|---|---|
1 | 1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] | 2 |
2 | 2.5 | true | b | b'b' | 1994-01-02 | [2, 2, 2] | 3 |
3 | 3.5 | false | c | b'c' | 1994-01-03 | [3, 3, 3] | 4 |
4 | 4.5 | false | d | b'd' | 1994-01-04 | [4, 4, 4] | 5 |
Method Accessors#
Some Expression methods are only allowed on certain types and are accessible through “method accessors” such as the .str
accessor (see: Expression Accessor Properties).
For example, the .str.length()
expression is only valid when run on a String column:
df_E_length = df.with_column("strings_length", df["strings"].str.length())
df_E_length.collect()
integers Int64 | floats Float64 | bools Boolean | strings Utf8 | bytes Binary | dates Date | lists List[Int64] | strings_length UInt64 |
---|---|---|---|---|---|---|---|
1 | 1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] | 1 |
2 | 2.5 | true | b | b'b' | 1994-01-02 | [2, 2, 2] | 1 |
3 | 3.5 | false | c | b'c' | 1994-01-03 | [3, 3, 3] | 1 |
4 | 4.5 | false | d | b'd' | 1994-01-04 | [4, 4, 4] | 1 |
Another example of a useful method accessor is the .url
accessor. You can use .url.download()
to download data from a column of URLs like so:
image_url_df = daft.from_pydict({
"urls": [
"http://farm9.staticflickr.com/8186/8119368305_4e622c8349_z.jpg",
"http://farm1.staticflickr.com/1/127244861_ab0c0381e7_z.jpg",
"http://farm3.staticflickr.com/2169/2118578392_1193aa04a0_z.jpg",
],
})
image_downloaded_df = image_url_df.with_column("image_bytes", image_url_df["urls"].url.download())
image_downloaded_df.collect()
urls Utf8 | image_bytes Binary |
---|---|
http://farm9.staticflickr.com/8186/8119368305_4e622c8349_... | b'\xff\xd8\xff\xe1\x00TExif\x00\x00MM\x00*\x00\x00\x00\x0... |
http://farm1.staticflickr.com/1/127244861_ab0c0381e7_z.jpg | b'\xff\xd8\xff\xe1\x00(Exif\x00\x00MM\x00*\x00\x00\x00\x0... |
http://farm3.staticflickr.com/2169/2118578392_1193aa04a0_... | b'\xff\xd8\xff\xe1\x00\x16Exif\x00\x00MM\x00*\x00\x00\x00... |
For a full list of all Expression methods and operators, see: Expressions API Docs
Python object columns#
Daft Dataframes can also contain Python objects. Here is an example of how to create a dataframe with Python objects.
# Let's define a toy example of a Python class!
class Dog:
def __init__(self, name):
self.name = name
def bark(self):
return f"{self.name}!"
py_df = daft.from_pydict({
"dogs": [Dog("ruffles"), Dog("waffles"), Dog("doofus")],
"owner": ["russell", "william", "david"],
})
Now, when we print our dataframe we can see that it contains our Dog
Python objects! Also note that the type of the column is Python
.
py_df.collect()
dogs Python | owner Utf8 |
---|---|
<__main__.Dog object at 0x11ef78ac0> | russell |
<__main__.Dog object at 0x11ef78430> | william |
<__main__.Dog object at 0x11ef78040> | david |
To work with Python
type columns, Daft provides a few useful Expression methods.
.apply
is useful to work on each Dog individually and apply a function.
Here’s an example where we extract a string from each Dog
by calling .bark
on each Dog
object and returning a new Utf8
column.
py_df.with_column(
"dogs_bark_name",
py_df["dogs"].apply(lambda dog: dog.bark(), return_dtype=DataType.string()),
).collect()
dogs Python | owner Utf8 | dogs_bark_name Utf8 |
---|---|---|
<__main__.Dog object at 0x11ef78ac0> | russell | ruffles! |
<__main__.Dog object at 0x11ef78430> | william | waffles! |
<__main__.Dog object at 0x11ef78040> | david | doofus! |
User-Defined Functions#
.apply
makes it really easy to map a function on a single column, but is limited in 2 main ways:
Only runs on a single column: some algorithms require multiple columns as inputs
Only runs on a single row: some algorithms run much more efficiently when run on a batch of rows instead
To overcome these limitations, you can use User-Defined Functions (UDFs).
See Also: UDF User Guide
from daft import udf
@udf(return_dtype=DataType.string())
def custom_bark(dog_series, owner_series):
return [
f"{dog.name} loves {owner_name}!"
for dog, owner_name
in zip(dog_series.to_pylist(), owner_series.to_pylist())
]
py_df.with_column("custom_bark", custom_bark(py_df["dogs"], py_df["owner"])).collect()
dogs Python | owner Utf8 | custom_bark Utf8 |
---|---|---|
<__main__.Dog object at 0x11ef78ac0> | russell | ruffles loves russell! |
<__main__.Dog object at 0x11ef78430> | william | waffles loves william! |
<__main__.Dog object at 0x11ef78040> | david | doofus loves david! |
Filtering Data#
You can filter rows in dataframe using df.where()
, which accepts a Boolean type Expression as an argument:
# Keep only rows where values in column "A" are less than 3
df_filtered = df.where(df["integers"] < 3)
df_filtered.collect()
integers Int64 | floats Float64 | bools Boolean | strings Utf8 | bytes Binary | dates Date | lists List[Int64] |
---|---|---|---|---|---|---|
1 | 1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] |
2 | 2.5 | true | b | b'b' | 1994-01-02 | [2, 2, 2] |
Missing Data#
All columns in Daft are “nullable” by default. Unlike other frameworks such as Pandas, Daft differentiates between “null” (missing) and “nan” (stands for not a number - a special value indicating an invalid float).
missing_data_df = daft.from_pydict({
"floats": [1.5, None, float("nan")],
})
missing_data_df = missing_data_df \
.with_column("floats_is_null", missing_data_df["floats"].is_null()) \
.with_column("floats_is_nan", missing_data_df["floats"].float.is_nan())
missing_data_df.collect()
floats Float64 | floats_is_null Boolean | floats_is_nan Boolean |
---|---|---|
1.5 | false | false |
None | true | none |
nan | false | true |
To fill in missing values, a useful Expression is the .if_else
expression which can be used to fill in values if the value is null:
missing_data_df = missing_data_df.with_column("filled_in_floats", (missing_data_df["floats"].is_null()).if_else(0.0, missing_data_df["floats"]))
missing_data_df.collect()
floats Float64 | floats_is_null Boolean | floats_is_nan Boolean | filled_in_floats Float64 |
---|---|---|---|
1.5 | false | false | 1.5 |
None | true | none | 0 |
nan | false | true | nan |
Merging Dataframes#
DataFrames can be joined with df.join()
. Here is a naive example of a self-join where we join df
on itself with column “A” as the join key.
joined_df = df.join(df, on="integers")
joined_df.collect()
integers Int64 | floats Float64 | bools Boolean | strings Utf8 | bytes Binary | dates Date | lists List[Int64] | right.floats Float64 | right.bools Boolean | right.strings Utf8 | right.bytes Binary | right.dates Date | right.lists List[Int64] |
---|---|---|---|---|---|---|---|---|---|---|---|---|
1 | 1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] | 1.5 | true | a | b'a' | 1994-01-01 | [1, 1, 1] |
2 | 2.5 | true | b | b'b' | 1994-01-02 | [2, 2, 2] | 2.5 | true | b | b'b' | 1994-01-02 | [2, 2, 2] |
3 | 3.5 | false | c | b'c' | 1994-01-03 | [3, 3, 3] | 3.5 | false | c | b'c' | 1994-01-03 | [3, 3, 3] |
4 | 4.5 | false | d | b'd' | 1994-01-04 | [4, 4, 4] | 4.5 | false | d | b'd' | 1994-01-04 | [4, 4, 4] |
Grouping and Aggregations#
Groupby aggregation operations over a dataset happens in 2 phases:
Splitting the data into groups based on some criteria using
df.groupby()
Specifying how to aggregate the data for each group using
GroupedDataFrame.agg()
Let’s take a look at an example:
grouping_df = daft.from_pydict(
{
"A": ["foo", "bar", "foo", "bar", "foo", "bar", "foo", "foo"],
"B": ["a", "a", "b", "c", "b", "b", "a", "c"],
"C": [i for i in range(8)],
"D": [i for i in range(8)],
}
)
grouping_df.collect()
A Utf8 | B Utf8 | C Int64 | D Int64 |
---|---|---|---|
foo | a | 0 | 0 |
bar | a | 1 | 1 |
foo | b | 2 | 2 |
bar | c | 3 | 3 |
foo | b | 4 | 4 |
bar | b | 5 | 5 |
foo | a | 6 | 6 |
foo | c | 7 | 7 |
First we group by “A”, so that we will evaluate rows with A=foo
and A=bar
separately in their respective groups.
grouped_df = grouping_df.groupby(grouping_df["A"])
grouped_df
GroupedDataFrame(df=+--------+--------+---------+---------+
| A | B | C | D |
| Utf8 | Utf8 | Int64 | Int64 |
+========+========+=========+=========+
| foo | a | 0 | 0 |
+--------+--------+---------+---------+
| bar | a | 1 | 1 |
+--------+--------+---------+---------+
| foo | b | 2 | 2 |
+--------+--------+---------+---------+
| bar | c | 3 | 3 |
+--------+--------+---------+---------+
| foo | b | 4 | 4 |
+--------+--------+---------+---------+
| bar | b | 5 | 5 |
+--------+--------+---------+---------+
| foo | a | 6 | 6 |
+--------+--------+---------+---------+
| foo | c | 7 | 7 |
+--------+--------+---------+---------+
(Showing first 8 of 8 rows), group_by=<daft.expressions.expressions.ExpressionsProjection object at 0x11f58ab90>)
Now we can specify the aggregations we want to compute over columns C and D. Here we compute the sum over column C, and the mean over column D for each group:
aggregated_df = grouped_df.agg([
(grouped_df["C"].alias("C_sum"), "sum"),
(grouped_df["D"].alias("D_mean"), "mean"),
])
aggregated_df.collect()
A Utf8 | C_sum Int64 | D_mean Float64 |
---|---|---|
bar | 9 | 3 |
foo | 19 | 3.8 |
These operations work as well when run over multiple groupby columns, which will produce one row for each combination of columns that occur in the DataFrame:
grouping_df \
.groupby(grouping_df["A"], grouping_df["B"]) \
.agg([
(grouping_df["C"].alias("C_sum"), "sum"),
(grouping_df["D"].alias("D_mean"), "mean"),
]) \
.collect()
A Utf8 | B Utf8 | C_sum Int64 | D_mean Float64 |
---|---|---|---|
bar | a | 1 | 1 |
foo | b | 6 | 3 |
foo | a | 6 | 3 |
bar | b | 5 | 5 |
foo | c | 7 | 7 |
bar | c | 3 | 3 |
Writing Data#
See: Writing Data
Writing data will execute your DataFrame and write the results out to the specified backend. For example, to write data out to Parquet with df.write_parquet()
:
written_df = df.write_parquet("my-dataframe.parquet")
Note that writing your dataframe is a blocking operation that executes your DataFrame. It will return a new DataFrame
that contains the filepaths to the written data:
written_df.collect()
file_path Utf8 |
---|
my-dataframe.parquet/6d29ca81-06bd-4a49-8fac-eb80656c69ca... |