Configuration#
Configure the execution backend, Daft in various ways during execution, and how Daft interacts with storage.
Setting the Runner#
Control the execution backend that Daft will run on by calling these functions once at the start of your application.
set_runner_native #
set_runner_native(
num_threads: int | None = None,
) -> DaftContext
Configure Daft to execute dataframes using native multi-threaded processing.
This is the default execution mode for Daft.
Returns:
Name | Type | Description |
---|---|---|
DaftContext | DaftContext | Updated Daft execution context configured for native execution. |
Note
Can also be configured via environment variable: DAFT_RUNNER=native
Source code in daft/context.py
119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
|
set_runner_ray #
set_runner_ray(
address: str | None = None,
noop_if_initialized: bool = False,
max_task_backlog: int | None = None,
force_client_mode: bool = False,
) -> DaftContext
Configure Daft to execute dataframes using the Ray distributed computing framework.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
address | str | None | Ray cluster address to connect to. If None, connects to or starts a local Ray instance. | None |
noop_if_initialized | bool | If True, skip initialization if Ray is already running. | False |
max_task_backlog | int | None | Maximum number of tasks that can be queued. None means Daft will automatically determine a good default. | None |
force_client_mode | bool | If True, forces Ray to run in client mode. | False |
Returns:
Name | Type | Description |
---|---|---|
DaftContext | DaftContext | Updated Daft execution context configured for Ray. |
Note
Can also be configured via environment variable: DAFT_RUNNER=ray
Source code in daft/context.py
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
|
Setting Configurations#
Configure Daft in various ways during execution.
set_planning_config #
set_planning_config(
config: PyDaftPlanningConfig | None = None,
default_io_config: IOConfig | None = None,
) -> DaftContext
Globally sets various configuration parameters which control Daft plan construction behavior.
These configuration values are used when a Dataframe is being constructed (e.g. calls to create a Dataframe, or to build on an existing Dataframe).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config | PyDaftPlanningConfig | None | A PyDaftPlanningConfig object to set the config to, before applying other kwargs. Defaults to None which indicates that the old (current) config should be used. | None |
default_io_config | IOConfig | None | A default IOConfig to use in the absence of one being explicitly passed into any Expression (e.g. | None |
Source code in daft/context.py
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
|
planning_config_ctx #
planning_config_ctx(**kwargs)
Context manager that wraps set_planning_config to reset the config to its original setting afternwards.
Source code in daft/context.py
135 136 137 138 139 140 141 142 143 |
|
set_execution_config #
set_execution_config(
config: PyDaftExecutionConfig | None = None,
scan_tasks_min_size_bytes: int | None = None,
scan_tasks_max_size_bytes: int | None = None,
max_sources_per_scan_task: int | None = None,
broadcast_join_size_bytes_threshold: int | None = None,
parquet_split_row_groups_max_files: int | None = None,
sort_merge_join_sort_with_aligned_boundaries: bool
| None = None,
hash_join_partition_size_leniency: float | None = None,
sample_size_for_sort: int | None = None,
num_preview_rows: int | None = None,
parquet_target_filesize: int | None = None,
parquet_target_row_group_size: int | None = None,
parquet_inflation_factor: float | None = None,
csv_target_filesize: int | None = None,
csv_inflation_factor: float | None = None,
shuffle_aggregation_default_partitions: int
| None = None,
partial_aggregation_threshold: int | None = None,
high_cardinality_aggregation_threshold: float
| None = None,
read_sql_partition_size_bytes: int | None = None,
enable_aqe: bool | None = None,
enable_native_executor: bool | None = None,
default_morsel_size: int | None = None,
shuffle_algorithm: str | None = None,
pre_shuffle_merge_threshold: int | None = None,
flight_shuffle_dirs: list[str] | None = None,
enable_ray_tracing: bool | None = None,
scantask_splitting_level: int | None = None,
flotilla: bool | None = None,
) -> DaftContext
Globally sets various configuration parameters which control various aspects of Daft execution.
These configuration values are used when a Dataframe is executed (e.g. calls to DataFrame.write_*
, DataFrame.collect() or DataFrame.show()).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config | PyDaftExecutionConfig | None | A PyDaftExecutionConfig object to set the config to, before applying other kwargs. Defaults to None which indicates that the old (current) config should be used. | None |
scan_tasks_min_size_bytes | int | None | Minimum size in bytes when merging ScanTasks when reading files from storage. Increasing this value will make Daft perform more merging of files into a single partition before yielding, which leads to bigger but fewer partitions. (Defaults to 96 MiB) | None |
scan_tasks_max_size_bytes | int | None | Maximum size in bytes when merging ScanTasks when reading files from storage. Increasing this value will increase the upper bound of the size of merged ScanTasks, which leads to bigger but fewer partitions. (Defaults to 384 MiB) | None |
max_sources_per_scan_task | int | None | Maximum number of sources in a single ScanTask. (Defaults to 10) | None |
broadcast_join_size_bytes_threshold | int | None | If one side of a join is smaller than this threshold, a broadcast join will be used. Default is 10 MiB. | None |
parquet_split_row_groups_max_files | int | None | Maximum number of files to read in which the row group splitting should happen. (Defaults to 10) | None |
sort_merge_join_sort_with_aligned_boundaries | bool | None | Whether to use a specialized algorithm for sorting both sides of a sort-merge join such that they have aligned boundaries. This can lead to a faster merge-join at the cost of more skewed sorted join inputs, increasing the risk of OOMs. | None |
hash_join_partition_size_leniency | float | None | If the left side of a hash join is already correctly partitioned and the right side isn't, and the ratio between the left and right size is at least this value, then the right side is repartitioned to have an equal number of partitions as the left. Defaults to 0.5. | None |
sample_size_for_sort | int | None | number of elements to sample from each partition when running sort, Default is 20. | None |
num_preview_rows | int | None | number of rows to when showing a dataframe preview, Default is 8. | None |
parquet_target_filesize | int | None | Target File Size when writing out Parquet Files. Defaults to 512MB | None |
parquet_target_row_group_size | int | None | Target Row Group Size when writing out Parquet Files. Defaults to 128MB | None |
parquet_inflation_factor | float | None | Inflation Factor of parquet files (In-Memory-Size / File-Size) ratio. Defaults to 3.0 | None |
csv_target_filesize | int | None | Target File Size when writing out CSV Files. Defaults to 512MB | None |
csv_inflation_factor | float | None | Inflation Factor of CSV files (In-Memory-Size / File-Size) ratio. Defaults to 0.5 | None |
shuffle_aggregation_default_partitions | int | None | Maximum number of partitions to create when performing aggregations on the Ray Runner. Defaults to 200, unless the number of input partitions is less than 200. | None |
partial_aggregation_threshold | int | None | Threshold for performing partial aggregations on the Native Runner. Defaults to 10000 rows. | None |
high_cardinality_aggregation_threshold | float | None | Threshold selectivity for performing high cardinality aggregations on the Native Runner. Defaults to 0.8. | None |
read_sql_partition_size_bytes | int | None | Target size of partition when reading from SQL databases. Defaults to 512MB | None |
enable_aqe | bool | None | Enables Adaptive Query Execution, Defaults to False | None |
enable_native_executor | bool | None | Enables the native executor, Defaults to False | None |
default_morsel_size | int | None | Default size of morsels used for the new local executor. Defaults to 131072 rows. | None |
shuffle_algorithm | str | None | The shuffle algorithm to use. Defaults to "auto", which will let Daft determine the algorithm. Options are "map_reduce" and "pre_shuffle_merge". | None |
pre_shuffle_merge_threshold | int | None | Memory threshold in bytes for pre-shuffle merge. Defaults to 1GB | None |
flight_shuffle_dirs | list[str] | None | The directories to use for flight shuffle. Defaults to ["/tmp"]. | None |
enable_ray_tracing | bool | None | Enable tracing for Ray. Accessible in | None |
scantask_splitting_level | int | None | How aggressively to split scan tasks. Setting this to | None |
Source code in daft/context.py
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 |
|
execution_config_ctx #
execution_config_ctx(**kwargs)
Context manager that wraps set_execution_config to reset the config to its original setting afternwards.
Source code in daft/context.py
172 173 174 175 176 177 178 179 180 |
|
I/O Configurations#
Configure behavior when Daft interacts with storage (e.g. credentials, retry policies and various other knobs to control performance/resource usage)
These configurations are most often used as inputs to Daft DataFrame reading I/O functions such as in Dataframe Creation.
IOConfig #
IOConfig(
s3: S3Config | None = None,
azure: AzureConfig | None = None,
gcs: GCSConfig | None = None,
http: HTTPConfig | None = None,
)
Configuration for the native I/O layer, e.g. credentials for accessing cloud storage systems.
Methods:
Name | Description |
---|---|
replace | Replaces values if provided, returning a new IOConfig. |
Attributes:
Name | Type | Description |
---|---|---|
azure | AzureConfig | |
gcs | GCSConfig | |
http | HTTPConfig | |
s3 | S3Config | |
Source code in daft/daft/__init__.pyi
641 642 643 644 645 646 647 |
|
replace #
replace(
s3: S3Config | None = None,
azure: AzureConfig | None = None,
gcs: GCSConfig | None = None,
http: HTTPConfig | None = None,
) -> IOConfig
Replaces values if provided, returning a new IOConfig.
Source code in daft/daft/__init__.pyi
648 649 650 651 652 653 654 655 656 |
|
S3Config #
S3Config(
region_name: str | None = None,
endpoint_url: str | None = None,
key_id: str | None = None,
session_token: str | None = None,
access_key: str | None = None,
credentials_provider: Callable[[], S3Credentials]
| None = None,
buffer_time: int | None = None,
max_connections: int | None = None,
retry_initial_backoff_ms: int | None = None,
connect_timeout_ms: int | None = None,
read_timeout_ms: int | None = None,
num_tries: int | None = None,
retry_mode: str | None = None,
anonymous: bool | None = None,
use_ssl: bool | None = None,
verify_ssl: bool | None = None,
check_hostname_ssl: bool | None = None,
requester_pays: bool | None = None,
force_virtual_addressing: bool | None = None,
profile_name: str | None = None,
)
I/O configuration for accessing an S3-compatible system.
Methods:
Name | Description |
---|---|
from_env | Creates an S3Config, retrieving credentials and configurations from the current environment. |
provide_cached_credentials | Wrapper around call to |
replace | Replaces values if provided, returning a new S3Config. |
Attributes:
Name | Type | Description |
---|---|---|
access_key | str | None | |
anonymous | bool | |
check_hostname_ssl | bool | |
connect_timeout_ms | int | |
credentials_provider | Callable[[], S3Credentials] | None | |
endpoint_url | str | None | |
force_virtual_addressing | bool | None | |
key_id | str | None | |
max_connections | int | |
num_tries | int | |
profile_name | str | None | |
read_timeout_ms | int | |
region_name | str | None | |
requester_pays | bool | None | |
retry_initial_backoff_ms | int | |
retry_mode | str | None | |
session_token | str | None | |
use_ssl | bool | |
verify_ssl | bool | |
Source code in daft/daft/__init__.pyi
476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 |
|
from_env #
from_env() -> S3Config
Creates an S3Config, retrieving credentials and configurations from the current environment.
Source code in daft/daft/__init__.pyi
524 525 526 527 |
|
provide_cached_credentials #
provide_cached_credentials() -> S3Credentials | None
Wrapper around call to S3Config.credentials_provider
to cache credentials until expiry.
Source code in daft/daft/__init__.pyi
529 530 531 |
|
replace #
replace(
region_name: str | None = None,
endpoint_url: str | None = None,
key_id: str | None = None,
session_token: str | None = None,
access_key: str | None = None,
credentials_provider: Callable[[], S3Credentials]
| None = None,
max_connections: int | None = None,
retry_initial_backoff_ms: int | None = None,
connect_timeout_ms: int | None = None,
read_timeout_ms: int | None = None,
num_tries: int | None = None,
retry_mode: str | None = None,
anonymous: bool | None = None,
use_ssl: bool | None = None,
verify_ssl: bool | None = None,
check_hostname_ssl: bool | None = None,
requester_pays: bool | None = None,
force_virtual_addressing: bool | None = None,
profile_name: str | None = None,
) -> S3Config
Replaces values if provided, returning a new S3Config.
Source code in daft/daft/__init__.pyi
499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 |
|
S3Credentials #
S3Credentials(
key_id: str,
access_key: str,
session_token: str | None = None,
expiry: datetime | None = None,
)
Attributes:
Name | Type | Description |
---|---|---|
access_key | str | |
expiry | datetime | None | |
key_id | str | |
session_token | str | None | |
Source code in daft/daft/__init__.pyi
539 540 541 542 543 544 545 |
|
GCSConfig #
GCSConfig(
project_id: str | None = None,
credentials: str | None = None,
token: str | None = None,
anonymous: bool | None = None,
max_connections: int | None = None,
retry_initial_backoff_ms: int | None = None,
connect_timeout_ms: int | None = None,
read_timeout_ms: int | None = None,
num_tries: int | None = None,
)
I/O configuration for accessing Google Cloud Storage.
Methods:
Name | Description |
---|---|
replace | Replaces values if provided, returning a new GCSConfig. |
Attributes:
Name | Type | Description |
---|---|---|
anonymous | bool | |
connect_timeout_ms | int | |
credentials | str | None | |
max_connections | int | |
num_tries | int | |
project_id | str | None | |
read_timeout_ms | int | |
retry_initial_backoff_ms | int | |
token | str | None | |
Source code in daft/daft/__init__.pyi
606 607 608 609 610 611 612 613 614 615 616 617 |
|
replace #
replace(
project_id: str | None = None,
credentials: str | None = None,
token: str | None = None,
anonymous: bool | None = None,
max_connections: int | None = None,
retry_initial_backoff_ms: int | None = None,
connect_timeout_ms: int | None = None,
read_timeout_ms: int | None = None,
num_tries: int | None = None,
) -> GCSConfig
Replaces values if provided, returning a new GCSConfig.
Source code in daft/daft/__init__.pyi
618 619 620 621 622 623 624 625 626 627 628 629 630 631 |
|
AzureConfig #
AzureConfig(
storage_account: str | None = None,
access_key: str | None = None,
sas_token: str | None = None,
bearer_token: str | None = None,
tenant_id: str | None = None,
client_id: str | None = None,
client_secret: str | None = None,
use_fabric_endpoint: bool | None = None,
anonymous: bool | None = None,
endpoint_url: str | None = None,
use_ssl: bool | None = None,
)
I/O configuration for accessing Azure Blob Storage.
Methods:
Name | Description |
---|---|
replace | Replaces values if provided, returning a new AzureConfig. |
Attributes:
Name | Type | Description |
---|---|---|
access_key | str | None | |
anonymous | bool | None | |
bearer_token | str | None | |
client_id | str | None | |
client_secret | str | None | |
endpoint_url | str | None | |
sas_token | str | None | |
storage_account | str | None | |
tenant_id | str | None | |
use_fabric_endpoint | bool | None | |
use_ssl | bool | None | |
Source code in daft/daft/__init__.pyi
562 563 564 565 566 567 568 569 570 571 572 573 574 575 |
|
replace #
replace(
storage_account: str | None = None,
access_key: str | None = None,
sas_token: str | None = None,
bearer_token: str | None = None,
tenant_id: str | None = None,
client_id: str | None = None,
client_secret: str | None = None,
use_fabric_endpoint: bool | None = None,
anonymous: bool | None = None,
endpoint_url: str | None = None,
use_ssl: bool | None = None,
) -> AzureConfig
Replaces values if provided, returning a new AzureConfig.
Source code in daft/daft/__init__.pyi
576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 |
|