!pip install getdaft

Hint

✨✨✨ Run this notebook on Google Colab ✨✨✨

You can run this notebook yourself with Google Colab!

10 minutes Quickstart#

This is a short introduction to all the main functionality in Daft, geared towards new users.

We import from daft as follows:

import daft
from daft import DataType

DataFrame creation#

See also: API Reference: DataFrame Construction

We can create a DataFrame from a dictionary of columns - this is a dictionary where the keys are strings representing the columns’ names and the values are equal-length lists representing the columns’ values.

import datetime

df = daft.from_pydict({
    "integers": [1, 2, 3, 4],
    "floats": [1.5, 2.5, 3.5, 4.5],
    "bools": [True, True, False, False],
    "strings": ["a", "b", "c", "d"],
    "bytes": [b"a", b"b", b"c", b"d"],
    "dates": [datetime.date(1994, 1, 1), datetime.date(1994, 1, 2), datetime.date(1994, 1, 3), datetime.date(1994, 1, 4)],
    "lists": [[1, 1, 1], [2, 2, 2], [3, 3, 3], [4, 4, 4]],
    "nulls": [None, None, None, None],
})
df
integers
Int64
floats
Float64
bools
Boolean
strings
Utf8
bytes
Binary
dates
Date
lists
List[Int64]
nulls
Null
1
1.5
true
a
b"a"
1994-01-01
[1, 1, 1]
None
2
2.5
true
b
b"b"
1994-01-02
[2, 2, 2]
None
3
3.5
false
c
b"c"
1994-01-03
[3, 3, 3]
None
4
4.5
false
d
b"d"
1994-01-04
[4, 4, 4]
None
(Showing first 4 of 4 rows)

You can also load DataFrames from other sources, such as:

  1. CSV files: daft.read_csv("s3://bucket/*.csv")

  2. Parquet files: daft.read_parquet("/path/*.parquet")

  3. JSON line-delimited files: daft.read_json("/path/*.parquet")

  4. Files on disk: daft.from_glob_path("/path/*.jpeg")

Daft automatically supports local paths as well as paths to object storage such as AWS S3.

Let’s try to select the columns from our DataFrame that are not nulls:

df = df.select("integers", "floats", "bools", "strings", "bytes", "dates", "lists")
df
integers
Int64
floats
Float64
bools
Boolean
strings
Utf8
bytes
Binary
dates
Date
lists
List[Int64]
(No data to display: Dataframe not materialized)

Executing your DataFrame and Displaying Data#

Notice that instead of the contents of the dataframe, the message (no data to display: Dataframe not materialized) is displayed when we printed our dataframe in the previous section.

This is because Daft is lazy and only executes computations when explicitly told to do so. When you call methods on DataFrames such as df.select() and df.where(), as well as reads from files such as daft.read_csv(), Daft actually only enqueues these operations in a Logical Plan. You can examine this logical plan using df.explain():

df.explain()
* Project: col(integers), col(floats), col(bools), col(strings), col(bytes), col(dates), col(lists)
|
* Source:
|   Number of partitions = 1
|   Output schema = integers (Int64), floats (Float64), bools (Boolean), strings (Utf8), bytes (Binary), dates (Date), lists (List(Int64)), nulls (Null)

Our currently plan says that there is are two operations to be executed, an InMemoryScan operation that reads from a set of in-memory data, and a Project operation that selects columns from the data.

To execute all operations on all data in the current DataFrame’s plan, you can use the df.collect() method.

df.collect()
                                                           
                                                         
integers
Int64
floats
Float64
bools
Boolean
strings
Utf8
bytes
Binary
dates
Date
lists
List[Int64]
1
1.5
true
a
b"a"
1994-01-01
[1, 1, 1]
2
2.5
true
b
b"b"
1994-01-02
[2, 2, 2]
3
3.5
false
c
b"c"
1994-01-03
[3, 3, 3]
4
4.5
false
d
b"d"
1994-01-04
[4, 4, 4]
(Showing first 4 of 4 rows)

df.collect() is useful because it executes computations on all your data, and shows you a little preview of the materialized results. These results are then kept in memory so that subsequent operations will avoid recomputations.

However, if you only wish to “peek” at your data instead of materializing the entire dataframe (e.g. your dataframe has a million rows, and you only want to view the first 10 without materializing the entire result set in memory), you can use df.show(N) instead to view the first N rows of your dataframe. This is especially useful when developing interactively on small samples of data.

df.show(2)
integers
Int64
floats
Float64
bools
Boolean
strings
Utf8
bytes
Binary
dates
Date
lists
List[Int64]
1 1.5true a b'a' 1994-01-01 [1, 1, 1]
2 2.5true b b'b' 1994-01-02 [2, 2, 2]
(Showing first 2 rows)

