User Defined Functions (UDFs)#

User-Defined Functions (UDFs) are a mechanism to run Python code on the data that lives in a DataFrame.

A UDF can be used just like Expressions, allowing users to express computation that should be executed by Daft lazily.

To write a UDF, you should use the @udf decorator, which can decorate either a Python function or a Python class, producing a UDF.

For more details, please consult the User Guide.

Creating UDFs#

udf(*, return_dtype: Union[DataType, type, str], num_cpus: Optional[float] = None, num_gpus: Optional[float] = None, memory_bytes: Optional[int] = None, batch_size: Optional[int] = None, concurrency: Optional[int] = None) Callable[[Union[Callable[[...], Union[Series, list, ndarray, Array, ChunkedArray]], type]], UDF][source]#

@udf Decorator to convert a Python function/class into a UDF.

UDFs allow users to run arbitrary Python code on the outputs of Expressions.

Note

In most cases, UDFs will be slower than a native kernel/expression because of the required Rust and Python overheads. If your computation can be expressed using Daft expressions, you should do so instead of writing a UDF. If your UDF expresses a common use-case that isn’t already covered by Daft, you should file a ticket or contribute this functionality back to Daft as a kernel!

In the example below, we create a UDF that:

  1. Receives data under the argument name x

  2. Iterates over the x Daft Series

  3. Adds a Python constant value c to every element in x

  4. Returns a new list of Python values which will be coerced to the specified return type: return_dtype=DataType.int64().

  5. We can call our UDF on a dataframe using any of the dataframe projection operations (df.with_column(), df.select(), etc.)

Example

>>> import daft
>>> @daft.udf(return_dtype=daft.DataType.int64())
... def add_constant(x: daft.Series, c=10):
...     return [v + c for v in x]
>>>
>>> df = daft.from_pydict({"x": [1, 2, 3]})
>>> df = df.with_column("new_x", add_constant(df["x"], c=20))
>>> df.show()
╭───────┬───────╮
│ x     ┆ new_x │
│ ---   ┆ ---   │
│ Int64 ┆ Int64 │
╞═══════╪═══════╡
│ 1     ┆ 21    │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2     ┆ 22    │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 3     ┆ 23    │
╰───────┴───────╯

(Showing first 3 of 3 rows)

Resource Requests#

You can also hint Daft about the resources that your UDF will require to run. For example, the following UDF requires 2 CPUs to run. On a machine/cluster with 8 CPUs, Daft will be able to run up to 4 instances of this UDF at once!

>>> import daft
>>> @daft.udf(return_dtype=daft.DataType.int64(), num_cpus=2)
... def udf_needs_2_cpus(x: daft.Series):
...     return x
>>>
>>> df = daft.from_pydict({"x": [1, 2, 3]})
>>> df = df.with_column("new_x", udf_needs_2_cpus(df["x"]))
>>> df.show()
╭───────┬───────╮
│ x     ┆ new_x │
│ ---   ┆ ---   │
│ Int64 ┆ Int64 │
╞═══════╪═══════╡
│ 1     ┆ 1     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2     ┆ 2     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 3     ┆ 3     │
╰───────┴───────╯

(Showing first 3 of 3 rows)

Your UDFs’ resources can also be overridden before you call it like so:

>>> import daft
>>> @daft.udf(return_dtype=daft.DataType.int64(), num_cpus=4)
... def udf_needs_4_cpus(x: daft.Series):
...     return x
>>>
>>> # Override the num_cpus to 2 instead
>>> udf_needs_2_cpus = udf_needs_4_cpus.override_options(num_cpus=2)
>>>
>>> df = daft.from_pydict({"x": [1, 2, 3]})
>>> df = df.with_column("new_x", udf_needs_2_cpus(df["x"]))
>>> df.show()
╭───────┬───────╮
│ x     ┆ new_x │
│ ---   ┆ ---   │
│ Int64 ┆ Int64 │
╞═══════╪═══════╡
│ 1     ┆ 1     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2     ┆ 2     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 3     ┆ 3     │
╰───────┴───────╯

(Showing first 3 of 3 rows)
param return_dtype:

Returned type of the UDF

type return_dtype:

DataType

param num_cpus:

Number of CPUs to allocate each running instance of your UDF. Note that this is purely used for placement (e.g. if your machine has 8 CPUs and you specify num_cpus=4, then Daft can run at most 2 instances of your UDF at a time). The default None indicates that Daft is free to allocate as many instances of the UDF as it wants to.

param num_gpus:

Number of GPUs to allocate each running instance of your UDF. This is used for placement and also for allocating the appropriate GPU to each UDF using CUDA_VISIBLE_DEVICES.

param memory_bytes:

Amount of memory to allocate each running instance of your UDF in bytes. If your UDF is experiencing out-of-memory errors, this parameter can help hint Daft that each UDF requires a certain amount of heap memory for execution.

param batch_size:

Enables batching of the input into batches of at most this size. Results between batches are concatenated.

param concurrency:

Spin up N number of persistent replicas of the UDF to process all partitions. Defaults to None which will spin up one UDF per partition. This is especially useful for expensive initializations that need to be amortized across partitions such as loading model weights for model batch inference.

