Skip to content

vllm.v1.executor.ray_utils

FutureWrapper

Bases: Future

A wrapper around Ray output reference to meet the interface of .execute_model(): The top level (core busy loop) expects .result() api to block and return a single output.

If aggregator is provided, the outputs from all workers are aggregated upon the result() call. If not only the first worker's output is returned.

Source code in vllm/v1/executor/ray_utils.py
class FutureWrapper(Future):
    """A wrapper around Ray output reference to meet the interface
    of .execute_model(): The top level (core busy loop) expects .result() api
    to block and return a single output.

    If aggregator is provided, the outputs from all workers are aggregated upon
    the result() call. If not only the first worker's output is returned.
    """

    def __init__(self, ref_or_refs, aggregator: KVOutputAggregator | None = None):
        super().__init__()
        self.ref_or_refs = ref_or_refs
        self.aggregator = aggregator

    def result(self, timeout=None):
        outputs = ray.get(self.ref_or_refs, timeout=timeout)
        if self.aggregator is None:
            return outputs

        return self.aggregator.aggregate(outputs, output_rank=0)

RayWorkerWrapper

Bases: WorkerWrapperBase

Ray wrapper for vllm.worker.Worker, allowing Worker to be lazily initialized after Ray sets CUDA_VISIBLE_DEVICES.

Source code in vllm/v1/executor/ray_utils.py
class RayWorkerWrapper(WorkerWrapperBase):
    """Ray wrapper for vllm.worker.Worker, allowing Worker to be
    lazily initialized after Ray sets CUDA_VISIBLE_DEVICES."""

    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        # Since the compiled DAG runs a main execution
        # in a different thread that calls cuda.set_device.
        # The flag indicates is set_device is called on
        # that thread.
        self.compiled_dag_cuda_device_set = False

    rpc_rank: int

    def adjust_rank(self, rank_mapping: dict[int, int]) -> None:
        """
        Adjust the rpc_rank based on the given mapping.
        It is only used during the initialization of the executor,
        to adjust the rpc_rank of workers after we create all workers.
        """
        if self.rpc_rank in rank_mapping:
            self.rpc_rank = rank_mapping[self.rpc_rank]

    def execute_method(self, method: str | bytes, *args, **kwargs):
        try:
            return run_method(self, method, args, kwargs)
        except Exception as e:
            # if the driver worker also execute methods,
            # exceptions in the rest worker may cause deadlock in rpc
            # see https://github.com/vllm-project/vllm/issues/3455
            msg = (
                f"Error executing method {method!r}. "
                "This might cause deadlock in distributed execution."
            )
            logger.exception(msg)
            raise e

    def get_node_ip(self) -> str:
        return get_ip()

    def get_node_and_gpu_ids(self) -> tuple[str, list[int]]:
        node_id = ray.get_runtime_context().get_node_id()
        device_key = vllm.platforms.current_platform.ray_device_key
        if not device_key:
            raise RuntimeError(
                "current platform %s does not support ray.",
                vllm.platforms.current_platform.device_name,
            )
        gpu_ids = ray.get_runtime_context().get_accelerator_ids()[device_key]
        return node_id, gpu_ids

    def setup_device_if_necessary(self):
        # TODO(swang): This is needed right now because Ray CG executes
        # on a background thread, so we need to reset torch's current
        # device.
        # We can remove this API after it is fixed in compiled graph.
        assert self.worker is not None, "Worker is not initialized"
        if not self.compiled_dag_cuda_device_set:
            if current_platform.is_tpu():
                # Not needed
                pass
            else:
                assert self.worker.device is not None
                current_platform.set_device(self.worker.device)

            self.compiled_dag_cuda_device_set = True

    def execute_model_ray(
        self,
        execute_model_input: tuple["SchedulerOutput", "GrammarOutput"]
        | tuple["SchedulerOutput", "GrammarOutput", "IntermediateTensors"],
    ) -> Union[
        "ModelRunnerOutput",
        tuple["SchedulerOutput", "GrammarOutput", "IntermediateTensors"],
    ]:
        # This method is used by Ray Compiled Graph to execute the model,
        # and it needs a special logic of self.setup_device_if_necessary()
        self.setup_device_if_necessary()
        assert self.worker is not None, "Worker is not initialized"
        if len(execute_model_input) == 3:
            scheduler_output, grammar_output, intermediate_tensors = (
                execute_model_input
            )
        else:
            scheduler_output, grammar_output = execute_model_input
            intermediate_tensors = None
        assert self.worker.model_runner is not None
        output = self.worker.model_runner.execute_model(
            scheduler_output, intermediate_tensors
        )
        if self._is_intermediate_tensors(output):
            if (
                self.worker.model_runner.supports_mm_inputs
                and get_pp_group().is_first_rank
            ):
                # Strip mm_features before Ray forwards it to the next PP Stage.
                # PP Stage>0 only needs the intermediate tensors,
                # not preprocessed multimodal data.

                # scheduled_new_reqs is a required field of SchedulerOutput,
                # so accessing it directly will raise AttributeError if missing.
                for req in scheduler_output.scheduled_new_reqs:
                    req.mm_features = []
            return scheduler_output, grammar_output, output

        if isinstance(output, AsyncModelRunnerOutput):
            output = output.get_output()
        if not self._is_last_rank():
            # Case where there are no scheduled requests
            # but may still be finished requests.
            assert not output or not output.req_ids
            output = scheduler_output, grammar_output, None
        elif output is None:
            output = self.worker.model_runner.sample_tokens(grammar_output)
            # Ensure outputs crossing Ray compiled DAG are serializable.
            # AsyncModelRunnerOutput holds CUDA events and cannot be
            # pickled.
            if isinstance(output, AsyncModelRunnerOutput):
                output = output.get_output()
        return output

    def override_env_vars(self, vars: dict[str, str]):
        os.environ.update(vars)

    def _is_intermediate_tensors(self, output) -> bool:
        return isinstance(output, IntermediateTensors)

    def _is_last_rank(self) -> bool:
        return get_pp_group().is_last_rank

