Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spot] Add spot job name in the SKYPILOT_TASK_ID env var #3424

Merged
merged 25 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions docs/source/examples/spot-jobs.rst
concretevitamin marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ We can launch it with the following:
.. code-block:: yaml

# bert_qa.yaml
name: bert_qa
name: bert-qa

resources:
accelerators: V100:1
Expand Down Expand Up @@ -121,7 +121,7 @@ Below we show an `example <https://github.com/skypilot-org/skypilot/blob/master/
:emphasize-lines: 12-15,41-44

# bert_qa.yaml
name: bert_qa
name: bert-qa

resources:
accelerators: V100:1
Expand Down Expand Up @@ -176,7 +176,7 @@ We also set :code:`--run_name` to :code:`$SKYPILOT_TASK_ID` so that the logs for
to the same run in Weights & Biases.

.. note::
The environment variable :code:`$SKYPILOT_TASK_ID` (example: "sky-2022-10-06-05-17-09-750781_spot_id-22") can be used to identify the same job, i.e., it is kept identical across all
The environment variable :code:`$SKYPILOT_TASK_ID` (example: "sky-managed-2022-10-06-05-17-09-750781_pipeline_eval_8-1") can be used to identify the same job, i.e., it is kept identical across all
recoveries of the job.
It can be accessed in the task's :code:`run` commands or directly in the program itself (e.g., access
via :code:`os.environ` and pass to Weights & Biases for tracking purposes in your training script). It is made available to
Expand Down Expand Up @@ -369,6 +369,12 @@ The above YAML file defines a pipeline with two tasks. The first :code:`name: pi

To submit the pipeline, the same command :code:`sky spot launch` is used. The pipeline will be automatically launched and monitored by SkyPilot. You can check the status of the pipeline with :code:`sky spot queue` or :code:`sky spot dashboard`.

.. note::

The :code:`$SKYPILOT_TASK_ID` environment variable is also available in the :code:`run` section of each task. It is unique for each task in the pipeline.
For example, the :code:`$SKYPILOT_TASK_ID` for the :code:`eval` task above is:
"sky-managed-2022-10-06-05-17-09-750781_pipeline_eval_8-1".

.. code-block:: console

$ sky spot launch -n pipeline pipeline.yaml
Expand Down
24 changes: 19 additions & 5 deletions docs/source/running-jobs/environment-variables.rst
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ Environment variables for ``setup``


.. list-table::
:widths: 20 70 10
:widths: 20 60 10
:header-rows: 1

* - Name
Expand All @@ -108,17 +108,29 @@ Environment variables for ``setup``
* - ``SKYPILOT_SETUP_NODE_IPS``
- A string of IP addresses of the nodes in the cluster with the same order as the node ranks, where each line contains one IP address.
- 1.2.3.4
* - ``SKYPILOT_TASK_ID``
- A unique ID assigned to each task.

This environment variable is available only when the task is submitted
with :code:`sky launch --detach-setup`, or run as a managed spot job.

Refer to the description in the :ref:`environment variables for run <env-vars-for-run>`.
- sky-2023-07-06-21-18-31-563597_myclus_1

For managed spot jobs: sky-managed-2023-07-06-21-18-31-563597_my-job-name_1-0
* - ``SKYPILOT_SERVE_REPLICA_ID``
- The ID of a replica within the service (starting from 1). Available only for a :ref:`service <sky-serve>`'s replica task.
- 1

Since setup commands always run on all nodes of a cluster, SkyPilot ensures both of these environment variables (the ranks and the IP list) never change across multiple setups on the same cluster.

.. _env-vars-for-run:

Environment variables for ``run``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. list-table::
:widths: 20 70 10
:widths: 20 60 10
:header-rows: 1

* - Name
Expand All @@ -136,13 +148,15 @@ Environment variables for ``run``
more :ref:`here <dist-jobs>`.
- 0
* - ``SKYPILOT_TASK_ID``
- A unique ID assigned to each task.
- A unique ID assigned to each task in the format "sky-<timestamp>_<cluster-name>_<task-id>".
Useful for logging purposes: e.g., use a unique output path on the cluster; pass to Weights & Biases; etc.
Each task's logs are stored on the cluster at ``~/sky_logs/${SKYPILOT_TASK_ID%%_*}/tasks/*.log``.

