From e8cb69edaddae53607967d0bddc66886ce592729 Mon Sep 17 00:00:00 2001 From: Jvst Me Date: Tue, 26 May 2026 21:32:55 +0200 Subject: [PATCH 1/2] Add Vast.ai-specific profile options - Support setting backend-specific options in profiles, run and fleet configurations. These options are only applied to offers from the respective backends, and ignored for other offers. - Add the following options for the `vastai` backend: - `offer_order` - Controls the order in which offers are considered for provisioning. Use `score` to prioritize the highest overall score first (the default order in the Vast.ai console), or `price` to prioritize the lowest-cost offers first. Lower-cost offers are often less reliable, so consider applying stricter filters when using `price`. Defaults to `score`. - `min_reliability` - The minimum reliability threshold for offers, on a scale from `0` to `1`. Defaults to `0.9`. - `min_score` - The minimum overall score required for offers to be considered. The scoring scale varies and may require experimentation. Starting with a value in the low hundreds is generally recommended. Example: ```yaml type: dev-environment backend_options: - type: vastai offer_order: price min_reliability: 0.97 min_score: 250 ``` --- .../reference/dstack.yml/dev-environment.md | 12 +++ mkdocs/docs/reference/dstack.yml/fleet.md | 12 +++ mkdocs/docs/reference/dstack.yml/service.md | 12 +++ mkdocs/docs/reference/dstack.yml/task.md | 12 +++ pyproject.toml | 5 +- .../core/backends/base/profile_options.py | 23 +++++ .../core/backends/profile_options.py | 5 ++ .../_internal/core/backends/vastai/compute.py | 58 +++++++++--- .../core/backends/vastai/profile_options.py | 63 +++++++++++++ .../_internal/core/compatibility/common.py | 13 ++- .../_internal/core/compatibility/fleets.py | 14 +-- .../_internal/core/compatibility/runs.py | 10 ++- src/dstack/_internal/core/models/fleets.py | 10 ++- src/dstack/_internal/core/models/profiles.py | 21 +++++ src/dstack/_internal/core/models/runs.py | 2 + .../_internal/server/services/fleets.py | 1 + .../services/jobs/configurators/base.py | 1 + .../server/services/requirements/combine.py | 88 ++++++++++++------- src/dstack/_internal/utils/combine.py | 34 +++++++ .../backends/base/test_profile_options.py | 18 ++++ .../core/backends/vastai/test_compute.py | 18 +++- .../backends/vastai/test_profile_options.py | 51 +++++++++++ .../_internal/core/models/test_profiles.py | 29 ++++++ .../_internal/server/routers/test_fleets.py | 6 ++ .../_internal/server/routers/test_runs.py | 6 ++ .../services/requirements/test_combine.py | 71 +++++++++++++++ 26 files changed, 536 insertions(+), 59 deletions(-) create mode 100644 src/dstack/_internal/core/backends/base/profile_options.py create mode 100644 src/dstack/_internal/core/backends/profile_options.py create mode 100644 src/dstack/_internal/core/backends/vastai/profile_options.py create mode 100644 src/dstack/_internal/utils/combine.py create mode 100644 src/tests/_internal/core/backends/base/test_profile_options.py create mode 100644 src/tests/_internal/core/backends/vastai/test_profile_options.py create mode 100644 src/tests/_internal/core/models/test_profiles.py diff --git a/mkdocs/docs/reference/dstack.yml/dev-environment.md b/mkdocs/docs/reference/dstack.yml/dev-environment.md index 0c02374ee3..594b6e65d7 100644 --- a/mkdocs/docs/reference/dstack.yml/dev-environment.md +++ b/mkdocs/docs/reference/dstack.yml/dev-environment.md @@ -149,3 +149,15 @@ The `dev-environment` configuration type allows running [dev environments](../.. * `~/.bashrc`, same as `~/.bashrc:~/.bashrc` * `/opt/myorg`, same as `/opt/myorg/` and `/opt/myorg:/opt/myorg` * `libs/patched_libibverbs.so.1:/lib/x86_64-linux-gnu/libibverbs.so.1` + +### `backend_options` + +Backend-specific options that only take effect for offers of the respective backend. + +#### `backend_options[n][type=vastai]` { #backend_options-vastai data-toc-label="vastai" } + +#SCHEMA# dstack._internal.core.backends.vastai.profile_options.VastAIProfileOptions + overrides: + show_root_heading: false + type: + required: true diff --git a/mkdocs/docs/reference/dstack.yml/fleet.md b/mkdocs/docs/reference/dstack.yml/fleet.md index 76312a52b9..99b985db44 100644 --- a/mkdocs/docs/reference/dstack.yml/fleet.md +++ b/mkdocs/docs/reference/dstack.yml/fleet.md @@ -54,6 +54,18 @@ The `fleet` configuration type allows creating and updating fleets. overrides: show_root_heading: false + ### `backend_options` + + Backend-specific options that only take effect for offers of the respective backend. + + #### `backend_options[n][type=vastai]` { #backend_options-vastai data-toc-label="vastai" } + + #SCHEMA# dstack._internal.core.backends.vastai.profile_options.VastAIProfileOptions + overrides: + show_root_heading: false + type: + required: true + === "SSH fleet" ## Root reference diff --git a/mkdocs/docs/reference/dstack.yml/service.md b/mkdocs/docs/reference/dstack.yml/service.md index 8aba6f827e..5ddfe46dd1 100644 --- a/mkdocs/docs/reference/dstack.yml/service.md +++ b/mkdocs/docs/reference/dstack.yml/service.md @@ -229,3 +229,15 @@ The `service` configuration type allows running [services](../../concepts/servic * `~/.bashrc`, same as `~/.bashrc:~/.bashrc` * `/opt/myorg`, same as `/opt/myorg/` and `/opt/myorg:/opt/myorg` * `libs/patched_libibverbs.so.1:/lib/x86_64-linux-gnu/libibverbs.so.1` + +### `backend_options` + +Backend-specific options that only take effect for offers of the respective backend. + +#### `backend_options[n][type=vastai]` { #backend_options-vastai data-toc-label="vastai" } + +#SCHEMA# dstack._internal.core.backends.vastai.profile_options.VastAIProfileOptions + overrides: + show_root_heading: false + type: + required: true diff --git a/mkdocs/docs/reference/dstack.yml/task.md b/mkdocs/docs/reference/dstack.yml/task.md index 01bc7580b2..96d05c325d 100644 --- a/mkdocs/docs/reference/dstack.yml/task.md +++ b/mkdocs/docs/reference/dstack.yml/task.md @@ -149,3 +149,15 @@ The `task` configuration type allows running [tasks](../../concepts/tasks.md). * `~/.bashrc`, same as `~/.bashrc:~/.bashrc` * `/opt/myorg`, same as `/opt/myorg/` and `/opt/myorg:/opt/myorg` * `libs/patched_libibverbs.so.1:/lib/x86_64-linux-gnu/libibverbs.so.1` + +### `backend_options` + +Backend-specific options that only take effect for offers of the respective backend. + +#### `backend_options[n][type=vastai]` { #backend_options-vastai data-toc-label="vastai" } + +#SCHEMA# dstack._internal.core.backends.vastai.profile_options.VastAIProfileOptions + overrides: + show_root_heading: false + type: + required: true diff --git a/pyproject.toml b/pyproject.toml index e4083604c7..5381277dd1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,7 @@ dependencies = [ "python-multipart>=0.0.16", "filelock", "psutil", - "gpuhunt==0.1.22", + "gpuhunt @ git+https://github.com/dstackai/gpuhunt.git@vastai_offer_order", "argcomplete>=3.5.0", "ignore-python>=0.2.0", "orjson", @@ -67,6 +67,9 @@ artifacts = [ "src/dstack/_internal/server/statics/**", ] +[tool.hatch.metadata] +allow-direct-references = true + [tool.hatch.metadata.hooks.fancy-pypi-readme] content-type = "text/markdown" diff --git a/src/dstack/_internal/core/backends/base/profile_options.py b/src/dstack/_internal/core/backends/base/profile_options.py new file mode 100644 index 0000000000..04fd496c91 --- /dev/null +++ b/src/dstack/_internal/core/backends/base/profile_options.py @@ -0,0 +1,23 @@ +from abc import ABC, abstractmethod +from typing import Generic, Optional, Sequence, Type, TypeVar + +from dstack._internal.core.models.common import CoreModel + +T = TypeVar("T", bound="BackendProfileOptions") + + +class BackendProfileOptions(CoreModel, ABC, Generic[T]): + @abstractmethod + def combine(self, other: T) -> T: ... + + +_OptionsT = TypeVar("_OptionsT", bound="BackendProfileOptions") + + +def get_backend_profile_options( + options: Optional[Sequence[BackendProfileOptions]], + options_type: Type[_OptionsT], +) -> Optional[_OptionsT]: + if not options: + return None + return next((opt for opt in options if isinstance(opt, options_type)), None) diff --git a/src/dstack/_internal/core/backends/profile_options.py b/src/dstack/_internal/core/backends/profile_options.py new file mode 100644 index 0000000000..cc2ae99562 --- /dev/null +++ b/src/dstack/_internal/core/backends/profile_options.py @@ -0,0 +1,5 @@ +from dstack._internal.core.backends.vastai.profile_options import VastAIProfileOptions + +# TODO: when adding options for the first VM-based backend, +# implement the logic to check idle instances against backend options before reusing. +AnyBackendProfileOptions = VastAIProfileOptions diff --git a/src/dstack/_internal/core/backends/vastai/compute.py b/src/dstack/_internal/core/backends/vastai/compute.py index adf11c8d89..915d98f105 100644 --- a/src/dstack/_internal/core/backends/vastai/compute.py +++ b/src/dstack/_internal/core/backends/vastai/compute.py @@ -2,6 +2,7 @@ import gpuhunt from gpuhunt.providers.vastai import VastAIProvider +from typing_extensions import assert_never from dstack._internal.core.backends.base.backend import Compute from dstack._internal.core.backends.base.compute import ( @@ -10,8 +11,15 @@ get_docker_commands, ) from dstack._internal.core.backends.base.offers import get_catalog_offers +from dstack._internal.core.backends.base.profile_options import get_backend_profile_options from dstack._internal.core.backends.vastai.api_client import VastAIAPIClient from dstack._internal.core.backends.vastai.models import VastAIConfig +from dstack._internal.core.backends.vastai.profile_options import ( + VASTAI_DEFAULT_MIN_RELIABILITY, + VASTAI_DEFAULT_OFFER_ORDER, + VastAIOfferOrder, + VastAIProfileOptions, +) from dstack._internal.core.consts import DSTACK_RUNNER_SSH_PORT from dstack._internal.core.errors import ProvisioningError from dstack._internal.core.models.backends.base import BackendType @@ -40,31 +48,55 @@ def __init__(self, config: VastAIConfig): super().__init__() self.config = config self.api_client = VastAIAPIClient(config.creds.api_key) - self.catalog = gpuhunt.Catalog(balance_resources=False, auto_reload=False) - self.catalog.add_provider( + + def _make_catalog(self, options: VastAIProfileOptions) -> gpuhunt.Catalog: + filters = { + "direct_port_count": {"gte": 1}, + "reliability2": { + "gte": options.min_reliability + if options.min_reliability is not None + else VASTAI_DEFAULT_MIN_RELIABILITY + }, + "inet_down": {"gt": 128}, + "verified": {"eq": True}, + "cuda_max_good": {"gte": 12.8}, + "compute_cap": {"gte": 600}, + } + if options.min_score is not None: + filters["score"] = {"gte": options.min_score} + match options.offer_order or VASTAI_DEFAULT_OFFER_ORDER: + case VastAIOfferOrder.SCORE: + order = [("score", "desc")] + case VastAIOfferOrder.PRICE: + # NOTE: dph_base is only one of the price components, + # so we also sort by InstanceOffer.price later for accurate results. + order = [("dph_base", "asc")] + case other: + assert_never(other) + catalog = gpuhunt.Catalog(balance_resources=False, auto_reload=False) + catalog.add_provider( VastAIProvider( - community_cloud=config.allow_community_cloud, - extra_filters={ - "direct_port_count": {"gte": 1}, - "reliability2": {"gte": 0.9}, - "inet_down": {"gt": 128}, - "verified": {"eq": True}, - "cuda_max_good": {"gte": 12.8}, - "compute_cap": {"gte": 600}, - }, + community_cloud=self.config.allow_community_cloud, + extra_filters=filters, + order=order, ) ) + return catalog def get_offers_by_requirements( self, requirements: Requirements ) -> List[InstanceOfferWithAvailability]: + vastai_options = ( + get_backend_profile_options(requirements.backend_options, VastAIProfileOptions) + or VastAIProfileOptions() + ) offers = get_catalog_offers( backend=BackendType.VASTAI, locations=self.config.regions or None, requirements=requirements, # TODO(egor-s): spots currently not supported extra_filter=lambda offer: not offer.instance.resources.spot, - catalog=self.catalog, + catalog=self._make_catalog(vastai_options), ) offers = [ offer.with_availability( @@ -73,6 +105,8 @@ def get_offers_by_requirements( ) for offer in offers ] + if (vastai_options.offer_order or VASTAI_DEFAULT_OFFER_ORDER) == VastAIOfferOrder.PRICE: + offers = sorted(offers, key=lambda o: o.price) return offers def run_job( diff --git a/src/dstack/_internal/core/backends/vastai/profile_options.py b/src/dstack/_internal/core/backends/vastai/profile_options.py new file mode 100644 index 0000000000..4574d05b30 --- /dev/null +++ b/src/dstack/_internal/core/backends/vastai/profile_options.py @@ -0,0 +1,63 @@ +from enum import Enum +from typing import Annotated, Literal, Optional + +from pydantic import Field + +from dstack._internal.core.backends.base.profile_options import BackendProfileOptions +from dstack._internal.utils.combine import get_max_optional, get_single_value_optional + + +class VastAIOfferOrder(str, Enum): + SCORE = "score" + PRICE = "price" + + +VASTAI_DEFAULT_OFFER_ORDER = VastAIOfferOrder.SCORE +VASTAI_DEFAULT_MIN_RELIABILITY = 0.9 + + +class VastAIProfileOptions(BackendProfileOptions["VastAIProfileOptions"]): + type: Literal["vastai"] = "vastai" + offer_order: Annotated[ + Optional[VastAIOfferOrder], + Field( + description=( + "Controls the order in which offers are considered for provisioning." + " Use `score` to prioritize the highest overall score first" + " (the default order in the Vast.ai console)," + " or `price` to prioritize the lowest-cost offers first." + " Lower-cost offers are often less reliable," + " so consider applying stricter filters when using `price`." + f" Defaults to `{VASTAI_DEFAULT_OFFER_ORDER.value}`" + ) + ), + ] = None + min_reliability: Annotated[ + Optional[float], + Field( + description=( + "The minimum reliability threshold for offers, on a scale from `0` to `1`." + f" Defaults to `{VASTAI_DEFAULT_MIN_RELIABILITY}`" + ), + ge=0, + le=1, + ), + ] = None + min_score: Annotated[ + Optional[int], + Field( + description=( + "The minimum overall score required for offers to be considered." + " The scoring scale varies and may require experimentation." + " Starting with a value in the low hundreds is generally recommended" + ), + ge=0, + ), + ] = None + + def combine(self, other: "VastAIProfileOptions") -> "VastAIProfileOptions": + return VastAIProfileOptions( + offer_order=get_single_value_optional(self.offer_order, other.offer_order), + min_reliability=get_max_optional(self.min_reliability, other.min_reliability), + min_score=get_max_optional(self.min_score, other.min_score), + ) diff --git a/src/dstack/_internal/core/compatibility/common.py b/src/dstack/_internal/core/compatibility/common.py index 6a3445a7bc..789e2d120a 100644 --- a/src/dstack/_internal/core/compatibility/common.py +++ b/src/dstack/_internal/core/compatibility/common.py @@ -1,7 +1,18 @@ -from dstack._internal.core.models.common import EntityReference +from typing import Optional + +from dstack._internal.core.models.common import EntityReference, IncludeExcludeSetType from dstack._internal.core.models.profiles import ProfileParams +def get_profile_excludes(profile: Optional[ProfileParams]) -> IncludeExcludeSetType: + excludes: IncludeExcludeSetType = set() + if profile is None: + return excludes + if profile.backend_options is None: + excludes.add("backend_options") + return excludes + + def patch_profile_params(params: ProfileParams) -> None: # If there are no project-prefixed fleets, replace all EntityReference with str # for compatibility with pre-0.20.14 servers that don't support EntityReference. diff --git a/src/dstack/_internal/core/compatibility/fleets.py b/src/dstack/_internal/core/compatibility/fleets.py index 3c76c73e7b..36c933af4f 100644 --- a/src/dstack/_internal/core/compatibility/fleets.py +++ b/src/dstack/_internal/core/compatibility/fleets.py @@ -1,10 +1,7 @@ from typing import Optional -from dstack._internal.core.compatibility.common import patch_profile_params -from dstack._internal.core.models.common import ( - IncludeExcludeDictType, - IncludeExcludeSetType, -) +from dstack._internal.core.compatibility.common import get_profile_excludes, patch_profile_params +from dstack._internal.core.models.common import IncludeExcludeDictType from dstack._internal.core.models.fleets import ApplyFleetPlanInput, FleetSpec @@ -24,6 +21,9 @@ def get_apply_plan_excludes(plan_input: ApplyFleetPlanInput) -> IncludeExcludeDi current_resource = plan_input.current_resource if current_resource is not None: current_resource_excludes = {} + current_resource_spec_excludes = get_fleet_spec_excludes(current_resource.spec) + if current_resource_spec_excludes: + current_resource_excludes["spec"] = current_resource_spec_excludes apply_plan_excludes["current_resource"] = current_resource_excludes return {"plan": apply_plan_excludes} @@ -44,9 +44,11 @@ def get_fleet_spec_excludes(fleet_spec: FleetSpec) -> Optional[IncludeExcludeDic """ spec_excludes: IncludeExcludeDictType = {} configuration_excludes: IncludeExcludeDictType = {} - profile_excludes: IncludeExcludeSetType = set() + profile_excludes = get_profile_excludes(fleet_spec.profile) spec_excludes["autocreated"] = True + if fleet_spec.configuration.backend_options is None: + configuration_excludes["backend_options"] = True if configuration_excludes: spec_excludes["configuration"] = configuration_excludes diff --git a/src/dstack/_internal/core/compatibility/runs.py b/src/dstack/_internal/core/compatibility/runs.py index ba5c96d9cd..cbed73c5bf 100644 --- a/src/dstack/_internal/core/compatibility/runs.py +++ b/src/dstack/_internal/core/compatibility/runs.py @@ -1,6 +1,6 @@ from typing import Optional -from dstack._internal.core.compatibility.common import patch_profile_params +from dstack._internal.core.compatibility.common import get_profile_excludes, patch_profile_params from dstack._internal.core.models.common import ( EntityReference, IncludeExcludeDictType, @@ -82,7 +82,10 @@ def get_run_spec_excludes(run_spec: RunSpec) -> IncludeExcludeDictType: """ spec_excludes: IncludeExcludeDictType = {} configuration_excludes: IncludeExcludeDictType = {} - profile_excludes: IncludeExcludeSetType = set() + profile_excludes = get_profile_excludes(run_spec.profile) + + if run_spec.configuration.backend_options is None: + configuration_excludes["backend_options"] = True if isinstance(run_spec.configuration, ServiceConfiguration): if run_spec.configuration.probes: @@ -148,6 +151,9 @@ def get_job_spec_excludes(job_specs: list[JobSpec]) -> IncludeExcludeDictType: if all(all(p.until_ready == DEFAULT_PROBE_UNTIL_READY for p in s.probes) for s in job_specs): probe_excludes["until_ready"] = True + if all(s.requirements.backend_options is None for s in job_specs): + spec_excludes["requirements"] = {"backend_options": True} + return spec_excludes diff --git a/src/dstack/_internal/core/models/fleets.py b/src/dstack/_internal/core/models/fleets.py index 12869ed5f9..ce636ba8de 100644 --- a/src/dstack/_internal/core/models/fleets.py +++ b/src/dstack/_internal/core/models/fleets.py @@ -7,6 +7,7 @@ from pydantic import Field, root_validator, validator from typing_extensions import Annotated, Literal +from dstack._internal.core.backends.profile_options import AnyBackendProfileOptions from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.common import ( ApplyAction, @@ -22,6 +23,7 @@ ProfileRetry, SpotPolicy, parse_idle_duration, + validate_backend_options, ) from dstack._internal.core.models.resources import ResourcesSpec from dstack._internal.utils.common import list_enum_values_for_annotation @@ -303,6 +305,10 @@ class BackendFleetConfiguraionProps(CoreModel): ) ), ] = None + backend_options: Annotated[ + Optional[List[AnyBackendProfileOptions]], + Field(description="Backend-specific options, applied only to offers from that backend"), + ] = None @validator("nodes", pre=True) def parse_nodes(cls, v: Optional[Union[dict, str]]) -> Optional[dict]: @@ -317,8 +323,10 @@ def parse_nodes(cls, v: Optional[Union[dict, str]]) -> Optional[dict]: _validate_idle_duration = validator("idle_duration", pre=True, allow_reuse=True)( parse_idle_duration ) - _validate_tags = validator("tags", pre=True, allow_reuse=True)(tags_validator) + _validate_backend_options = validator("backend_options", allow_reuse=True)( + validate_backend_options + ) class BackendFleetConfigurationPropsConfig(CoreConfig): diff --git a/src/dstack/_internal/core/models/profiles.py b/src/dstack/_internal/core/models/profiles.py index c09a32f046..f1beebd5b9 100644 --- a/src/dstack/_internal/core/models/profiles.py +++ b/src/dstack/_internal/core/models/profiles.py @@ -5,6 +5,7 @@ from pydantic import Field, root_validator, validator from typing_extensions import Annotated, Literal +from dstack._internal.core.backends.profile_options import AnyBackendProfileOptions from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.common import ( CoreConfig, @@ -99,6 +100,19 @@ def parse_idle_duration(v: Optional[Union[int, str, bool]]) -> Optional[int]: return parse_duration(v) +def validate_backend_options( + v: Optional[List["AnyBackendProfileOptions"]], +) -> Optional[List["AnyBackendProfileOptions"]]: + if v is None: + return v + seen = set() + for opt in v: + if opt.type in seen: + raise ValueError(f"backend_options contains duplicate entry for backend '{opt.type}'") + seen.add(opt.type) + return v + + class RetryEvent(str, Enum): NO_CAPACITY = "no-capacity" INTERRUPTION = "interruption" @@ -387,6 +401,10 @@ class ProfileParams(CoreModel): ) ), ] = None + backend_options: Annotated[ + Optional[List[AnyBackendProfileOptions]], + Field(description="Backend-specific options, applied only to offers from that backend"), + ] = None _validate_max_duration = validator("max_duration", pre=True, allow_reuse=True)( parse_max_duration @@ -399,6 +417,9 @@ class ProfileParams(CoreModel): ) _validate_fleets = validator("fleets", allow_reuse=True, each_item=True)(EntityReference.parse) _validate_tags = validator("tags", pre=True, allow_reuse=True)(tags_validator) + _validate_backend_options = validator("backend_options", allow_reuse=True)( + validate_backend_options + ) class ProfileProps(CoreModel): diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index e7b40c8a0b..04f4c326d8 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -6,6 +6,7 @@ from pydantic import UUID4, Field, root_validator from typing_extensions import Annotated +from dstack._internal.core.backends.profile_options import AnyBackendProfileOptions from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.common import ( ApplyAction, @@ -224,6 +225,7 @@ class Requirements(CoreModel): multinode: Optional[bool] = None """Backends can use `multinode` to filter out offers when some offers support multinode and some do not. """ + backend_options: Optional[List[AnyBackendProfileOptions]] = None def pretty_format(self, resources_only: bool = False): res = self.resources.pretty_format() diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index 1b2612ebc0..547f91d52c 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -958,6 +958,7 @@ def get_fleet_requirements(fleet_spec: FleetSpec) -> Requirements: spot=get_policy_map(profile.spot_policy, default=SpotPolicy.ONDEMAND), reservation=fleet_spec.configuration.reservation, multinode=fleet_spec.configuration.placement == InstanceGroupPlacement.CLUSTER, + backend_options=profile.backend_options, ) return requirements diff --git a/src/dstack/_internal/server/services/jobs/configurators/base.py b/src/dstack/_internal/server/services/jobs/configurators/base.py index 2298c79fa4..7eca97a91f 100644 --- a/src/dstack/_internal/server/services/jobs/configurators/base.py +++ b/src/dstack/_internal/server/services/jobs/configurators/base.py @@ -336,6 +336,7 @@ def _requirements(self, jobs_per_replica: int) -> Requirements: spot=None if spot_policy == SpotPolicy.AUTO else (spot_policy == SpotPolicy.SPOT), reservation=self.run_spec.merged_profile.reservation, multinode=jobs_per_replica > 1, + backend_options=self.run_spec.merged_profile.backend_options, ) def _retry(self) -> Optional[Retry]: diff --git a/src/dstack/_internal/server/services/requirements/combine.py b/src/dstack/_internal/server/services/requirements/combine.py index 1b402046e0..53cec3457e 100644 --- a/src/dstack/_internal/server/services/requirements/combine.py +++ b/src/dstack/_internal/server/services/requirements/combine.py @@ -1,8 +1,9 @@ -from typing import Callable, Optional, Protocol, TypeVar +from typing import Callable, List, Optional, Protocol, TypeVar from pydantic import BaseModel from typing_extensions import Self +from dstack._internal.core.backends.profile_options import AnyBackendProfileOptions from dstack._internal.core.models.profiles import Profile, SpotPolicy from dstack._internal.core.models.resources import ( CPUSpec, @@ -13,13 +14,14 @@ ResourcesSpec, ) from dstack._internal.core.models.runs import Requirements +from dstack._internal.utils.combine import ( + CombineError, + combine_optional, + get_single_value_optional, +) from dstack._internal.utils.typing import SupportsRichComparison -class CombineError(ValueError): - pass - - def combine_fleet_and_run_profiles( fleet_profile: Profile, run_profile: Profile ) -> Optional[Profile]: @@ -36,7 +38,7 @@ def combine_fleet_and_run_profiles( instance_types=_intersect_lists_optional( fleet_profile.instance_types, run_profile.instance_types ), - reservation=_get_single_value_optional( + reservation=get_single_value_optional( fleet_profile.reservation, run_profile.reservation ), spot_policy=_combine_spot_policy_optional( @@ -47,6 +49,9 @@ def combine_fleet_and_run_profiles( fleet_profile.idle_duration, run_profile.idle_duration ), tags=_combine_tags_optional(fleet_profile.tags, run_profile.tags), + backend_options=_combine_backend_options_optional( + fleet_profile.backend_options, run_profile.backend_options + ), ) except CombineError: return None @@ -60,15 +65,38 @@ def combine_fleet_and_run_requirements( resources=_combine_resources(fleet_requirements.resources, run_requirements.resources), max_price=_get_min_optional(fleet_requirements.max_price, run_requirements.max_price), spot=_combine_spot_optional(fleet_requirements.spot, run_requirements.spot), - reservation=_get_single_value_optional( + reservation=get_single_value_optional( fleet_requirements.reservation, run_requirements.reservation ), multinode=fleet_requirements.multinode or run_requirements.multinode, + backend_options=_combine_backend_options_optional( + fleet_requirements.backend_options, run_requirements.backend_options + ), ) except CombineError: return None +def _combine_backend_options( + value1: List[AnyBackendProfileOptions], + value2: List[AnyBackendProfileOptions], +) -> List[AnyBackendProfileOptions]: + by_type: dict[str, AnyBackendProfileOptions] = {opt.type: opt for opt in value1} + for opt in value2: + if opt.type in by_type: + by_type[opt.type] = by_type[opt.type].combine(opt) + else: + by_type[opt.type] = opt.copy(deep=True) + return list(by_type.values()) + + +def _combine_backend_options_optional( + value1: Optional[List[AnyBackendProfileOptions]], + value2: Optional[List[AnyBackendProfileOptions]], +) -> Optional[List[AnyBackendProfileOptions]]: + return _combine_copy_model_list_optional(value1, value2, _combine_backend_options) + + _T = TypeVar("_T") _ModelT = TypeVar("_ModelT", bound=BaseModel) _CompT = TypeVar("_CompT", bound=SupportsRichComparison) @@ -98,17 +126,7 @@ def _get_min(value1: _CompT, value2: _CompT) -> _CompT: def _get_min_optional(value1: Optional[_CompT], value2: Optional[_CompT]) -> Optional[_CompT]: - return _combine_optional(value1, value2, _get_min) - - -def _get_single_value(value1: _T, value2: _T) -> _T: - if value1 == value2: - return value1 - raise CombineError(f"Values {value1} and {value2} cannot be combined") - - -def _get_single_value_optional(value1: Optional[_T], value2: Optional[_T]) -> Optional[_T]: - return _combine_optional(value1, value2, _get_single_value) + return combine_optional(value1, value2, _get_min) def _combine_spot_policy(value1: SpotPolicy, value2: SpotPolicy) -> SpotPolicy: @@ -124,7 +142,7 @@ def _combine_spot_policy(value1: SpotPolicy, value2: SpotPolicy) -> SpotPolicy: def _combine_spot_policy_optional( value1: Optional[SpotPolicy], value2: Optional[SpotPolicy] ) -> Optional[SpotPolicy]: - return _combine_optional(value1, value2, _combine_spot_policy) + return combine_optional(value1, value2, _combine_spot_policy) def _combine_idle_duration(value1: int, value2: int) -> int: @@ -138,7 +156,7 @@ def _combine_idle_duration(value1: int, value2: int) -> int: def _combine_idle_duration_optional(value1: Optional[int], value2: Optional[int]) -> Optional[int]: - return _combine_optional(value1, value2, _combine_idle_duration) + return combine_optional(value1, value2, _combine_idle_duration) def _combine_tags_optional( @@ -163,7 +181,7 @@ def _combine_resources(value1: ResourcesSpec, value2: ResourcesSpec) -> Resource def _combine_cpu(value1: CPUSpec, value2: CPUSpec) -> CPUSpec: return CPUSpec( - arch=_get_single_value_optional(value1.arch, value2.arch), + arch=get_single_value_optional(value1.arch, value2.arch), count=_combine_range(value1.count, value2.count), ) @@ -180,7 +198,7 @@ def _combine_shm_size_optional( def _combine_gpu(value1: GPUSpec, value2: GPUSpec) -> GPUSpec: return GPUSpec( - vendor=_get_single_value_optional(value1.vendor, value2.vendor), + vendor=get_single_value_optional(value1.vendor, value2.vendor), name=_intersect_lists_optional(value1.name, value2.name), count=_combine_range(value1.count, value2.count), memory=_combine_range_optional(value1.memory, value2.memory), @@ -212,7 +230,7 @@ def _combine_spot(value1: bool, value2: bool) -> bool: def _combine_spot_optional(value1: Optional[bool], value2: Optional[bool]) -> Optional[bool]: - return _combine_optional(value1, value2, _combine_spot) + return combine_optional(value1, value2, _combine_spot) def _combine_range(value1: Range, value2: Range) -> Range: @@ -226,16 +244,6 @@ def _combine_range_optional(value1: Optional[Range], value2: Optional[Range]) -> return _combine_models_optional(value1, value2, _combine_range) -def _combine_optional( - value1: Optional[_T], value2: Optional[_T], combiner: Callable[[_T, _T], _T] -) -> Optional[_T]: - if value1 is None: - return value2 - if value2 is None: - return value1 - return combiner(value1, value2) - - def _combine_models_optional( value1: Optional[_ModelT], value2: Optional[_ModelT], @@ -262,3 +270,17 @@ def _combine_copy_optional( if value2 is None: return value1.copy() return combiner(value1, value2) + + +def _combine_copy_model_list_optional( + value1: Optional[List[_ModelT]], + value2: Optional[List[_ModelT]], + combiner: Callable[[List[_ModelT], List[_ModelT]], List[_ModelT]], +) -> Optional[List[_ModelT]]: + if value1 is None: + if value2 is not None: + return [item.copy(deep=True) for item in value2] + return None + if value2 is None: + return [item.copy(deep=True) for item in value1] + return combiner(value1, value2) diff --git a/src/dstack/_internal/utils/combine.py b/src/dstack/_internal/utils/combine.py new file mode 100644 index 0000000000..cff1e3d5fd --- /dev/null +++ b/src/dstack/_internal/utils/combine.py @@ -0,0 +1,34 @@ +from typing import Callable, Optional, TypeVar + +from dstack._internal.utils.typing import SupportsRichComparison + +_T = TypeVar("_T") +_CompT = TypeVar("_CompT", bound=SupportsRichComparison) + + +class CombineError(ValueError): + pass + + +def combine_optional( + value1: Optional[_T], value2: Optional[_T], combiner: Callable[[_T, _T], _T] +) -> Optional[_T]: + if value1 is None: + return value2 + if value2 is None: + return value1 + return combiner(value1, value2) + + +def get_max_optional(value1: Optional[_CompT], value2: Optional[_CompT]) -> Optional[_CompT]: + return combine_optional(value1, value2, max) + + +def _get_single_value(value1: _T, value2: _T) -> _T: + if value1 == value2: + return value1 + raise CombineError(f"Values {value1!r} and {value2!r} cannot be combined") + + +def get_single_value_optional(value1: Optional[_T], value2: Optional[_T]) -> Optional[_T]: + return combine_optional(value1, value2, _get_single_value) diff --git a/src/tests/_internal/core/backends/base/test_profile_options.py b/src/tests/_internal/core/backends/base/test_profile_options.py new file mode 100644 index 0000000000..635cae4d55 --- /dev/null +++ b/src/tests/_internal/core/backends/base/test_profile_options.py @@ -0,0 +1,18 @@ +from dstack._internal.core.backends.base.profile_options import get_backend_profile_options +from dstack._internal.core.backends.vastai.profile_options import ( + VastAIProfileOptions, +) + + +class TestGetBackendProfileOptions: + def test_returns_none_for_empty_list(self): + assert get_backend_profile_options([], VastAIProfileOptions) is None + + def test_returns_none_for_none(self): + assert get_backend_profile_options(None, VastAIProfileOptions) is None + + def test_returns_matching_option(self): + opts = [VastAIProfileOptions(min_score=500)] + result = get_backend_profile_options(opts, VastAIProfileOptions) + assert isinstance(result, VastAIProfileOptions) + assert result.min_score == 500 diff --git a/src/tests/_internal/core/backends/vastai/test_compute.py b/src/tests/_internal/core/backends/vastai/test_compute.py index e6f5c0a5c8..8925b38b29 100644 --- a/src/tests/_internal/core/backends/vastai/test_compute.py +++ b/src/tests/_internal/core/backends/vastai/test_compute.py @@ -2,19 +2,27 @@ from dstack._internal.core.backends.vastai.compute import VastAICompute from dstack._internal.core.backends.vastai.models import VastAIConfig, VastAICreds +from dstack._internal.core.models.resources import ResourcesSpec +from dstack._internal.core.models.runs import Requirements def _config(community_cloud=None) -> VastAIConfig: return VastAIConfig(creds=VastAICreds(api_key="test"), community_cloud=community_cloud) +def _requirements() -> Requirements: + return Requirements(resources=ResourcesSpec()) + + def test_vastai_compute_enables_community_cloud_by_default(): with ( patch("dstack._internal.core.backends.vastai.compute.VastAIProvider") as vast_provider_cls, patch("dstack._internal.core.backends.vastai.compute.gpuhunt.Catalog") as catalog_cls, + patch("dstack._internal.core.backends.vastai.compute.get_catalog_offers", return_value=[]), ): catalog_instance = catalog_cls.return_value - VastAICompute(_config()) + compute = VastAICompute(_config()) + list(compute.get_offers(_requirements())) vast_provider_cls.assert_called_once() assert vast_provider_cls.call_args.kwargs["community_cloud"] is True catalog_instance.add_provider.assert_called_once() @@ -24,9 +32,11 @@ def test_vastai_compute_can_enable_community_cloud(): with ( patch("dstack._internal.core.backends.vastai.compute.VastAIProvider") as vast_provider_cls, patch("dstack._internal.core.backends.vastai.compute.gpuhunt.Catalog") as catalog_cls, + patch("dstack._internal.core.backends.vastai.compute.get_catalog_offers", return_value=[]), ): catalog_instance = catalog_cls.return_value - VastAICompute(_config(community_cloud=True)) + compute = VastAICompute(_config(community_cloud=True)) + list(compute.get_offers(_requirements())) vast_provider_cls.assert_called_once() assert vast_provider_cls.call_args.kwargs["community_cloud"] is True catalog_instance.add_provider.assert_called_once() @@ -36,9 +46,11 @@ def test_vastai_compute_can_disable_community_cloud(): with ( patch("dstack._internal.core.backends.vastai.compute.VastAIProvider") as vast_provider_cls, patch("dstack._internal.core.backends.vastai.compute.gpuhunt.Catalog") as catalog_cls, + patch("dstack._internal.core.backends.vastai.compute.get_catalog_offers", return_value=[]), ): catalog_instance = catalog_cls.return_value - VastAICompute(_config(community_cloud=False)) + compute = VastAICompute(_config(community_cloud=False)) + list(compute.get_offers(_requirements())) vast_provider_cls.assert_called_once() assert vast_provider_cls.call_args.kwargs["community_cloud"] is False catalog_instance.add_provider.assert_called_once() diff --git a/src/tests/_internal/core/backends/vastai/test_profile_options.py b/src/tests/_internal/core/backends/vastai/test_profile_options.py new file mode 100644 index 0000000000..42f423946e --- /dev/null +++ b/src/tests/_internal/core/backends/vastai/test_profile_options.py @@ -0,0 +1,51 @@ +import pytest + +from dstack._internal.core.backends.vastai.profile_options import ( + VastAIOfferOrder, + VastAIProfileOptions, +) +from dstack._internal.utils.combine import CombineError + + +class TestVastAIProfileOptionsCombine: + def test_combine_empty_options(self): + a = VastAIProfileOptions() + b = VastAIProfileOptions() + result = a.combine(b) + assert result == VastAIProfileOptions() + + def test_combine_all_fields_set(self): + a = VastAIProfileOptions( + offer_order=VastAIOfferOrder.PRICE, + min_reliability=0.7, + min_score=100, + ) + b = VastAIProfileOptions( + offer_order=VastAIOfferOrder.PRICE, + min_reliability=0.95, + min_score=300, + ) + a_combine_b = a.combine(b) + assert a_combine_b.offer_order == VastAIOfferOrder.PRICE + assert a_combine_b.min_reliability == 0.95 + assert a_combine_b.min_score == 300 + b_combine_a = b.combine(a) + assert b_combine_a.offer_order == VastAIOfferOrder.PRICE + assert b_combine_a.min_reliability == 0.95 + assert b_combine_a.min_score == 300 + + def test_combine_one_has_all_fields_set(self): + a = VastAIProfileOptions( + offer_order=VastAIOfferOrder.PRICE, + min_reliability=0.7, + min_score=100, + ) + b = VastAIProfileOptions() + assert a.combine(b) == a + assert b.combine(a) == a + + def test_combine_conflicting_offer_order_raises(self): + a = VastAIProfileOptions(offer_order=VastAIOfferOrder.PRICE) + b = VastAIProfileOptions(offer_order=VastAIOfferOrder.SCORE) + with pytest.raises(CombineError): + a.combine(b) diff --git a/src/tests/_internal/core/models/test_profiles.py b/src/tests/_internal/core/models/test_profiles.py new file mode 100644 index 0000000000..4a1caf8bf8 --- /dev/null +++ b/src/tests/_internal/core/models/test_profiles.py @@ -0,0 +1,29 @@ +import pytest +from pydantic import ValidationError + +from dstack._internal.core.backends.vastai.profile_options import VastAIProfileOptions +from dstack._internal.core.models.profiles import Profile + + +class TestValidateProfileBackendOptions: + def test_duplicate_backend_type_raises_validation_error(self): + with pytest.raises(ValidationError, match="duplicate entry for backend 'vastai'"): + Profile( + backend_options=[ + VastAIProfileOptions(min_score=100), + VastAIProfileOptions(min_score=200), + ] + ) + + def test_single_entry_per_backend_is_valid(self): + profile = Profile(backend_options=[VastAIProfileOptions(min_score=100)]) + assert profile.backend_options is not None + assert len(profile.backend_options) == 1 + + def test_none_backend_options_is_valid(self): + profile = Profile(backend_options=None) + assert profile.backend_options is None + + def test_empty_list_backend_options_is_valid(self): + profile = Profile(backend_options=[]) + assert profile.backend_options == [] diff --git a/src/tests/_internal/server/routers/test_fleets.py b/src/tests/_internal/server/routers/test_fleets.py index 499460e300..16ce066866 100644 --- a/src/tests/_internal/server/routers/test_fleets.py +++ b/src/tests/_internal/server/routers/test_fleets.py @@ -951,6 +951,7 @@ async def test_creates_fleet(self, test_db, session: AsyncSession, client: Async "reservation": None, "blocks": 1, "tags": None, + "backend_options": None, }, "profile": { "backends": None, @@ -973,6 +974,7 @@ async def test_creates_fleet(self, test_db, session: AsyncSession, client: Async "reservation": None, "fleets": None, "tags": None, + "backend_options": None, }, "autocreated": False, }, @@ -1069,6 +1071,7 @@ async def test_creates_ssh_fleet(self, test_db, session: AsyncSession, client: A "reservation": None, "blocks": 1, "tags": None, + "backend_options": None, }, "profile": { "backends": None, @@ -1091,6 +1094,7 @@ async def test_creates_ssh_fleet(self, test_db, session: AsyncSession, client: A "reservation": None, "fleets": None, "tags": None, + "backend_options": None, }, "autocreated": False, }, @@ -1286,6 +1290,7 @@ async def test_updates_ssh_fleet(self, test_db, session: AsyncSession, client: A "reservation": None, "blocks": 1, "tags": None, + "backend_options": None, }, "profile": { "backends": None, @@ -1308,6 +1313,7 @@ async def test_updates_ssh_fleet(self, test_db, session: AsyncSession, client: A "reservation": None, "fleets": None, "tags": None, + "backend_options": None, }, "autocreated": False, }, diff --git a/src/tests/_internal/server/routers/test_runs.py b/src/tests/_internal/server/routers/test_runs.py index 6bdf8b7a29..5e37242bdb 100644 --- a/src/tests/_internal/server/routers/test_runs.py +++ b/src/tests/_internal/server/routers/test_runs.py @@ -212,6 +212,7 @@ def get_dev_env_run_plan_dict( "reservation": None, "fleets": None, "tags": None, + "backend_options": None, "priority": 0, }, "configuration_path": "dstack.yaml", @@ -237,6 +238,7 @@ def get_dev_env_run_plan_dict( "reservation": None, "fleets": None, "tags": None, + "backend_options": None, }, "repo_code_hash": None, "repo_data": { @@ -290,6 +292,7 @@ def get_dev_env_run_plan_dict( "spot": None, "reservation": None, "multinode": False, + "backend_options": None, }, "retry": None, "volumes": volumes, @@ -454,6 +457,7 @@ def get_dev_env_run_dict( "reservation": None, "fleets": None, "tags": None, + "backend_options": None, "priority": 0, }, "configuration_path": "dstack.yaml", @@ -479,6 +483,7 @@ def get_dev_env_run_dict( "reservation": None, "fleets": None, "tags": None, + "backend_options": None, }, "repo_code_hash": None, "repo_data": { @@ -527,6 +532,7 @@ def get_dev_env_run_dict( "spot": None, "reservation": None, "multinode": False, + "backend_options": None, }, "retry": None, "volumes": [], diff --git a/src/tests/_internal/server/services/requirements/test_combine.py b/src/tests/_internal/server/services/requirements/test_combine.py index 294ed3f43a..3680161e83 100644 --- a/src/tests/_internal/server/services/requirements/test_combine.py +++ b/src/tests/_internal/server/services/requirements/test_combine.py @@ -3,6 +3,10 @@ import gpuhunt import pytest +from dstack._internal.core.backends.vastai.profile_options import ( + VastAIOfferOrder, + VastAIProfileOptions, +) from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.profiles import SpotPolicy from dstack._internal.core.models.resources import ( @@ -18,6 +22,7 @@ from dstack._internal.server.services.requirements.combine import ( CombineError, Profile, + _combine_backend_options_optional, _combine_cpu, _combine_gpu_optional, _combine_idle_duration_optional, @@ -101,6 +106,22 @@ def test_prefers_finite_idle_duration_over_off(self): None, id="incompatible_profiles", ), + pytest.param( + Profile(backend_options=[VastAIProfileOptions(min_score=100)]), + Profile(backend_options=[VastAIProfileOptions(min_score=400)]), + Profile(backend_options=[VastAIProfileOptions(min_score=400)]), + id="backend_options_compatible", + ), + pytest.param( + Profile( + backend_options=[VastAIProfileOptions(offer_order=VastAIOfferOrder.PRICE)] + ), + Profile( + backend_options=[VastAIProfileOptions(offer_order=VastAIOfferOrder.SCORE)] + ), + None, + id="backend_options_incompatible", + ), ], ) def test_combines_profiles( @@ -151,6 +172,33 @@ def test_returns_the_same_requirements_if_requirements_identical(self): None, id="incompatible_requirements", ), + pytest.param( + Requirements( + resources=ResourcesSpec(), + backend_options=[VastAIProfileOptions(min_score=100)], + ), + Requirements( + resources=ResourcesSpec(), + backend_options=[VastAIProfileOptions(min_score=400)], + ), + Requirements( + resources=ResourcesSpec(), + backend_options=[VastAIProfileOptions(min_score=400)], + ), + id="backend_options_compatible", + ), + pytest.param( + Requirements( + resources=ResourcesSpec(), + backend_options=[VastAIProfileOptions(offer_order=VastAIOfferOrder.PRICE)], + ), + Requirements( + resources=ResourcesSpec(), + backend_options=[VastAIProfileOptions(offer_order=VastAIOfferOrder.SCORE)], + ), + None, + id="backend_options_incompatible", + ), ], ) def test_combines_requirements( @@ -426,3 +474,26 @@ def test_non_overlapping_memory_ranges_raises_error(self): gpu2 = GPUSpec(count=Range(min=1, max=2), memory=Range(min=Memory(32), max=Memory(64))) with pytest.raises(CombineError): _combine_gpu_optional(gpu1, gpu2) + + +class TestCombineBackendOptionsOptional: + def test_both_none_returns_none(self): + assert _combine_backend_options_optional(None, None) is None + + def test_one_none_returns_copy_of_other(self): + opts = [VastAIProfileOptions(min_score=100)] + combine_none_opts = _combine_backend_options_optional(None, opts) + assert combine_none_opts == opts + assert combine_none_opts is not opts + combine_opts_none = _combine_backend_options_optional(opts, None) + assert combine_opts_none == opts + assert combine_opts_none is not opts + + def test_combines_same_backend_type(self): + opts1 = [VastAIProfileOptions(min_score=100, min_reliability=0.7)] + opts2 = [VastAIProfileOptions(min_score=300, min_reliability=0.95)] + result = _combine_backend_options_optional(opts1, opts2) + assert result is not None + assert len(result) == 1 + assert result[0].min_score == 300 + assert result[0].min_reliability == 0.95 From 305038e5cb2061c46976f5567edf475e1ea0760f Mon Sep 17 00:00:00 2001 From: Jvst Me Date: Wed, 27 May 2026 10:14:17 +0200 Subject: [PATCH 2/2] Pin new gpuhunt version --- pyproject.toml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5381277dd1..221c0bba90 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,7 @@ dependencies = [ "python-multipart>=0.0.16", "filelock", "psutil", - "gpuhunt @ git+https://github.com/dstackai/gpuhunt.git@vastai_offer_order", + "gpuhunt==0.1.23", "argcomplete>=3.5.0", "ignore-python>=0.2.0", "orjson", @@ -67,9 +67,6 @@ artifacts = [ "src/dstack/_internal/server/statics/**", ] -[tool.hatch.metadata] -allow-direct-references = true - [tool.hatch.metadata.hooks.fancy-pypi-readme] content-type = "text/markdown"