adjust_rank

adjust_rank(rank_mapping: dict[int, int]) -> None

Adjust the rpc_rank based on the given mapping. It is only used during the initialization of the executor, to adjust the rpc_rank of workers after we create all workers.

Source code in vllm/v1/executor/ray_utils.py
def adjust_rank(self, rank_mapping: dict[int, int]) -> None:
    """
    Adjust the rpc_rank based on the given mapping.
    It is only used during the initialization of the executor,
    to adjust the rpc_rank of workers after we create all workers.
    """
    if self.rpc_rank in rank_mapping:
        self.rpc_rank = rank_mapping[self.rpc_rank]

_verify_bundles

_verify_bundles(
    placement_group: PlacementGroup,
    parallel_config: ParallelConfig,
    device_str: str,
    require_gpu_on_driver: bool = True,
)

Verify a given placement group has bundles located in the right place.

There are 2 rules. - Warn if all tensor parallel workers cannot fit in a single node. - Fail if driver node is not included in a placement group (only when require_gpu_on_driver is True).

Source code in vllm/v1/executor/ray_utils.py
def _verify_bundles(
    placement_group: "PlacementGroup",
    parallel_config: ParallelConfig,
    device_str: str,
    require_gpu_on_driver: bool = True,
):
    """Verify a given placement group has bundles located in the right place.

    There are 2 rules.
    - Warn if all tensor parallel workers cannot fit in a single node.
    - Fail if driver node is not included in a placement group
      (only when require_gpu_on_driver is True).
    """
    assert ray.is_initialized(), (
        "Ray is not initialized although distributed-executor-backend is ray."
    )
    pg_data = placement_group_table(placement_group)
    # bundle_idx -> node_id
    bundle_to_node_ids = pg_data["bundles_to_node_id"]
    # bundle_idx -> bundle (e.g., {"GPU": 1})
    bundles = pg_data["bundles"]
    # node_id -> List of bundle (e.g., {"GPU": 1})
    node_id_to_bundle: dict[str, list[dict[str, float]]] = defaultdict(list)

    for bundle_idx, node_id in bundle_to_node_ids.items():
        node_id_to_bundle[node_id].append(bundles[bundle_idx])
    driver_node_id = ray.get_runtime_context().get_node_id()

    if require_gpu_on_driver and driver_node_id not in node_id_to_bundle:
        raise RuntimeError(
            f"driver node id {driver_node_id} is not included in a placement "
            f"group {placement_group.id}. Node id -> bundles "
            f"{node_id_to_bundle}. "
            "You don't have enough GPUs available in a current node. Check "
            "`ray status` and `ray list nodes` to see if you have available "
            "GPUs in a node `{driver_node_id}` before starting an vLLM engine."
        )

    for node_id, bundles in node_id_to_bundle.items():
        if len(bundles) < parallel_config.tensor_parallel_size:
            logger.warning(
                "tensor_parallel_size=%d "
                "is bigger than a reserved number of %ss (%d "
                "%ss) in a node %s. Tensor parallel workers can be "
                "spread out to 2+ nodes which can degrade the performance "
                "unless you have fast interconnect across nodes, like "
                "Infiniband. To resolve this issue, make sure you have more "
                "than %d GPUs available at each node.",
                parallel_config.tensor_parallel_size,
                device_str,
                len(bundles),
                device_str,
                node_id,
                parallel_config.tensor_parallel_size,
            )

