Tutorial: Scalable and Distributed ML Workflows with DVC and Ray (Part 1)
Ray + DVC integration example
Training models at the scale of the Gemini or GPT-4 models requires advanced tools that manage complexity while ensuring efficiency. This tutorial explores how Data Version Control (DVC) can be a game-changer for ambitious projects. DVC simplifies AI development by automating pipelines, managing versions, and tracking experiments while embracing GitOps for reproducibility. It excels in both local and cloud environments for traditional ML workflows. However, the rise of Generative AI and complex deep learning projects demands scalable, distributed training solutions.
This tutorial is divided into two parts. Part 1 sets the foundation for scalable and efficient machine learning workflows by leveraging Ray’s distributed computing capabilities and DVC’s data version control.
In Part 2, we extend the solution to a Ray Cluster on AWS, demonstrating how to adapt the setup for cloud-based distributed computing. This involves configuring AWS resources, deploying Ray clusters in the cloud, and running DVC-managed pipelines at scale.
This guide is tailored for ML Engineers and Team Leads in AI projects who aim to speed up training, optimize resources, and ensure reproducibility across distributed environments. I am looking forward to hearing your feedback and improvements! 🙌
We would like to express our gratitude to Andreas Schuh from HeartFlow for his contribution to this solution and for providing ideas and feedback for the blog posts. 🤝
Table Of Contents
- Why DVC and Ray?
- Tutorial Scope
- 👩💻 Installation
- ⭐ Get Started with Ray
- 🏃♂️ Run DVC Pipeline on a Ray Cluster
- 💬 Discuss the Solution Design
- ☝️ Use DVC to run scripts calling Ray API
- ☝️ Persist DVC stage outputs to keep them available for downstream stages in case of failure
- ☝️ Use DVCLive to track live metrics updates with DVC Studio and DVC Extension for VS Code
- ☝️ Propagate DVC environment variables to Worker nodes
- ☝️ Copy the
model.pth
file from the Ray Trial folder to the DVC project repository
- 🎨 Summing Up: DVC + Ray Integration
- References
Why DVC and Ray?
DVC is an open-source tool that brings GitOps and reproducibility to data management, ML experiments, and model development. It connects versioned data sources and code with pipelines, tracks experiments, and registers models — all based on GitOps principles.
Ray is an open-source unified computing framework that makes scaling AI and Python workloads easy — from reinforcement learning to deep learning to tuning and model serving. Ray makes it a breeze to scale your compute-intensive tasks from a single machine to a massive cluster without losing your mind.
DVC and Ray make your ML projects more manageable and prepare them to tackle the challenges of tomorrow’s AI-driven landscape. Let’s explore this dynamic duo and unlock new potentials in your MLOps journey!
💡 Want to learn more about DVC?
Join our online course about DVC: Iterative Tools for Data Scientists & Analysts course!
Tutorial Scope
This tutorial will guide users through creating automated, scalable, and distributed ML pipelines using DVC (Data Version Control) and Ray. We start with configuring the Ray Cluster for local and cloud environments. Then, we discuss the challenges of running DVC in distributed environments. Then, we’ll run a few examples of using DVC and Ray. By the end of the tutorial, you will be able to design, run, and manage ML pipelines distributed over multiple nodes and trackable through version control.
For DVC users, this tutorial offers several advantages:
- Bring Distributed Computing Efficiency to DVC projects
- Easy use of AWS Cloud for Development and Production workflows
- Enable automated pipelines and data versioning in ML projects with Ray
For Ray users, this tutorial aims to highlight the benefits of integrating DVC:
- Enhance Model Training Reproducibility with DVC’s data versioning capabilities
- Streamline ML Pipeline Management through DVC’s structured approach
- Facilitate Efficient Collaboration among teams by leveraging DVC for shared data and model management
High-level solution design
Let’s overview the high-level design of our target solution with DVC and Ray.
- Users can manage Ray Cluster and run DVC pipelines from a “local” environment.
- Ray distributes workloads across multiple workers and can auto-scale cluster nodes.
- During the training, DVCLive logs live updates of metrics and parameters to DVC Studio.
- DVC utilizes S3 to sync states between a Worker and Head nodes.
- DVC uses remote storage (AWS S3) to manage data and model artifacts.
- Users commit the results of the experiment to Git and DVC Remote Storage.
Prerequisites
We expect that you:
- Have some experience with Machine Learning or Data Engineering pipelines
- Are familiar with DVC
To follow this tutorial, you’ll need the following tools:
- Git
- Python 3.11 or above
- AWS CLI (if you want to run pipelines in AWS)
👩 💻 Installation
Creating an ML pipeline that runs distributed tasks is a powerful way to manage and scale your machine learning workflows. With DVC, we can efficiently orchestrate our pipeline stages and handle experiment outputs.
To clone the example repository, you can follow these steps:
git clone https://github.com/iterative/tutorial-mnist-dvc-ray.git
cd tutorial-mnist-dvc-ray
Install Python dependencies:
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
export PYTHONPATH=$PWD
⭐ Get Started with Ray
1 - Overview of the Ray Framework
Ray is a framework for scaling AI and Python applications. For AI and ML applications, Ray helps to scale jobs without needing infrastructure expertise:
- Efficiently parallelize and distribute ML workloads across multiple nodes and GPUs.
- Leverage the ML ecosystem with native and extensible integrations.
Stack of Ray libraries - A Unified Toolkit For ML Workloads (Ray Docs)
In this tutorial, we work with Ray Clusters and Ray AI Libraries (Ray Tune and Ray Train). Ray Cluster is a set of Worker nodes connected to a common Ray Head node.
- The Head node serves as the central coordination point for the Ray cluster. It manages the cluster’s metadata, maintains the cluster state, and handles task scheduling and management.
- Worker nodes are the computational workhorses of the Ray cluster. They are responsible for executing tasks and running computations for applications.
A Ray cluster with two worker nodes. Each node runs Ray helper processes to facilitate distributed scheduling and memory management. The head node runs additional control processes (highlighted in blue). Source: Ray Docs
Ray clusters can be fixed-size or autoscale up and down according to the resources requested by applications running on the cluster.
Ray Tune is a Python Library that automates the hyperparameter tuning process across distributed resources. By integrating Ray Tune into the experiment workflow, we can evaluate numerous hyperparameter combinations in parallel, speeding up the search for optimal model configurations.
Distributed tuning with distributed training per trial. Source: Ray Docs
Ray Train creates a setup to scale model training code from a single machine to a cluster of machines in the cloud and abstracts away the complexities of distributed computing. At a high level of abstraction, it distributes and runs training jobs among worker nodes.
Ray Train Overview. Source: Ray Docs
2 - Start a Ray Cluster
💡 Navigate to the
main
branch in the repository
To start a Ray Cluster, first initiate the Ray head node. The head node is the primary node in the Ray cluster that manages the worker nodes. Since this is a local setup, your machine will act as both the Head and Worker nodes. Use the following command:
ray start --head
This command starts the Ray cluster with your machine acting as the head node.
To monitor and debug Ray, view the dashboard at http://127.0.0.1:8265/.
💡 Multi-node Ray clusters are only supported on Linux. You may deploy Windows and OSX clusters for development by setting the environment variable
RAY_ENABLE_WINDOWS_OR_OSX_CLUSTER=1
. Source: Ray Clusters Overview.
3 - Run a test script on the Ray Cluster
You can run a simple test script to ensure your local Ray cluster works
correctly. In your project directory, create a file named hello_cluster.py
inside the src/test_scripts
directory. Add a script to connect to the Ray
cluster and print a message. Here’s an example script:
import ray
@ray.remote
def hello_world():
return “Hello Ray cluster”
# Automatically connect to the running Ray cluster.
ray.init()
print(ray.get(hello_world.remote()))
Execute the script using Python. Open your terminal and run:
python src/test_scripts/hello_cluster.py
You should see an output similar to this:
2023-11-14 12:11:17,363 INFO worker.py:1489 -- Connecting to existing Ray cluster at address: 192.168.100.19:6379...
2023-11-14 12:11:17,370 INFO worker.py:1664 -- Connected to Ray cluster. View the dashboard at http://127.0.0.1:8265
Hello Ray cluster
This output indicates that your script has successfully connected to the local Ray cluster and executed the print statement.
🏃♂️ Run DVC Pipeline on a Ray Cluster
You have a single-node Ray Cluster at this step on your local machine. Let’s start with the DVC pipeline setup.
Goals for this section:
- Design a Solution for DVC + Ray.
- Create a DVC pipeline with two stages: tune and train.
- Modify DVCLive to sync metrics and parameters with DVC Studio.
1 - Design Solution for DVC + Ray
The technical design calls for a structure where ML experiment scripts, managed by DVC, invoke Ray for their computation needs. DVC is the orchestrator, invoking the appropriate Ray functions for distributed processing.
Design POC Solution for DVC + Ray (local)
This diagram outlines the integration of DVC (Data Version Control) with a Ray cluster for running ML experiments in a distributed manner:
- DVC initiates the process by running a stage script. The
dvc.yaml
pipeline definition is the blueprint for the ML workflow, defining stages that utilize Ray for hyperparameter tuning and subsequent training stages. - Ray Job Submission: The stage script (e.g.,
src/stages/tune.py
) starts a Ray application that submits computation jobs to Ray. Thesrc/stages/tune.py
script utilizes Ray Tune’sTuner
class to define and run the hyperparameter tuning trials. - Ray Cluster contains a single Head Node where the actual computation occurs. (Note: In the production cluster, Ray runs the jobs distributed across multiple worker nodes). Ray saves results for each job (trial) to a local directory in a worker node (outside the DVC project repo).
- After all jobs complete, the stage script retrieves results from Ray’s trial directories to the DVC project repo (if needed).
- DVC manages the outputs of the pipeline, ensuring reproducibility and traceability.
The result is a robust framework for conducting and managing ML experiments that are scalable, reproducible, and efficiently optimized. This framework not only streamlines the experimentation process but also simplifies the transition of models from development to production.
2 - Create a DVC pipeline
In this tutorial, the dvc.yaml
file contains only two stages in the ML
pipeline: tune
and train
.
DVC
pipeline configuration in dvc.yaml
with tune
and train
stages, and plots
sections
Tune Stage
This initial stage is responsible for hyperparameter tuning. It uses Ray to
distribute the computation involved in this process. The stage executes a Python
script tune.py
that optimizes hyperparameters using the Ray Tune. The output
of this stage is best_params.yaml
, which contains the best hyperparameters
found during the tuning process.
tune:
cmd: python src/stages/tune.py --config params.yaml
params:
- tune
outs:
- ${tune.results_dir}/best_params.yaml:
cache: false
persist: true
Use two specific configuration parameters for the best_params.yaml
output:
- Set
cache: false
to instruct DVC not to cache the file but version it with Git. - Set
persist: true
to instruct DVC not to remove the file before reproducing the stage. It’s useful for stage dependencies when you work in an unstable environment (or debugging), and the stage script can fail for any reason. In this example, even if thetune
stage fails, you can run thetrain
stage usingbest_params.yaml
from the previous run.
Train Stage
The Train Stage runs distributed computation via Ray. This stage depends on
best_params.yaml
generated by the tune
stage to access the optimal
hyperparameters for training the model. The train
stage is invoked by the
train.py
script, which will train the model based on the tuned parameters.
train:
cmd: python src/stages/train.py --config params.yaml
params:
- train
deps:
- ${tune.results_dir}/best_params.yaml
outs:
- ${train.results_dir}/model.pth
The trained model is saved as model.pth
, with the path again parameterized to
allow flexibility in the output location. The output model is automatically
cached and versioned with DVC.
3 - Run DVC pipelines on Ray Cluster
To execute your automated and distributed ML pipeline with DVC, perform the following steps:
- Set the PYTHONPATH environment variable to ensure Python scripts can access
modules within your project’s directory by setting the
PYTHONPATH
environment variable. - Run DVC pipeline with
dvc exp run
command.
export PYTHONPATH=$PWD
dvc exp run
This will start the pipeline, running the tune
and train
stages as defined
in your dvc.yaml
file, utilizing distributed computation with Ray.
You may see live updates of metrics and plots in DVC Studio and DVC Extension for VS Code. DVC can generate and render plots based on your project’s data. Metrics and plots logged with DVCLive can be visualized in DVC Studio and DVC Extension for VS Code.
A few benefits of tracking and visualizing metrics and plots with DVC (see docs):
- Enhanced Experiment Tracking: Compare metrics, parameters, version of data, and plots between experiments in a live mode (docs: Visualize and Compare experiments).
- Customize Visualization: Define visualization template, select data to be visualized and titles interactively, before or after the experiment is complete (docs: Defining plots).
- Share & Version Control for Metrics: You can send live metrics and plots to DVC Studio, push completed experiments (including data, models, and code), and convert an experiment into a persistent branch or commit in your Git repo (docs Sharing Experiments).
Experiment tracking with DVC Studio and DVC Extension for VS Code
💡 Note: Sometimes, when you run
dvc exp run
with a local Ray Cluster, the process may get stuck withConnecting to existing Ray cluster at address: 192.168.100.19:6379...
message due to aConnectionError
in Ray. In this case, open a new terminal session, exportPYTHONPATH
, and run thedvc exp run
command there.
💬 Discuss the Solution Design
This section above explains a simple example of running DVC and Ray together. It’s not a production setup. But it’s a good start for developing and debugging the DVC pipeline with Ray.
Let’s think about what decisions we made and discuss some details:
- Use DVC to run scripts calling Ray API.
- Persist DVC stage outputs to keep them available for downstream stages in case of failure.
- Use DVCLive to track metrics only on a worker with a rank of 0.
- Propagate DVC environment variables to a worker node using TorchTrainer
train_loop_config
. - Copy the
model.pth
file from the Ray Trial folder to the DVC project repository.
☝️ Use DVC to run scripts calling Ray API
Ray framework provides a rich Python API for distributed data processing, model tuning, and training. Wrapping Ray scripts into callable Python modules simplifies using DVC. Therefore, you get two benefits:
- Get scalability and distributed training with Ray
- Get reproducibility and versioning with DVC
A template of the dvc.yaml
for DVC + Ray:
stages:
first_stage:
cmd: python first_script_with_ray.py
...
next_stage:
cmd: python second_script_with_ray.py
...
☝️ Persist DVC stage outputs to keep them available for downstream stages in case of failure
Set persist: true
to instruct DVC not to remove the file before reproducing
the stage. It’s useful for stage dependencies when you work in an unstable
environment (or debugging), and the stage script might fail.
stages:
first_stage:
cmd: python first_script_with_ray.py
outs:
- stage_output.file:
persist: true
☝️ Use DVCLive to track live metrics updates with DVC Studio and DVC Extension for VS Code
Ray Train lets you use native experiment tracking libraries inside the train_func function. DVCLive is a highly flexible and lightweight library that simplifies experiment tracking in DVC projects.
from dvclive import Live
with Live() as live:
live.log_metric(metric_name, value)
This solution uses log metrics with Live()
inside
the train_func_per_worker()
function.
One significant distinction between distributed and non-distributed training lies in the parallel execution of multiple processes in distributed training setups, which may yield identical results under specific configurations. When all processes communicate results to the tracking backend, there’s a risk of receiving duplicate entries (check Ray docs for details).
Therefore, a few adjustments should be made to DVCLive.
- Use DVCLive to track metrics only on a worker with a rank of 0.
- Use the
DVC_ROOT
variable to create theLive(dir=...)
object. DVC automatically sets the value for theDVC_ROOT
variable to the directory of your DVC repository and ensures Ray writes metrics inside the repo (docs).
As a result, the DVCLive usage code inside the train_func_per_worker()
function looks like the example below.
# train.py
def train_func_per_worker(config: Dict):
# Initialize DVC Live
live = None
rank = ray.train.get_context().get_world_rank()
# Create a Live object on the rank 0 worker
if rank == 0:
live = Live(
dir=os.path.join(os.environ.get("DVC_ROOT",""), "results/dvclive"),
)
for epoch in range(epochs):
# ...epoch training
# Log metrics with print()
if live:
live.log_metric("loss", test_loss)
live.log_metric("accuracy", accuracy)
live.next_step()
Utilizing DVCLive in Python code for logging metrics and plots automatically generates the necessary configurations for plots within the dvc.yaml file. Below is an example configuration for metrics and plots:
metrics:
- results/dvclive/metrics.json
plots:
- accuracy:
x: step
y:
results/dvclive/plots/metrics/accuracy.tsv: accuracy
title: Accuracy
x_label: Step
y_label: Accuracy
- loss:
template: simple
x: step
y:
results/dvclive/plots/metrics/loss.tsv: loss
title: Loss
x_label: Step
y_label: Accuracy
- results/tune/plots/images
The train stage logs metrics and plots to results/dvclive. Datapoints for
metrics and plots are saved in files and visualized later in DVC Studio and VS
Code.
Metrics
plot generated by the tune
stage
The tune stage logs a mean_accuracy_plot.png file to visualize metrics for tuning trials.
Metrics plot generated by the tune
stage
☝️ Propagate DVC environment variables to Worker nodes
DVC environment variables are necessary for every Ray worker because they provide essential information and configurations for DVCLive, facilitating experiment tracking. These variables include:
- DVC_STUDIO_REPO_URL: Repository URL where DVC stores versioned data.
- DVC_STUDIO_TOKEN: Authentication token for secure access to DVC Studio.
- DVC_STUDIO_URL: Web interface URL for managing DVC projects.
- DVC_EXP_BASELINE_REV: Baseline revision for comparing experiment results.
- DVC_EXP_NAME: Descriptive identifier for the experiment.
- DVC_ROOT: Root directory of the DVC project on the filesystem.
💡 Note: All environment variables above are set by DVC automatically when running a pipeline.
You don’t need to care about DVC environment variables when running DVC in a non-distributed environment. However, running it in Ray Cluster requires setting up on every worker. In this solution, DVC environment variables are passed via RuntimeEnv to specify a runtime environment for the whole job.
Set up DVC Environment Variables
The code snippet below demonstrates an approach to managing DVC environment variables within a TorchTrainer setup.
def train_func_per_worker(config: Dict):
#...
if rank == 0:
live = Live(
dir=os.path.join(os.environ.get("DVC_ROOT",""), "results/dvclive"),
)
def train(params: dict) -> None:
#...
trainer = TorchTrainer(
train_loop_per_worker=train_func_per_worker,
train_loop_config=train_config,
...
)
if __name__ == "__main__":
#...
# [1] Propogate DVC environment variables from Head Node to Workers
# =============================================
DVC_ENV_VARS = {k: v for k, v in os.environ.items() if k.startswith("DVC")}
ray.init(runtime_env=RuntimeEnv(env_vars=DVC_ENV_VARS))
train(params)
- To ensure that DVC environment variables are accessible within the training
loop across all worker nodes,
RuntimeEnv
propagates these variables from the head node to the workers.
☝️ Copy the model.pth
file from the Ray Trial folder to the DVC project repository
Upon completing the training process, the model.pth
file is saved in the Ray
Trial folder. Therefore, it’s copied to the DVC project repository (as shown in
the code example above).
This ensures that the trained model file is appropriately stored within the DVC-managed project structure, facilitating version control and reproducibility.
🎨 Summing Up: DVC + Ray Integration
The DVC + Ray integration presents a comprehensive solution to the challenges of running machine learning experiments at scale. By addressing specific issues related to auto-scaling, execution optimization, live metrics tracking, and data synchronization, this setup ensures that machine learning teams can focus on innovation and experimentation backed by a robust, scalable, and efficient infrastructure.
In Part 1 of the tutorial, we explored the basics of setting up and integrating DVC with Ray for distributed machine learning workflows. We covered the following key topics:
- Introduction to Ray: We discussed Ray’s capabilities for scaling AI and Python applications, focusing on its ability to parallelize and distribute ML workloads across multiple nodes easily.
- Ray Clusters: The architecture of Ray clusters was explained, highlighting the roles of head and worker nodes in managing and executing tasks.
- Ray Tune and Ray Train: We introduced Ray Tune for hyperparameter optimization and Ray Train for scaling model training code, emphasizing their integration into ML workflows.
- Local Ray Cluster Setup: Step-by-step instructions were provided for starting a Ray Cluster locally, showcasing how to test the setup with a simple script.
Key Takeaways
The key takeaway from Part 1 is the foundation it sets for scalable and efficient machine learning workflows. By leveraging Ray’s distributed computing capabilities and DVC’s data version control, we establish a robust framework for managing complex ML experiments. This combination enhances scalability, reproducibility, and collaboration in ML projects.
Looking Ahead to Part 2
In Part 2 of the tutorial, we will extend the solution to a Ray Cluster on AWS, demonstrating how to adapt the setup for cloud-based distributed computing. This will involve configuring AWS resources, deploying Ray clusters in the cloud, and running DVC-managed pipelines at scale. The focus will shift towards managing the increased complexity and leveraging cloud infrastructure to maximize the efficiency and performance of ML experiments.
Stay tuned for detailed instructions on deploying and managing cloud-based Ray clusters with DVC as we take the scalability and efficiency of ML workflows to the next level.
💡 Did you find this tutorial interesting? Please leave your comments and share your experience with DVC and Ray! Join us on Discord 🙌