csdms

Source code for wmtexe.reporter

"""Logging and reporting tools for a wmt-exe environment."""

import os
import sys
import threading
import time
import logging
import subprocess

import yaml
import requests


logger = logging.getLogger(__name__)
"""Logger : Instance of Logging class."""


[docs]class TaskCompleted(Exception): """Exception thrown when a wmt-exe task completes.""" pass
[docs]class redirect_output(object): """Manager for output and error logs. Parameters ---------- name : str Log file name. log_dir : str, optional Path to logging directory (default is current directory). join : bool, optional Set to True to combine logs (default is False). """ def __init__(self, name, log_dir='.', join=False): self._stdout = sys.stdout self._stderr = sys.stderr prefix = os.path.abspath(log_dir) self._out_log = os.path.join(prefix, name) if join: self._err_log = self._out_log else: self._err_log = os.path.join(prefix, '%s.err' % name) def __enter__(self): self._out = open(self._out_log, 'w') if self._out_log == self._err_log: self._err = self._out else: self._err = open(self._err_log, 'w') sys.stdout = self._out sys.stderr = self._err return self._out, self._err def __exit__(self, type, value, traceback): self._out.close() if self._out_log != self._err_log: self._err.close() sys.stdout = self._stdout sys.stderr = self._stderr
[docs]class open_reporter(object): """Manager for a reporter in a wmt-exe environment. Parameters ---------- id : str A unique UUID for a job. server : str URL of API server. fname : str Name of status file. """ def __init__(self, id, server, fname): self._args = (id, server, fname) def __enter__(self): self._reporter = Reporter(*self._args) self._reporter.start() def __exit__(self, exc_type, exc_value, exc_traceback): self._reporter.stop() self._reporter.join() if exc_type is not None: reporter = TaskStatus(*self._args) reporter.report('error', reporter.status_with_line_nos(n=40))
[docs]def add_line_numbers(lines, start=0, fn=None): """Prefix lines with numbers. Parameters ---------- lines : list or list-like of str Lines to be prefixed with a number. start : int, optional The index at which line numbers start (default is 0). fn : str, optional File name to prepend to line number (default is None). Returns ------- list Lines prefixed with numbers. """ if fn: fmt_string = '[{fn}-{{ln}}] {{line}}'.format(fn=fn) else: fmt_string = '[{ln}] {line}' with_prefix = [] for line_no, line in enumerate(lines, start): with_prefix.append(fmt_string.format(ln=line_no, line=line)) return with_prefix
[docs]def wc_l(fname, with_wc='wc'): """Count the lines in a file. Parameters ---------- fname : str File name. with_wc : str, optional The 'wc' command to use (default is `wc`). Returns ------- int Number of lines in file, or None on error. """ try: n_lines = subprocess.check_output( [with_wc, '-l', fname]) except Exception: raise else: return int(n_lines.split()[0])
[docs]def tail(fname, n=10, with_tail='tail'): """Get the last lines in a file. Parameters ---------- fname : str File name. n : int, optional Number of lines to get (default is 10). with_tail : str, optional The 'tail' command to use (default is `tail`). Returns ------- str The last lines in file, or None on error. """ fname = os.path.abspath(fname) try: lines = subprocess.check_output( [with_tail, '-n{n}'.format(n=n), fname]) except subprocess.CalledProcessError: raise RuntimeError('Unable to get status. Please try again.') except Exception: raise else: return lines.strip()
[docs]def tail_with_line_numbers(fname, n=10, with_tail='tail', with_wc='wc'): """Get the last lines in a file, with line numbers. Parameters ---------- fname : str File name. n : int, optional Number of lines to get (default is 10). with_tail : str, optional The 'tail' command to use (default is `tail`). with_wc : str, optional The 'wc' command to use (default is `wc`). Returns ------- list The last lines in file, prefixed with numbers. """ total_lines = wc_l(fname, with_wc=with_wc) if total_lines > 0: last_lines = tail(fname, n=n, with_tail=with_tail).split(os.linesep) start_line_no = total_lines - len(last_lines) return add_line_numbers(last_lines, start_line_no) else: return [fname]
[docs]class Reporter(threading.Thread): """Event reporter for wmt-exe tasks. Parameters ---------- id : str A unique UUID for a job. server : str URL of API server. filename : str Name of status file. **kwds Arbitrary keyowrd arguments. """ def __init__(self, id, server, filename, **kwds): super(Reporter, self).__init__(**kwds) self._stop = threading.Event() self._args = (id, server, filename)
[docs] def stop(self): """Stop reporting on a task.""" self._stop.set()
[docs] def stopped(self): """Check whether reporting has stopped. Returns ------- bool True if reporting has stopped. """ return self._stop.is_set()
[docs] def run(self): """Start reporting on a task.""" import time reporter = TaskStatus(*self._args) while 1: try: status = reporter.get_status() except TaskCompleted: break except Exception as error: import traceback reporter.report( 'running', '(2) Error getting status ({err})\n{tb}'.format( err=error, tb=traceback.format_exc())) else: reporter.report('running', '{message}'.format(message=status)) time.sleep(2) if self.stopped(): break reporter.report('success', 'completed')
[docs]class WmtTaskReporter(object): """Reporter for wmt-exe tasks. Parameters ---------- id : str A unique UUID for a job. server : str URL of API server. """ def __init__(self, id, server): self._id = id self._server = server self._curl = os.environ.get('CURL', 'curl') log_file = os.path.expanduser('~/.wmt/%s.log' % self.id) logging.basicConfig(filename=log_file, filemode='w', level=logging.DEBUG) @property def id(self): """Get id of task. Returns ------- str The task id. """ return self._id @property def server(self): """Get server URL. Returns ------- str The server URL. """ return self._server
[docs] def report_error(self, message): """Report an error. Parameters ---------- message : str Error message. Returns ------- Reponse Response from server. """ return self.report('error', message)
[docs] def report_success(self, message): """Report successful task completion. Parameters ---------- message : str Success message. Returns ------- Reponse Response from server. """ return self.report('success', message)
[docs] def report(self, status, message): """Report task status using `requests`. Parameters ---------- status : str Type of report. message : str Message for report. Returns ------- Reponse Response from server. """ import requests logger.info('%s: %s' % (status, message)) url = os.path.join(self.server, 'run/update') resp = requests.post(url, data={ 'uuid': self.id, 'status': status, 'message': message, }) return resp
[docs] def report_with_curl(self, status, message): """Report task status using `curl`. Parameters ---------- status : str Type of report. message : str Message for report. Returns ------- str Response from server. """ logger.info('%s: %s' % (status, message)) cmd = [ self._curl, '-i', '-F', 'uuid=%s' % self.id, 'status=%s' % status, 'message=\'%s\'' % message, '%s/run/update' % self.server, ] return subprocess.check_output(cmd)
from datetime import datetime
[docs]def load_status_from_lines(lines): """Load task status from strings. Parameters ---------- lines : list or list-like of str List of lines containing task status information. Returns ------- dict The task status, or an empty dict on error. """ status = {} for line in lines[::-1]: try: status = yaml.load(line) except yaml.YAMLError: pass else: if isinstance(status, dict): break if not isinstance(status, dict): return {} else: return status
[docs]def read_wmt_status(fname): """Read the WMT status from a file. Parameters ---------- fname : str WMT status file. Returns ------- dict The task status, or an empty dict on error. """ status = {} try: status_lines = tail(fname, n=2).split(os.linesep) except RuntimeError: status = {} else: status = load_status_from_lines(status_lines) return status
[docs]class TaskStatus(WmtTaskReporter): """Task status manager for a wmt-exe environment. Parameters ---------- id : str A unique UUID for a job. server : str URL of API server. filename : str Name of status file. pid : str, optional Process id (default is None). prefix : str, optional Path to base directory (default is current directory). """ def __init__(self, id, server, filename, pid=None, prefix='.'): super(TaskStatus, self).__init__(id, server) self._status_file = filename self._prefix = os.path.abspath(prefix) self._tail = os.environ.get('TAIL', 'tail') self._pid = pid self._start_time = datetime.now() @property def status_file(self): """Get the status file. Returns ------- str Name of status file. """ return self._status_file @property def elapsed(self): """Get the elapsed time in the simulation. Returns ------- float The elapsed time, in seconds. """ return (datetime.now() - self._start_time).total_seconds()
[docs] def running(self): """Test whether the simulation is running. Returns ------- bool True if simulation is running. """ if self._pid: try: os.kill(self._pid, 0) except OSError: return False else: return True else: return True
[docs] def status_with_line_nos(self, n=10): """Get status prepended with line numbers. Parameters ---------- n : int, optional Number of lines to display. Returns ------- str The status as a YAML stream. """ lines = tail_with_line_numbers(self.status_file, n=n) if len(lines) == 0: dots = '.' * (int(self.elapsed / 10) % 10) lines = ['Waiting for stdout{dots}'.format(dots=dots)] else: lines = [ 'Last {n} lines from stdout:'.format(n=len(lines)), ''] + lines status = dict(stdout=os.linesep.join(lines), time_elapsed=self.elapsed) status.update(read_wmt_status(os.path.join(self._prefix, '_time.txt'))) return yaml.dump(status)
# return os.linesep.join(lines)
[docs] def get_status(self): """Get task status. Returns ------- str The status as a YAML stream. """ # if not self.running(): # raise TaskCompleted() return self.status_with_line_nos()
[docs] def report_status(self): """Report task status at regular intervals until task completes. Returns ------- Reponse Response from server. """ import time while 1: time.sleep(2) try: status = self.get_status() except TaskCompleted: break except Exception as error: self.report('running', '(1) Error getting status (%s)' % error) #return else: # self.report('running', 'Time: %s days' % status) self.report('running', '{message}'.format(message=status)) self.report('success', 'completed')
def __call__(self): self.report_status()