_wait_until_pg_ready

_wait_until_pg_ready(
    current_placement_group: PlacementGroup,
)

Wait until a placement group is ready.

It prints the informative log messages if the placement group is not created within time.

Source code in vllm/v1/executor/ray_utils.py
def _wait_until_pg_ready(current_placement_group: "PlacementGroup"):
    """Wait until a placement group is ready.

    It prints the informative log messages if the placement group is
    not created within time.

    """
    # Wait until PG is ready - this will block until all
    # requested resources are available, and will time out
    # if they cannot be provisioned.
    placement_group_specs = current_placement_group.bundle_specs

    s = time.time()
    pg_ready_ref = current_placement_group.ready()
    wait_interval = 10
    while time.time() - s < PG_WAIT_TIMEOUT:
        ready, _ = ray.wait([pg_ready_ref], timeout=wait_interval)
        if len(ready) > 0:
            break

        # Exponential backoff for warning print.
        wait_interval *= 2
        logger.info(
            "Waiting for creating a placement group of specs for "
            "%d seconds. specs=%s. Check `ray status` and "
            "`ray list nodes` to see if you have enough resources,"
            " and make sure the IP addresses used by ray cluster"
            " are the same as VLLM_HOST_IP environment variable"
            " specified in each node if you are running on a multi-node.",
            int(time.time() - s),
            placement_group_specs,
        )

    try:
        ray.get(pg_ready_ref, timeout=0)
    except ray.exceptions.GetTimeoutError:
        # Provide more helpful error message when GPU count is exceeded
        total_gpu_required = sum(spec.get("GPU", 0) for spec in placement_group_specs)
        # If more than one GPU is required for the placement group, provide a
        # more specific error message.
        # We use >1 here because multi-GPU (tensor parallel) jobs are more
        # likely to fail due to insufficient cluster resources, and users may
        # need to adjust tensor_parallel_size to fit available GPUs.
        if total_gpu_required > 1:
            raise ValueError(
                f"Cannot provide a placement group requiring "
                f"{total_gpu_required} GPUs "
                f"(placement_group_specs={placement_group_specs}) within "
                f"{PG_WAIT_TIMEOUT} seconds.\n"
                f"Tensor parallel size may exceed available GPUs in your "
                f"cluster. Check resources with `ray status` and "
                f"`ray list nodes`.\n"
                f"If running on K8s with limited GPUs, consider reducing "
                f"--tensor-parallel-size to match available GPU resources."
            ) from None
        else:
            raise ValueError(
                "Cannot provide a placement group of "
                f"{placement_group_specs=} within "
                f"{PG_WAIT_TIMEOUT} seconds. See "
                "`ray status` and `ray list nodes` to make sure the cluster "
                "has enough resources."
            ) from None