Sorting Data#

You can sort a dataframe with df.sort(), which we do so here in descending order:

df.sort(df["integers"], desc=True).collect()
integers
Int64
floats
Float64
bools
Boolean
strings
Utf8
bytes
Binary
dates
Date
lists
List[Int64]
4 4.5false d b'd' 1994-01-04 [4, 4, 4]
3 3.5false c b'c' 1994-01-03 [3, 3, 3]
2 2.5true b b'b' 1994-01-02 [2, 2, 2]
1 1.5true a b'a' 1994-01-01 [1, 1, 1]
(Showing first 4 of 4 rows)

Data Selection#

You can limit the number of rows in a dataframe by calling df.limit().

df_limited = df.limit(1)
df_limited.collect()
integers
Int64
floats
Float64
bools
Boolean
strings
Utf8
bytes
Binary
dates
Date
lists
List[Int64]
1 1.5true a b'a' 1994-01-01 [1, 1, 1]
(Showing first 1 of 1 rows)

To select just a few columns, you can use df.select():

df_selected = df.select(df["integers"], df["floats"])
df_selected.collect()
integers
Int64
floats
Float64
1 1.5
2 2.5
3 3.5
4 4.5
(Showing first 4 of 4 rows)

Column selection also allows you to rename columns using .alias():

df_renamed = df.select(df["integers"].alias("ints"), df["floats"])
df_renamed.collect()
ints
Int64
floats
Float64
1 1.5
2 2.5
3 3.5
4 4.5
(Showing first 4 of 4 rows)

To drop columns from the dataframe, call df.exclude():

df_excluded = df.exclude("integers")
df_excluded.collect()
floats
Float64
bools
Boolean
strings
Utf8
bytes
Binary
dates
Date
lists
List[Int64]
1.5true a b'a' 1994-01-01 [1, 1, 1]
2.5true b b'b' 1994-01-02 [2, 2, 2]
3.5false c b'c' 1994-01-03 [3, 3, 3]
4.5false d b'd' 1994-01-04 [4, 4, 4]
(Showing first 4 of 4 rows)

Expressions#

See: Expressions

Expressions are an API for defining computation that needs to happen over your columns.

For example, to create a new column that is just the column A incremented by 1:

df_A_plus1 = df.with_column("integers_plus_1", df["integers"] + 1)  # does not run any computation
df_A_plus1.collect()  # materializes the new DataFrame, which includes the new column "A_plus1"
integers
Int64
floats
Float64
bools
Boolean
strings
Utf8
bytes
Binary
dates
Date
lists
List[Int64]
integers_plus_1
Int64
1 1.5true a b'a' 1994-01-01 [1, 1, 1] 2
2 2.5true b b'b' 1994-01-02 [2, 2, 2] 3
3 3.5false c b'c' 1994-01-03 [3, 3, 3] 4
4 4.5false d b'd' 1994-01-04 [4, 4, 4] 5
(Showing first 4 of 4 rows)

Method Accessors#

Some Expression methods are only allowed on certain types and are accessible through “method accessors” such as the .str accessor (see: Expression Accessor Properties).

For example, the .str.length() expression is only valid when run on a String column:

df_E_length = df.with_column("strings_length", df["strings"].str.length())
df_E_length.collect()
integers
Int64
floats
Float64
bools
Boolean
strings
Utf8
bytes
Binary
dates
Date
lists
List[Int64]
strings_length
UInt64
1 1.5true a b'a' 1994-01-01 [1, 1, 1] 1
2 2.5true b b'b' 1994-01-02 [2, 2, 2] 1
3 3.5false c b'c' 1994-01-03 [3, 3, 3] 1
4 4.5false d b'd' 1994-01-04 [4, 4, 4] 1
(Showing first 4 of 4 rows)

Another example of a useful method accessor is the .url accessor. You can use .url.download() to download data from a column of URLs like so:

image_url_df = daft.from_pydict({
    "urls": [
        "http://farm9.staticflickr.com/8186/8119368305_4e622c8349_z.jpg",
        "http://farm1.staticflickr.com/1/127244861_ab0c0381e7_z.jpg",
        "http://farm3.staticflickr.com/2169/2118578392_1193aa04a0_z.jpg",
    ],
})
image_downloaded_df = image_url_df.with_column("image_bytes", image_url_df["urls"].url.download())
image_downloaded_df.collect()
urls
Utf8
image_bytes
Binary
http://farm9.staticflickr.com/8186/8119368305_4e622c8349_...b'\xff\xd8\xff\xe1\x00TExif\x00\x00MM\x00*\x00\x00\x00\x0...
http://farm1.staticflickr.com/1/127244861_ab0c0381e7_z.jpg b'\xff\xd8\xff\xe1\x00(Exif\x00\x00MM\x00*\x00\x00\x00\x0...
http://farm3.staticflickr.com/2169/2118578392_1193aa04a0_...b'\xff\xd8\xff\xe1\x00\x16Exif\x00\x00MM\x00*\x00\x00\x00...
(Showing first 3 of 3 rows)

