csdms

Source code for wmtexe.launcher

"""Classes to configure and launch jobs from a wmt-exe environment."""

import os
import sys
import subprocess
from types import StringTypes


[docs]class Launcher(object): """Job launcher for a wmt-exe environment. Parameters ---------- sim_id : str A unique UUID for the job. Attributes ---------- launch_dir : str The working directory from which the job is started. script_path : str Path to launch script. sim_id : str A unique UUID for the job. """ launch_dir = '~/.wmt' _script = "{slave_command}" _extra_args = [] def __init__(self, sim_id): self.sim_id = sim_id self.script_path = os.path.expanduser( os.path.join(self.launch_dir, '%s.sh' % self.sim_id))
[docs] def before_launch(self, **kwds): """Perform actions before launching job. Parameters ---------- **kwds Arbitrary keyword arguments. """ with open(self.script_path, 'w') as f: f.write(self.script(**kwds))
[docs] def after_launch(self, **kwds): """Perform actions after launching job. Parameters ---------- **kwds Arbitrary keyword arguments. """ pass
[docs] def after_success(self, **kwds): """Perform actions after job completes. Parameters ---------- **kwds Arbitrary keyword arguments. """ pass
[docs] def run(self, **kwds): """Perform job setup, launch, and teardown actions. Parameters ---------- **kwds Arbitrary keyword arguments. """ self.before_launch(**kwds) try: self.launch(**kwds) except subprocess.CalledProcessError: raise else: self.after_success(**kwds) finally: self.after_launch(**kwds)
[docs] def launch(self, **kwds): """Launch job with launch command. Parameters ---------- **kwds Arbitrary keyword arguments. """ subprocess.check_output(self.launch_command(**kwds), env={})
[docs] def launch_command(self, **kwds): """Path to launch script. Parameters ---------- **kwds Arbitrary keyword arguments. Returns ------- str The launch command to execute. """ return self.script_path
[docs] def slave_command(self, extra_args=None): """Create the `wmt-slave` command. Parameters ---------- extra_args : str, optional Additional arguments. Returns ------- str The slave command to execute. """ import shlex from pipes import quote wmt_slave = os.path.join(sys.prefix, 'bin', 'wmt-slave') command = [wmt_slave, quote(self.sim_id)] + self._extra_args if extra_args: if isinstance(extra_args, StringTypes): extra_args = shlex.split(extra_args) command += [quote(arg) for arg in extra_args] return ' '.join(command)
[docs] def script(self, **kwds): """Generate the launch script. Parameters ---------- *kwds Arbitrary keyword arguments. Returns ------- str The launch script to be written to a file. """ return self._script.format(slave_command=self.slave_command(**kwds))
[docs]class QsubLauncher(Launcher): """WMT job launcher for a PBS scheduler.""" _script = """ #! /bin/bash #PBS -q debug #PBS -l mem=10gb #PBS -j oe #PBS -k oe cd $TMPDIR {slave_command} """.strip() _extra_args = ['--exec-dir=$TMPDIR', '--server-url=https://csdms.colorado.edu/wmt/api-testing']
[docs] def launch_command(self, **kwds): """Path to launch script. Parameters ---------- **kwds Arbitrary keyword arguments. Returns ------- str The launch command to execute. """ return ['/opt/torque/bin/qsub', '-o', self.launch_dir, self.script_path]
[docs]class BashLauncher(Launcher): """WMT job launcher for a bash environment.""" _script = """ #! /bin/bash # source $(wmt-activate) {slave_command} """.strip()