assert_ray_available

assert_ray_available()

Raise an exception if Ray is not available.

Source code in vllm/v1/executor/ray_utils.py
def assert_ray_available():
    """Raise an exception if Ray is not available."""
    if ray is None:
        raise ValueError(
            f"Failed to import Ray: {ray_import_err}."
            "Please install Ray with `pip install ray`."
        )

build_actor_name

build_actor_name(
    instance_id: str,
    rank: int,
    tp_size: int,
    pp_size: int,
    pcp_size: int,
) -> str

Build a descriptive Ray actor name for dashboard visibility.

Source code in vllm/v1/executor/ray_utils.py
def build_actor_name(
    instance_id: str,
    rank: int,
    tp_size: int,
    pp_size: int,
    pcp_size: int,
) -> str:
    """Build a descriptive Ray actor name for dashboard visibility."""
    name = f"vllm_Worker_{instance_id}"
    if tp_size > 1:
        name += f"_TP{rank % tp_size}"
    if pp_size > 1:
        name += f"_PP{(rank // tp_size) % pp_size}"
    if pcp_size > 1:
        name += f"_PCP{rank // (tp_size * pp_size)}"
    return name

get_bundles_for_indices

get_bundles_for_indices(
    placement_group: PlacementGroup,
    bundle_indices: list[int],
    world_size: int,
) -> list[tuple[int, str, str]]

Return GPU bundle indices paired with node IDs and node IPs for explicit bundle indices specified via VLLM_RAY_BUNDLE_INDICES.

Source code in vllm/v1/executor/ray_utils.py
def get_bundles_for_indices(
    placement_group: "PlacementGroup",
    bundle_indices: list[int],
    world_size: int,
) -> list[tuple[int, str, str]]:
    """
    Return GPU bundle indices paired with node IDs and node IPs for
    explicit bundle indices specified via VLLM_RAY_BUNDLE_INDICES.
    """
    assert len(bundle_indices) == world_size, (
        "VLLM_RAY_BUNDLE_INDICES must have the same size"
        f" as the world size, but got {bundle_indices=} "
        f"and {world_size=}"
    )
    assert len(set(bundle_indices)) == len(bundle_indices), (
        "VLLM_RAY_BUNDLE_INDICES cannot have duplicate values,"
        f" but got {bundle_indices=}"
    )

    pg_data = placement_group_table(placement_group)
    pg_bundle_to_node = pg_data["bundles_to_node_id"]
    node_id_to_ip = {
        n["NodeID"]: n["NodeManagerAddress"] for n in ray.nodes() if n["Alive"]
    }
    return [
        (bid, pg_bundle_to_node[bid], node_id_to_ip[pg_bundle_to_node[bid]])
        for bid in bundle_indices
    ]

get_bundles_sorted_by_node

get_bundles_sorted_by_node(
    placement_group: PlacementGroup,
) -> list[tuple[int, str, str]]

Return GPU bundle indices paired with node IDs and node IPs, sorted driver-first.

This utility has to be invoked from the driver node.

Example: 3-node cluster, driver on node-A, PG bundles spread across nodes:

Input: [ (0, node-C), (1, node-A), (2, node-B), (3, node-C), (4, node-A), (5, node-B), ] Output: [ (1, node-A), (4, node-A), (2, node-B), (5, node-B), (0, node-C), (3, node-C), ]

