Source code for abed.results.cache

"""
Functions for creating the result cache.

The result cache is basically a dictionary between the hashes and the metrics
that we want to know for each hash.
"""

from .models import AbedCache, AbedResult
from .walk import walk_for_cache
from ..conf import settings
from ..io import info, warning


[docs]def find_label(line): for scalar in settings.SCALARS: if scalar in line: return scalar return "_".join(line.split(" ")[1].split("_")[:-1])
[docs]def parse_result_fileobj(fid, hsh, dataset, method): data = {} label = None for line in fid: l = line.strip() # Skip comment lines if l.startswith("#"): continue elif l.startswith("%"): label = find_label(l) if label in settings.SCALARS: data[label] = None else: data[label] = {"true": [], "pred": []} continue if label in settings.SCALARS: # If we already have data for a label, we continue to avoid # overwriting it if data[label]: continue try: data[label] = float(l) except ValueError: warning( "Could not parse scalar metric '%s' for " "file with hash %s. Skipping.\nOffending line: %s" % (label, hsh, l) ) continue else: try: if "\t" in l: true, pred = l.split("\t") else: true, pred = l.split(" ") data[label]["true"].append(float(true)) data[label]["pred"].append(float(pred)) except ValueError: warning( "Could not parse true/pred metric '%s' for " "file %s. Skipping.\nOffending line: %s" % (label, hsh, l) ) continue fid.close() ar = AbedResult(hsh, dataset=dataset, method=method) for label in data.keys(): if label in settings.SCALARS: ar.add_result_scalar(label, data[label]) else: for metric in settings.METRICS: metric_func = settings.METRICS[metric]["metric"] ar.add_result_metric( label, metric, metric_func(data[label]["true"], data[label]["pred"]), ) return ar
[docs]def init_result_cache(task_dict): ac = AbedCache( methods=settings.METHODS, datasets=settings.DATASETS, metrics=settings.METRICS, scalars=settings.SCALARS, ) counter = 0 for dataset, method, fid, hsh in walk_for_cache(ac): result = parse_result_fileobj(fid, hsh, dataset, method) if result is None: continue ac.add_result(result) counter += 1 ac.dump() info("Read %i result files into cache." % counter) return ac
[docs]def update_result_cache(task_dict, skip_cache=False): ac = AbedCache() try: ac.load() info("Result cache loaded from disk.") except IOError: info("Result cache non-existent, generating it.") ac = init_result_cache(task_dict) return ac # User requested skip of cache regeneration if skip_cache: warning("Skipping cache regeneration check on user request.") return ac # updating the result cache is done in two steps: # 1. Check if new metrics or scalars are added, if so regenerate everything # 2. Check if new result files are added, if that's the case only generate # those conf_metrics = set(settings.METRICS.keys()) cache_metrics = ac.metrics diff = conf_metrics - cache_metrics if len(diff) > 0: ac = init_result_cache(task_dict) return ac for dataset, method, fid, hsh in walk_for_cache(ac): result = parse_result_fileobj(fid, hsh, dataset, method) if result is None: continue ac.add_result(result) ac.dump() return ac