returns:

UDF decorator - converts a user-provided Python function as a UDF that can be called on Expressions

rtype:

Callable[[UserDefinedPyFuncLike], UDF]

Using UDFs#

class UDF(inner: Union[Callable[[...], Union[Series, list, ndarray, Array, ChunkedArray]], type], name: str, return_dtype: DataType, init_args: Optional[Tuple[Tuple[Any, ...], Dict[str, Any]]] = None, concurrency: Optional[int] = None, resource_request: Optional[ResourceRequest] = None, batch_size: Optional[int] = None)[source]#

A class produced by applying the @daft.udf decorator over a Python function or class.

Calling this class produces a daft.Expression that can be used in a DataFrame function.

Example

>>> import daft
>>> @daft.udf(return_dtype=daft.DataType.float64())
... def multiply_and_add(x: daft.Series, y: float, z: float):
...     return x.to_arrow().to_numpy() * y + z
>>>
>>> df = daft.from_pydict({"x": [1, 2, 3]})
>>> df = df.with_column("result", multiply_and_add(df["x"], 2.0, z=1.5))
>>> df.show()
╭───────┬─────────╮
│ x     ┆ result  │
│ ---   ┆ ---     │
│ Int64 ┆ Float64 │
╞═══════╪═════════╡
│ 1     ┆ 3.5     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 2     ┆ 5.5     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 3     ┆ 7.5     │
╰───────┴─────────╯

(Showing first 3 of 3 rows)
__call__(*args, **kwargs) Expression[source]#

Call self as a function.

override_options(*, num_cpus: float | None = <object object>, num_gpus: float | None = <object object>, memory_bytes: int | None = <object object>, batch_size: int | None = <object object>) UDF[source]#

Replace the resource requests for running each instance of your UDF.

For instance, if your UDF requires 4 CPUs to run, you can configure it like so:

>>> import daft
>>>
>>> @daft.udf(return_dtype=daft.DataType.string())
... def example_udf(inputs):
...     # You will have access to 4 CPUs here if you configure your UDF correctly!
...     return inputs
>>>
>>> # Parametrize the UDF to run with 4 CPUs
>>> example_udf_4CPU = example_udf.override_options(num_cpus=4)
Parameters:
  • num_cpus – Number of CPUs to allocate each running instance of your UDF. Note that this is purely used for placement (e.g. if your machine has 8 CPUs and you specify num_cpus=4, then Daft can run at most 2 instances of your UDF at a time).

  • num_gpus – Number of GPUs to allocate each running instance of your UDF. This is used for placement and also for allocating the appropriate GPU to each UDF using CUDA_VISIBLE_DEVICES.

  • memory_bytes – Amount of memory to allocate each running instance of your UDF in bytes. If your UDF is experiencing out-of-memory errors, this parameter can help hint Daft that each UDF requires a certain amount of heap memory for execution.

  • batch_size – Enables batching of the input into batches of at most this size. Results between batches are concatenated.

with_concurrency(concurrency: int) UDF[source]#

Override the concurrency of this UDF, which tells Daft how many instances of your UDF to run concurrently.

Example

>>> import daft
>>>
>>> @daft.udf(return_dtype=daft.DataType.string(), num_gpus=1)
... class MyGpuUdf:
...     def __init__(self, text=" world"):
...         self.text = text
...
...     def __call__(self, data):
...         return [x + self.text for x in data]
>>>
>>> # New UDF that will have 8 concurrent running instances (will require 8 total GPUs)
>>> MyGpuUdf_8_concurrency = MyGpuUdf.with_concurrency(8)
with_init_args(*args, **kwargs) UDF[source]#

Replace initialization arguments for a class UDF when calling __init__ at runtime on each instance of the UDF.

Example

>>> import daft
>>>
>>> @daft.udf(return_dtype=daft.DataType.string())
... class MyUdfWithInit:
...     def __init__(self, text=" world"):
...         self.text = text
...
...     def __call__(self, data):
...         return [x + self.text for x in data]
>>>
>>> # Create a customized version of MyUdfWithInit by overriding the init args
>>> MyUdfWithInit_CustomInitArgs = MyUdfWithInit.with_init_args(text=" my old friend")
>>>
>>> df = daft.from_pydict({"foo": ["hello", "hello", "hello"]})
>>> df = df.with_column("bar_world", MyUdfWithInit(df["foo"]))
>>> df = df.with_column("bar_custom", MyUdfWithInit_CustomInitArgs(df["foo"]))
>>> df.show()
╭───────┬─────────────┬─────────────────────╮
│ foo   ┆ bar_world   ┆ bar_custom          │
│ ---   ┆ ---         ┆ ---                 │
│ Utf8  ┆ Utf8        ┆ Utf8                │
╞═══════╪═════════════╪═════════════════════╡
│ hello ┆ hello world ┆ hello my old friend │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ hello ┆ hello world ┆ hello my old friend │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ hello ┆ hello world ┆ hello my old friend │
╰───────┴─────────────┴─────────────────────╯

(Showing first 3 of 3 rows)