Source code in vllm/v1/executor/ray_utils.py
def get_bundles_sorted_by_node(
    placement_group: "PlacementGroup",
) -> list[tuple[int, str, str]]:
    """
    Return GPU bundle indices paired with node IDs and node IPs,
    sorted driver-first.

    This utility has to be invoked from the driver node.

    Example: 3-node cluster, driver on node-A, PG bundles spread
    across nodes:

      Input: [
          (0, node-C),
          (1, node-A),
          (2, node-B),
          (3, node-C),
          (4, node-A),
          (5, node-B),
      ]
      Output: [
          (1, node-A),
          (4, node-A),
          (2, node-B),
          (5, node-B),
          (0, node-C),
          (3, node-C),
      ]
    """
    pg_data = placement_group_table(placement_group)
    bundle_to_node = pg_data["bundles_to_node_id"]

    ray_device_key = current_platform.ray_device_key
    if not ray_device_key:
        raise ValueError(
            f"current platform {current_platform.device_name} does not support ray."
        )

    node_id_to_ip = {
        n["NodeID"]: n["NodeManagerAddress"] for n in ray.nodes() if n["Alive"]
    }

    bundle_specs = placement_group.bundle_specs
    assert bundle_specs is not None
    bundle_to_node_id: list[tuple[int, str, str]] = []
    for bundle_idx, bundle in enumerate(bundle_specs):
        if bundle.get(ray_device_key):
            node_id = bundle_to_node.get(bundle_idx)
            bundle_to_node_id.append((bundle_idx, node_id, node_id_to_ip[node_id]))

    driver_node = ray.get_runtime_context().get_node_id()

    def _sort_key(item):
        _, node_id, _ = item
        return (0 if node_id == driver_node else 1, node_id)

    bundle_to_node_id.sort(key=_sort_key)

    return bundle_to_node_id

initialize_ray_cluster

initialize_ray_cluster(
    parallel_config: ParallelConfig,
    ray_address: str | None = None,
    require_gpu_on_driver: bool = True,
)

Initialize the distributed cluster with Ray.

it will connect to the Ray cluster and create a placement group for the workers, which includes the specification of the resources for each distributed worker.

Parameters:

Name Type Description Default
parallel_config ParallelConfig

The configurations for parallel execution.

required
ray_address str | None

The address of the Ray cluster. If None, uses the default Ray cluster address.

None
require_gpu_on_driver bool

If True (default), require at least one GPU on the current (driver) node and pin the first PG bundle to it. Set to False for executors like RayExecutorV2 where all GPU work is delegated to remote Ray actors.

