Source code for abed.results.models

"""

Models for holding a result cache

"""

import os

from collections import OrderedDict
from six.moves import cPickle

from ..conf import settings
from ..datasets import dataset_name
from ..exceptions import AbedHashCollissionException
from ..utils import mkdir


[docs]class AbedCache(object): """ """ def __init__( self, methods=None, datasets=None, metrics=None, scalars=None, cachefile=None, ): self.methods = set() self.datasets = set() self.metrics = set() self.metric_targets = set() self.scalars = set() self.cache = {} if cachefile is None: self.cachefile = settings.OUTPUT_DIR + os.sep + "abed_cache.pkl" else: self.cachefile = cachefile
[docs] def dump(self): mkdir(os.path.dirname(self.cachefile)) f = open(self.cachefile, "wb") cPickle.dump(self.__dict__, f, 2) f.close()
[docs] def load(self): if not os.path.exists(self.cachefile): raise IOError f = open(self.cachefile, "rb") tmp = cPickle.load(f) f.close() self.__dict__.update(tmp)
[docs] def add_result(self, result): if result.hsh in self.cache: raise AbedHashCollissionException(result.hsh) self.datasets.add(result.dataset) self.methods.add(result.method) self.metrics = self.metrics.union(result.metrics) self.scalars = self.scalars.union(result.scalars) self.metric_targets = self.metric_targets.union(result.metric_targets) self.cache[result.hsh] = result
[docs] def has_result(self, hsh): return hsh in self.cache
[docs] def iter_results_dm(self, dataset, method): for result in self.cache.itervalues(): if result.dataset == dataset and result.method == method: yield result
[docs] def get_metric_values_dm(self, dataset, method, label, metricname): for result in self.cache.itervalues(): if result.dataset == dataset and result.method == method: yield result.get_result(label, metric=metricname)
[docs] def get_scalar_values_dm(self, dataset, method, scalarname): for result in self.cache.itervalues(): if result.dataset == dataset and result.method == method: yield result.get_result(scalarname)
def __repr__(self): return "AbedCache(n_results=%i)" % len(self.cache) def __str__(self): return repr(self) def __iter__(self): for hsh in self.cache: yield self.cache[hsh]
[docs]class AbedResult(object): """ """ def __init__(self, hsh=None, dataset=None, method=None): self.scalars = set() self.metrics = set() self.metric_targets = set() self.results = {} self.hsh = hsh self.dataset = dataset_name(dataset) self.method = method
[docs] def add_result_scalar(self, label, value): self.scalars.add(label) self.results[label] = value
[docs] def add_result_metric(self, label, metric, value): self.metrics.add(metric) self.metric_targets.add(label) if not label in self.results: self.results[label] = {} self.results[label][metric] = value
[docs] def get_result(self, label, metric=None): if metric is None: return self.results[label] else: return self.results[label][metric]
def __str__(self): s = "AbedResult(hsh=%r, dataset=%r, method=%r, results=%r)" % ( self.hsh, self.dataset, self.method, self.results, ) return s def __repr__(self): return str(self)
[docs]class AbedTableTypes: VALUES = "values" RANKS = "ranks"
[docs]class AbedTable(object): """ """ def __init__(self): self.num_columns = 0 self.num_rows = 0 self.headers = None self.rows = None self.higher_better = None self.type = None self.desc = "" self.name = "" self.target = None self.is_metric = True self.is_summary = False if settings.TYPE == "ASSESS": self.metricname = None elif settings.TYPE == "CV_TT": self.trainmetricname = None self.testmetricname = None
[docs] def add_row(self, _id, row): if self.rows is None: self.rows = OrderedDict() if self.rows.has_key(_id): raise KeyError("Existing id in table") self.rows[_id] = row self.num_rows += 1 if self.num_columns == 0 and len(row) > 0: self.num_columns = len(row)
[docs] def table_averages(self): averages = [0.0] * self.num_columns for _id in self.rows.keys(): for i, x in enumerate(self.rows[_id]): averages[i] += float(x) averages = [x / float(self.num_rows) for x in averages] fmtavg = [] for num in averages: rounded = round(num, settings.RESULT_PRECISION) fmt = "%%.%df" % settings.RESULT_PRECISION fmtavg.append(fmt % rounded) return fmtavg
[docs] def table_wins(self): hb = self.higher_better wins = [0] * self.num_columns for _id in self.rows.keys(): best = float("inf") best *= -1 if hb else 1 best_idx = None for i, x in enumerate(self.rows[_id]): val = float(x) if (hb and (val > best)) or ((not hb) and (val < best)): best = val best_idx = i if len([x for x in self.rows[_id] if float(x) == best]) == 1: wins[best_idx] += 1 return wins
[docs] def table_losses(self): hb = self.higher_better losses = [0] * self.num_columns for _id in self.rows.keys(): worst = float("inf") worst *= 1 if hb else -1 worst_idx = None for i, x in enumerate(self.rows[_id]): val = float(x) if (hb and (val < worst)) or ((not hb) and (val > worst)): worst = val worst_idx = i if len([x for x in self.rows[_id] if float(x) == worst]) == 1: losses[worst_idx] += 1 return losses
[docs] def table_ties(self): num_ties = 0 for _id in self.rows.keys(): values = [float(x) for x in self.rows[_id]] num_uniq = len(set(values)) if num_uniq == 1: num_ties += 1 ties = [num_ties] * self.num_columns return ties
[docs] def summary_table(self): at = AbedTable() at.headers = self.headers[:] at.type = self.type at.desc = self.desc at.name = self.name at.target = self.target at.is_metric = self.is_metric if settings.TYPE == "ASSESS": at.metricname = self.metricname elif settings.TYPE == "CV_TT": at.trainmetricname = self.trainmetricname at.testmetricname = self.testmetricname at.add_row("Average", self.table_averages()) at.add_row("Wins", self.table_wins()) at.add_row("Losses", self.table_losses()) at.add_row("Ties", self.table_ties()) at.is_summary = True return at
[docs] def left_insert(self, other): summary = self.summary_table() self.num_columns += other.num_columns self.headers = other.headers + self.headers[1:] for _id, otherrow in other: myrow = self.rows.get(_id, None) if myrow is None: continue self.rows[_id] = otherrow + myrow return summary
def __iter__(self): for _id in self.rows: yield (_id, self.rows[_id])
[docs] def from_csv(self, csvfile): with open(csvfile, "r") as fid: lines = fid.readlines() lines = [x.strip() for x in lines] self.headers = lines[0].split(",") for line in lines[1:]: parts = line.split(",") _id = parts[0] row = parts[1:] self.add_row(_id, row)