diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 8735b012d36..a8bd87da0c9 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -1650,7 +1650,9 @@ def _update_cluster_status_no_lock( backend.set_autostop(handle, -1, stream_logs=False) except (Exception, SystemExit): # pylint: disable=broad-except logger.debug('Failed to reset autostop.') - global_user_state.set_cluster_autostop_value(handle.cluster_name, -1) + global_user_state.set_cluster_autostop_value(handle.cluster_name, + -1, + to_down=False) # If the user starts part of a STOPPED cluster, we still need a status to # represent the abnormal status. For spot cluster, it can also represent diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index efbca949d12..fc22bdf91a6 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -1118,6 +1118,7 @@ def ray_up(): # different order from directly running in the console. The # `--log-style` and `--log-color` flags do not work. To reproduce, # `ray up --log-style pretty --log-color true | tee tmp.out`. + returncode, stdout, stderr = log_lib.run_with_log( # NOTE: --no-restart solves the following bug. Without it, if # 'ray up' (sky launch) twice on a cluster with >1 node, the @@ -1135,7 +1136,15 @@ def ray_up(): line_processor=log_utils.RayUpLineProcessor(), # Reduce BOTO_MAX_RETRIES from 12 to 5 to avoid long hanging # time during 'ray up' if insufficient capacity occurs. - env=dict(os.environ, BOTO_MAX_RETRIES='5'), + env=dict( + os.environ, + BOTO_MAX_RETRIES='5', + # Use environment variables to disable the ray usage stats + # (to avoid the 10 second wait for usage collection + # confirmation), as the ray version on the user's machine + # may be lower version that does not support the + # `--disable-usage-stats` flag. + RAY_USAGE_STATS_ENABLED='0'), require_outputs=True, # Disable stdin to avoid ray outputs mess up the terminal with # misaligned output when multithreading/multiprocessing are used @@ -1335,10 +1344,16 @@ def _ensure_cluster_ray_started(self, 'of the local cluster. Check if ray[default]==1.13.0 ' 'is installed or running correctly.') backend.run_on_head(handle, 'ray stop', use_cached_head_ip=False) + log_lib.run_with_log( ['ray', 'up', '-y', '--restart-only', handle.cluster_yaml], log_abs_path, stream_logs=False, + # Use environment variables to disable the ray usage collection + # (avoid the 10 second wait for usage collection confirmation), + # as the ray version on the user's machine may be lower version + # that does not support the `--disable-usage-stats` flag. + env=dict(os.environ, RAY_USAGE_STATS_ENABLED='0'), # Disable stdin to avoid ray outputs mess up the terminal with # misaligned output when multithreading/multiprocessing is used. # Refer to: https://github.com/ray-project/ray/blob/d462172be7c5779abf37609aed08af112a533e1e/python/ray/autoscaler/_private/subprocess_output_util.py#L264 # pylint: disable=line-too-long @@ -2608,10 +2623,11 @@ def post_teardown_cleanup(self, def set_autostop(self, handle: ResourceHandle, idle_minutes_to_autostop: Optional[int], + down: bool = False, stream_logs: bool = True) -> None: if idle_minutes_to_autostop is not None: code = autostop_lib.AutostopCodeGen.set_autostop( - idle_minutes_to_autostop, self.NAME) + idle_minutes_to_autostop, self.NAME, down) returncode, _, stderr = self.run_on_head(handle, code, require_outputs=True, @@ -2622,7 +2638,7 @@ def set_autostop(self, stderr=stderr, stream_logs=stream_logs) global_user_state.set_cluster_autostop_value( - handle.cluster_name, idle_minutes_to_autostop) + handle.cluster_name, idle_minutes_to_autostop, down) # TODO(zhwu): Refactor this to a CommandRunner class, so different backends # can support its own command runner. diff --git a/sky/cli.py b/sky/cli.py index 8fe8076e0d6..da458db8172 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -604,6 +604,7 @@ def _launch_with_confirm( detach_run: bool, no_confirm: bool = False, idle_minutes_to_autostop: Optional[int] = None, + down: bool = False, # pylint: disable=redefined-outer-name retry_until_up: bool = False, no_setup: bool = False, node_type: Optional[str] = None, @@ -654,6 +655,7 @@ def _launch_with_confirm( detach_run=detach_run, backend=backend, idle_minutes_to_autostop=idle_minutes_to_autostop, + down=down, retry_until_up=retry_until_up, no_setup=no_setup, ) @@ -991,7 +993,19 @@ def cli(): 'job queue. ' 'Setting this flag is equivalent to ' 'running ``sky launch -d ...`` and then ``sky autostop -i ``' - '. If not set, the cluster will not be auto-stopped.')) + '. If not set, the cluster will not be autostopped.')) +@click.option( + '--down', + default=False, + is_flag=True, + required=False, + help= + ('Tear down the cluster after all jobs finish (successfully or ' + 'abnormally). If --idle-minutes-to-autostop is also set, the cluster will ' + 'be torn down after the specified idle time. ' + 'Note that if errors occur during provisioning/data syncing/setting up, ' + 'the cluster will not be torn down for debugging purposes.'), +) @click.option( '--retry-until-up', '-r', @@ -1033,6 +1047,7 @@ def launch( env: List[Dict[str, str]], disk_size: Optional[int], idle_minutes_to_autostop: Optional[int], + down: bool, # pylint: disable=redefined-outer-name retry_until_up: bool, yes: bool, no_setup: bool, @@ -1083,6 +1098,7 @@ def launch( detach_run=detach_run, no_confirm=yes, idle_minutes_to_autostop=idle_minutes_to_autostop, + down=down, retry_until_up=retry_until_up, no_setup=no_setup, is_local_cloud=onprem_utils.check_if_local_cloud(cluster)) @@ -1247,6 +1263,10 @@ def status(all: bool, refresh: bool): # pylint: disable=redefined-builtin - STOPPED: The cluster is stopped and the storage is persisted. Use ``sky start`` to restart the cluster. + The autostop column indicates how long the cluster will be autostopped + after minutes of idling (no jobs running). If the time is followed by + '(down)', e.g. '1m (down)', the cluster will be autodowned, rather than + autostopped. """ cluster_records = core.status(all=all, refresh=refresh) local_clusters = onprem_utils.check_and_get_local_clusters( @@ -1431,7 +1451,7 @@ def cancel(cluster: str, all: bool, jobs: List[int]): # pylint: disable=redefin '-a', default=None, is_flag=True, - help='Tear down all existing clusters.') + help='Stop all existing clusters.') @click.option('--yes', '-y', is_flag=True, @@ -1472,10 +1492,10 @@ def stop( sky stop -a """ - _terminate_or_stop_clusters(clusters, - apply_to_all=all, - terminate=False, - no_confirm=yes) + _down_or_stop_clusters(clusters, + apply_to_all=all, + down=False, + no_confirm=yes) @cli.command(cls=_DocumentedCodeCommand) @@ -1493,12 +1513,21 @@ def stop( type=int, default=None, required=False, - help='Set the idle minutes before auto-stopping the cluster.') -@click.option('--cancel', - default=False, - is_flag=True, - required=False, - help='Cancel the auto-stopping.') + help='Set the idle minutes before autostopping the cluster.') +@click.option( + '--cancel', + default=False, + is_flag=True, + required=False, + help='Cancel the currently active auto{stop,down} setting for the ' + 'cluster.') +@click.option( + '--down', + default=False, + is_flag=True, + required=False, + help='Use autodown (tear down the cluster; non-restartable), instead ' + 'of autostop (restartable).') @click.option('--yes', '-y', is_flag=True, @@ -1511,15 +1540,21 @@ def autostop( all: Optional[bool], # pylint: disable=redefined-builtin idle_minutes: Optional[int], cancel: bool, # pylint: disable=redefined-outer-name + down: bool, # pylint: disable=redefined-outer-name yes: bool, ): - """Schedule or cancel auto-stopping for cluster(s). + """Schedule or cancel an autostop or autodown for cluster(s). CLUSTERS are the name (or glob pattern) of the clusters to stop. If both CLUSTERS and ``--all`` are supplied, the latter takes precedence. + If --down is passed, autodown (tear down the cluster; non-restartable) is + used, rather than autostop (restartable). + ``--idle-minutes`` is the number of minutes of idleness (no pending/running jobs) after which the cluster will be stopped automatically. + Scheduling autostop twice on the same cluster will overwrite the previous + autostop schedule. ``--cancel`` will cancel the autostopping. If the cluster was not scheduled autostop, this will do nothing to autostop. @@ -1527,6 +1562,9 @@ def autostop( If ``--idle-minutes`` and ``--cancel`` are not specified, default to 5 minutes. + When multiple configurations are specified for the same cluster, e.g. using + ``sky autostop`` or ``sky launch -i``, the last one takes precedence. + Examples: .. code-block:: bash @@ -1546,11 +1584,11 @@ def autostop( idle_minutes = -1 elif idle_minutes is None: idle_minutes = 5 - _terminate_or_stop_clusters(clusters, - apply_to_all=all, - terminate=False, - no_confirm=yes, - idle_minutes_to_autostop=idle_minutes) + _down_or_stop_clusters(clusters, + apply_to_all=all, + down=down, + no_confirm=yes, + idle_minutes_to_autostop=idle_minutes) @cli.command(cls=_DocumentedCodeCommand) @@ -1583,7 +1621,17 @@ def autostop( 'job queue. ' 'Setting this flag is equivalent to ' 'running ``sky launch -d ...`` and then ``sky autostop -i ``' - '. If not set, the cluster will not be auto-stopped.')) + '. If not set, the cluster will not be autostopped.')) +@click.option( + '--down', + default=False, + is_flag=True, + required=False, + help= + ('Tear down the cluster after all jobs finish (successfully or ' + 'abnormally). If --idle-minutes-to-autostop is also set, the cluster will ' + 'be torn down after the specified idle time.'), +) @click.option( '--retry-until-up', '-r', @@ -1594,8 +1642,13 @@ def autostop( 'if we fail to start the cluster due to unavailability errors.')) @usage_lib.entrypoint # pylint: disable=redefined-builtin -def start(clusters: Tuple[str], all: bool, yes: bool, - idle_minutes_to_autostop: int, retry_until_up: bool): +def start( + clusters: Tuple[str], + all: bool, + yes: bool, + idle_minutes_to_autostop: Optional[int], + down: bool, # pylint: disable=redefined-outer-name + retry_until_up: bool): """Restart cluster(s). If a cluster is previously stopped (status is STOPPED) or failed in @@ -1623,6 +1676,9 @@ def start(clusters: Tuple[str], all: bool, yes: bool, sky start -a """ + if down and idle_minutes_to_autostop is None: + raise click.UsageError( + '--idle-minutes-to-autostop must be set if --down is set.') to_start = [] if not clusters and not all: @@ -1711,7 +1767,10 @@ def start(clusters: Tuple[str], all: bool, yes: bool, for name in to_start: try: - core.start(name, idle_minutes_to_autostop, retry_until_up) + core.start(name, + idle_minutes_to_autostop, + retry_until_up, + down=down) except exceptions.NotSupportedError as e: click.echo(str(e)) click.secho(f'Cluster {name} started.', fg='green') @@ -1752,7 +1811,7 @@ def down( CLUSTER is the name of the cluster (or glob pattern) to tear down. If both CLUSTER and ``--all`` are supplied, the latter takes precedence. - Terminating a cluster will delete all associated resources (all billing + Tearing down a cluster will delete all associated resources (all billing stops), and any data on the attached disks will be lost. For local clusters, `sky down` does not terminate the local cluster, but instead removes the cluster from `sky status` and terminates the calling user's running jobs. @@ -1776,38 +1835,40 @@ def down( sky down -a """ - _terminate_or_stop_clusters(clusters, - apply_to_all=all, - terminate=True, - no_confirm=yes, - purge=purge) + _down_or_stop_clusters(clusters, + apply_to_all=all, + down=True, + no_confirm=yes, + purge=purge) -def _terminate_or_stop_clusters( +def _down_or_stop_clusters( names: Tuple[str], apply_to_all: Optional[bool], - terminate: bool, + down: bool, # pylint: disable=redefined-outer-name no_confirm: bool, purge: bool = False, idle_minutes_to_autostop: Optional[int] = None) -> None: - """Terminates or (auto-)stops a cluster (or all clusters). + """Tears down or (auto-)stops a cluster (or all clusters). Reserved clusters (spot controller) can only be terminated if the cluster name is explicitly and uniquely specified (not via glob) and purge is set to True. """ - assert idle_minutes_to_autostop is None or not terminate, ( - idle_minutes_to_autostop, terminate) - command = 'down' if terminate else 'stop' + command = 'down' if down else 'stop' if not names and apply_to_all is None: raise click.UsageError( f'sky {command} requires either a cluster name (see `sky status`) ' 'or --all.') - operation = 'Terminating' if terminate else 'Stopping' + operation = 'Terminating' if down else 'Stopping' if idle_minutes_to_autostop is not None: - verb = 'Scheduling' if idle_minutes_to_autostop >= 0 else 'Cancelling' - operation = f'{verb} auto-stop on' + is_cancel = idle_minutes_to_autostop < 0 + verb = 'Cancelling' if is_cancel else 'Scheduling' + option_str = 'down' if down else 'stop' + if is_cancel: + option_str = '{stop,down}' + operation = f'{verb} auto{option_str} on' if len(names) > 0: reserved_clusters = [ @@ -1819,7 +1880,7 @@ def _terminate_or_stop_clusters( name for name in _get_glob_clusters(names) if name not in backend_utils.SKY_RESERVED_CLUSTER_NAMES ] - if not terminate: + if not down: local_clusters = onprem_utils.check_and_get_local_clusters() # Local clusters are allowed to `sky down`, but not # `sky start/stop`. `sky down` unregisters the local cluster @@ -1836,7 +1897,7 @@ def _terminate_or_stop_clusters( if not purge: msg = (f'{operation} reserved cluster(s) ' f'{reserved_clusters_str} is not supported.') - if terminate: + if down: msg += ( '\nPlease specify --purge (-p) to force-terminate the ' 'reserved cluster(s).') @@ -1898,11 +1959,11 @@ def _terminate_or_stop_clusters( f'[bold cyan]{operation} {len(clusters)} cluster{plural}[/]', total=len(clusters)) - def _terminate_or_stop(name: str): + def _down_or_stop(name: str): success_progress = False if idle_minutes_to_autostop is not None: try: - core.autostop(name, idle_minutes_to_autostop) + core.autostop(name, idle_minutes_to_autostop, down) except (exceptions.NotSupportedError, exceptions.ClusterNotUpError) as e: message = str(e) @@ -1920,7 +1981,7 @@ def _terminate_or_stop(name: str): f'{colorama.Style.RESET_ALL}') else: try: - if terminate: + if down: core.down(name, purge=purge) else: core.stop(name, purge=purge) @@ -1935,7 +1996,7 @@ def _terminate_or_stop(name: str): message = ( f'{colorama.Fore.GREEN}{operation} cluster {name}...done.' f'{colorama.Style.RESET_ALL}') - if not terminate: + if not down: message += ('\n To restart the cluster, run: ' f'{colorama.Style.BRIGHT}sky start {name}' f'{colorama.Style.RESET_ALL}') @@ -1947,7 +2008,7 @@ def _terminate_or_stop(name: str): progress.start() with progress: - subprocess_utils.run_in_parallel(_terminate_or_stop, clusters) + subprocess_utils.run_in_parallel(_down_or_stop, clusters) progress.live.transient = False # Make sure the progress bar not mess up the terminal. progress.refresh() @@ -2787,7 +2848,7 @@ def bench(): help=('Automatically stop the cluster after this many minutes ' 'of idleness after setup/file_mounts. This is equivalent to ' 'running `sky launch -d ...` and then `sky autostop -i `. ' - 'If not set, the cluster will not be auto-stopped.')) + 'If not set, the cluster will not be autostopped.')) @click.option('--yes', '-y', is_flag=True, @@ -3180,7 +3241,7 @@ def benchmark_down( clusters_to_exclude: List[str], yes: bool, ) -> None: - """Terminate all clusters belonging to a benchmark.""" + """Tear down all clusters belonging to a benchmark.""" record = benchmark_state.get_benchmark_from_name(benchmark) if record is None: raise click.BadParameter(f'Benchmark {benchmark} does not exist.') @@ -3194,10 +3255,10 @@ def benchmark_down( continue to_stop.append(cluster) - _terminate_or_stop_clusters(to_stop, - apply_to_all=False, - terminate=True, - no_confirm=yes) + _down_or_stop_clusters(to_stop, + apply_to_all=False, + down=True, + no_confirm=yes) @bench.command('delete', cls=_DocumentedCodeCommand) @@ -3206,7 +3267,7 @@ def benchmark_down( '-a', default=None, is_flag=True, - help='Tear down all existing clusters.') + help='Delete all benchmark reports from the history.') @click.option('--yes', '-y', is_flag=True, diff --git a/sky/core.py b/sky/core.py index df64637df33..15d20fbaf6d 100644 --- a/sky/core.py +++ b/sky/core.py @@ -45,6 +45,8 @@ def status(all: bool, refresh: bool) -> List[Dict[str, Any]]: 'last_use': (int) timestamp of last use, 'status': (sky.ClusterStatus) cluster status, 'autostop': (int) idle time before autostop, + 'to_down': (bool) whether autodown is used instead of + autostop, 'metadata': (dict) metadata of the cluster, } ] @@ -54,9 +56,12 @@ def status(all: bool, refresh: bool) -> List[Dict[str, Any]]: return cluster_records -def _start(cluster_name: str, - idle_minutes_to_autostop: Optional[int] = None, - retry_until_up: bool = False) -> backends.Backend.ResourceHandle: +def _start( + cluster_name: str, + idle_minutes_to_autostop: Optional[int] = None, + retry_until_up: bool = False, + down: bool = False, # pylint: disable=redefined-outer-name +) -> backends.Backend.ResourceHandle: cluster_status, handle = backend_utils.refresh_cluster_status_handle( cluster_name) @@ -89,14 +94,17 @@ def _start(cluster_name: str, cluster_name=cluster_name, retry_until_up=retry_until_up) if idle_minutes_to_autostop is not None: - backend.set_autostop(handle, idle_minutes_to_autostop) + backend.set_autostop(handle, idle_minutes_to_autostop, down=down) return handle @usage_lib.entrypoint -def start(cluster_name: str, - idle_minutes_to_autostop: Optional[int] = None, - retry_until_up: bool = False): +def start( + cluster_name: str, + idle_minutes_to_autostop: Optional[int] = None, + retry_until_up: bool = False, + down: bool = False, # pylint: disable=redefined-outer-name +): """Start the cluster. Please refer to the sky.cli.start for the document. @@ -105,7 +113,7 @@ def start(cluster_name: str, ValueError: cluster does not exist. sky.exceptions.NotSupportedError: the cluster is not supported. """ - _start(cluster_name, idle_minutes_to_autostop, retry_until_up) + _start(cluster_name, idle_minutes_to_autostop, retry_until_up, down) @usage_lib.entrypoint @@ -173,7 +181,11 @@ def down(cluster_name: str, purge: bool = False): @usage_lib.entrypoint -def autostop(cluster_name: str, idle_minutes_to_autostop: int): +def autostop( + cluster_name: str, + idle_minutes_to_autostop: int, + down: bool = False, # pylint: disable=redefined-outer-name +): """Set the autostop time of the cluster. Please refer to the sky.cli.autostop for the document. @@ -186,8 +198,12 @@ def autostop(cluster_name: str, idle_minutes_to_autostop: int): sky.exceptions.NotSupportedError: the cluster is not supported. sky.exceptions.ClusterNotUpError: the cluster is not UP. """ - verb = 'Scheduling' if idle_minutes_to_autostop >= 0 else 'Cancelling' - operation = f'{verb} auto-stop on' + is_cancel = idle_minutes_to_autostop < 0 + verb = 'Cancelling' if is_cancel else 'Scheduling' + option_str = 'down' if down else 'stop' + if is_cancel: + option_str = '{stop,down}' + operation = f'{verb} auto{option_str} on' if cluster_name in backend_utils.SKY_RESERVED_CLUSTER_NAMES: raise exceptions.NotSupportedError( f'{operation} sky reserved cluster {cluster_name!r} ' @@ -209,7 +225,7 @@ def autostop(cluster_name: str, idle_minutes_to_autostop: int): raise exceptions.NotSupportedError( f'{colorama.Fore.YELLOW}{operation} cluster ' f'{cluster_name!r}... skipped{colorama.Style.RESET_ALL}' - '\n Auto-stopping is only supported by backend: ' + f'\n auto{option_str} is only supported by backend: ' f'{backends.CloudVmRayBackend.NAME}') if cluster_status != global_user_state.ClusterStatus.UP: with ux_utils.print_exception_no_traceback(): @@ -217,10 +233,10 @@ def autostop(cluster_name: str, idle_minutes_to_autostop: int): f'{colorama.Fore.YELLOW}{operation} cluster ' f'{cluster_name!r} (status: {cluster_status.value})... skipped' f'{colorama.Style.RESET_ALL}' - '\n Auto-stop can only be set/unset for ' + f'\n auto{option_str} can only be set/unset for ' f'{global_user_state.ClusterStatus.UP.value} clusters.') usage_lib.record_cluster_name_for_current_operation(cluster_name) - backend.set_autostop(handle, idle_minutes_to_autostop) + backend.set_autostop(handle, idle_minutes_to_autostop, down) # ================== diff --git a/sky/execution.py b/sky/execution.py index bad89e0f900..412aac0a0d2 100644 --- a/sky/execution.py +++ b/sky/execution.py @@ -43,7 +43,6 @@ logger = sky_logging.init_logger(__name__) OptimizeTarget = optimizer.OptimizeTarget -_MAX_SPOT_JOB_LENGTH = 10 # Message thrown when APIs sky.{exec,launch,spot_launch}() received a string # instead of a Dag. CLI (cli.py) is implemented by us so should not trigger @@ -93,13 +92,13 @@ class Stage(enum.Enum): SETUP = enum.auto() PRE_EXEC = enum.auto() EXEC = enum.auto() - TEARDOWN = enum.auto() + DOWN = enum.auto() def _execute( dag: sky.Dag, dryrun: bool = False, - teardown: bool = False, + down: bool = False, stream_logs: bool = True, handle: Any = None, backend: Optional[backends.Backend] = None, @@ -120,8 +119,11 @@ def _execute( dag: sky.Dag. dryrun: bool; if True, only print the provision info (e.g., cluster yaml). - teardown: bool; whether to teardown the launched resources after - execution. + down: bool; whether to tear down the launched resources after all jobs + finish (successfully or abnormally). If idle_minutes_to_autostop is + also set, the cluster will be torn down after the specified idle time. + Note that if errors occur during provisioning/data syncing/setting up, + the cluster will not be torn down for debugging purposes. stream_logs: bool; whether to stream all tasks' outputs to the client. handle: Any; if provided, execution will use an existing backend cluster handle instead of provisioning a new one. @@ -137,7 +139,7 @@ def _execute( cluster_name: Name of the cluster to create/reuse. If None, auto-generate a name. detach_run: bool; whether to detach the process after the job submitted. - autostop_idle_minutes: int; if provided, the cluster will be set to + idle_minutes_to_autostop: int; if provided, the cluster will be set to autostop after this many minutes of idleness. no_setup: bool; whether to skip setup commands or not when (re-)launching. """ @@ -157,16 +159,36 @@ def _execute( cluster_name) cluster_exists = existing_handle is not None + stages = stages if stages is not None else list(Stage) + backend = backend if backend is not None else backends.CloudVmRayBackend() - if not isinstance(backend, backends.CloudVmRayBackend - ) and idle_minutes_to_autostop is not None: + if isinstance(backend, backends.CloudVmRayBackend): + if down and idle_minutes_to_autostop is None: + # Use auto{stop,down} to terminate the cluster after the task is + # done. + idle_minutes_to_autostop = 0 + if idle_minutes_to_autostop is not None: + if idle_minutes_to_autostop == 0: + # idle_minutes_to_autostop=0 can cause the following problem: + # After we set the autostop in the PRE_EXEC stage with -i 0, + # it could be possible that the cluster immediately found + # itself have no task running and start the auto{stop,down} + # process, before the task is submitted in the EXEC stage. + verb = 'torn down' if down else 'stopped' + logger.info(f'{colorama.Fore.LIGHTBLACK_EX}The cluster will ' + f'be {verb} after 1 minutes of idleness ' + '(after all jobs finish).' + f'{colorama.Style.RESET_ALL}') + idle_minutes_to_autostop = 1 + stages.remove(Stage.DOWN) + elif idle_minutes_to_autostop is not None: # TODO(zhwu): Autostop is not supported for non-CloudVmRayBackend. with ux_utils.print_exception_no_traceback(): raise ValueError( f'Backend {backend.NAME} does not support autostop, please try ' f'{backends.CloudVmRayBackend.NAME}') - if not cluster_exists and (stages is None or Stage.OPTIMIZE in stages): + if not cluster_exists and Stage.OPTIMIZE in stages: if task.best_resources is None: # TODO: fix this for the situation where number of requested # accelerators is not an integer. @@ -186,7 +208,7 @@ def _execute( task.sync_storage_mounts() try: - if stages is None or Stage.PROVISION in stages: + if Stage.PROVISION in stages: if handle is None: handle = backend.provision(task, task.best_resources, @@ -199,33 +221,35 @@ def _execute( logger.info('Dry run finished.') return - if stages is None or Stage.SYNC_WORKDIR in stages: + if Stage.SYNC_WORKDIR in stages: if task.workdir is not None: backend.sync_workdir(handle, task.workdir) - if stages is None or Stage.SYNC_FILE_MOUNTS in stages: + if Stage.SYNC_FILE_MOUNTS in stages: backend.sync_file_mounts(handle, task.file_mounts, task.storage_mounts) if no_setup: logger.info('Setup commands skipped.') - elif stages is None or Stage.SETUP in stages: + elif Stage.SETUP in stages: backend.setup(handle, task) - if stages is None or Stage.PRE_EXEC in stages: + if Stage.PRE_EXEC in stages: if idle_minutes_to_autostop is not None: - backend.set_autostop(handle, idle_minutes_to_autostop) + backend.set_autostop(handle, + idle_minutes_to_autostop, + down=down) - if stages is None or Stage.EXEC in stages: + if Stage.EXEC in stages: try: global_user_state.update_last_use(handle.get_cluster_name()) backend.execute(handle, task, detach_run) finally: # Enables post_execute() to be run after KeyboardInterrupt. - backend.post_execute(handle, teardown) + backend.post_execute(handle, down) - if stages is None or Stage.TEARDOWN in stages: - if teardown: + if Stage.DOWN in stages: + if down and idle_minutes_to_autostop is None: backend.teardown_ephemeral_storage(task) backend.teardown(handle, terminate=True) finally: @@ -253,7 +277,7 @@ def launch( retry_until_up: bool = False, idle_minutes_to_autostop: Optional[int] = None, dryrun: bool = False, - teardown: bool = False, + down: bool = False, stream_logs: bool = True, backend: Optional[backends.Backend] = None, optimize_target: OptimizeTarget = OptimizeTarget.COST, @@ -268,8 +292,27 @@ def launch( auto-generate a name. retry_until_up: whether to retry launching the cluster until it is up. - idle_minutes_to_autostop: if provided, the cluster will be auto-stop - after this many minutes of idleness. + idle_minutes_to_autostop: automatically stop the cluster after this + many minute of idleness, i.e., no running or pending jobs in the + cluster's job queue. Idleness starts counting after + setup/file_mounts are done; the clock gets reset whenever there + are running/pending jobs in the job queue. Setting this flag is + equivalent to running ``sky.launch(..., detach_run=True, ...)`` + and then ``sky.autostop(idle_minutes=)``. If not set, + the cluster will not be autostopped. + down: Tear down the cluster after all jobs finish (successfully or + abnormally). If --idle-minutes-to-autostop is also set, the + cluster will be torn down after the specified idle time. + Note that if errors occur during provisioning/data syncing/setting + up, the cluster will not be torn down for debugging purposes. + dryrun: if True, do not actually launch the cluster. + stream_logs: if True, show the logs in the terminal. + backend: backend to use. If None, use the default backend + (CloudVMRayBackend). + optimize_target: target to optimize for. Choices: OptimizeTarget.COST, + OptimizeTarget.TIME. + detach_run: If True, run setup first (blocking), then detach from the + job's execution. no_setup: if true, the cluster will not re-run setup instructions Examples: @@ -290,7 +333,7 @@ def launch( _execute( dag=dag, dryrun=dryrun, - teardown=teardown, + down=down, stream_logs=stream_logs, handle=None, backend=backend, @@ -308,7 +351,7 @@ def exec( # pylint: disable=redefined-builtin dag: sky.Dag, cluster_name: str, dryrun: bool = False, - teardown: bool = False, + down: bool = False, stream_logs: bool = True, backend: Optional[backends.Backend] = None, optimize_target: OptimizeTarget = OptimizeTarget.COST, @@ -327,7 +370,7 @@ def exec( # pylint: disable=redefined-builtin 'Use `sky status` to check the status.') _execute(dag=dag, dryrun=dryrun, - teardown=teardown, + down=down, stream_logs=stream_logs, handle=handle, backend=backend, diff --git a/sky/global_user_state.py b/sky/global_user_state.py index 4a03be70bb4..ae8717420e4 100644 --- a/sky/global_user_state.py +++ b/sky/global_user_state.py @@ -61,6 +61,8 @@ def create_table(cursor, conn): db_utils.add_column_to_table(cursor, conn, 'clusters', 'metadata', 'TEXT DEFAULT "{}"') + db_utils.add_column_to_table(cursor, conn, 'clusters', 'to_down', + 'INTEGER DEFAULT 0') conn.commit() @@ -114,7 +116,7 @@ def add_or_update_cluster(cluster_name: str, status = ClusterStatus.UP if ready else ClusterStatus.INIT _DB.cursor.execute( 'INSERT or REPLACE INTO clusters' - '(name, launched_at, handle, last_use, status, autostop) ' + '(name, launched_at, handle, last_use, status, autostop, to_down) ' 'VALUES (' # name '?, ' @@ -132,7 +134,11 @@ def add_or_update_cluster(cluster_name: str, # Keep the old autostop value if it exists, otherwise set it to # default -1. 'COALESCE(' - '(SELECT autostop FROM clusters WHERE name=? AND status!=?), -1)' + '(SELECT autostop FROM clusters WHERE name=? AND status!=?), -1), ' + # Keep the old to_down value if it exists, otherwise set it to + # default 0. + 'COALESCE(' + '(SELECT to_down FROM clusters WHERE name=? AND status!=?), 0)' ')', ( # name @@ -150,6 +156,8 @@ def add_or_update_cluster(cluster_name: str, # autostop cluster_name, ClusterStatus.STOPPED.value, + cluster_name, + ClusterStatus.STOPPED.value, )) _DB.conn.commit() @@ -211,11 +219,14 @@ def set_cluster_status(cluster_name: str, status: ClusterStatus) -> None: raise ValueError(f'Cluster {cluster_name} not found.') -def set_cluster_autostop_value(cluster_name: str, idle_minutes: int) -> None: - _DB.cursor.execute('UPDATE clusters SET autostop=(?) WHERE name=(?)', ( - idle_minutes, - cluster_name, - )) +def set_cluster_autostop_value(cluster_name: str, idle_minutes: int, + to_down: bool) -> None: + _DB.cursor.execute( + 'UPDATE clusters SET autostop=(?), to_down=(?) WHERE name=(?)', ( + idle_minutes, + int(to_down), + cluster_name, + )) count = _DB.cursor.rowcount _DB.conn.commit() assert count <= 1, count @@ -248,7 +259,8 @@ def get_cluster_from_name( cluster_name: Optional[str]) -> Optional[Dict[str, Any]]: rows = _DB.cursor.execute('SELECT * FROM clusters WHERE name=(?)', (cluster_name,)) - for name, launched_at, handle, last_use, status, autostop, metadata in rows: + for (name, launched_at, handle, last_use, status, autostop, metadata, + to_down) in rows: record = { 'name': name, 'launched_at': launched_at, @@ -256,6 +268,7 @@ def get_cluster_from_name( 'last_use': last_use, 'status': ClusterStatus[status], 'autostop': autostop, + 'to_down': bool(to_down), 'metadata': json.loads(metadata), } return record @@ -265,7 +278,8 @@ def get_clusters() -> List[Dict[str, Any]]: rows = _DB.cursor.execute( 'select * from clusters order by launched_at desc') records = [] - for name, launched_at, handle, last_use, status, autostop, metadata in rows: + for (name, launched_at, handle, last_use, status, autostop, metadata, + to_down) in rows: # TODO: use namedtuple instead of dict record = { 'name': name, @@ -274,6 +288,7 @@ def get_clusters() -> List[Dict[str, Any]]: 'last_use': last_use, 'status': ClusterStatus[status], 'autostop': autostop, + 'to_down': bool(to_down), 'metadata': json.loads(metadata), } records.append(record) diff --git a/sky/skylet/autostop_lib.py b/sky/skylet/autostop_lib.py index 73c74f3f510..1e6c95f70fa 100644 --- a/sky/skylet/autostop_lib.py +++ b/sky/skylet/autostop_lib.py @@ -10,14 +10,23 @@ class AutostopConfig: + """Autostop configuration.""" - def __init__(self, autostop_idle_minutes: int, boot_time: int, - backend: Optional[str]): + def __init__(self, + autostop_idle_minutes: int, + boot_time: int, + backend: Optional[str], + down: bool = False): assert autostop_idle_minutes < 0 or backend is not None, ( autostop_idle_minutes, backend) self.autostop_idle_minutes = autostop_idle_minutes self.boot_time = boot_time self.backend = backend + self.down = down + + def __set_state__(self, state: dict): + state.setdefault('down', False) + self.__dict__.update(state) def get_autostop_config() -> Optional[AutostopConfig]: @@ -27,9 +36,9 @@ def get_autostop_config() -> Optional[AutostopConfig]: return pickle.loads(config_str) -def set_autostop(idle_minutes: int, backend: Optional[str]) -> None: +def set_autostop(idle_minutes: int, backend: Optional[str], down: bool) -> None: boot_time = psutil.boot_time() - autostop_config = AutostopConfig(idle_minutes, boot_time, backend) + autostop_config = AutostopConfig(idle_minutes, boot_time, backend, down) configs.set_config(AUTOSTOP_CONFIG_KEY, pickle.dumps(autostop_config)) @@ -43,9 +52,10 @@ class AutostopCodeGen: _PREFIX = ['from sky.skylet import autostop_lib'] @classmethod - def set_autostop(cls, idle_minutes: int, backend: str) -> str: + def set_autostop(cls, idle_minutes: int, backend: str, down: bool) -> str: code = [ - f'autostop_lib.set_autostop({idle_minutes}, {backend!r})', + f'autostop_lib.set_autostop({idle_minutes}, {backend!r},' + f' {down})', ] return cls._build(code) diff --git a/sky/skylet/events.py b/sky/skylet/events.py index 7f5fbaba17a..4d736fee9e4 100644 --- a/sky/skylet/events.py +++ b/sky/skylet/events.py @@ -80,7 +80,6 @@ class AutostopEvent(SkyletEvent): """Skylet event for autostop.""" EVENT_INTERVAL_SECONDS = 60 - _NUM_WORKER_PATTERN = re.compile(r'((?:min|max))_workers: (\d+)') _UPSCALING_PATTERN = re.compile(r'upscaling_speed: (\d+)') _CATCH_NODES = re.compile(r'cache_stopped_nodes: (.*)') @@ -119,13 +118,16 @@ def _run(self): def _stop_cluster(self, autostop_config): if (autostop_config.backend == cloud_vm_ray_backend.CloudVmRayBackend.NAME): - self._replace_yaml_for_stopping(self.ray_yaml_path) + self._replace_yaml_for_stopping(self.ray_yaml_path, + autostop_config.down) # `ray up` is required to reset the upscaling speed and min/max # workers. Otherwise, `ray down --workers-only` will continuously # scale down and up. - subprocess.run( - ['ray', 'up', '-y', '--restart-only', self.ray_yaml_path], - check=True) + subprocess.run([ + 'ray', 'up', '-y', '--restart-only', '--disable-usage-stats', + self.ray_yaml_path + ], + check=True) # Stop the workers first to avoid orphan workers. subprocess.run( ['ray', 'down', '-y', '--workers-only', self.ray_yaml_path], @@ -135,17 +137,20 @@ def _stop_cluster(self, autostop_config): else: raise NotImplementedError - def _replace_yaml_for_stopping(self, yaml_path: str): + def _replace_yaml_for_stopping(self, yaml_path: str, down: bool): with open(yaml_path, 'r') as f: yaml_str = f.read() - # Update the number of workers to 0. - yaml_str = self._NUM_WORKER_PATTERN.sub(r'\g<1>_workers: 0', yaml_str) yaml_str = self._UPSCALING_PATTERN.sub(r'upscaling_speed: 0', yaml_str) - yaml_str = self._CATCH_NODES.sub(r'cache_stopped_nodes: true', yaml_str) + if down: + yaml_str = self._CATCH_NODES.sub(r'cache_stopped_nodes: false', + yaml_str) + else: + yaml_str = self._CATCH_NODES.sub(r'cache_stopped_nodes: true', + yaml_str) config = yaml.safe_load(yaml_str) # Set the private key with the existed key on the remote instance. config['auth']['ssh_private_key'] = '~/ray_bootstrap_key.pem' # Empty the file_mounts. config['file_mounts'] = dict() common_utils.dump_yaml(yaml_path, config) - logger.debug('Replaced worker num and upscaling speed to 0.') + logger.debug('Replaced upscaling speed to 0.') diff --git a/sky/utils/cli_utils/status_utils.py b/sky/utils/cli_utils/status_utils.py index 94528e37c4f..6931ee875ce 100644 --- a/sky/utils/cli_utils/status_utils.py +++ b/sky/utils/cli_utils/status_utils.py @@ -34,7 +34,7 @@ def calc(self, record): def show_status_table(cluster_records: List[Dict[str, Any]], show_all: bool): """Compute cluster table values and display.""" - # TODO(zhwu): Update the information for auto-stop clusters. + # TODO(zhwu): Update the information for autostop clusters. status_columns = [ StatusColumn('NAME', _get_name), @@ -72,8 +72,8 @@ def show_status_table(cluster_records: List[Dict[str, Any]], show_all: bool): if pending_autostop: click.echo( '\n' - f'You have {pending_autostop} clusters with autostop scheduled.' - ' Refresh statuses with: `sky status --refresh`.') + f'You have {pending_autostop} clusters with auto{{stop,down}} ' + 'scheduled. Refresh statuses with: `sky status --refresh`.') else: click.echo('No existing clusters.') @@ -208,10 +208,17 @@ def _get_zone(cluster_status): def _get_autostop(cluster_status): - autostop_str = '-' + autostop_str = '' + separtion = '' if cluster_status['autostop'] >= 0: # TODO(zhwu): check the status of the autostop cluster. - autostop_str = str(cluster_status['autostop']) + ' min' + autostop_str = str(cluster_status['autostop']) + 'm' + separtion = ' ' + + if cluster_status['to_down']: + autostop_str += f'{separtion}(down)' + if autostop_str == '': + autostop_str = '-' return autostop_str diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 38973e1d2cc..3560feab72c 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -512,7 +512,7 @@ def test_autostop(): f'sky launch -y -d -c {name} --num-nodes 2 examples/minimal.yaml', f'sky autostop -y {name} -i 1', # Ensure autostop is set. - f'sky status | grep {name} | grep "1 min"', + f'sky status | grep {name} | grep "1m"', 'sleep 180', # Ensure the cluster is STOPPED. f'sky status --refresh | grep {name} | grep STOPPED', @@ -527,6 +527,38 @@ def test_autostop(): run_one_test(test) +# ---------- Testing Autodowning ---------- +def test_autodown(): + name = _get_cluster_name() + test = Test( + 'autodown', + [ + f'sky launch -y -d -c {name} --num-nodes 2 --cloud gcp examples/minimal.yaml', + f'sky autostop -y {name} --down -i 1', + # Ensure autostop is set. + f'sky status | grep {name} | grep "1m (down)"', + 'sleep 240', + # Ensure the cluster is terminated. + f's=$(SKYPILOT_DEBUG=0 sky status --refresh) && printf "$s" && {{ echo $s | grep {name} | grep "The following cluster" | grep "terminated"; }} || {{ echo $s | grep {name} && exit 1 || exit 0; }}', + f'sky launch -y -d -c {name} --cloud aws --num-nodes 2 --down examples/minimal.yaml', + f'sky status | grep {name} | grep UP', # Ensure the cluster is UP. + f'sky exec {name} --cloud aws examples/minimal.yaml', + f'sky status | grep {name} | grep "1m (down)"', + 'sleep 240', + # Ensure the cluster is terminated. + f's=$(SKYPILOT_DEBUG=0 sky status --refresh) && printf "$s" && {{ echo $s | grep {name} | grep "The following cluster" | grep "terminated"; }} || {{ echo $s | grep {name} && exit 1 || exit 0; }}', + f'sky launch -y -d -c {name} --cloud aws --num-nodes 2 --down examples/minimal.yaml', + f'sky autostop -y {name} --cancel', + 'sleep 240', + # Ensure the cluster is still UP. + f's=$(SKYPILOT_DEBUG=0 sky status --refresh) && printf "$s" && echo $s | grep {name} | grep UP', + ], + f'sky down -y {name}', + timeout=20 * 60, + ) + run_one_test(test) + + def _get_cancel_task_with_cloud(name, cloud, timeout=15 * 60): test = Test( f'{cloud}-cancel-task', @@ -731,7 +763,7 @@ def test_inline_spot_env(): [ f'sky spot launch -n {name} -y --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\$SKY_NODE_IPS\\" ]] && [[ ! -z \\"\$SKY_NODE_RANK\\" ]]) || exit 1"', 'sleep 10', - f'sky spot status | grep {name} | grep SUCCEEDED', + f's=$(sky spot status) && printf "$s" && echo "$s" | grep {name} | grep SUCCEEDED', ], f'sky spot cancel -y -n {name}', ) diff --git a/tests/test_spot.py b/tests/test_spot.py index d1bc6a7c5fd..549411a4dcc 100644 --- a/tests/test_spot.py +++ b/tests/test_spot.py @@ -141,7 +141,7 @@ def test_autostop_spot_controller(self, _mock_cluster_state): cli_runner = cli_testing.CliRunner() result = cli_runner.invoke(cli.autostop, [spot.SPOT_CONTROLLER_NAME]) assert result.exit_code == click.UsageError.exit_code - assert ('Scheduling auto-stop on reserved cluster(s) ' + assert ('Scheduling autostop on reserved cluster(s) ' f'\'{spot.SPOT_CONTROLLER_NAME}\' is not supported' in result.output)