Source code for abed.models
# -*- coding: utf-8 -*-
import os
import time
from .auto import submitted, get_jobid_from_logs, is_job_marked, mark_job
from .compress import compress_results
from .conf import settings
from .fab import fab_push, fab_pull, fab_repull, fab_setup
from .git_util import git_commit_auto, git_commit_tbd, git_init, git_ok
from .html.view import view_html
from .local import local_move_results
from .results.main import make_results
from .run import mpi_start
from .init import init_config
from .tasks import init_tasks, read_tasks, update_tasks, explain_tasks
from .io import info, error
from .zips import unpack_zips, move_results
[docs]class Abed(object):
commands = [
"auto",
"compress_results",
"explain_tbd_tasks",
"explain_tasks",
"local",
"parse_results",
"move_results",
"process_zips",
"pull",
"push",
"reload_tasks",
"repull",
"run",
"init",
"setup",
"status",
"update_tasks",
"view_results",
]
def __init__(self, skip_init=False, skip_cache=False):
self.task_dict = None
self.skip_cache = skip_cache
if not skip_init:
self.init_tasks()
[docs] def init_tasks(self):
# this takes over init_tasks
if os.path.isfile(settings.TASK_FILE):
self.task_dict = read_tasks()
else:
self.task_dict = init_tasks()
self.write_tasks()
git_commit_tbd()
[docs] def explain_tbd_tasks(self):
explain_tasks(self.task_dict)
[docs] def explain_tasks(self):
explain_tasks(init_tasks())
[docs] def update_tasks(self):
cnt = update_tasks(self.task_dict)
info(
"Task update removed %i completed tasks. Tasks remaining: %i"
% (cnt, len(self.task_dict))
)
self.write_tasks()
git_commit_tbd()
if len(self.task_dict) == 0:
info("All tasks completed. Cool cool cool.")
[docs] def reload_tasks(self):
self.task_dict = init_tasks()
self.update_tasks()
[docs] def write_tasks(self):
with open(settings.TASK_FILE, "w") as fid:
for task in sorted(self.task_dict.keys()):
fid.write("%s\n" % task)
info("Written task file to %s" % settings.TASK_FILE)
[docs] def setup(self):
fab_setup()
[docs] def push(self):
if not git_ok():
error("Git repository has uncommitted changes, not pushing.")
raise SystemExit
fab_push()
[docs] def pull(self, jobid=None):
info("Starting pull")
fab_pull()
info("Starting unpacking of zips")
unpack_zips()
if jobid is None:
jobid = get_jobid_from_logs()
info("Marking job as pulled: %s" % jobid)
mark_job(jobid)
git_commit_auto()
info("Updating tasks")
self.update_tasks()
[docs] def auto(self):
info("Starting auto loop")
while True:
if len(self.task_dict) == 0:
info("Stopping auto loop")
break
if not submitted():
info("No submitted task found, assuming done.")
jobid = get_jobid_from_logs()
info("Found jobid from logs: %s" % jobid)
if not is_job_marked(jobid):
info("Job %s not pulled yet, pulling it" % jobid)
self.pull(jobid=jobid)
if len(self.task_dict) == 0:
break
self.push()
info("Task busy, sleeping for a while ...")
time.sleep(settings.AUTO_SLEEP)
info("Starting parse_results")
self.parse_results()
[docs] def parse_results(self):
# this takes over parse_results.py
info("Starting make_results()")
make_results(self.task_dict, self.skip_cache)
[docs] def run(self):
# this takes over master/worker
if self.task_dict is None:
error("No tasks defined before attempted run. Exiting")
raise SystemExit
mpi_start(self.task_dict)
info("Finished with run command.")
[docs] def init(self):
init_config()
git_init()
[docs] def status(self):
info(
"There are %i tasks left to be done, out of %i tasks defined."
% (len(self.task_dict), len(init_tasks()))
)
[docs] def process_zips(self):
unpack_zips()
[docs] def repull(self):
# use abed_auto.log to repull all zips from previous runs
info("Starting repull based on {}".format(settings.AUTO_FILE))
fab_repull()
info("Unpacking zips")
unpack_zips()
info("Done repulling.")
[docs] def view_results(self):
view_html()
[docs] def move_results(self):
move_results(init_tasks())
[docs] def local(self):
if self.task_dict is None:
error("No tasks defined before attempted run. Exiting")
raise SystemExit
mpi_start(self.task_dict, local=True)
local_move_results(self.task_dict)
[docs] def compress_results(self):
compress_results(init_tasks())