csdms

Source code for wmtexe.launcher

"""Classes to configure and launch jobs from a wmt-exe environment."""

import os
import sys
import stat
import subprocess
from types import StringTypes


[docs]class Launcher(object): """Job launcher for a wmt-exe environment. Parameters ---------- sim_id : str A unique UUID for the job. server_url : str or None, optional The URL of the WMT API server from which the job was submitted. launch_dir : str, optional The working directory from which the job is started. extra_args : list, optional Extra arguments to be passed to the wmt-slave command. Attributes ---------- launch_dir : str The working directory from which the job is started. script_path : str Path to launch script. sim_id : str A unique UUID for the job. server_url : str or None The URL of the WMT API server from which the job was submitted. """ _script = "{slave_command}" def __init__(self, sim_id, server_url=None, launch_dir='~/.wmt', extra_args=[]): self.sim_id = sim_id self.server_url = server_url self.launch_dir = os.path.expandvars(os.path.expanduser(launch_dir)) self.script_path = os.path.join(self.launch_dir, '%s.sh' % self.sim_id) self._extra_args = extra_args
[docs] def before_launch(self, **kwds): """Perform actions before launching job. Parameters ---------- **kwds Arbitrary keyword arguments. """ try: os.makedirs(self.launch_dir) except OSError: if not os.path.isdir(self.launch_dir): raise with open(self.script_path, 'w') as f: f.write(self.script(**kwds)) os.chmod(self.script_path, stat.S_IXUSR|stat.S_IWUSR|stat.S_IRUSR)
[docs] def after_launch(self, **kwds): """Perform actions after launching job. Parameters ---------- **kwds Arbitrary keyword arguments. """ pass
[docs] def after_success(self, **kwds): """Perform actions after job completes. Parameters ---------- **kwds Arbitrary keyword arguments. """ pass
[docs] def run(self, **kwds): """Perform job setup, launch, and teardown actions. Parameters ---------- **kwds Arbitrary keyword arguments. """ self.before_launch(**kwds) try: self.launch(**kwds) except subprocess.CalledProcessError: raise else: self.after_success(**kwds) finally: self.after_launch(**kwds)
[docs] def launch(self, **kwds): """Launch job with launch command. Parameters ---------- **kwds Arbitrary keyword arguments. """ subprocess.check_output(self.launch_command(**kwds), env={})
[docs] def launch_command(self, **kwds): """The command that runs a job. Parameters ---------- **kwds Arbitrary keyword arguments. Returns ------- str The launch command to execute. """ return self.script_path
[docs] def slave_command(self, extra_args=None): """Create the `wmt-slave` command. Parameters ---------- extra_args : str, optional Additional arguments. Returns ------- str The slave command to execute. """ import shlex from pipes import quote wmt_slave = os.path.join(sys.prefix, 'bin', 'wmt-slave') command = [wmt_slave, quote(self.sim_id)] + self._extra_args if self.server_url: command += ['--server-url={}'.format(self.server_url)] if extra_args: if isinstance(extra_args, StringTypes): extra_args = shlex.split(extra_args) command += [quote(arg) for arg in extra_args] return ' '.join(command)
[docs] def script(self, **kwds): """Generate the launch script. Parameters ---------- *kwds Arbitrary keyword arguments. Returns ------- str The launch script to be written to a file. """ return self._script.format(slave_command=self.slave_command(**kwds))
[docs]class QsubLauncher(Launcher): """WMT job launcher for a PBS scheduler.""" _script = """ #! /bin/bash #PBS -q debug #PBS -l mem=10gb #PBS -j oe #PBS -k oe cd $TMPDIR {slave_command} """.lstrip()
[docs] def launch_command(self, **kwds): """Path to launch script. Parameters ---------- **kwds Arbitrary keyword arguments. Returns ------- str The launch command to execute. """ return ['/opt/torque/bin/qsub', '-o', self.launch_dir, self.script_path]
[docs]class SbatchLauncher(Launcher): """WMT job launcher for a Slurm scheduler. Parameters ---------- sim_id : str A unique UUID for the job. server_url : str or None, optional The URL of the WMT API server from which the job was submitted. launch_dir : str, optional The working directory from which the job is started. extra_args : list, optional Extra arguments to be passed to the wmt-slave command. Attributes ---------- launch_dir : str The working directory from which the job is started. script_path : str Path to launch script. run_script_path : str Path to run script, which submits launch script to scheduler. sim_id : str A unique UUID for the job. server_url : str or None The URL of the WMT API server from which the job was submitted. """ _script = """ #!/usr/bin/env bash #SBATCH --qos=blanca-csdms #SBATCH --job-name=wmt #SBATCH --mem=8000MB export MPLBACKEND=Agg {slave_command} """.lstrip() _run_script = """ #!/usr/bin/env bash module load slurm/blanca sbatch --output={output_file} {script_path} """.lstrip() def __init__(self, *args, **kwds): Launcher.__init__(self, *args, **kwds) self.run_script_path = os.path.expanduser( os.path.join(self.launch_dir, '%s.run.sh' % self.sim_id))
[docs] def before_launch(self, **kwds): """Perform actions before launching job. Parameters ---------- **kwds Arbitrary keyword arguments. """ Launcher.before_launch(self, **kwds) with open(self.run_script_path, 'w') as f: f.write(self.run_script(**kwds)) os.chmod(self.run_script_path, stat.S_IXUSR|stat.S_IWUSR|stat.S_IRUSR)
[docs] def run_script(self, **kwds): """Generate the run script that submits job to scheduler. Parameters ---------- *kwds Arbitrary keyword arguments. Returns ------- str The run script to be written to a file. """ output_file = os.path.expanduser( os.path.join(self.launch_dir, '%s.out' % self.sim_id)) return self._run_script.format(output_file=output_file, script_path=self.script_path)
[docs] def launch_command(self, **kwds): """The command that runs a job. Parameters ---------- **kwds Arbitrary keyword arguments. Returns ------- str The launch command to execute. """ return self.run_script_path
[docs] def launch(self, **kwds): """Launch job with launch command. Note that we override Launcher.launch because we want to inherit the environment from the current process. Parameters ---------- **kwds Arbitrary keyword arguments. """ subprocess.check_output(self.launch_command(**kwds))
[docs]class BashLauncher(Launcher): """WMT job launcher for a bash environment.""" _script = """ #! /bin/bash export PATH={wmt_path} {slave_command} """.lstrip()
[docs] def prepend_path(self): """Places the `bin` directory of executor at the front of the path.""" return os.pathsep.join([os.path.join(sys.prefix, 'bin'), os.environ['PATH']])
[docs] def script(self, **kwds): """Generate the launch script. Parameters ---------- *kwds Arbitrary keyword arguments. Returns ------- str The launch script to be written to a file. """ return self._script.format(wmt_path=self.prepend_path(), slave_command=self.slave_command(**kwds))