User Defined Functions (UDFs)#

User-Defined Functions (UDFs) are a mechanism to run Python code on the data that lives in a DataFrame.

A UDF can be used just like Expressions, allowing users to express computation that should be executed by Daft lazily.

To write a UDF, you should use the @udf decorator, which can decorate either a Python function or a Python class, producing a UDF.

For more details, please consult the User Guide.

Creating UDFs#

udf(*, return_dtype: Union[DataType, type, str], num_cpus: Optional[float] = None, num_gpus: Optional[float] = None, memory_bytes: Optional[int] = None, batch_size: Optional[int] = None) Callable[[Union[Callable[[...], Union[Series, list, ndarray, Array, ChunkedArray]], type]], UDF][source]#

@udf Decorator to convert a Python function/class into a UDF.

UDFs allow users to run arbitrary Python code on the outputs of Expressions.

Note

In most cases, UDFs will be slower than a native kernel/expression because of the required Rust and Python overheads. If your computation can be expressed using Daft expressions, you should do so instead of writing a UDF. If your UDF expresses a common use-case that isn’t already covered by Daft, you should file a ticket or contribute this functionality back to Daft as a kernel!

In the example below, we create a UDF that:

  1. Receives data under the argument name x

  2. Iterates over the x Daft Series

  3. Adds a Python constant value c to every element in x

  4. Returns a new list of Python values which will be coerced to the specified return type: return_dtype=DataType.int64().

  5. We can call our UDF on a dataframe using any of the dataframe projection operations (df.with_column(), df.select(), etc.)

Example

>>> import daft
>>> @daft.udf(return_dtype=daft.DataType.int64())
... def add_constant(x: daft.Series, c=10):
...     return [v + c for v in x]
>>>
>>> df = daft.from_pydict({"x": [1, 2, 3]})
>>> df = df.with_column("new_x", add_constant(df["x"], c=20))
>>> df.show()
╭───────┬───────╮
│ x     ┆ new_x │
│ ---   ┆ ---   │
│ Int64 ┆ Int64 │
╞═══════╪═══════╡
│ 1     ┆ 21    │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2     ┆ 22    │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 3     ┆ 23    │
╰───────┴───────╯

(Showing first 3 of 3 rows)

Resource Requests#

You can also hint Daft about the resources that your UDF will require to run. For example, the following UDF requires 2 CPUs to run. On a machine/cluster with 8 CPUs, Daft will be able to run up to 4 instances of this UDF at once, giving you a concurrency of 4!

>>> import daft
>>> @daft.udf(return_dtype=daft.DataType.int64(), num_cpus=2)
... def udf_needs_2_cpus(x: daft.Series):
...     return x
>>>
>>> df = daft.from_pydict({"x": [1, 2, 3]})
>>> df = df.with_column("new_x", udf_needs_2_cpus(df["x"]))
>>> df.show()
╭───────┬───────╮
│ x     ┆ new_x │
│ ---   ┆ ---   │
│ Int64 ┆ Int64 │
╞═══════╪═══════╡
│ 1     ┆ 1     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2     ┆ 2     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 3     ┆ 3     │
╰───────┴───────╯

(Showing first 3 of 3 rows)

Your UDFs’ resources can also be overridden before you call it like so:

>>> import daft
>>> @daft.udf(return_dtype=daft.DataType.int64(), num_cpus=4)
... def udf_needs_4_cpus(x: daft.Series):
...     return x
>>>
>>> # Override the num_cpus to 2 instead
>>> udf_needs_2_cpus = udf_needs_4_cpus.override_options(num_cpus=2)
>>>
>>> df = daft.from_pydict({"x": [1, 2, 3]})
>>> df = df.with_column("new_x", udf_needs_2_cpus(df["x"]))
>>> df.show()
╭───────┬───────╮
│ x     ┆ new_x │
│ ---   ┆ ---   │
│ Int64 ┆ Int64 │
╞═══════╪═══════╡
│ 1     ┆ 1     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2     ┆ 2     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 3     ┆ 3     │
╰───────┴───────╯

(Showing first 3 of 3 rows)
param return_dtype:

Returned type of the UDF

type return_dtype:

DataType

param num_cpus:

Number of CPUs to allocate each running instance of your UDF. Note that this is purely used for placement (e.g. if your machine has 8 CPUs and you specify num_cpus=4, then Daft can run at most 2 instances of your UDF at a time). The default None indicates that Daft is free to allocate as many instances of the UDF as it wants to.

param num_gpus:

Number of GPUs to allocate each running instance of your UDF. This is used for placement and also for allocating the appropriate GPU to each UDF using CUDA_VISIBLE_DEVICES.

param memory_bytes:

Amount of memory to allocate each running instance of your UDF in bytes. If your UDF is experiencing out-of-memory errors, this parameter can help hint Daft that each UDF requires a certain amount of heap memory for execution.

param batch_size:

Enables batching of the input into batches of at most this size. Results between batches are concatenated.

returns:

