User Defined Functions (UDFs)#
User-Defined Functions (UDFs) are a mechanism to run Python code on the data that lives in a DataFrame.
A UDF can be used just like Expressions, allowing users to express computation that should be executed by Daft lazily.
To write a UDF, you should use the @udf
decorator, which can decorate either a Python
function or a Python class, producing a UDF
.
For more details, please consult the User Guide.
Creating UDFs#
- udf(*, return_dtype: Union[DataType, type, str], num_cpus: Optional[float] = None, num_gpus: Optional[float] = None, memory_bytes: Optional[int] = None, batch_size: Optional[int] = None) Callable[[Union[Callable[[...], Union[Series, list, ndarray, Array, ChunkedArray]], type]], UDF] [source]#
@udf
Decorator to convert a Python function/class into aUDF
.UDFs allow users to run arbitrary Python code on the outputs of Expressions.
Note
In most cases, UDFs will be slower than a native kernel/expression because of the required Rust and Python overheads. If your computation can be expressed using Daft expressions, you should do so instead of writing a UDF. If your UDF expresses a common use-case that isn’t already covered by Daft, you should file a ticket or contribute this functionality back to Daft as a kernel!
In the example below, we create a UDF that:
Receives data under the argument name
x
Iterates over the
x
Daft SeriesAdds a Python constant value
c
to every element inx
Returns a new list of Python values which will be coerced to the specified return type:
return_dtype=DataType.int64()
.We can call our UDF on a dataframe using any of the dataframe projection operations (
df.with_column()
,df.select()
, etc.)
Example
>>> import daft >>> @daft.udf(return_dtype=daft.DataType.int64()) ... def add_constant(x: daft.Series, c=10): ... return [v + c for v in x] >>> >>> df = daft.from_pydict({"x": [1, 2, 3]}) >>> df = df.with_column("new_x", add_constant(df["x"], c=20)) >>> df.show() ╭───────┬───────╮ │ x ┆ new_x │ │ --- ┆ --- │ │ Int64 ┆ Int64 │ ╞═══════╪═══════╡ │ 1 ┆ 21 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ │ 2 ┆ 22 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ │ 3 ┆ 23 │ ╰───────┴───────╯ (Showing first 3 of 3 rows)
Resource Requests#
You can also hint Daft about the resources that your UDF will require to run. For example, the following UDF requires 2 CPUs to run. On a machine/cluster with 8 CPUs, Daft will be able to run up to 4 instances of this UDF at once, giving you a concurrency of 4!
>>> import daft >>> @daft.udf(return_dtype=daft.DataType.int64(), num_cpus=2) ... def udf_needs_2_cpus(x: daft.Series): ... return x >>> >>> df = daft.from_pydict({"x": [1, 2, 3]}) >>> df = df.with_column("new_x", udf_needs_2_cpus(df["x"])) >>> df.show() ╭───────┬───────╮ │ x ┆ new_x │ │ --- ┆ --- │ │ Int64 ┆ Int64 │ ╞═══════╪═══════╡ │ 1 ┆ 1 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ │ 2 ┆ 2 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ │ 3 ┆ 3 │ ╰───────┴───────╯ (Showing first 3 of 3 rows)
Your UDFs’ resources can also be overridden before you call it like so:
>>> import daft >>> @daft.udf(return_dtype=daft.DataType.int64(), num_cpus=4) ... def udf_needs_4_cpus(x: daft.Series): ... return x >>> >>> # Override the num_cpus to 2 instead >>> udf_needs_2_cpus = udf_needs_4_cpus.override_options(num_cpus=2) >>> >>> df = daft.from_pydict({"x": [1, 2, 3]}) >>> df = df.with_column("new_x", udf_needs_2_cpus(df["x"])) >>> df.show() ╭───────┬───────╮ │ x ┆ new_x │ │ --- ┆ --- │ │ Int64 ┆ Int64 │ ╞═══════╪═══════╡ │ 1 ┆ 1 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ │ 2 ┆ 2 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤ │ 3 ┆ 3 │ ╰───────┴───────╯ (Showing first 3 of 3 rows)
- param return_dtype:
Returned type of the UDF
- type return_dtype:
DataType
- param num_cpus:
Number of CPUs to allocate each running instance of your UDF. Note that this is purely used for placement (e.g. if your machine has 8 CPUs and you specify num_cpus=4, then Daft can run at most 2 instances of your UDF at a time). The default
None
indicates that Daft is free to allocate as many instances of the UDF as it wants to.- param num_gpus:
Number of GPUs to allocate each running instance of your UDF. This is used for placement and also for allocating the appropriate GPU to each UDF using
CUDA_VISIBLE_DEVICES
.- param memory_bytes:
Amount of memory to allocate each running instance of your UDF in bytes. If your UDF is experiencing out-of-memory errors, this parameter can help hint Daft that each UDF requires a certain amount of heap memory for execution.
- param batch_size:
Enables batching of the input into batches of at most this size. Results between batches are concatenated.
- returns:
UDF decorator - converts a user-provided Python function as a UDF that can be called on Expressions
- rtype:
Callable[[UserDefinedPyFuncLike], UDF]
Using UDFs#
- class UDF(inner: Union[Callable[[...], Union[Series, list, ndarray, Array, ChunkedArray]], type], name: str, return_dtype: DataType, init_args: Optional[Tuple[Tuple[Any, ...], Dict[str, Any]]] = None, concurrency: Optional[int] = None, resource_request: Optional[ResourceRequest] = None, batch_size: Optional[int] = None)[source]#
A class produced by applying the
@daft.udf
decorator over a Python function or class.Calling this class produces a
daft.Expression
that can be used in a DataFrame function.Example
>>> import daft >>> @daft.udf(return_dtype=daft.DataType.float64()) ... def multiply_and_add(x: daft.Series, y: float, z: float): ... return x.to_arrow().to_numpy() * y + z >>> >>> df = daft.from_pydict({"x": [1, 2, 3]}) >>> df = df.with_column("result", multiply_and_add(df["x"], 2.0, z=1.5)) >>> df.show() ╭───────┬─────────╮ │ x ┆ result │ │ --- ┆ --- │ │ Int64 ┆ Float64 │ ╞═══════╪═════════╡ │ 1 ┆ 3.5 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ │ 2 ┆ 5.5 │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ │ 3 ┆ 7.5 │ ╰───────┴─────────╯ (Showing first 3 of 3 rows)
- override_options(*, num_cpus: float | None = <object object>, num_gpus: float | None = <object object>, memory_bytes: int | None = <object object>, batch_size: int | None = <object object>) UDF [source]#
Replace the resource requests for running each instance of your UDF.
For instance, if your UDF requires 4 CPUs to run, you can configure it like so:
>>> import daft >>> >>> @daft.udf(return_dtype=daft.DataType.string()) ... def example_udf(inputs): ... # You will have access to 4 CPUs here if you configure your UDF correctly! ... return inputs >>> >>> # Parametrize the UDF to run with 4 CPUs >>> example_udf_4CPU = example_udf.override_options(num_cpus=4)
- Parameters:
num_cpus – Number of CPUs to allocate each running instance of your UDF. Note that this is purely used for placement (e.g. if your machine has 8 CPUs and you specify num_cpus=4, then Daft can run at most 2 instances of your UDF at a time).
num_gpus – Number of GPUs to allocate each running instance of your UDF. This is used for placement and also for allocating the appropriate GPU to each UDF using
CUDA_VISIBLE_DEVICES
.memory_bytes – Amount of memory to allocate each running instance of your UDF in bytes. If your UDF is experiencing out-of-memory errors, this parameter can help hint Daft that each UDF requires a certain amount of heap memory for execution.
batch_size – Enables batching of the input into batches of at most this size. Results between batches are concatenated.
- with_concurrency(concurrency: int) UDF [source]#
Override the concurrency of this UDF, which tells Daft how many instances of your UDF to run concurrently.
Example
>>> import daft >>> >>> @daft.udf(return_dtype=daft.DataType.string(), num_gpus=1) ... class MyGpuUdf: ... def __init__(self, text=" world"): ... self.text = text ... ... def __call__(self, data): ... return [x + self.text for x in data] >>> >>> # New UDF that will have 8 concurrent running instances (will require 8 total GPUs) >>> MyGpuUdf_8_concurrency = MyGpuUdf.with_concurrency(8)
- with_init_args(*args, **kwargs) UDF [source]#
Replace initialization arguments for a class UDF when calling
__init__
at runtime on each instance of the UDF.Example
>>> import daft >>> >>> @daft.udf(return_dtype=daft.DataType.string()) ... class MyUdfWithInit: ... def __init__(self, text=" world"): ... self.text = text ... ... def __call__(self, data): ... return [x + self.text for x in data] >>> >>> # Create a customized version of MyUdfWithInit by overriding the init args >>> MyUdfWithInit_CustomInitArgs = MyUdfWithInit.with_init_args(text=" my old friend") >>> >>> df = daft.from_pydict({"foo": ["hello", "hello", "hello"]}) >>> df = df.with_column("bar_world", MyUdfWithInit(df["foo"])) >>> df = df.with_column("bar_custom", MyUdfWithInit_CustomInitArgs(df["foo"])) >>> df.show() ╭───────┬─────────────┬─────────────────────╮ │ foo ┆ bar_world ┆ bar_custom │ │ --- ┆ --- ┆ --- │ │ Utf8 ┆ Utf8 ┆ Utf8 │ ╞═══════╪═════════════╪═════════════════════╡ │ hello ┆ hello world ┆ hello my old friend │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ hello ┆ hello world ┆ hello my old friend │ ├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ hello ┆ hello world ┆ hello my old friend │ ╰───────┴─────────────┴─────────────────────╯ (Showing first 3 of 3 rows)