For a full list of all Expression methods and operators, see: Expressions API Docs

Python object columns#

Daft Dataframes can also contain Python objects. Here is an example of how to create a dataframe with Python objects.

# Let's define a toy example of a Python class!
class Dog:
    def __init__(self, name):
        self.name = name
        
    def bark(self):
        return f"{self.name}!"

py_df = daft.from_pydict({
    "dogs": [Dog("ruffles"), Dog("waffles"), Dog("doofus")],
    "owner": ["russell", "william", "david"],
})

Now, when we print our dataframe we can see that it contains our Dog Python objects! Also note that the type of the column is Python.

py_df.collect()
dogs
Python
owner
Utf8
<__main__.Dog object at 0x11ef78ac0>russell
<__main__.Dog object at 0x11ef78430>william
<__main__.Dog object at 0x11ef78040>david
(Showing first 3 of 3 rows)

To work with Python type columns, Daft provides a few useful Expression methods.

.apply is useful to work on each Dog individually and apply a function.

Here’s an example where we extract a string from each Dog by calling .bark on each Dog object and returning a new Utf8 column.

py_df.with_column(
    "dogs_bark_name",
    py_df["dogs"].apply(lambda dog: dog.bark(), return_dtype=DataType.string()),
).collect()
dogs
Python
owner
Utf8
dogs_bark_name
Utf8
<__main__.Dog object at 0x11ef78ac0>russell ruffles!
<__main__.Dog object at 0x11ef78430>william waffles!
<__main__.Dog object at 0x11ef78040>david doofus!
(Showing first 3 of 3 rows)

User-Defined Functions#

.apply makes it really easy to map a function on a single column, but is limited in 2 main ways:

  1. Only runs on a single column: some algorithms require multiple columns as inputs

  2. Only runs on a single row: some algorithms run much more efficiently when run on a batch of rows instead

To overcome these limitations, you can use User-Defined Functions (UDFs).

See Also: UDF User Guide

from daft import udf

@udf(return_dtype=DataType.string())
def custom_bark(dog_series, owner_series):
    return [
        f"{dog.name} loves {owner_name}!"
        for dog, owner_name
        in zip(dog_series.to_pylist(), owner_series.to_pylist())
    ]

py_df.with_column("custom_bark", custom_bark(py_df["dogs"], py_df["owner"])).collect()
dogs
Python
owner
Utf8
custom_bark
Utf8
<__main__.Dog object at 0x11ef78ac0>russell ruffles loves russell!
<__main__.Dog object at 0x11ef78430>william waffles loves william!
<__main__.Dog object at 0x11ef78040>david doofus loves david!
(Showing first 3 of 3 rows)

Filtering Data#

You can filter rows in dataframe using df.where(), which accepts a Boolean type Expression as an argument:

# Keep only rows where values in column "A" are less than 3
df_filtered = df.where(df["integers"] < 3)
df_filtered.collect()
integers
Int64
floats
Float64
bools
Boolean
strings
Utf8
bytes
Binary
dates
Date
lists
List[Int64]
1 1.5true a b'a' 1994-01-01 [1, 1, 1]
2 2.5true b b'b' 1994-01-02 [2, 2, 2]
(Showing first 2 of 2 rows)

Missing Data#

All columns in Daft are “nullable” by default. Unlike other frameworks such as Pandas, Daft differentiates between “null” (missing) and “nan” (stands for not a number - a special value indicating an invalid float).

missing_data_df = daft.from_pydict({
    "floats": [1.5, None, float("nan")],
})
missing_data_df = missing_data_df \
    .with_column("floats_is_null", missing_data_df["floats"].is_null()) \
    .with_column("floats_is_nan", missing_data_df["floats"].float.is_nan())

missing_data_df.collect()
floats
Float64
floats_is_null
Boolean
floats_is_nan
Boolean
1.5 false false
None true none
nan false true
(Showing first 3 of 3 rows)

To fill in missing values, a useful Expression is the .if_else expression which can be used to fill in values if the value is null:

missing_data_df = missing_data_df.with_column("filled_in_floats", (missing_data_df["floats"].is_null()).if_else(0.0, missing_data_df["floats"]))
missing_data_df.collect()
floats
Float64
floats_is_null
Boolean
floats_is_nan
Boolean
filled_in_floats
Float64
1.5 false false 1.5
None true none 0
nan false true nan
(Showing first 3 of 3 rows)

