Source code for abed.fab_util
from fabric.api import env
from fabric.context_managers import cd as fab_cd
from fabric.context_managers import settings as fab_settings
from fabric.operations import hide, run, put, get
from .conf import settings
[docs]class MyFabric(object):
"""
Class to manage env etc.
"""
def __init__(self):
self.host = settings.REMOTE_HOST
self.user = settings.REMOTE_USER
self.name = settings.PROJECT_NAME
self.environment = "staging"
self.project_path = settings.REMOTE_DIR
self.port = settings.REMOTE_PORT
self.data_path = None
self.empty = None
# Use the ssh config of the user. If a password is still requested,
# ensure that the key is added with ``ssh-add /path/to/key``.
env.use_ssh_config = True
[docs] def run(self, command="", warn_only=False, cd=None):
env.host_string = "%s@%s:%s" % (self.user, self.host, self.port)
if self.empty is None:
with hide("output"):
self.empty = str(run("echo"))
if cd is None:
cd = "/home/{}".format(self.user)
with (fab_cd(cd)):
with fab_settings(warn_only=warn_only):
with hide("output"):
output = str(run(command))
text = output[len(self.empty) + 1 :].replace("\r", "").strip()
return text
[docs] def get(self, source, dest):
env.host_string = "%s@%s:%s" % (self.user, self.host, self.port)
get(source, dest)
[docs] def put(self, source, dest):
env.host_string = "%s@%s:%s" % (self.user, self.host, self.port)
put(source, dest)
if settings is None:
myfab = None
else:
myfab = MyFabric()