If a task is run as a :ref:`managed spot job <spot-jobs>`, then all
recoveries of that job will have the same ID value. Read more :ref:`here <spot-jobs-end-to-end>`.
- sky-2023-07-06-21-18-31-563597_myclus_id-1
recoveries of that job will have the same ID value. The ID is in the format "sky-managed-<timestamp>_<job-name>(_<task-name>)_<job-id>-<task-id>", where ``<task-name>`` will appear when a pipeline is used, i.e., more than one task in a managed spot job. Read more :ref:`here <spot-jobs-end-to-end>`.
- sky-2023-07-06-21-18-31-563597_myclus_1
Michaelvll marked this conversation as resolved.
Show resolved Hide resolved

For managed spot jobs: sky-managed-2023-07-06-21-18-31-563597_my-job-name_1-0
* - ``SKYPILOT_SERVE_REPLICA_ID``
- The ID of a replica within the service (starting from 1). Available only for a :ref:`service <sky-serve>`'s replica task.
- 1
Expand Down
53 changes: 23 additions & 30 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ def add_gang_scheduling_placement_group_and_setup(
stable_cluster_internal_ips: List[str],
setup_cmd: Optional[str] = None,
setup_log_path: Optional[str] = None,
envs: Optional[Dict[str, str]] = None,
env_vars: Optional[Dict[str, str]] = None,
) -> None:
"""Create the gang scheduling placement group for a Task.

Expand Down Expand Up @@ -415,7 +415,7 @@ def add_gang_scheduling_placement_group_and_setup(
.remote(
setup_cmd,
os.path.expanduser({setup_log_path!r}),
env_vars={envs!r},
env_vars={env_vars!r},
stream_logs=True,
with_ray=True,
) for i in range(total_num_nodes)]
Expand Down Expand Up @@ -484,7 +484,6 @@ def register_run_fn(self, run_fn: str, run_fn_name: str) -> None:
def add_ray_task(self,
bash_script: Optional[str],
task_name: Optional[str],
job_run_id: Optional[str],
ray_resources_dict: Dict[str, float],
log_dir: str,
env_vars: Optional[Dict[str, str]] = None,
Expand Down Expand Up @@ -538,11 +537,6 @@ def add_ray_task(self,
if env_vars is not None:
sky_env_vars_dict_str.extend(f'sky_env_vars_dict[{k!r}] = {v!r}'
for k, v in env_vars.items())
if job_run_id is not None:
sky_env_vars_dict_str += [
f'sky_env_vars_dict[{constants.TASK_ID_ENV_VAR!r}]'
f' = {job_run_id!r}',
]
sky_env_vars_dict_str = '\n'.join(sky_env_vars_dict_str)

options_str = ', '.join(options)
Expand Down Expand Up @@ -4527,6 +4521,20 @@ def get_storage_mounts_metadata(
storage_metadata, sync_on_reconstruction=False)
return storage_mounts

def _get_task_env_vars(self, task: task_lib.Task, job_id: int,
handle: CloudVmRayResourceHandle) -> Dict[str, str]:
"""Returns the environment variables for the task."""
env_vars = task.envs.copy()
# If it is a managed spot job, the TASK_ID_ENV_VAR will have been
# already set by the controller.
if constants.TASK_ID_ENV_VAR not in env_vars:
env_vars[
constants.TASK_ID_ENV_VAR] = common_utils.get_global_job_id(
self.run_timestamp,
cluster_name=handle.cluster_name,
job_id=str(job_id))
return env_vars

def _execute_task_one_node(self, handle: CloudVmRayResourceHandle,
task: task_lib.Task, job_id: int,
detach_run: bool) -> None:
Expand All @@ -4537,6 +4545,8 @@ def _execute_task_one_node(self, handle: CloudVmRayResourceHandle,
internal_ips = handle.internal_ips()
assert internal_ips is not None, 'internal_ips is not cached in handle'

task_env_vars = self._get_task_env_vars(task, job_id, handle)

codegen = RayCodeGen()
codegen.add_prologue(job_id)
codegen.add_gang_scheduling_placement_group_and_setup(
Expand All @@ -4545,28 +4555,19 @@ def _execute_task_one_node(self, handle: CloudVmRayResourceHandle,
stable_cluster_internal_ips=internal_ips,
setup_cmd=self._setup_cmd,
setup_log_path=os.path.join(log_dir, 'setup.log'),
envs=task.envs,
env_vars=task_env_vars,
)

if callable(task.run):
run_fn_code = textwrap.dedent(inspect.getsource(task.run))
run_fn_name = task.run.__name__
codegen.register_run_fn(run_fn_code, run_fn_name)

# If it is a managed spot job, the TASK_ID_ENV_VAR will have been
# already set by the controller.
job_run_id = task.envs.get(
constants.TASK_ID_ENV_VAR,
common_utils.get_global_job_id(self.run_timestamp,
cluster_name=handle.cluster_name,
job_id=str(job_id)))

command_for_node = task.run if isinstance(task.run, str) else None
codegen.add_ray_task(
bash_script=command_for_node,
env_vars=task.envs,
env_vars=task_env_vars,
task_name=task.name,
job_run_id=job_run_id,
ray_resources_dict=backend_utils.get_task_demands_dict(task),
log_dir=log_dir)

Expand All @@ -4593,6 +4594,7 @@ def _execute_task_n_nodes(self, handle: CloudVmRayResourceHandle,

# If TPU VM Pods is used, #num_nodes should be num_nodes * num_node_ips
num_actual_nodes = task.num_nodes * handle.num_ips_per_node
task_env_vars = self._get_task_env_vars(task, job_id, handle)

codegen = RayCodeGen()
codegen.add_prologue(job_id)
Expand All @@ -4602,21 +4604,13 @@ def _execute_task_n_nodes(self, handle: CloudVmRayResourceHandle,
stable_cluster_internal_ips=internal_ips,
setup_cmd=self._setup_cmd,
setup_log_path=os.path.join(log_dir, 'setup.log'),
envs=task.envs)
env_vars=task_env_vars)

if callable(task.run):
run_fn_code = textwrap.dedent(inspect.getsource(task.run))
run_fn_name = task.run.__name__
codegen.register_run_fn(run_fn_code, run_fn_name)

# If it is a managed spot job, the TASK_ID_ENV_VAR will have been
# already set by the controller.
job_run_id = task.envs.get(
constants.TASK_ID_ENV_VAR,
common_utils.get_global_job_id(self.run_timestamp,
cluster_name=handle.cluster_name,
job_id=str(job_id)))

# TODO(zhwu): The resources limitation for multi-node ray.tune and
# horovod should be considered.
for i in range(num_actual_nodes):
Expand All @@ -4626,9 +4620,8 @@ def _execute_task_n_nodes(self, handle: CloudVmRayResourceHandle,
# the corresponding node, represented by private IPs.
codegen.add_ray_task(
bash_script=command_for_node,
env_vars=task.envs,
env_vars=task_env_vars,
task_name=task.name,
job_run_id=job_run_id,
ray_resources_dict=backend_utils.get_task_demands_dict(task),
log_dir=log_dir,
gang_scheduling_id=i)
Expand Down
10 changes: 7 additions & 3 deletions sky/spot/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,18 @@ def __init__(self, job_id: int, dag_yaml: str,
# pylint: disable=line-too-long
# Add a unique identifier to the task environment variables, so that
# the user can have the same id for multiple recoveries.
# Example value: sky-2022-10-04-22-46-52-467694_spot_id-17-1
# Example value: sky-2022-10-04-22-46-52-467694_my-spot-name_spot_id-17-0
job_id_env_vars = []
for i in range(len(self._dag.tasks)):
task_name = self._dag_name
if len(self._dag.tasks) > 1:
task_name = f'{self._dag_name}_{task_name}'
job_id_env_var = common_utils.get_global_job_id(
self._backend.run_timestamp,
'spot',
f'{task_name}',
str(self._job_id),
task_id=i)
task_id=i,
is_managed_job=True)
job_id_env_vars.append(job_id_env_var)

for i, task in enumerate(self._dag.tasks):
Expand Down
8 changes: 6 additions & 2 deletions sky/utils/common_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,16 @@ def cluster_name_in_hint(cluster_name: str, cluster_name_on_cloud: str) -> str:
def get_global_job_id(job_timestamp: str,
cluster_name: Optional[str],
job_id: str,
task_id: Optional[int] = None) -> str:
task_id: Optional[int] = None,
is_managed_job: bool = False) -> str:
"""Returns a unique job run id for each job run.

A job run is defined as the lifetime of a job that has been launched.
"""
global_job_id = f'{job_timestamp}_{cluster_name}_id-{job_id}'
managed_job_str = 'managed-' if is_managed_job else ''
_, sep, timestamp = job_timestamp.partition('sky-')
job_timestamp = f'{sep}{managed_job_str}{timestamp}'
global_job_id = f'{job_timestamp}_{cluster_name}_{job_id}'
if task_id is not None:
global_job_id += f'-{task_id}'
return global_job_id
Expand Down
Loading