User Defined Functions (UDFs)#
User-Defined Functions (UDFs) are a mechanism to run Python code on the data that lives in a DataFrame.
A UDF can be used just like Expressions, allowing users to express computation that should be executed by Daft lazily.
To write a UDF, you should use the @udf
decorator, which can decorate either a Python
function or a Python class, producing either a StatelessUDF
or
StatefulUDF
respectively.
For more details, please consult the UDF User Guide
Creating UDFs#
- udf(*, return_dtype: DataType, num_cpus: Optional[float] = None, num_gpus: Optional[float] = None, memory_bytes: Optional[int] = None, batch_size: Optional[int] = None) Callable[[Union[Callable[[...], Union[Series, ndarray, list]], type]], daft.udf.StatelessUDF | daft.udf.StatefulUDF] [source]#
@udf
Decorator to convert a Python function/class into aStatelessUDF
orStatefulUDF
respectivelyUDFs allow users to run arbitrary Python code on the outputs of Expressions.
Note
In most cases, UDFs will be slower than a native kernel/expression because of the required Rust and Python overheads. If your computation can be expressed using Daft expressions, you should do so instead of writing a UDF. If your UDF expresses a common use-case that isn’t already covered by Daft, you should file a ticket or contribute this functionality back to Daft as a kernel!
In the example below, we create a UDF that:
Receives data under the argument name
x
Converts the
x
Daft Series into a Python list usingx.to_pylist()
Adds a Python constant value
c
to every element inx
Returns a new list of Python values which will be coerced to the specified return type:
return_dtype=DataType.int64()
.We can call our UDF on a dataframe using any of the dataframe projection operations (
df.with_column()
,df.select()
, etc.)
Example
>>> import daft >>> @daft.udf(return_dtype=daft.DataType.int64()) ... def add_constant(x: daft.Series, c=10): ... return [v + c for v in x.to_pylist()] >>> >>> df = daft.from_pydict({"x": [1, 2, 3]}) >>> df = df.with_column("new_x", add_constant(df["x"], c=20)) >>> df.show() ╭───────┬───────╮ │ x ┆ new_x │ │ --- ┆ --- │ │ Int64 ┆ Int64 │ ╞═══════╪═══════╡ │ 1 ┆ 21 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ │ 2 ┆ 22 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ │ 3 ┆ 23 │ ╰───────┴───────╯ (Showing first 3 of 3 rows)
Resource Requests#
You can also hint Daft about the resources that your UDF will require to run. For example, the following UDF requires 2 CPUs to run. On a machine/cluster with 8 CPUs, Daft will be able to run up to 4 instances of this UDF at once, giving you a concurrency of 4!
>>> import daft >>> @daft.udf(return_dtype=daft.DataType.int64(), num_cpus=2) ... def udf_needs_2_cpus(x: daft.Series): ... return x >>> >>> df = daft.from_pydict({"x": [1, 2, 3]}) >>> df = df.with_column("new_x", udf_needs_2_cpus(df["x"])) >>> df.show() ╭───────┬───────╮ │ x ┆ new_x │ │ --- ┆ --- │ │ Int64 ┆ Int64 │ ╞═══════╪═══════╡ │ 1 ┆ 1 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ │ 2 ┆ 2 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ │ 3 ┆ 3 │ ╰───────┴───────╯ (Showing first 3 of 3 rows)
Your UDFs’ resources can also be overridden before you call it like so:
>>> import daft >>> @daft.udf(return_dtype=daft.DataType.int64(), num_cpus=4) ... def udf_needs_4_cpus(x: daft.Series): ... return x >>> >>> # Override the num_cpus to 2 instead >>> udf_needs_2_cpus = udf_needs_4_cpus.override_options(num_cpus=2) >>> >>> df = daft.from_pydict({"x": [1, 2, 3]}) >>> df = df.with_column("new_x", udf_needs_2_cpus(df["x"])) >>> df.show() ╭───────┬───────╮ │ x ┆ new_x │ │ --- ┆ --- │ │ Int64 ┆ Int64 │ ╞═══════╪═══════╡ │ 1 ┆ 1 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ │ 2 ┆ 2 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ │ 3 ┆ 3 │ ╰───────┴───────╯ (Showing first 3 of 3 rows)
- param return_dtype:
Returned type of the UDF
- type return_dtype:
DataType
- param num_cpus:
Number of CPUs to allocate each running instance of your UDF. Note that this is purely used for placement (e.g. if your machine has 8 CPUs and you specify num_cpus=4, then Daft can run at most 2 instances of your UDF at a time). The default
None
indicates that Daft is free to allocate as many instances of the UDF as it wants to.- param num_gpus:
Number of GPUs to allocate each running instance of your UDF. This is used for placement and also for allocating the appropriate GPU to each UDF using
CUDA_VISIBLE_DEVICES
.- param memory_bytes:
Amount of memory to allocate each running instance of your UDF in bytes. If your UDF is experiencing out-of-memory errors, this parameter can help hint Daft that each UDF requires a certain amount of heap memory for execution.
- param batch_size:
Enables batching of the input into batches of at most this size. Results between batches are concatenated.
- returns:
UDF decorator - converts a user-provided Python function as a UDF that can be called on Expressions
- rtype:
Callable[[UserProvidedPythonFunction], UDF]
Using UDFs#
- class StatelessUDF(common_args: CommonUDFArgs, name: str, func: Callable[[...], Union[Series, ndarray, list]], return_dtype: DataType)[source]#
A Stateless UDF is produced by calling
@udf
over a Python function- __call__(*args, **kwargs) Expression [source]#
Call the UDF using some input Expressions, producing a new Expression that can be used by a DataFrame. :param *args: Positional arguments to be passed to the UDF. These can be either Expressions or Python values. :param **kwargs: Keyword arguments to be passed to the UDF. These can be either Expressions or Python values.
- Returns:
A new Expression representing the UDF call, which can be used in DataFrame operations.
- Return type:
Expression
Note
When passing arguments to the UDF, you can use a mix of Expressions (e.g., df[“column”]) and Python values. Expressions will be evaluated for each row, while Python values will be passed as-is to the UDF.
Example
>>> import daft >>> @daft.udf(return_dtype=daft.DataType.float64()) ... def multiply_and_add(x: daft.Series, y: float, z: float): ... return x.to_arrow().to_numpy() * y + z >>> >>> df = daft.from_pydict({"x": [1, 2, 3]}) >>> df = df.with_column("result", multiply_and_add(df["x"], 2.0, z=1.5)) >>> df.show() ╭───────┬─────────╮ │ x ┆ result │ │ --- ┆ --- │ │ Int64 ┆ Float64 │ ╞═══════╪═════════╡ │ 1 ┆ 3.5 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ │ 2 ┆ 5.5 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ │ 3 ┆ 7.5 │ ╰───────┴─────────╯ (Showing first 3 of 3 rows)
- override_options(*, num_cpus: float | None = <object object>, num_gpus: float | None = <object object>, memory_bytes: int | None = <object object>, batch_size: int | None = <object object>) StatelessUDF [source]#
Replace the resource requests for running each instance of your UDF.
For instance, if your UDF requires 4 CPUs to run, you can configure it like so:
>>> import daft >>> >>> @daft.udf(return_dtype=daft.DataType.string()) ... def example_stateless_udf(inputs): ... # You will have access to 4 CPUs here if you configure your UDF correctly! ... return inputs >>> >>> # Parametrize the UDF to run with 4 CPUs >>> example_stateless_udf_4CPU = example_stateless_udf.override_options(num_cpus=4) >>> >>> df = daft.from_pydict({"foo": [1, 2, 3]}) >>> df = df.with_column("bar", example_stateless_udf_4CPU(df["foo"]))
- Parameters:
num_cpus – Number of CPUs to allocate each running instance of your UDF. Note that this is purely used for placement (e.g. if your machine has 8 CPUs and you specify num_cpus=4, then Daft can run at most 2 instances of your UDF at a time).
num_gpus – Number of GPUs to allocate each running instance of your UDF. This is used for placement and also for allocating the appropriate GPU to each UDF using
CUDA_VISIBLE_DEVICES
.memory_bytes – Amount of memory to allocate each running instance of your UDF in bytes. If your UDF is experiencing out-of-memory errors, this parameter can help hint Daft that each UDF requires a certain amount of heap memory for execution.
batch_size – Enables batching of the input into batches of at most this size. Results between batches are concatenated.
- class StatefulUDF(common_args: CommonUDFArgs, name: str, cls: type, return_dtype: DataType, init_args: Optional[Tuple[Tuple[Any, ...], Dict[str, Any]]] = None, concurrency: Optional[int] = None)[source]#
A StatefulUDF is produced by calling
@udf
over a Python class, allowing for maintaining state between calls: it can be further parametrized at runtime with custom concurrency, resources, and init args.- Example of a Stateful UDF:
>>> import daft >>> >>> @daft.udf(return_dtype=daft.DataType.string()) ... class MyStatefulUdf: ... def __init__(self, prefix: str = "Goodbye"): ... self.prefix = prefix ... ... def __call__(self, name: daft.Series) -> list: ... return [f"{self.prefix}, {n}!" for n in name.to_pylist()] >>> >>> MyHelloStatefulUdf = MyStatefulUdf.with_init_args(prefix="Hello") >>> >>> df = daft.from_pydict({"name": ["Alice", "Bob", "Charlie"]}) >>> df = df.with_column("greeting", MyHelloStatefulUdf(df["name"])) >>> df.show() ╭─────────┬─────────────────╮ │ name ┆ greeting │ │ --- ┆ --- │ │ Utf8 ┆ Utf8 │ ╞═════════╪═════════════════╡ │ Alice ┆ Hello, Alice! │ ├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ Bob ┆ Hello, Bob! │ ├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ Charlie ┆ Hello, Charlie! │ ╰─────────┴─────────────────╯ (Showing first 3 of 3 rows)
The state (in this case, the prefix) is maintained across calls to the UDF. Most commonly, this state is used for things such as ML models which should be downloaded and loaded into memory once for multiple invocations.
- __call__(*args, **kwargs) Expression [source]#
Call the UDF using some input Expressions, producing a new Expression that can be used by a DataFrame. :param *args: Positional arguments to be passed to the UDF. These can be either Expressions or Python values. :param **kwargs: Keyword arguments to be passed to the UDF. These can be either Expressions or Python values.
- Returns:
A new Expression representing the UDF call, which can be used in DataFrame operations.
- Return type:
Expression
Note
When passing arguments to the UDF, you can use a mix of Expressions (e.g., df[“column”]) and Python values. Expressions will be evaluated for each row, while Python values will be passed as-is to the UDF.
Example
>>> import daft >>> >>> @daft.udf(return_dtype=daft.DataType.float64()) ... class MultiplyAndAdd: ... def __init__(self, multiplier: float = 2.0): ... self.multiplier = multiplier ... ... def __call__(self, x: daft.Series, z: float) -> list: ... return [val * self.multiplier + z for val in x.to_pylist()] >>> >>> df = daft.from_pydict({"x": [1, 2, 3]}) >>> df = df.with_column("result", MultiplyAndAdd(df["x"], z=1.5)) >>> df.show() ╭───────┬─────────╮ │ x ┆ result │ │ --- ┆ --- │ │ Int64 ┆ Float64 │ ╞═══════╪═════════╡ │ 1 ┆ 3.5 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ │ 2 ┆ 5.5 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ │ 3 ┆ 7.5 │ ╰───────┴─────────╯ (Showing first 3 of 3 rows)
- override_options(*, num_cpus: float | None = <object object>, num_gpus: float | None = <object object>, memory_bytes: int | None = <object object>, batch_size: int | None = <object object>) StatefulUDF [source]#
Replace the resource requests for running each instance of your UDF.
For instance, if your UDF requires 4 CPUs to run, you can configure it like so:
>>> import daft >>> >>> @daft.udf(return_dtype=daft.DataType.string()) ... def example_stateless_udf(inputs): ... # You will have access to 4 CPUs here if you configure your UDF correctly! ... return inputs >>> >>> # Parametrize the UDF to run with 4 CPUs >>> example_stateless_udf_4CPU = example_stateless_udf.override_options(num_cpus=4) >>> >>> df = daft.from_pydict({"foo": [1, 2, 3]}) >>> df = df.with_column("bar", example_stateless_udf_4CPU(df["foo"]))
- Parameters:
num_cpus – Number of CPUs to allocate each running instance of your UDF. Note that this is purely used for placement (e.g. if your machine has 8 CPUs and you specify num_cpus=4, then Daft can run at most 2 instances of your UDF at a time).
num_gpus – Number of GPUs to allocate each running instance of your UDF. This is used for placement and also for allocating the appropriate GPU to each UDF using
CUDA_VISIBLE_DEVICES
.memory_bytes – Amount of memory to allocate each running instance of your UDF in bytes. If your UDF is experiencing out-of-memory errors, this parameter can help hint Daft that each UDF requires a certain amount of heap memory for execution.
batch_size – Enables batching of the input into batches of at most this size. Results between batches are concatenated.
- with_concurrency(concurrency: int) StatefulUDF [source]#
Override the concurrency of this StatefulUDF, which tells Daft how many instances of your StatefulUDF to run concurrently.
Example:
>>> import daft >>> >>> @daft.udf(return_dtype=daft.DataType.string(), num_gpus=1) ... class MyUDFThatNeedsAGPU: ... def __init__(self, text=" world"): ... self.text = text ... ... def __call__(self, data): ... return [x + self.text for x in data.to_pylist()] >>> >>> # New UDF that will have 8 concurrent running instances (will require 8 total GPUs) >>> MyUDFThatNeedsAGPU_8_concurrency = MyUDFThatNeedsAGPU.with_concurrency(8)
- with_init_args(*args, **kwargs) StatefulUDF [source]#
Replace initialization arguments for the Stateful UDF when calling
__init__
at runtime on each instance of the UDF.Example:
>>> import daft >>> >>> @daft.udf(return_dtype=daft.DataType.string()) ... class MyInitializedClass: ... def __init__(self, text=" world"): ... self.text = text ... ... def __call__(self, data): ... return [x + self.text for x in data.to_pylist()] >>> >>> # Create a customized version of MyInitializedClass by overriding the init args >>> MyInitializedClass_CustomInitArgs = MyInitializedClass.with_init_args(text=" my old friend") >>> >>> df = daft.from_pydict({"foo": ["hello", "hello", "hello"]}) >>> df = df.with_column("bar_world", MyInitializedClass(df["foo"])) >>> df = df.with_column("bar_custom", MyInitializedClass_CustomInitArgs(df["foo"])) >>> df.show() ╭───────┬─────────────┬─────────────────────╮ │ foo ┆ bar_world ┆ bar_custom │ │ --- ┆ --- ┆ --- │ │ Utf8 ┆ Utf8 ┆ Utf8 │ ╞═══════╪═════════════╪═════════════════════╡ │ hello ┆ hello world ┆ hello my old friend │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ hello ┆ hello world ┆ hello my old friend │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ hello ┆ hello world ┆ hello my old friend │ ╰───────┴─────────────┴─────────────────────╯ (Showing first 3 of 3 rows)