Core Concepts#
Learn about the core concepts that Daft is built on!
DataFrame#
If you are coming from other DataFrame libraries such as Pandas or Polars, here are some key differences about Daft DataFrames:
-
Distributed: When running in a distributed cluster, Daft splits your data into smaller "chunks" called Partitions. This allows Daft to process your data in parallel across multiple machines, leveraging more resources to work with large datasets.
-
Lazy: When you write operations on a DataFrame, Daft doesn't execute them immediately. Instead, it creates a plan (called a query plan) of what needs to be done. This plan is optimized and only executed when you specifically request the results, which can lead to more efficient computations.
-
Multimodal: Unlike traditional tables that usually contain simple data types like numbers and text, Daft DataFrames can handle complex data types in its columns. This includes things like images, audio files, or even custom Python objects.
For a full comparison between Daft and other DataFrame Libraries, see DataFrame Comparison.
Common data operations that you would perform on DataFrames are:
- Filtering rows: Use
df.where(...)
to keep only the rows that meet certain conditions. - Creating new columns: Use
df.with_column(...)
to add a new column based on calculations from existing ones. - Joining DataFrames: Use
df.join(other_df, ...)
to combine two DataFrames based on common columns. - Sorting: Use
df.sort(...)
to arrange your data based on values in one or more columns. - Grouping and aggregating: Use
df.groupby(...)
anddf.agg(...)
to summarize your data by groups.
Creating a Dataframe#
See Also
Reading Data and Writing Data - a more in-depth guide on various options for reading and writing data to and from Daft DataFrames from in-memory data (Python, Arrow), files (Parquet, CSV, JSON), SQL Databases and Data Catalogs
Let's create our first Dataframe from a Python dictionary of columns.
1 2 3 4 5 6 7 8 |
|
Examine your Dataframe by printing it:
1 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|
Congratulations - you just created your first DataFrame! It has 4 columns, "A", "B", "C", and "D". Let's try to select only the "A", "B", and "C" columns:
1 2 |
|
1 2 |
|
Output | |
---|---|
1 2 3 4 5 6 7 |
|
But wait - why is it printing the message (No data to display: Dataframe not materialized)
and where are the rows of each column?
Executing DataFrame and Viewing Data#
The reason that our DataFrame currently does not display its rows is that Daft DataFrames are lazy. This just means that Daft DataFrames will defer all its work until you tell it to execute.
In this case, Daft is just deferring the work required to read the data and select columns, however in practice this laziness can be very useful for helping Daft optimize your queries before execution!
Info
When you call methods on a Daft Dataframe, it defers the work by adding to an internal "plan". You can examine the current plan of a DataFrame by calling df.explain()
!
Passing the show_all=True
argument will show you the plan after Daft applies its query optimizations and the physical (lower-level) plan.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
|
We can tell Daft to execute our DataFrame and store the results in-memory using df.collect()
:
1 2 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|
Now your DataFrame object df
is materialized - Daft has executed all the steps required to compute the results, and has cached the results in memory so that it can display this preview.
Any subsequent operations on df
will avoid recomputations, and just use this materialized result!
When should I materialize my DataFrame?#
If you "eagerly" call df.collect()
immediately on every DataFrame, you may run into issues:
- If data is too large at any step, materializing all of it may cause memory issues
- Optimizations are not possible since we cannot "predict future operations"
However, data science is all about experimentation and trying different things on the same data. This means that materialization is crucial when working interactively with DataFrames, since it speeds up all subsequent experimentation on that DataFrame.
We suggest materializing DataFrames using df.collect()
when they contain expensive operations (e.g. sorts or expensive function calls) and have to be called multiple times by downstream code:
1 2 3 4 5 6 7 |
|
1 2 3 4 5 6 7 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
|
In many other cases however, there are better options than materializing your entire DataFrame with df.collect()
:
- Peeking with df.show(N): If you only want to "peek" at the first few rows of your data for visualization purposes, you can use
df.show(N)
, which processes and shows only the firstN
rows. - Writing to disk: The
df.write_*
methods will process and write your data to disk per-partition, avoiding materializing it all in memory at once. - Pruning data: You can materialize your DataFrame after performing a
df.limit()
,df.where()
ordf.select()
operation which processes your data or prune it down to a smaller size.
Schemas and Types#
Notice also that when we printed our DataFrame, Daft displayed its schema. Each column of your DataFrame has a name and a type, and all data in that column will adhere to that type!
Daft can display your DataFrame's schema without materializing it. Under the hood, it performs intelligent sampling of your data to determine the appropriate schema, and if you make any modifications to your DataFrame it can infer the resulting types based on the operation.
Note
Under the hood, Daft represents data in the Apache Arrow format, which allows it to efficiently represent and work on data using high-performance kernels which are written in Rust.
Running Computation with Expressions#
To run computations on data in our DataFrame, we use Expressions.
The following statement will df.show()
a DataFrame that has only one column - the column A
from our original DataFrame but with every row incremented by 1.
1 |
|
1 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|
Info
A common pattern is to create a new columns using DataFrame.with_column
:
1 2 3 4 |
|
1 2 3 4 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|
Congratulations, you have just written your first Expression: df["A"] + 1
! Expressions are a powerful way of describing computation on columns. For more details, check out the next section on Expressions.
Selecting Rows#
We can limit the rows to the first N
rows using df.limit(N)
:
1 2 3 4 5 6 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 |
|
We can also filter rows using df.where()
, which takes an input a Logical Expression predicate:
1 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 |
|
Selecting Columns#
Select specific columns in a DataFrame using df.select()
, which also takes Expressions as an input.
1 2 3 4 5 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 |
|
A useful alias for df.select()
is indexing a DataFrame with a list of column names or Expressions:
1 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 |
|
Sometimes, it may be useful to exclude certain columns from a DataFrame. This can be done with df.exclude()
:
1 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 |
|
Adding a new column can be achieved with df.with_column()
:
1 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 |
|
Selecting Columns Using Wildcards#
We can select multiple columns at once using wildcards. The expression col("*")
selects every column in a DataFrame, and you can operate on this expression in the same way as a single column:
1 2 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 |
|
We can also select multiple columns within structs using col("struct")["*"]
:
1 2 3 4 5 6 7 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 |
|
Under the hood, wildcards work by finding all of the columns that match, then copying the expression several times and replacing the wildcard. This means that there are some caveats:
- Only one wildcard is allowed per expression tree. This means that
col("*") + col("*")
and similar expressions do not work. - Be conscious about duplicated column names. Any code like
df.select(col("*"), col("*") + 3)
will not work because the wildcards expand into the same column names.
For the same reason, col("A") + col("*")
will not work because the name on the left-hand side is inherited, meaning all the output columns are named A
, causing an error if there is more than one. However, col("*") + col("A")
will work fine.
Combining DataFrames#
Two DataFrames can be column-wise joined using df.join()
.
This requires a "join key", which can be supplied as the on
argument if both DataFrames have the same name for their key columns, or the left_on
and right_on
argument if the key column has different names in each DataFrame.
Daft also supports multi-column joins if you have a join key comprising of multiple columns!
1 2 3 4 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 |
|
Reordering Rows#
Rows in a DataFrame can be reordered based on some column using df.sort()
. Daft also supports multi-column sorts for sorting on multiple columns at once.
1 2 3 4 5 6 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 |
|
Numbering Rows#
Daft provides monotonically_increasing_id()
, which assigns unique, increasing IDs to rows in a DataFrame, especially useful in distributed settings, by:
- Using the upper 28 bits for the partition number
- Using the lower 36 bits for the row number within each partition
This allows for up to 268 million partitions and 68 billion rows per partition. It's useful for creating unique IDs in distributed DataFrames, tracking row order after operations like sorting, and ensuring uniqueness across large datasets.
1 2 3 4 5 6 7 8 9 10 11 12 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
|
In this example, rows in the first partition get IDs 0
and 1
, while rows in the second partition start at 2^36
(68719476736
).
Exploding Columns#
The df.explode()
method can be used to explode a column containing a list of values into multiple rows. All other rows will be duplicated.
1 2 3 4 5 6 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
|
Expressions#
Expressions are how you can express computations that should be run over columns of data.
Creating Expressions#
Referring to a column in a DataFrame#
Most commonly you will be creating expressions by using the daft.col()
function.
1 2 |
|
1 |
|
Output | |
---|---|
1 |
|
The above code creates an Expression that refers to a column named "A"
.
Using SQL#
Daft can also parse valid SQL as expressions.
1 |
|
Output | |
---|---|
1 |
|
The above code will create an expression representing "the column named 'x' incremented by 1". For many APIs, sql_expr
will actually be applied for you as syntactic sugar!
Literals#
You may find yourself needing to hardcode a "single value" oftentimes as an expression. Daft provides a lit()
helper to do so:
1 2 3 4 |
|
1 2 |
|
Output | |
---|---|
1 |
|
~daft.expressions.lit
expression we just created evaluates always to the value 42
. Wildcard Expressions#
You can create expressions on multiple columns at once using a wildcard. The expression col("*")
selects every column in a DataFrame, and you can operate on this expression in the same way as a single column:
1 2 3 4 5 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 |
|
Wildcards also work very well for accessing all members of a struct column:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
|
1 2 3 4 5 6 7 8 9 10 11 12 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 |
|
In this example, we use the wildcard *
to access all fields of the person
struct column. This is equivalent to selecting each field individually (person.name
, person.age
), but is more concise and flexible, especially when dealing with structs that have many fields.
Composing Expressions#
Numeric Expressions#
Since column "A" is an integer, we can run numeric computation such as addition, division and checking its value. Here are some examples where we create new columns using the results of such computations:
1 2 3 4 5 6 7 8 9 10 |
|
1 2 3 4 5 6 7 8 9 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 |
|
Notice that the returned types of these operations are also well-typed according to their input types. For example, calling df["A"] > 1
returns a column of type Boolean
.
Both the Float
and Int
types are numeric types, and inherit many of the same arithmetic Expression operations. You may find the full list of numeric operations in the Expressions API Reference.
String Expressions#
Daft also lets you have columns of strings in a DataFrame. Let's take a look!
1 2 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 |
|
Unlike the numeric types, the string type does not support arithmetic operations such as *
and /
. The one exception to this is the +
operator, which is overridden to concatenate two string expressions as is commonly done in Python. Let's try that!
1 2 |
|
1 2 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 |
|
There are also many string operators that are accessed through a separate .str.*
"method namespace".
For example, to check if each element in column "B" contains the substring "a", we can use the .str.contains()
method:
1 2 |
|
1 2 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 |
|
You may find a full list of string operations in the Expressions API Reference.
URL Expressions#
One special case of a String column you may find yourself working with is a column of URL strings.
Daft provides the .url.*
method namespace with functionality for working with URL strings. For example, to download data from URLs:
1 2 3 4 5 6 7 8 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|
This works well for URLs which are HTTP paths to non-HTML files (e.g. jpeg), local filepaths or even paths to a file in an object store such as AWS S3 as well!
JSON Expressions#
If you have a column of JSON strings, Daft provides the .json.*
method namespace to run JQ-style filters on them. For example, to extract a value from a JSON object:
1 2 3 4 5 6 7 8 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 |
|
Daft uses jaq as the underlying executor, so you can find the full list of supported filters in the jaq documentation.
Logical Expressions#
Logical Expressions are an expression that refers to a column of type Boolean
, and can only take on the values True or False.
1 |
|
Daft supports logical operations such as &
(and) and |
(or) between logical expressions.
Comparisons#
Many of the types in Daft support comparisons between expressions that returns a Logical Expression.
For example, here we can compare if each element in column "A" is equal to elements in column "B":
1 2 3 4 5 |
|
1 2 3 4 5 6 7 8 9 10 11 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
|
Other useful comparisons can be found in the Expressions API Reference.
If Else Pattern#
The .if_else()
method is a useful expression to have up your sleeve for choosing values between two other expressions based on a logical expression:
1 2 3 4 5 6 7 8 9 10 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
|
This is a useful expression for cleaning your data!
Temporal Expressions#
Daft provides rich support for working with temporal data types like Timestamp and Duration. Let's explore some common temporal operations:
Basic Temporal Operations#
You can perform arithmetic operations with timestamps and durations, such as adding a duration to a timestamp or calculating the duration between two timestamps:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 |
|
Temporal Component Extraction#
The .dt.*
method namespace provides extraction methods for the components of a timestamp, such as year, month, day, hour, minute, and second:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 |
|
Time Zone Operations#
You can parse strings as timestamps with time zones and convert between different time zones:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 |
|
Temporal Truncation#
The .dt.truncate()
method allows you to truncate timestamps to specific time units. This can be useful for grouping data by time periods. For example, to truncate timestamps to the nearest hour:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
|
Daft can read data from a variety of sources, and write data to many destinations.
Reading Data#
From Files#
DataFrames can be loaded from file(s) on some filesystem, commonly your local filesystem or a remote cloud object store such as AWS S3.
Additionally, Daft can read data from a variety of container file formats, including CSV, line-delimited JSON and Parquet.
Daft supports file paths to a single file, a directory of files, and wildcards. It also supports paths to remote object storage such as AWS S3.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
To learn more about each of these constructors, as well as the options that they support, consult the API documentation on creating DataFrames from files
.
From Data Catalogs#
If you use catalogs such as Apache Iceberg or Apache Hudi, you can check out their dedicated integration pages.
From File Paths#
Daft also provides an easy utility to create a DataFrame from globbing a path. You can use the daft.from_glob_path()
method which will read a DataFrame of globbed filepaths.
1 2 3 4 5 6 |
|
This is especially useful for reading things such as a folder of images or documents into Daft. A common pattern is to then download data from these files into your DataFrame as bytes, using the .url.download()
method.
From Memory#
For testing, or small datasets that fit in memory, you may also create DataFrames using Python lists and dictionaries.
1 2 3 4 5 |
|
To learn more, consult the API documentation on creating DataFrames from in-memory data structures
.
From Databases#
Daft can also read data from a variety of databases, including PostgreSQL, MySQL, Trino, and SQLite using the daft.read_sq())
method. In order to partition the data, you can specify a partition column, which will allow Daft to read the data in parallel.
1 2 3 4 5 6 |
|
To learn more, consult the SQL Integration Page
or the API documentation on daft.read_sql()
.
Reading a column of URLs#
Daft provides a convenient way to read data from a column of URLs using the .url.download()
method. This is particularly useful when you have a DataFrame with a column containing URLs pointing to external resources that you want to fetch and incorporate into your DataFrame.
Here's an example of how to use this feature:
1 2 3 4 5 6 7 8 9 10 11 12 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 |
|
This approach allows you to efficiently download and process data from a large number of URLs in parallel, leveraging Daft's distributed computing capabilities.
Writing Data#
Writing data will execute your DataFrame and write the results out to the specified backend. The df.write_*(...)
methods, such as df.write_csv()
, df.write_iceberg()
, and df.write_deltalake()
to name a few, are used to write DataFrames to files or other destinations.
1 2 3 4 5 6 |
|
Note
Because Daft is a distributed DataFrame library, by default it will produce multiple files (one per partition) at your specified destination. Writing your dataframe is a blocking operation that executes your DataFrame. It will return a new DataFrame
that contains the filepaths to the written data.
DataTypes#
All columns in a Daft DataFrame have a DataType (also often abbreviated as dtype
).
All elements of a column are of the same dtype, or they can be the special Null value (indicating a missing value).
Daft provides simple DataTypes that are ubiquituous in many DataFrames such as numbers, strings and dates - all the way up to more complex types like tensors and images.
Tip
For a full overview on all the DataTypes that Daft supports, see the DataType API Reference.
Numeric DataTypes#
Numeric DataTypes allows Daft to represent numbers. These numbers can differ in terms of the number of bits used to represent them (8, 16, 32 or 64 bits) and the semantic meaning of those bits (float vs integer vs unsigned integers).
Examples:
DataType.int8()
: represents an 8-bit signed integer (-128 to 127)DataType.float32()
: represents a 32-bit float (a float number with about 7 decimal digits of precision)
Columns/expressions with these datatypes can be operated on with many numeric expressions such as +
and *
.
See also: Numeric Expressions
Logical DataTypes#
The Boolean
DataType represents values which are boolean values: True
, False
or Null
.
Columns/expressions with this dtype can be operated on using logical expressions such as &
and .if_else()
.
See also: Logical Expressions
String Types#
Daft has string types, which represent a variable-length string of characters.
As a convenience method, string types also support the +
Expression, which has been overloaded to support concatenation of elements between two DataType.string()
columns.
DataType.string()
: represents a string of UTF-8 charactersDataType.binary()
: represents a string of bytes
See also: String Expressions
Temporal DataTypes#
Temporal DataTypes represent data that have to do with time.
Examples:
DataType.date()
: represents a Date (year, month and day)DataType.timestamp()
: represents a Timestamp (particular instance in time)
See also: Temporal Expressions
Nested DataTypes#
Nested DataTypes wrap other DataTypes, allowing you to compose types into complex data structures.
Examples:
DataType.list(child_dtype)
: represents a list where each element is of the childdtype
DataType.struct({"field_name": child_dtype})
: represents a structure that has childrendtype
s, each mapped to a field name
Python DataType#
The DataType.python()
DataType represent items that are Python objects.
Warning
Daft does not impose any invariants about what Python types these objects are. To Daft, these are just generic Python objects!
Python is AWESOME because it's so flexible, but it's also slow and memory inefficient! Thus we recommend:
- Cast early!: Casting your Python data into native Daft DataTypes if possible - this results in much more efficient downstream data serialization and computation.
- Use Python UDFs: If there is no suitable Daft representation for your Python objects, use Python UDFs to process your Python data and extract the relevant data to be returned as native Daft DataTypes!
Note
If you work with Python classes for a generalizable use-case (e.g. documents, protobufs), it may be that these types are good candidates for "promotion" into a native Daft type! Please get in touch with the Daft team and we would love to work together on building your type into canonical Daft types.
Complex DataTypes#
Daft supports many more interesting complex DataTypes, for example:
DataType.tensor()
: Multi-dimensional (potentially uniformly-shaped) tensors of dataDataType.embedding()
: Lower-dimensional vector representation of data (e.g. words)DataType.image()
: NHWC images
Daft abstracts away the in-memory representation of your data and provides kernels for many common operations on top of these data types. For supported image operations see the image expressions API reference. For more complex algorithms, you can also drop into a Python UDF to process this data using your custom Python libraries.
Please add suggestions for new DataTypes to our Github Discussions page!
SQL#
Daft supports Structured Query Language (SQL) as a way of constructing query plans (represented in Python as a daft.DataFrame
) and expressions (daft.Expression
).
SQL is a human-readable way of constructing these query plans, and can often be more ergonomic than using DataFrames for writing queries.
Daft's SQL support is new and is constantly being improved on!
Please give us feedback or submit an issue and we'd love to hear more about what you would like.
Head to our SQL Overview page for examples on using SQL with DataFrames, SQL Expressions, and SQL Functions.
Aggregations and Grouping#
Some operations such as the sum or the average of a column are called aggregations. Aggregations are operations that reduce the number of rows in a column.
Global Aggregations#
An aggregation can be applied on an entire DataFrame, for example to get the mean on a specific column:
1 2 3 4 5 6 7 8 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 |
|
Aggregations can also be mixed and matched across columns, via the .agg()
method:
1 2 3 4 5 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 |
|
Grouped Aggregations#
Aggregations can also be called on a "Grouped DataFrame". For the above example, perhaps we want to get the mean "score" not for the entire DataFrame, but for each "class".
Let's run the mean of column "score" again, but this time grouped by "class":
1 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 |
|
To run multiple aggregations on a Grouped DataFrame, you can use the agg
method:
1 2 3 4 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 |
|
Cross Column Aggregations#
While standard aggregations like sum
or mean
work vertically on a single column, Daft also provides functions to operate horizontally across multiple columns for each row. These functions are part of the daft.functions
module and include:
columns_min
: Find the minimum value across specified columns for each rowcolumns_max
: Find the maximum value across specified columns for each rowcolumns_mean
: Calculate the mean across specified columns for each rowcolumns_sum
: Calculate the sum across specified columns for each rowcolumns_avg
: Alias forcolumns_mean
Here's a simple example showing these functions in action:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
|
These functions are especially useful when you need to calculate statistics across related columns or find extreme values from multiple fields in your data.
User-Defined Functions (UDF)#
A key piece of functionality in Daft is the ability to flexibly define custom functions that can run computations on any data in your dataframe. This section walks you through the different types of UDFs that Daft allows you to run.
Let's first create a dataframe that will be used as a running example throughout this tutorial!
1 2 3 4 5 6 7 8 9 |
|
Per-column per-row functions using .apply()
#
You can use .apply()
to run a Python function on every row in a column.
For example, the following example creates a new flattened_image
column by calling .flatten()
on every object in the image
column.
1 2 3 4 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
|
Note here that we use the return_dtype
keyword argument to specify that our returned column type is a Python column!
Multi-column per-partition functions using @udf
#
.apply()
is great for convenience, but has two main limitations:
- It can only run on single columns
- It can only run on single items at a time
Daft provides the @udf
decorator for defining your own UDFs that process multiple columns or multiple rows at a time.
For example, let's try writing a function that will crop all our images in the image
column by its corresponding value in the crop
column:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
|
There's a few things happening here, let's break it down:
-
crop_images
is a normal Python function. It takes as input:a. A list of images:
images
b. A list of cropping boxes:
crops
c. An integer indicating how much padding to apply to the right and bottom of the cropping:
padding
-
To allow Daft to pass column data into the
images
andcrops
arguments, we decorate the function with@udf
a.
return_dtype
defines the returned data type. In this case, we return a column containing Python objects of numpy arraysb. At runtime, because we call the UDF on the
image
andcrop
columns, the UDF will receive adaft.Series
object for each argument. -
We can create a new column in our DataFrame by applying our UDF on the
"image"
and"crop"
columns inside of adf.with_column()
call.
UDF Inputs#
When you specify an Expression as an input to a UDF, Daft will calculate the result of that Expression and pass it into your function as a daft.Series
object.
The Daft daft.Series
is just an abstraction on a "column" of data! You can obtain several different data representations from a daft.Series
:
- PyArrow Arrays (
pa.Array
):s.to_arrow()
- Python lists (
list
):s.to_pylist()
Depending on your application, you may choose a different data representation that is more performant or more convenient!
Info
Certain array formats have some restrictions around the type of data that they can handle:
-
Null Handling: In Pandas and Numpy, nulls are represented as NaNs for numeric types, and Nones for non-numeric types. Additionally, the existence of nulls will trigger a type casting from integer to float arrays. If null handling is important to your use-case, we recommend using one of the other available options.
-
Python Objects: PyArrow array formats cannot support Python columns.
We recommend using Python lists if performance is not a major consideration, and using the arrow-native formats such as PyArrow arrays and numpy arrays if performance is important.
Return Types#
The return_dtype
argument specifies what type of column your UDF will return. Types can be specified using the daft.DataType
class.
Your UDF function itself needs to return a batch of columnar data, and can do so as any one of the following array types:
- Numpy Arrays (
np.ndarray
) - PyArrow Arrays (
pa.Array
) - Python lists (
list
)
Note that if the data you have returned is not castable to the return_dtype that you specify (e.g. if you return a list of floats when you've specified a return_dtype=DataType.bool()
), Daft will throw a runtime error!
Class UDFs#
UDFs can also be created on Classes, which allow for initialization on some expensive state that can be shared between invocations of the class, for example downloading data or creating a model.
1 2 3 4 5 6 7 8 9 |
|
Running Class UDFs are exactly the same as running their functional cousins.
1 |
|
Resource Requests#
Sometimes, you may want to request for specific resources for your UDF. For example, some UDFs need one GPU to run as they will load a model onto the GPU.
To do so, you can create your UDF and assign it a resource request:
1 2 3 4 5 6 7 8 9 |
|
1 2 3 4 |
|
In the above example, if Daft ran on a Ray cluster consisting of 8 GPUs and 64 CPUs, Daft would be able to run 8 replicas of your UDF in parallel, thus massively increasing the throughput of your UDF!
UDFs can also be parametrized with new resource requests after being initialized.
1 2 3 4 5 |
|
Example: UDFs in ML Workloads#
We'll define a function that uses a pre-trained PyTorch model: ResNet50 to classify the dog pictures. We'll pass the contents of the image urls
column and send the classification predictions to a new column classify_breed
.
Working with PyTorch adds some complexity but you can just run the cells below to perform the classification.
First, make sure to install and import some extra dependencies:
1 |
|
1 2 |
|
Define your ClassifyImages
UDF. Models are expensive to initialize and load, so we want to do this as few times as possible, and share a model across multiple invocations.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
|
Now you're ready to call this function on the urls
column and store the outputs in a new column we'll call classify_breed
:
1 2 |
|
Multimodal Data#
Daft is built to work comfortably with multimodal data types, including URLs and images. You can use the url.download()
expression to download the bytes from a URL. Let's store them in a new column using the df.with_column()
method:
1 2 |
|
Output | |
---|---|
1 2 3 4 5 6 7 8 9 10 11 |
|
Let's turn the bytes into human-readable images using image.decode()
:
1 2 |
|