Skip to main content

Ray on Slurm

Ray is bundled as a first-class launcher in dagster-slurm. This page explains how to enable it, how scheduling works across the different execution modes, and how to keep observability when your Ray workers live inside a Slurm allocation.

When to reach for Ray

Use Ray when your asset logic benefits from distributed execution or GPU scheduling but you still want Dagster to orchestrate upstream/downstream dependencies. Typical patterns include:

  • Scaling model training and hyper-parameter sweeps across multiple Slurm nodes.
  • Running Ray data pipelines that need high-throughput IO.
  • Mixing CPU- and GPU-heavy stages in the same asset (leveraging heterogeneous allocations).

If your workload is single-process, the default Bash launcher remains simpler.

Prerequisites

  • dagster-slurm>=1.5.0
  • A pixi environment (local or pre-deployed) that installs ray>=2.48.0
  • Slurm partitions with the resources required by your Ray workers

For containerised development the bundled docker-compose stack already provides these pieces.

Configure the compute resource

Ray support starts with the standard ComputeResource from dagster_slurm. You provide:

  1. An execution mode (local or slurm)
  2. A RayLauncher either as the default launcher or per asset
  3. Optional run-scoped allocation settings when several compatible Ray assets should share one Slurm allocation.
definitions/assets.py
import dagster as dg
from dagster_slurm import (
ComputeResource,
RayLauncher,
SlurmQueueConfig,
SlurmResource,
SSHConnectionResource,
)

ssh = SSHConnectionResource.from_env(prefix="SLURM_EDGE_NODE_")
slurm = SlurmResource(
ssh=ssh,
queue=SlurmQueueConfig(partition="compute", qos="normal", time_limit="01:00:00"),
)

compute = ComputeResource(
mode="slurm",
slurm=slurm,
default_launcher=RayLauncher(
num_gpus_per_node=4,
),
)

Add compute to your asset's dependencies using Dagster's standard resource injection.

Control-plane asset example

assets/train.py
import dagster as dg
from dagster_slurm import ComputeResource, RayLauncher


@dg.asset(required_resource_keys={"compute"})
def train_model(context: dg.AssetExecutionContext) -> None:
ray_launcher = RayLauncher(
num_cpus_per_node=16,
num_gpus_per_node=2,
working_dir="ray_jobs/train_model",
)

completed = context.resources.compute.run(
context=context,
payload_path="python -m train.entrypoint",
launcher=ray_launcher,
extra_env={"WANDB_MODE": "offline"},
extras={"experiment": context.run.run_id},
)
yield from completed.get_results()

Key points:

  • Override the launcher per asset when resource requirements differ from the default.
  • payload_path can be a script path or module invocation (python -m ...). The Ray launcher ensures the command runs inside the Ray head node.
  • extras travel to the user-code process via Dagster Pipes.

User-plane entrypoint

train/entrypoint.py
import ray
from dagster_pipes import PipesContext, open_dagster_pipes


@ray.remote(num_gpus=1)
def train_shard(config: dict[str, str]) -> dict[str, float]:
# Real training logic goes here
return {"loss": 0.01, "accuracy": 0.98}


def main() -> None:
context = PipesContext.get()
context.log.info("Launching Ray jobs...")
handles = [train_shard.remote({"shard": i}) for i in range(4)]
results = ray.get(handles)
context.report_asset_materialization(
metadata={"shards": len(results), "max_accuracy": max(r["accuracy"] for r in results)}
)
context.log.info("Training complete")


if __name__ == "__main__":
with open_dagster_pipes():
ray.init(address="auto") # Connect to the cluster inside the Slurm allocation
main()

The launcher exposes the head node address via Ray defaults, so ray.init(address="auto") connects automatically when workers run inside the same allocation.

Share one Ray cluster across assets

The stable default is still one Slurm allocation per asset. For consecutive Ray assets with the same resource shape, opt into a run-owned allocation:

definitions/resources.py
from dagster_slurm import (
ComputeResource,
RayLauncher,
SlurmAllocationScope,
SlurmRunAllocationConfig,
)

compute_ray = ComputeResource(
mode="slurm",
slurm=slurm,
default_launcher=RayLauncher(num_gpus_per_node=1),
allocation_scope=SlurmAllocationScope.RUN,
run_allocation=SlurmRunAllocationConfig(
num_nodes=2,
gpus_per_node=1,
cpus_per_task=8,
mem="64G",
time_limit="04:00:00",
partition="gpu",
),
)

With this configuration, dagster-slurm submits one run-owned Slurm allocation, starts Ray once inside it, injects RAY_ADDRESS into each compatible asset step, and cancels the allocation when the Dagster run tears down. If an asset asks for a different Slurm shape through extra_slurm_opts, the run fails before submission.

In the bundled example project, set SLURM_ALLOCATION_SCOPE=run to apply this mode to compute_ray while leaving the Bash and Spark resources on per-asset Slurm jobs.

Execution modes and Ray behaviour

ModeBehaviour
localStarts a local Ray runtime on the Dagster node. Ideal for development, no Slurm required.
slurm + asset scopeSubmits a Slurm job per asset; the RayLauncher bootstraps a Ray cluster inside that allocation and tears it down afterwards.
slurm + run scopeSubmits one Slurm allocation for the Dagster run, starts Ray once, and runs compatible Ray asset steps inside that allocation.

Heterogeneous jobs remain experimental and separate from run-scoped Ray allocation.

To mix Ray with other launchers, override launcher= per asset or per step.

Resource sizing

  • num_gpus_per_node determines Ray GPU visibility inside each Slurm node.
  • Use SlurmRunAllocationConfig.cpus_per_task, mem, num_nodes, and queue fields to size run-scoped allocations.
  • ray_start_args lets you pass ray start flags for debugging.

Packaging the environment

Set ComputeResource.auto_detect_platform=True (default) so pixi packs the right platform build for your cluster. For faster startup in production:

  1. Pre-build the environment via pixi run deploy-prod-docker.
  2. Point ComputeResource.pre_deployed_env_path to the installation.

Ray workers inherit that environment automatically.

Observability

  • Dagster logs capture Ray's stdout/stderr from the head node. Use the Dagster UI metadata for quick metrics.
  • Enable the Ray dashboard by exposing the web UI through SSH tunnelling (ray_args=["--dashboard-host=0.0.0.0"]), then forward the port from the edge node.
  • Slurm usage metrics (CPU efficiency, memory high-water mark, elapsed time) appear in the asset materialization metadata automatically.

Real-world examples

Troubleshooting

  • Cluster never starts – Ensure ray is installed in the packed environment and that your Slurm partition allows the requested CPUs/GPUs. Check the job logs under ${SLURM_DEPLOYMENT_BASE_PATH}/.../run.log.
  • Workers cannot reach the head node – Verify that intra-node networking is allowed on your cluster. Set head_node_address explicitly in RayLauncher if your setup uses custom hostnames.
  • Run-scoped assets reject overrides – Move shared Slurm settings such as nodes, memory, partition, QoS, and GPUs into SlurmRunAllocationConfig. Per-asset overrides must match that shape.
  • Hanging shutdowns – Run-scoped allocations are canceled during Dagster resource teardown. If a local process is interrupted before teardown, use scancel <allocation-job-id> and inspect the allocation log under the remote run base.

Need more? Open an issue with your cluster details so we can extend the launcher helpers.