UDF decorator - converts a user-provided Python function as a UDF that can be called on Expressions

rtype:

Callable[[UserDefinedPyFuncLike], UDF]

Using UDFs#

class UDF(inner: Union[Callable[[...], Union[Series, list, ndarray, Array, ChunkedArray]], type], name: str, return_dtype: DataType, init_args: Optional[Tuple[Tuple[Any, ...], Dict[str, Any]]] = None, concurrency: Optional[int] = None, resource_request: Optional[ResourceRequest] = None, batch_size: Optional[int] = None)[source]#

A class produced by applying the @daft.udf decorator over a Python function or class.

Calling this class produces a daft.Expression that can be used in a DataFrame function.

Example

>>> import daft
>>> @daft.udf(return_dtype=daft.DataType.float64())
... def multiply_and_add(x: daft.Series, y: float, z: float):
...     return x.to_arrow().to_numpy() * y + z
>>>
>>> df = daft.from_pydict({"x": [1, 2, 3]})
>>> df = df.with_column("result", multiply_and_add(df["x"], 2.0, z=1.5))
>>> df.show()
╭───────┬─────────╮
│ x     ┆ result  │
│ ---   ┆ ---     │
│ Int64 ┆ Float64 │
╞═══════╪═════════╡
│ 1     ┆ 3.5     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 2     ┆ 5.5     │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 3     ┆ 7.5     │
╰───────┴─────────╯

(Showing first 3 of 3 rows)
__call__(*args, **kwargs) Expression[source]#

Call self as a function.

override_options(*, num_cpus: float | None = <object object>, num_gpus: float | None = <object object>, memory_bytes: int | None = <object object>, batch_size: int | None = <object object>) UDF[source]#

Replace the resource requests for running each instance of your UDF.

For instance, if your UDF requires 4 CPUs to run, you can configure it like so:

>>> import daft
>>>
>>> @daft.udf(return_dtype=daft.DataType.string())
... def example_udf(inputs):
...     # You will have access to 4 CPUs here if you configure your UDF correctly!
...     return inputs
>>>
>>> # Parametrize the UDF to run with 4 CPUs
>>> example_udf_4CPU = example_udf.override_options(num_cpus=4)
Parameters:
  • num_cpus – Number of CPUs to allocate each running instance of your UDF. Note that this is purely used for placement (e.g. if your machine has 8 CPUs and you specify num_cpus=4, then Daft can run at most 2 instances of your UDF at a time).

  • num_gpus – Number of GPUs to allocate each running instance of your UDF. This is used for placement and also for allocating the appropriate GPU to each UDF using CUDA_VISIBLE_DEVICES.

  • memory_bytes – Amount of memory to allocate each running instance of your UDF in bytes. If your UDF is experiencing out-of-memory errors, this parameter can help hint Daft that each UDF requires a certain amount of heap memory for execution.

  • batch_size – Enables batching of the input into batches of at most this size. Results between batches are concatenated.

with_concurrency(concurrency: int) UDF[source]#

Override the concurrency of this UDF, which tells Daft how many instances of your UDF to run concurrently.

Example

>>> import daft
>>>
>>> @daft.udf(return_dtype=daft.DataType.string(), num_gpus=1)
... class MyGpuUdf:
...     def __init__(self, text=" world"):
...         self.text = text
...
...     def __call__(self, data):
...         return [x + self.text for x in data]
>>>
>>> # New UDF that will have 8 concurrent running instances (will require 8 total GPUs)
>>> MyGpuUdf_8_concurrency = MyGpuUdf.with_concurrency(8)
with_init_args(*args, **kwargs) UDF[source]#

Replace initialization arguments for a class UDF when calling __init__ at runtime on each instance of the UDF.

Example

>>> import daft
>>>
>>> @daft.udf(return_dtype=daft.DataType.string())
... class MyUdfWithInit:
...     def __init__(self, text=" world"):
...         self.text = text
...
...     def __call__(self, data):
...         return [x + self.text for x in data]
>>>
>>> # Create a customized version of MyUdfWithInit by overriding the init args
>>> MyUdfWithInit_CustomInitArgs = MyUdfWithInit.with_init_args(text=" my old friend")
>>>
>>> df = daft.from_pydict({"foo": ["hello", "hello", "hello"]})
>>> df = df.with_column("bar_world", MyUdfWithInit(df["foo"]))
>>> df = df.with_column("bar_custom", MyUdfWithInit_CustomInitArgs(df["foo"]))
>>> df.show()
╭───────┬─────────────┬─────────────────────╮
│ foo   ┆ bar_world   ┆ bar_custom          │
│ ---   ┆ ---         ┆ ---                 │
│ Utf8  ┆ Utf8        ┆ Utf8                │
╞═══════╪═════════════╪═════════════════════╡
│ hello ┆ hello world ┆ hello my old friend │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ hello ┆ hello world ┆ hello my old friend │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ hello ┆ hello world ┆ hello my old friend │
╰───────┴─────────────┴─────────────────────╯

(Showing first 3 of 3 rows)