Merging Dataframes#

DataFrames can be joined with df.join(). Here is a naive example of a self-join where we join df on itself with column “A” as the join key.

joined_df = df.join(df, on="integers")
joined_df.collect()
integers
Int64
floats
Float64
bools
Boolean
strings
Utf8
bytes
Binary
dates
Date
lists
List[Int64]
right.floats
Float64
right.bools
Boolean
right.strings
Utf8
right.bytes
Binary
right.dates
Date
right.lists
List[Int64]
1 1.5true a b'a' 1994-01-01 [1, 1, 1] 1.5true a b'a' 1994-01-01 [1, 1, 1]
2 2.5true b b'b' 1994-01-02 [2, 2, 2] 2.5true b b'b' 1994-01-02 [2, 2, 2]
3 3.5false c b'c' 1994-01-03 [3, 3, 3] 3.5false c b'c' 1994-01-03 [3, 3, 3]
4 4.5false d b'd' 1994-01-04 [4, 4, 4] 4.5false d b'd' 1994-01-04 [4, 4, 4]
(Showing first 4 of 4 rows)

Grouping and Aggregations#

Groupby aggregation operations over a dataset happens in 2 phases:

  1. Splitting the data into groups based on some criteria using df.groupby()

  2. Specifying how to aggregate the data for each group using GroupedDataFrame.agg()

Let’s take a look at an example:

grouping_df = daft.from_pydict(
    {
        "A": ["foo", "bar", "foo", "bar", "foo", "bar", "foo", "foo"],
        "B": ["a", "a", "b", "c", "b", "b", "a", "c"],
        "C": [i for i in range(8)],
        "D": [i for i in range(8)],
    }
)
grouping_df.collect()
A
Utf8
B
Utf8
C
Int64
D
Int64
foo a 0 0
bar a 1 1
foo b 2 2
bar c 3 3
foo b 4 4
bar b 5 5
foo a 6 6
foo c 7 7
(Showing first 8 of 8 rows)

First we group by “A”, so that we will evaluate rows with A=foo and A=bar separately in their respective groups.

grouped_df = grouping_df.groupby(grouping_df["A"])
grouped_df
GroupedDataFrame(df=+--------+--------+---------+---------+
| A      | B      |       C |       D |
| Utf8   | Utf8   |   Int64 |   Int64 |
+========+========+=========+=========+
| foo    | a      |       0 |       0 |
+--------+--------+---------+---------+
| bar    | a      |       1 |       1 |
+--------+--------+---------+---------+
| foo    | b      |       2 |       2 |
+--------+--------+---------+---------+
| bar    | c      |       3 |       3 |
+--------+--------+---------+---------+
| foo    | b      |       4 |       4 |
+--------+--------+---------+---------+
| bar    | b      |       5 |       5 |
+--------+--------+---------+---------+
| foo    | a      |       6 |       6 |
+--------+--------+---------+---------+
| foo    | c      |       7 |       7 |
+--------+--------+---------+---------+
(Showing first 8 of 8 rows), group_by=<daft.expressions.expressions.ExpressionsProjection object at 0x11f58ab90>)

Now we can specify the aggregations we want to compute over columns C and D. Here we compute the sum over column C, and the mean over column D for each group:

aggregated_df = grouped_df.agg([
    (grouped_df["C"].alias("C_sum"), "sum"),
    (grouped_df["D"].alias("D_mean"), "mean"),
])
aggregated_df.collect()
A
Utf8
C_sum
Int64
D_mean
Float64
bar 9 3
foo 19 3.8
(Showing first 2 of 2 rows)

These operations work as well when run over multiple groupby columns, which will produce one row for each combination of columns that occur in the DataFrame:

grouping_df \
    .groupby(grouping_df["A"], grouping_df["B"]) \
    .agg([
        (grouping_df["C"].alias("C_sum"), "sum"),
        (grouping_df["D"].alias("D_mean"), "mean"),
    ]) \
    .collect()
A
Utf8
B
Utf8
C_sum
Int64
D_mean
Float64
bar a 1 1
foo b 6 3
foo a 6 3
bar b 5 5
foo c 7 7
bar c 3 3
(Showing first 6 of 6 rows)

Writing Data#

See: Writing Data

Writing data will execute your DataFrame and write the results out to the specified backend. For example, to write data out to Parquet with df.write_parquet():

written_df = df.write_parquet("my-dataframe.parquet")
                                                                  
                                                         

Note that writing your dataframe is a blocking operation that executes your DataFrame. It will return a new DataFrame that contains the filepaths to the written data:

written_df
path
Utf8
my-dataframe.parquet/d796131c-0c31-4688-a5ee-48ca500498e3-0.parquet
(Showing first 1 of 1 rows)