True
Source code in vllm/v1/executor/ray_utils.py
def initialize_ray_cluster(
    parallel_config: ParallelConfig,
    ray_address: str | None = None,
    require_gpu_on_driver: bool = True,
):
    """Initialize the distributed cluster with Ray.

    it will connect to the Ray cluster and create a placement group
    for the workers, which includes the specification of the resources
    for each distributed worker.

    Args:
        parallel_config: The configurations for parallel execution.
        ray_address: The address of the Ray cluster. If None, uses
            the default Ray cluster address.
        require_gpu_on_driver: If True (default), require at least one GPU
            on the current (driver) node and pin the first PG bundle to it.
            Set to False for executors like RayExecutorV2 where all GPU work
            is delegated to remote Ray actors.
    """
    assert_ray_available()
    from vllm.platforms import current_platform

    # Disable Ray usage stats collection
    if os.environ.get("RAY_USAGE_STATS_ENABLED", "0") != "1":
        os.environ["RAY_USAGE_STATS_ENABLED"] = "0"

    # Prevalidate GPU requirements before Ray processing
    if current_platform.is_cuda() and parallel_config.world_size > 1:
        available_gpus = current_platform.device_count()
        if parallel_config.world_size > available_gpus:
            logger.warning(
                "Tensor parallel size (%d) exceeds available GPUs (%d). "
                "This may result in Ray placement group allocation failures. "
                "Consider reducing tensor_parallel_size to %d or less, "
                "or ensure your Ray cluster has %d GPUs available.",
                parallel_config.world_size,
                available_gpus,
                available_gpus,
                parallel_config.world_size,
            )

    if ray.is_initialized():
        logger.info("Ray is already initialized. Skipping Ray initialization.")
    elif current_platform.is_rocm() or current_platform.is_xpu():
        # Try to connect existing ray instance and create a new one if not found
        try:
            ray.init("auto")
        except ConnectionError:
            logger.warning(
                "No existing RAY instance detected. "
                "A new instance will be launched with current node resources."
            )
            ray.init(
                address=ray_address,
                num_gpus=parallel_config.world_size,
                runtime_env=parallel_config.ray_runtime_env,
            )
    else:
        ray.init(address=ray_address, runtime_env=parallel_config.ray_runtime_env)

    device_str = current_platform.ray_device_key
    if not device_str:
        raise ValueError(
            f"current platform {current_platform.device_name} does not support ray."
        )

    # Create or get the placement group for worker processes
    if parallel_config.placement_group:
        current_placement_group = parallel_config.placement_group
    else:
        current_placement_group = ray.util.get_current_placement_group()

    if current_placement_group:
        logger.info("Using the existing placement group")

        # We are in a placement group
        bundles = current_placement_group.bundle_specs
        # Verify that we can use the placement group.
        device_bundles = 0
        for bundle in bundles:
            bundle_devices = bundle.get(device_str, 0)
            if bundle_devices > 1:
                raise ValueError(
                    f"Placement group bundle cannot have more than 1 {device_str}."
                )
            if bundle_devices:
                device_bundles += 1
        if parallel_config.world_size > device_bundles:
            raise ValueError(
                f"The number of required {device_str}s exceeds the total "
                f"number of available {device_str}s in the placement group. "
                f"Required number of devices: {parallel_config.world_size}. "
                f"Total number of devices: {device_bundles}."
            )
    else:
        logger.info("No current placement group found. Creating a new placement group.")
        num_devices_in_cluster = ray.cluster_resources().get(device_str, 0)
        # Log a warning message and delay resource allocation failure response.
        # Avoid immediate rejection to allow user-initiated placement group
        # created and wait cluster to be ready
        if parallel_config.world_size > num_devices_in_cluster:
            logger.warning(
                "The number of required %ss exceeds the total "
                "number of available %ss in the placement group.",
                device_str,
                device_str,
            )
        # Create a new placement group
        placement_group_specs: list[dict[str, float]] = [
            {device_str: 1.0} for _ in range(parallel_config.world_size)
        ]

        # vLLM engine is also a worker to execute model with an accelerator,
        # so it requires to have the device in a current node. Check if
        # the current node has at least one device.
        current_ip = get_ip()
        current_node_id = ray.get_runtime_context().get_node_id()
        current_node_resource = available_resources_per_node()[current_node_id]
        # TODO (jeffreywang): require_gpu_on_driver should be always False
        # after deprecating RayDistributedExecutor.
        if require_gpu_on_driver:
            if current_node_resource.get(device_str, 0) < 1:
                raise ValueError(
                    f"Current node has no {device_str} available. "
                    f"{current_node_resource=}. vLLM engine cannot start "
                    f"without {device_str}. Make sure you have at least 1 "
                    f"{device_str} available in a node "
                    f"{current_node_id=} {current_ip=}."
                )
            # This way, at least bundle is required to be created in a
            # current node.
            placement_group_specs[0][f"node:{current_ip}"] = 0.001

        # By default, Ray packs resources as much as possible.
        current_placement_group = ray.util.placement_group(
            placement_group_specs, strategy="PACK"
        )
        _wait_until_pg_ready(current_placement_group)

    assert current_placement_group is not None
    _verify_bundles(
        current_placement_group, parallel_config, device_str, require_gpu_on_driver
    )
    # Set the placement group in the parallel config
    parallel_config.placement_group = current_placement_group

ray_is_available

ray_is_available() -> bool

Returns True if Ray is available.

Source code in vllm/v1/executor/ray_utils.py
def ray_is_available() -> bool:
    """Returns True if Ray is available."""
    return ray is not None