Source code for abed.tasks

# -*- coding: utf-8 -*-


"""
Functions for managing tasks

"""

import os
import sys
import random
import hashlib

from itertools import product

from .conf import settings
from .exceptions import (
    AbedHashCollissionException,
    AbedExperimentTypeException,
)
from .results.walk import walk_hashes
from .io import error


[docs]def cartesian(params): return (dict(list(zip(params, x))) for x in product(*params.values()))
[docs]def check_size(): if not sys.maxsize == 9223372036854775807: error( "Running on a non 64-bit system. This may cause problems with " "hashes." ) raise SystemExit
[docs]def task_hash(task): """ This yields a hash of a list by combining the hashes of all list elements. """ as_tuples = sorted(task.items()) hasher = hashlib.blake2b(digest_size=8) for key, value in as_tuples: k = repr(key) v = repr(value) hasher.update(k.encode()) hasher.update(v.encode()) return hasher.hexdigest()
[docs]def init_tasks(): if settings.TYPE == "ASSESS": task_func = init_tasks_assess elif settings.TYPE == "CV_TT": task_func = init_tasks_cv_tt elif settings.TYPE == "RAW": task_func = init_tasks_raw try: return task_func() except AbedHashCollissionException: error( "A hash collision occured. This rarely occurs naturally, so it" " is most likely caused by duplicate tasks in the task list. " "Abed does not currently support duplicate tasks." ) raise SystemExit raise AbedExperimentTypeException
[docs]def init_tasks_assess(): out = {} for dset in settings.DATASETS: for method in settings.METHODS: for prmset in cartesian(settings.PARAMS[method]): task = {key: value for key, value in prmset.items()} task["dataset"] = dset task["method"] = method hsh = task_hash(task) if hsh in out: raise AbedHashCollissionException out[hsh] = task return out
[docs]def init_tasks_cv_tt(): out = {} rng = random.Random(x=settings.CV_BASESEED) for train, test in settings.DATASETS: seed = rng.randint(0, 2 ** 31 - 1) for method in settings.METHODS: for prmset in cartesian(settings.PARAMS[method]): task = {key: value for key, value in prmset.items()} task["train_dataset"] = train task["test_dataset"] = test task["method"] = method task["cv_seed"] = seed hsh = task_hash(task) if hsh in out: raise AbedHashCollissionException out[hsh] = task return out
[docs]def init_tasks_raw(): out = {} with open(settings.RAW_CMD_FILE, "r") as fid: tasks = [x.strip() for x in fid.readlines() if x.strip()] for txttask in tasks: hsh = hash(txttask) hsh %= (sys.maxsize + 1) * 2 if hsh in out: raise AbedHashCollissionException out[hsh] = txttask return out
[docs]def read_tasks(): with open(settings.TASK_FILE, "r") as fid: tasks = [l.strip() for l in fid.readlines() if l.strip()] grid = init_tasks() out = {} for key in tasks: out[key] = grid[key] return out
[docs]def update_tasks(tasks): delcnt = 0 if not os.path.exists(settings.RESULT_DIR): return 0 for hsh in walk_hashes(): try: del tasks[hsh] delcnt += 1 except KeyError: pass return delcnt
[docs]def explain_tasks(all_tasks): for task in sorted(all_tasks.keys()): if settings.TYPE == "RAW": cmd = all_tasks[task] else: d = {k: v for k, v in all_tasks[task].items()} command = settings.COMMANDS[d["method"]] d["datadir"] = "{datadir}" d["execdir"] = "{execdir}" cmd = command.format(**d) print("%s : %s" % (task, cmd))