Source code for abed.zips
# -*- coding: utf-8 -*-
"""
Functions for dealing with zips of results
Note:
The bz2file dependency is needed because tar.bz2 files are created with
pbzip2, which results in multiple streams in the tarfile. The Python 2.x
tarfile module does not handle multiple streams, but the bz2file package
does. Unpacking the tarfiles is thus done in two separate steps.
"""
import bz2file
import os
import shutil
import tarfile
import sys
from .conf import settings
from .datasets import dataset_name
from .progress import iter_progress
from .tasks import init_tasks
from .io import error, warning
from .utils import mkdir
splitext = os.path.splitext
basename = os.path.basename
def _unpack_zip(zipfile, all_tasks):
fpath = "%s%s%s" % (settings.ZIP_DIR, os.sep, zipfile)
try:
b = bz2file.BZ2File(fpath)
tar = tarfile.open(fileobj=b)
except tarfile.ReadError:
error("Could not read tarfile: %s" % fpath)
return
mkdir(settings.STAGE_DIR)
tar.extractall(settings.STAGE_DIR)
tar.close()
move_results(all_tasks)
ziplog = settings.ZIP_DIR + os.sep + "abed_unzipped.txt"
with open(ziplog, "a") as fid:
fid.write(zipfile + "\n")
[docs]def unpack_zips():
ziplog = settings.ZIP_DIR + os.sep + "abed_unzipped.txt"
if os.path.exists(ziplog):
with open(ziplog, "r") as fid:
unzipped = [x.strip() for x in fid.readlines()]
else:
unzipped = []
all_tasks = init_tasks()
bzips = [
x
for x in os.listdir(settings.ZIP_DIR)
if x.endswith(".bz2") and not x in unzipped
]
if len(bzips) == 0:
return
for fname in iter_progress(bzips, "Unpacking zips: "):
_unpack_zip(fname, all_tasks)
[docs]def move_results(task_dict):
mkdir(settings.RESULT_DIR)
subdirs = os.listdir(settings.STAGE_DIR)
for subdir in subdirs:
subpath = "%s%s%s" % (settings.STAGE_DIR, os.sep, subdir)
if not os.path.isdir(subpath):
warning("Skipping file in stagedir: %s." % subdir)
continue
files = os.listdir(subpath)
for fname in files:
fpath = "%s%s%s" % (subpath, os.sep, fname)
hsh = basename(fpath)[: -len(settings.RESULT_EXTENSION)]
if settings.TYPE == "RAW":
dset = "dataset"
method = "method"
else:
if not hsh in task_dict:
print("Unknown hash: %s" % hsh, file=sys.stderr)
continue
if settings.TYPE == "ASSESS":
dset = dataset_name(task_dict[hsh]["dataset"])
elif settings.TYPE == "CV_TT":
dset = dataset_name(
(
task_dict[hsh]["train_dataset"],
task_dict[hsh]["test_dataset"],
)
)
method = task_dict[hsh]["method"]
outdir = "%s%s%s%s%s" % (
settings.RESULT_DIR,
os.sep,
dset,
os.sep,
method,
)
mkdir(outdir)
dpath = "%s%s%s" % (outdir, os.sep, fname)
shutil.move(fpath, dpath)
clean_empty_dir(subpath)
[docs]def clean_empty_dir(folder):
try:
os.rmdir(folder)
except OSError:
dirs = (x for x in os.listdir(folder) if os.path.isdir(x))
for d in dirs:
clean_empty_dir(d)