Source code for abed.fab

"""Functions for using fabric

Abed uses Fabric to synchronize data to and from the compute cluster. The 
various functions that Abed needs are defined here. A helper class is defined 
in :mod:`fab_util`, which is used to define the Fabric context environment that 
is used for the commands.

"""

import os
import time

from fabric.operations import local
from tempfile import gettempdir

from .auto import get_jobid_from_logs
from .conf import settings
from .fab_util import myfab
from .pbs import generate_pbs_text
from .io import info
from .utils import mkdir


[docs]def init_data(): """Push the data to the remote server This function is used to synchronize the :setting:`DATADIR` directory to the compute cluster. This is done by first locally compressing the entire directory as a tar file, then syncing it to a ``datasets`` directory in the remote project path, and unpacking it there. The path where the unpacked dataset is located is stored in the :class:`MyFabric` class. """ local("tar czf datasets.tar.gz -C {} .".format(settings.DATADIR, os.sep)) release_time = time.strftime("%s") release_path = "{ppath}/{datapath}/{relpath}".format( ppath=myfab.project_path, datapath="datasets", relpath=release_time ) myfab.run("mkdir -p {releasepath}".format(releasepath=release_path)) myfab.put("./datasets.tar.gz", release_path) myfab.run("cd {} && tar xvf datasets.tar.gz".format(release_path)) myfab.run( "cd {} && ".format(release_path, "datasets") + "rm datasets.tar.gz" ) local("rm datasets.tar.gz") info("Remote datasets placed in: {}".format(release_path)) myfab.data_path = release_path
[docs]def move_data(): """ Move the data from previous release """ curr_path = "{}/releases/current".format(myfab.project_path) prev_path = "{}/releases/previous".format(myfab.project_path) myfab.run("mkdir -p {}/{}/".format(curr_path, "datasets")) myfab.run( "rsync -av --remove-source-files {}/{}/* {}/{}/".format( prev_path, "datasets", curr_path, "datasets" ) )
[docs]def setup(): myfab.run("mkdir -p {}".format(myfab.project_path)) myfab.run("mkdir -p releases; mkdir -p packages;", cd=myfab.project_path)
[docs]def deploy(push_data=False): if push_data: assert not myfab.data_path is None # upload tar from git release_time = time.strftime("%Y_%m_%d_%H_%M_%S") sha1 = local("git log master -1 --pretty=format:'%h'", capture=True) release_name = "{}_{}".format(release_time, sha1) release_path = "{}/releases/{}".format(myfab.project_path, release_name) release_file_name = "{}_{}.tar.gz".format(release_time, sha1) package_file_path = "{}/packages/{}".format( myfab.project_path, release_file_name ) # archive locally local( "git archive --format=tar master | gzip > " "{}.tar.gz".format(release_name) ) myfab.run("mkdir " + release_path) # transfer to remote myfab.put(release_file_name, package_file_path) local("rm " + release_file_name) myfab.run("cd {} && tar zxf {}".format(release_path, package_file_path)) # copy data if pushed if push_data: myfab.run("mkdir -p {}/{}/".format(release_path, "datasets")) myfab.run( "cp {}/* {}/{}/".format(myfab.data_path, release_path, "datasets") ) # symlinks if not push_data: myfab.run( "rm -f releases/previous; mv releases/current " "releases/previous", warn_only=True, cd=myfab.project_path, ) myfab.run( "ln -s {} releases/current".format(release_path), cd=myfab.project_path )
[docs]def get_files_from_glob(glob_path, glob, dest_dir): lstxt = myfab.run( "shopt -s nullglob && ls -1 %s && shopt -u nullglob" % glob, cd=glob_path, ) files = [os.path.join(glob_path, f) for f in lstxt.split("\n")] for f in files: fname = os.path.basename(f) lpath = "%s%s%s" % (dest_dir, os.sep, fname) if not os.path.exists(lpath): myfab.get(f, dest_dir)
[docs]def get_results(basepath=None): if basepath is None: basepath = "{}/releases/current".format(myfab.project_path) zip_path = "{}/bzips".format(basepath) zip_glob = "*.tar.bz2" mkdir(settings.ZIP_DIR) get_files_from_glob(zip_path, zip_glob, settings.ZIP_DIR) log_path = "{}/logs".format(basepath) log_glob = "*" mkdir(settings.LOG_DIR) get_files_from_glob(log_path, log_glob, settings.LOG_DIR)
[docs]def write_and_queue(): temp_pbs = os.path.join(gettempdir(), "abed.pbs") with open(temp_pbs, "w") as pbs: pbs.write(generate_pbs_text()) curr_path = "{}/releases/current".format(myfab.project_path) myfab.put("/tmp/abed.pbs", "{}/".format(curr_path)) myfab.run("mkdir -p {}/logs".format(curr_path)) myfab.run("qsub -d . -e logs -o logs abed.pbs", cd=curr_path) local("rm /tmp/abed.pbs")
[docs]def build_remote(): """ Runs the build command remotely if the program requires compilation """ if not settings.NEEDS_BUILD: return build_path = "{}/releases/current/{}".format( myfab.project_path, settings.BUILD_DIR ) myfab.run(settings.BUILD_CMD, cd=build_path)
[docs]def fab_push(): deploy(push_data=False) move_data() build_remote() write_and_queue()
[docs]def fab_pull(): get_results()
[docs]def fab_setup(): setup() init_data() deploy(push_data=True)
[docs]def fab_repull(): releasepath = "{}/releases".format(myfab.project_path) lstext = myfab.run("ls -1 {}".format(releasepath)) special = ["current", "previous"] paths = [x for x in lstext.split("\n") if not x in special] with open(settings.AUTO_FILE, "r") as fid: lines = fid.readlines() auto_jobids = [x.strip() for x in lines] to_pull = [] for path in paths: fullpath = "{}/{}".format(releasepath, path) logpath = "{}/{}".format(fullpath, "logs") jobid = get_jobid_from_logs(logpath) if jobid in auto_jobids: to_pull.append(fullpath) for path in to_pull: zip_path = "{}/bzips/".format(path) zip_glob = "*.tar.bz2" mkdir(settings.ZIP_DIR) get_files_from_glob(zip_path, zip_glob, settings.ZIP_DIR)