mirror of
1
0
Fork 0

Cleanup link logic, with a fix for windows links with \\?\ in them.

* reduce ci matrix

* python runscript without bash

(cherry picked from commit 9b148a6679722db5eb7ffabd3a27a8579f296319)

* change link dest function to handle '\?\' links

* add path normalization for windows support

* Revert "add path normalization for windows support"

This reverts commit 2ab0fc1b3c.

* link variable extraction without normpath

* type annotation

* blacken

* missing black files

* variable renames from '2775765a' outside link function

* from '2775765a' use method for default flags

* fix defaults from method

* variable renames from '2775765a' in link function and method renames

* refactor if clauses into blocks

* maybe fix if refactor

* remove unreachable code

* remove silly disambiguation semantics

* remove silly disambiguation semantics 2

* incremental else swap

* bring source existence check to front

* bring source existence check to front and remove old back check

* refactor almost final case

* check symlink broken cases up front

* add return missing

* flip block order to make things easier to understand
This commit is contained in:
m-richards 2021-02-16 19:32:13 +10:00 committed by GitHub
parent 4c961db076
commit 47f3e07b2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 423 additions and 332 deletions

View File

@ -2,14 +2,12 @@ name: CI
on:
push:
pull_request:
schedule:
- cron: '0 8 * * 6'
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
python: [2.7, pypy2, 3.4, 3.5, 3.6, 3.7, 3.8, pypy3]
python: [3.6, 3.7, 3.9]
name: Python ${{ matrix.python }}
steps:
- uses: actions/checkout@v2

View File

@ -0,0 +1,49 @@
ARGS_TO_SCRIPT = (
"python >/dev/null /c/Users/Matt/dotfiles/dotbot/bin/dotbot -d /c/Users/Matt/dotfiles -c install.conf.yaml"
)
# These arguments are only linux/ bash safe
"""Script port from /c/Users/Matt/dotfiles/dotbot/bin/dotbot"""
import sys, os
DOTFILE_DIR = os.path.normpath(os.path.expandvars("%USERPROFILE%/dotfiles/"))
DIRECTORY_ARG = os.path.normpath(DOTFILE_DIR)
CONFIG_FILE_ARG = "install.conf.yaml"
sys.path.append(DOTFILE_DIR)
print(DOTFILE_DIR)
os.chdir(DOTFILE_DIR)
# PROJECT_ROOT_DIRECTORY = os.path.dirname(
# os.path.dirname(os.path.realpath(__file__)))
# print(PROJECT_ROOT_DIRECTORY)
PROJECT_ROOT_DIRECTORY = DOTFILE_DIR
def inject(lib_path):
path = os.path.join(PROJECT_ROOT_DIRECTORY, "lib", lib_path)
sys.path.insert(0, path)
# version dependent libraries
if sys.version_info[0] >= 3:
inject("pyyaml/lib3")
else:
inject("pyyaml/lib")
if os.path.exists(os.path.join(PROJECT_ROOT_DIRECTORY, "dotbot")):
if PROJECT_ROOT_DIRECTORY not in sys.path:
sys.path.insert(0, PROJECT_ROOT_DIRECTORY)
os.putenv("PYTHONPATH", PROJECT_ROOT_DIRECTORY)
import dotbot
def main():
dotbot.cli.main(additional_args=["-d", DIRECTORY_ARG, "-c", CONFIG_FILE_ARG])
if __name__ == "__main__":
main()

View File

@ -1,4 +1,4 @@
from .cli import main
from .plugin import Plugin
__version__ = '1.18.0'
__version__ = "1.18.0"

View File

@ -11,47 +11,48 @@ from .util import module
import dotbot
import yaml
def add_options(parser):
parser.add_argument('-Q', '--super-quiet', action='store_true',
help='suppress almost all output')
parser.add_argument('-q', '--quiet', action='store_true',
help='suppress most output')
parser.add_argument('-v', '--verbose', action='store_true',
help='enable verbose output')
parser.add_argument('-d', '--base-directory',
help='execute commands from within BASEDIR',
metavar='BASEDIR')
parser.add_argument('-c', '--config-file',
help='run commands given in CONFIGFILE', metavar='CONFIGFILE')
parser.add_argument('-p', '--plugin', action='append', dest='plugins', default=[],
help='load PLUGIN as a plugin', metavar='PLUGIN')
parser.add_argument('--disable-built-in-plugins',
action='store_true', help='disable built-in plugins')
parser.add_argument('--plugin-dir', action='append', dest='plugin_dirs', default=[],
metavar='PLUGIN_DIR', help='load all plugins in PLUGIN_DIR')
parser.add_argument('--only', nargs='+',
help='only run specified directives', metavar='DIRECTIVE')
parser.add_argument('--except', nargs='+', dest='skip',
help='skip specified directives', metavar='DIRECTIVE')
parser.add_argument('--force-color', dest='force_color', action='store_true',
help='force color output')
parser.add_argument('--no-color', dest='no_color', action='store_true',
help='disable color output')
parser.add_argument('--version', action='store_true',
help='show program\'s version number and exit')
parser.add_argument("-Q", "--super-quiet", action="store_true", help="suppress almost all output")
parser.add_argument("-q", "--quiet", action="store_true", help="suppress most output")
parser.add_argument("-v", "--verbose", action="store_true", help="enable verbose output")
parser.add_argument("-d", "--base-directory", help="execute commands from within BASEDIR", metavar="BASEDIR")
parser.add_argument("-c", "--config-file", help="run commands given in CONFIGFILE", metavar="CONFIGFILE")
parser.add_argument(
"-p", "--plugin", action="append", dest="plugins", default=[], help="load PLUGIN as a plugin", metavar="PLUGIN"
)
parser.add_argument("--disable-built-in-plugins", action="store_true", help="disable built-in plugins")
parser.add_argument(
"--plugin-dir",
action="append",
dest="plugin_dirs",
default=[],
metavar="PLUGIN_DIR",
help="load all plugins in PLUGIN_DIR",
)
parser.add_argument("--only", nargs="+", help="only run specified directives", metavar="DIRECTIVE")
parser.add_argument("--except", nargs="+", dest="skip", help="skip specified directives", metavar="DIRECTIVE")
parser.add_argument("--force-color", dest="force_color", action="store_true", help="force color output")
parser.add_argument("--no-color", dest="no_color", action="store_true", help="disable color output")
parser.add_argument("--version", action="store_true", help="show program's version number and exit")
def read_config(config_file):
reader = ConfigReader(config_file)
return reader.get_config()
def main():
def main(additional_args=None):
log = Messenger()
try:
parser = ArgumentParser()
add_options(parser)
options = parser.parse_args()
if additional_args is not None:
print("got explicit argumenets")
options = parser.parse_args(additional_args)
if options.version:
print('Dotbot version %s (yaml: %s)' % (dotbot.__version__, yaml.__version__))
print("Dotbot version %s (yaml: %s)" % (dotbot.__version__, yaml.__version__))
exit(0)
if options.super_quiet:
log.set_level(Level.WARNING)
@ -75,22 +76,23 @@ def main():
from .plugins import Clean, Create, Link, Shell
plugin_paths = []
for directory in plugin_directories:
for plugin_path in glob.glob(os.path.join(directory, '*.py')):
plugin_paths.append(plugin_path)
for plugin_path in glob.glob(os.path.join(directory, "*.py")):
plugin_paths.append(plugin_path)
for plugin_path in options.plugins:
plugin_paths.append(plugin_path)
for plugin_path in plugin_paths:
abspath = os.path.abspath(plugin_path)
module.load(abspath)
if not options.config_file:
log.error('No configuration file specified')
log.error("No configuration file specified")
exit(1)
# read tasks from config file
tasks = read_config(options.config_file)
if tasks is None:
log.warning('Configuration file is empty, no work to do')
log.warning("Configuration file is empty, no work to do")
tasks = []
if not isinstance(tasks, list):
raise ReadingError('Configuration file must be a list of tasks')
raise ReadingError("Configuration file must be a list of tasks")
if options.base_directory:
base_directory = os.path.abspath(options.base_directory)
else:
@ -100,12 +102,12 @@ def main():
dispatcher = Dispatcher(base_directory, only=options.only, skip=options.skip)
success = dispatcher.dispatch(tasks)
if success:
log.info('\n==> All tasks executed successfully')
log.info("\n==> All tasks executed successfully")
else:
raise DispatchError('\n==> Some tasks were not executed successfully')
raise DispatchError("\n==> Some tasks were not executed successfully")
except (ReadingError, DispatchError) as e:
log.error('%s' % e)
log.error("%s" % e)
exit(1)
except KeyboardInterrupt:
log.error('\n==> Operation aborted')
log.error("\n==> Operation aborted")
exit(1)

View File

@ -3,6 +3,7 @@ import json
import os.path
from .util import string
class ConfigReader(object):
def __init__(self, config_file_path):
self._config = self._read(config_file_path)
@ -11,17 +12,18 @@ class ConfigReader(object):
try:
_, ext = os.path.splitext(config_file_path)
with open(config_file_path) as fin:
if ext == '.json':
if ext == ".json":
data = json.load(fin)
else:
data = yaml.safe_load(fin)
return data
except Exception as e:
msg = string.indent_lines(str(e))
raise ReadingError('Could not read config file:\n%s' % msg)
raise ReadingError("Could not read config file:\n%s" % msg)
def get_config(self):
return self._config
class ReadingError(Exception):
pass

View File

@ -1,10 +1,11 @@
import copy
import os
class Context(object):
'''
"""
Contextual data and information for plugins.
'''
"""
def __init__(self, base_directory):
self._base_directory = base_directory

View File

@ -4,6 +4,7 @@ from .messenger import Messenger
from .context import Context
import traceback
class Dispatcher(object):
def __init__(self, base_directory, only=None, skip=None):
self._log = Messenger()
@ -13,25 +14,27 @@ class Dispatcher(object):
self._skip = skip
def _setup_context(self, base_directory):
path = os.path.abspath(
os.path.expanduser(base_directory))
path = os.path.abspath(os.path.expanduser(base_directory))
if not os.path.exists(path):
raise DispatchError('Nonexistent base directory')
raise DispatchError("Nonexistent base directory")
self._context = Context(path)
def dispatch(self, tasks):
success = True
for task in tasks:
for action in task.keys():
if (self._only is not None and action not in self._only \
or self._skip is not None and action in self._skip) \
and action != 'defaults':
self._log.info('Skipping action %s' % action)
if (
self._only is not None
and action not in self._only
or self._skip is not None
and action in self._skip
) and action != "defaults":
self._log.info("Skipping action %s" % action)
continue
handled = False
# print("\tcurrent action", action)
if action == 'defaults':
self._context.set_defaults(task[action]) # replace, not update
if action == "defaults":
self._context.set_defaults(task[action]) # replace, not update
handled = True
# keep going, let other plugins handle this if they want
for plugin in self._plugins:
@ -44,9 +47,7 @@ class Dispatcher(object):
except Exception as err:
print("failure", err)
traceback.print_exception(type(err), err, err.__traceback__)
self._log.error(
'An error was encountered while executing action "%s"' %
action)
self._log.error('An error was encountered while executing action "%s"' % action)
self._log.debug(err)
if not handled:
success = False
@ -54,8 +55,8 @@ class Dispatcher(object):
return success
def _load_plugins(self):
self._plugins = [plugin(self._context)
for plugin in Plugin.__subclasses__()]
self._plugins = [plugin(self._context) for plugin in Plugin.__subclasses__()]
class DispatchError(Exception):
pass

View File

@ -1,8 +1,8 @@
class Color(object):
NONE = ''
RESET = '\033[0m'
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
MAGENTA = '\033[95m'
NONE = ""
RESET = "\033[0m"
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
MAGENTA = "\033[95m"

View File

@ -3,8 +3,9 @@ from ..util.compat import with_metaclass
from .color import Color
from .level import Level
class Messenger(with_metaclass(Singleton, object)):
def __init__(self, level = Level.LOWINFO):
def __init__(self, level=Level.LOWINFO):
self.set_level(level)
self.use_color(True)
@ -15,8 +16,8 @@ class Messenger(with_metaclass(Singleton, object)):
self._use_color = yesno
def log(self, level, message):
if (level >= self._level):
print('%s%s%s' % (self._color(level), message, self._reset()))
if level >= self._level:
print("%s%s%s" % (self._color(level), message, self._reset()))
def debug(self, message):
self.log(Level.DEBUG, message)
@ -34,13 +35,13 @@ class Messenger(with_metaclass(Singleton, object)):
self.log(Level.ERROR, message)
def _color(self, level):
'''
"""
Get a color (terminal escape sequence) according to a level.
'''
"""
if not self._use_color:
return ''
return ""
elif level < Level.DEBUG:
return ''
return ""
elif Level.DEBUG <= level < Level.LOWINFO:
return Color.YELLOW
elif Level.LOWINFO <= level < Level.INFO:
@ -53,10 +54,10 @@ class Messenger(with_metaclass(Singleton, object)):
return Color.RED
def _reset(self):
'''
"""
Get a reset color (terminal escape sequence).
'''
"""
if not self._use_color:
return ''
return ""
else:
return Color.RESET

View File

@ -1,25 +1,26 @@
from .messenger import Messenger
from .context import Context
class Plugin(object):
'''
"""
Abstract base class for commands that process directives.
'''
"""
def __init__(self, context):
self._context = context
self._log = Messenger()
def can_handle(self, directive):
'''
"""
Returns true if the Plugin can handle the directive.
'''
"""
raise NotImplementedError
def handle(self, directive, data):
'''
"""
Executes the directive.
Returns true if the Plugin successfully handled the directive.
'''
"""
raise NotImplementedError

View File

@ -3,45 +3,45 @@ import dotbot
class Clean(dotbot.Plugin):
'''
"""
Cleans broken symbolic links.
'''
"""
_directive = 'clean'
_directive = "clean"
def can_handle(self, directive):
return directive == self._directive
def handle(self, directive, data):
if directive != self._directive:
raise ValueError('Clean cannot handle directive %s' % directive)
raise ValueError("Clean cannot handle directive %s" % directive)
return self._process_clean(data)
def _process_clean(self, targets):
success = True
defaults = self._context.defaults().get(self._directive, {})
for target in targets:
force = defaults.get('force', False)
recursive = defaults.get('recursive', False)
force = defaults.get("force", False)
recursive = defaults.get("recursive", False)
if isinstance(targets, dict) and isinstance(targets[target], dict):
force = targets[target].get('force', force)
recursive = targets[target].get('recursive', recursive)
force = targets[target].get("force", force)
recursive = targets[target].get("recursive", recursive)
success &= self._clean(target, force, recursive)
if success:
self._log.info('All targets have been cleaned')
self._log.info("All targets have been cleaned")
else:
self._log.error('Some targets were not successfully cleaned')
self._log.error("Some targets were not successfully cleaned")
return success
def _clean(self, target, force, recursive):
'''
"""
Cleans all the broken symbolic links in target if they point to
a subdirectory of the base directory or if forced to clean.
'''
"""
target_normalised = os.path.normpath(os.path.expandvars(os.path.expanduser(target)))
isdir = os.path.isdir(target_normalised)
if isdir is False:
self._log.debug('Ignoring nonexistent directory %s' % target)
self._log.debug("Ignoring nonexistent directory %s" % target)
return True
for item in os.listdir(target_normalised):
path = os.path.join(target_normalised, item)
@ -53,16 +53,16 @@ class Clean(dotbot.Plugin):
if not os.path.exists(path) and os.path.islink(path):
points_at = os.path.join(os.path.dirname(path), os.readlink(path))
if self._in_directory(path, self._context.base_directory()) or force:
self._log.lowinfo('Removing invalid link %s -> %s' % (path, points_at))
self._log.lowinfo("Removing invalid link %s -> %s" % (path, points_at))
os.remove(path)
else:
self._log.lowinfo('Link %s -> %s not removed.' % (path, points_at))
self._log.lowinfo("Link %s -> %s not removed." % (path, points_at))
return True
def _in_directory(self, path, directory):
'''
"""
Returns true if the path is in the directory.
'''
directory = os.path.join(os.path.realpath(directory), '')
"""
directory = os.path.join(os.path.realpath(directory), "")
path = os.path.realpath(path)
return os.path.commonprefix([path, directory]) == directory

View File

@ -3,18 +3,18 @@ import dotbot
class Create(dotbot.Plugin):
'''
"""
Create empty paths.
'''
"""
_directive = 'create'
_directive = "create"
def can_handle(self, directive):
return directive == self._directive
def handle(self, directive, data):
if directive != self._directive:
raise ValueError('Create cannot handle directive %s' % directive)
raise ValueError("Create cannot handle directive %s" % directive)
return self._process_paths(data)
def _process_paths(self, paths):
@ -23,28 +23,28 @@ class Create(dotbot.Plugin):
path = os.path.normpath(os.path.expandvars(os.path.expanduser(path)))
success &= self._create(path)
if success:
self._log.info('All paths have been set up')
self._log.info("All paths have been set up")
else:
self._log.error('Some paths were not successfully set up')
self._log.error("Some paths were not successfully set up")
return success
def _exists(self, path):
'''
"""
Returns true if the path exists.
'''
"""
path = os.path.expanduser(path)
return os.path.exists(path)
def _create(self, path):
success = True
if not self._exists(path):
self._log.debug('Trying to create path %s' % path)
self._log.debug("Trying to create path %s" % path)
try:
self._log.lowinfo('Creating path %s' % path)
self._log.lowinfo("Creating path %s" % path)
os.makedirs(path)
except OSError:
self._log.warning('Failed to create path %s' % path)
self._log.warning("Failed to create path %s" % path)
success = False
else:
self._log.lowinfo('Path exists %s' % path)
self._log.lowinfo("Path exists %s" % path)
return success

View File

@ -7,48 +7,63 @@ import subprocess
class Link(dotbot.Plugin):
'''
"""
Symbolically links dotfiles.
'''
"""
_directive = 'link'
_directive = "link"
def can_handle(self, directive):
return directive == self._directive
def handle(self, directive, data):
if directive != self._directive:
raise ValueError('Link cannot handle directive %s' % directive)
raise ValueError("Link cannot handle directive %s" % directive)
return self._process_links(data)
def _process_links(self, links):
def _get_default_flags(self):
"""Get flags for process links from default file."""
defaults = self._context.defaults().get("link", {})
relative = defaults.get("relative", False)
canonical_path = defaults.get("canonicalize-path", True)
force = defaults.get("force", False)
relink = defaults.get("relink", False)
create = defaults.get("create", False)
use_glob = defaults.get("glob", False)
test = defaults.get("if", None)
ignore_missing = defaults.get("ignore-missing", False)
return relative, canonical_path, force, relink, create, use_glob, test, ignore_missing
def _process_links(self, links_dict):
# print("symlinking\n\t", links)
success = True
defaults = self._context.defaults().get('link', {})
for destination, source in links.items():
(relative_default, canonical_path_default, force_flag_default, relink_flag_default,
create_dir_flag_default, use_glob_default, shell_command_default, ignore_missing_default) = self._get_default_flags()
for destination, source_dict in links_dict.items():
destination = os.path.expandvars(destination)
relative = defaults.get('relative', False)
canonical_path = defaults.get('canonicalize-path', True)
force = defaults.get('force', False)
relink = defaults.get('relink', False)
create = defaults.get('create', False)
use_glob = defaults.get('glob', False)
test = defaults.get('if', None)
ignore_missing = defaults.get('ignore-missing', False)
if isinstance(source, dict):
if isinstance(source_dict, dict):
path = self._default_source(destination, source_dict.get("path"))
# extended config
test = source.get('if', test)
relative = source.get('relative', relative)
canonical_path = source.get('canonicalize-path', canonical_path)
force = source.get('force', force)
relink = source.get('relink', relink)
create = source.get('create', create)
use_glob = source.get('glob', use_glob)
ignore_missing = source.get('ignore-missing', ignore_missing)
path = self._default_source(destination, source.get('path'))
shell_command = source_dict.get("if", shell_command_default)
relative = source_dict.get("relative", relative_default)
canonical_path = source_dict.get("canonicalize-path", canonical_path_default)
force_flag = source_dict.get("force", force_flag_default)
relink_flag = source_dict.get("relink", relink_flag_default)
create_dir_flag = source_dict.get("create", create_dir_flag_default)
use_glob = source_dict.get("glob", use_glob_default)
ignore_missing = source_dict.get("ignore-missing", ignore_missing_default)
else:
path = self._default_source(destination, source)
if test is not None and not self._test_success(test):
self._log.lowinfo('Skipping %s' % destination)
path = self._default_source(destination, source_dict)
(shell_command, relative, canonical_path, force_flag, relink_flag,
create_dir_flag, use_glob, ignore_missing) = (shell_command_default, relative_default, canonical_path_default, force_flag_default, relink_flag_default,
create_dir_flag_default, use_glob_default, ignore_missing_default)
if shell_command is not None and not self._test_success(shell_command):
self._log.lowinfo("Skipping %s" % destination)
continue
path = os.path.expandvars(os.path.expanduser(path))
if use_glob:
@ -58,63 +73,69 @@ class Link(dotbot.Plugin):
self._log.warning("Globbing couldn't find anything matching " + str(path))
success = False
continue
glob_star_loc = path.find('*')
if glob_star_loc == -1 and destination[-1] == '/':
glob_star_loc = path.find("*")
if glob_star_loc == -1 and destination[-1] == "/":
self._log.error("Ambiguous action requested.")
self._log.error("No wildcard in glob, directory use undefined: " +
destination + " -> " + str(glob_results))
self._log.error(
"No wildcard in glob, directory use undefined: " + destination + " -> " + str(glob_results)
)
self._log.warning("Did you want to link the directory or into it?")
success = False
continue
elif glob_star_loc == -1 and len(glob_results) == 1:
# perform a normal link operation
if create:
success &= self._create(destination)
if force or relink:
success &= self._delete(path, destination, relative, canonical_path, force)
if create_dir_flag:
success &= self._create_dir(destination)
if force_flag or relink_flag:
success &= self._delete(path, destination, relative, canonical_path, force_flag)
success &= self._link(path, destination, relative, canonical_path, ignore_missing)
else:
self._log.lowinfo("Globs from '" + path + "': " + str(glob_results))
glob_base = path[:glob_star_loc]
for glob_full_item in glob_results:
glob_item = glob_full_item[len(glob_base):]
glob_item = glob_full_item[len(glob_base) :]
glob_link_destination = os.path.join(destination, glob_item)
if create:
success &= self._create(glob_link_destination)
if force or relink:
success &= self._delete(glob_full_item, glob_link_destination, relative, canonical_path, force)
success &= self._link(glob_full_item, glob_link_destination, relative, canonical_path, ignore_missing)
else:
if create:
success &= self._create(destination)
if not ignore_missing and not self._exists(os.path.join(self._context.base_directory(), path)):
if create_dir_flag:
success &= self._create_dir(glob_link_destination)
if force_flag or relink_flag:
success &= self._delete(
glob_full_item, glob_link_destination, relative, canonical_path, force_flag
)
success &= self._link(
glob_full_item, glob_link_destination, relative, canonical_path, ignore_missing
)
else: # not using glob:
if create_dir_flag:
success &= self._create_dir(destination)
if ignore_missing is False and self._exists(
os.path.join(self._context.base_directory(), path)
) is False:
# we seemingly check this twice (here and in _link) because
# if the file doesn't exist and force is True, we don't
# want to remove the original (this is tested by
# link-force-leaves-when-nonexistent.bash)
success = False
self._log.warning('Nonexistent source %s -> %s' %
(destination, path))
self._log.warning("Nonexistent source %s -> %s" % (destination, path))
continue
if force or relink:
success &= self._delete(path, destination, relative, canonical_path, force)
if force_flag or relink_flag:
success &= self._delete(path, destination, relative, canonical_path, force_flag)
success &= self._link(path, destination, relative, canonical_path, ignore_missing)
if success:
self._log.info('All links have been set up')
self._log.info("All links have been set up")
else:
self._log.error('Some links were not successfully set up')
self._log.error("Some links were not successfully set up")
return success
def _test_success(self, command):
ret = dotbot.util.shell_command(command, cwd=self._context.base_directory())
if ret != 0:
self._log.debug('Test \'%s\' returned false' % command)
self._log.debug("Test '%s' returned false" % command)
return ret == 0
def _default_source(self, destination, source):
if source is None:
basename = os.path.basename(destination)
if basename.startswith('.'):
if basename.startswith("."):
return basename[1:]
else:
return basename
@ -122,26 +143,44 @@ class Link(dotbot.Plugin):
return source
def _is_link(self, path):
'''
"""
Returns true if the path is a symbolic link.
'''
"""
return os.path.islink(os.path.expanduser(path))
def _link_destination(self, path):
'''
Returns the destination of the symbolic link.
'''
def _get_link_destination(self, path):
"""
Returns the destination of the symbolic link. Truncates the \\?\ start to a path if it
is present. This is an identifier which allows >255 character file name links to work.
Since this function is for the point of comparison, it is okay to truncate
"""
# path = os.path.normpath(path)
path = os.path.expanduser(path)
return os.readlink(path)
try:
read_link = os.readlink(path)
# Read link can return paths starting with \\?\ - this allows over the 255 file name
# limit
except OSError as e:
if "[WinError 4390] The file or directory is not a reparse point" in str(e) and os.path.isdir(path):
return "UNLINKED_DIR"
return "OSERROR_READING_LINK"
except Exception as e:
print(e)
return "GENERAL_EXCEPTION_READING_LINK"
else:
if read_link.startswith("\\\\?\\"):
read_link = read_link.replace("\\\\?\\", "")
return read_link
def _exists(self, path):
'''
Returns true if the path exists.
'''
"""
Returns true if the path exists. Returns false if contains dangling symbolic links.
"""
path = os.path.expanduser(path)
return os.path.exists(path)
def _create(self, path):
def _create_dir(self, path):
"""Create all directories in path if they do not already exist."""
success = True
parent = os.path.abspath(os.path.join(os.path.expanduser(path), os.pardir))
if not self._exists(parent):
@ -149,10 +188,10 @@ class Link(dotbot.Plugin):
try:
os.makedirs(parent)
except OSError:
self._log.warning('Failed to create directory %s' % parent)
self._log.warning("Failed to create directory %s" % parent)
success = False
else:
self._log.lowinfo('Creating directory %s' % parent)
self._log.lowinfo("Creating directory %s" % parent)
return success
def _delete(self, source, path, relative, canonical_path, force):
@ -161,8 +200,9 @@ class Link(dotbot.Plugin):
fullpath = os.path.expanduser(path)
if relative:
source = self._relative_path(source, fullpath)
if ((self._is_link(path) and self._link_destination(path) != source) or
(self._exists(path) and not self._is_link(path))):
if (self._is_link(path) and self._get_link_destination(path) != source) or (
self._exists(path) and not self._is_link(path)
):
removed = False
try:
if os.path.islink(fullpath):
@ -176,66 +216,84 @@ class Link(dotbot.Plugin):
os.remove(fullpath)
removed = True
except OSError:
self._log.warning('Failed to remove %s' % path)
self._log.warning("Failed to remove %s" % path)
success = False
else:
if removed:
self._log.lowinfo('Removing %s' % path)
self._log.lowinfo("Removing %s" % path)
return success
def _relative_path(self, source, destination):
'''
"""
Returns the relative path to get to the source file from the
destination file.
'''
"""
destination_dir = os.path.dirname(destination)
return os.path.relpath(source, destination_dir)
def _link(self, source, link_name, relative, canonical_path, ignore_missing):
'''
def _link(self, dotfile_source, target_path_to_link_at, relative_path, canonical_path, ignore_missing):
"""
Links link_name to source.
:param target_path_to_link_at is the file path where we are putting a symlink back to
dotfile_source
Returns true if successfully linked files.
'''
success = False
destination = os.path.expanduser(link_name)
"""
success_flag = False
destination = os.path.expanduser(target_path_to_link_at)
base_directory = self._context.base_directory(canonical_path=canonical_path)
absolute_source = os.path.join(base_directory, source)
if relative:
source = self._relative_path(absolute_source, destination)
absolute_source = os.path.join(base_directory, dotfile_source)
# Check source directory exists unless we ignore missing
if ignore_missing is False and self._exists(absolute_source) is False:
self._log.warning("Nonexistent source %s <-> %s" % (
target_path_to_link_at, dotfile_source))
return success_flag
if relative_path:
dotfile_source = self._relative_path(absolute_source, destination)
else:
source = absolute_source
if (not self._exists(link_name) and self._is_link(link_name) and
self._link_destination(link_name) != source):
self._log.warning('Invalid link %s -> %s' %
(link_name, self._link_destination(link_name)))
# we need to use absolute_source below because our cwd is the dotfiles
# directory, and if source is relative, it will be relative to the
# destination directory
elif not self._exists(link_name) and (ignore_missing or self._exists(absolute_source)):
dotfile_source = absolute_source
target_path_exists: bool = self._exists(target_path_to_link_at)
target_file_is_link: bool = self._is_link(target_path_to_link_at)
# get the file/ folder the symlink (located at the target path) is pointed to
symlink_dest_at_target_path: str = self._get_link_destination(target_path_to_link_at)
# Check case of links are present but incorrect
if target_file_is_link and (symlink_dest_at_target_path != dotfile_source):
if target_path_exists:
self._log.warning("Incorrect link (link exists but target is incorrect) %s -> %s"
% (target_path_to_link_at, symlink_dest_at_target_path))
else:
# Symlink is broken or dangling
self._log.warning("Symlink Invalid %s -> %s" % (target_path_to_link_at,
symlink_dest_at_target_path))
return success_flag
if target_path_exists: # file/ folder we want to put symlink in already exists
if target_file_is_link: # already checked if link pointed to wrong location,
# so if it's a link we know it's correct
self._log.lowinfo("Link exists %s -> %s" % (target_path_to_link_at, dotfile_source))
success_flag = True
return success_flag
else: # Not a link
self._log.warning(
"%s already exists but is a regular file or directory" % target_path_to_link_at)
return success_flag
else:
# target path doesn't exist already, so we try to create the symlink
try:
os.symlink(source, destination)
os.symlink(dotfile_source, destination)
except OSError:
self._log.warning('Linking failed %s -> %s' % (link_name, source))
self._log.warning("Linking failed %s -> %s" % (target_path_to_link_at, dotfile_source))
except Exception as e:
print(
f"SYMLINK FAILED with arguments os.symlink({dotfile_source}, {destination})",
)
raise e
else:
self._log.lowinfo('Creating link %s -> %s' % (link_name, source))
success = True
elif self._exists(link_name) and not self._is_link(link_name):
self._log.warning(
'%s already exists but is a regular file or directory' %
link_name)
elif self._is_link(link_name) and self._link_destination(link_name) != source:
self._log.warning('Incorrect link %s -> %s' %
(link_name, self._link_destination(link_name)))
# again, we use absolute_source to check for existence
elif not self._exists(absolute_source):
if self._is_link(link_name):
self._log.warning('Nonexistent source %s -> %s' %
(link_name, source))
else:
self._log.warning('Nonexistent source for %s : %s' %
(link_name, source))
else:
self._log.lowinfo('Link exists %s -> %s' % (link_name, source))
success = True
return success
self._log.lowinfo("Creating link %s -> %s" % (target_path_to_link_at, dotfile_source))
success_flag = True
return success_flag

View File

@ -5,36 +5,35 @@ import dotbot.util
class Shell(dotbot.Plugin):
'''
"""
Run arbitrary shell commands.
'''
"""
_directive = 'shell'
_directive = "shell"
def can_handle(self, directive):
return directive == self._directive
def handle(self, directive, data):
if directive != self._directive:
raise ValueError('Shell cannot handle directive %s' %
directive)
raise ValueError("Shell cannot handle directive %s" % directive)
return self._process_commands(data)
def _process_commands(self, data):
success = True
defaults = self._context.defaults().get('shell', {})
defaults = self._context.defaults().get("shell", {})
for item in data:
stdin = defaults.get('stdin', False)
stdout = defaults.get('stdout', False)
stderr = defaults.get('stderr', False)
quiet = defaults.get('quiet', False)
stdin = defaults.get("stdin", False)
stdout = defaults.get("stdout", False)
stderr = defaults.get("stderr", False)
quiet = defaults.get("quiet", False)
if isinstance(item, dict):
cmd = item['command']
msg = item.get('description', None)
stdin = item.get('stdin', stdin)
stdout = item.get('stdout', stdout)
stderr = item.get('stderr', stderr)
quiet = item.get('quiet', quiet)
cmd = item["command"]
msg = item.get("description", None)
stdin = item.get("stdin", stdin)
stdout = item.get("stdout", stdout)
stderr = item.get("stderr", stderr)
quiet = item.get("quiet", quiet)
elif isinstance(item, list):
cmd = item[0]
msg = item[1] if len(item) > 1 else None
@ -44,21 +43,17 @@ class Shell(dotbot.Plugin):
if msg is None:
self._log.lowinfo(cmd)
elif quiet:
self._log.lowinfo('%s' % msg)
self._log.lowinfo("%s" % msg)
else:
self._log.lowinfo('%s [%s]' % (msg, cmd))
self._log.lowinfo("%s [%s]" % (msg, cmd))
ret = dotbot.util.shell_command(
cmd,
cwd=self._context.base_directory(),
enable_stdin=stdin,
enable_stdout=stdout,
enable_stderr=stderr
cmd, cwd=self._context.base_directory(), enable_stdin=stdin, enable_stdout=stdout, enable_stderr=stderr
)
if ret != 0:
success = False
self._log.warning('Command [%s] failed' % cmd)
self._log.warning("Command [%s] failed" % cmd)
if success:
self._log.info('All commands have been executed')
self._log.info("All commands have been executed")
else:
self._log.error('Some commands were not successfully executed')
self._log.error("Some commands were not successfully executed")
return success

View File

@ -4,12 +4,12 @@ import platform
def shell_command(command, cwd=None, enable_stdin=False, enable_stdout=False, enable_stderr=False):
with open(os.devnull, 'w') as devnull_w, open(os.devnull, 'r') as devnull_r:
with open(os.devnull, "w") as devnull_w, open(os.devnull, "r") as devnull_r:
stdin = None if enable_stdin else devnull_r
stdout = None if enable_stdout else devnull_w
stderr = None if enable_stderr else devnull_w
executable = os.environ.get('SHELL')
if platform.system() == 'Windows':
executable = os.environ.get("SHELL")
if platform.system() == "Windows":
# We avoid setting the executable kwarg on Windows because it does
# not have the desired effect when combined with shell=True. It
# will result in the correct program being run (e.g. bash), but it
@ -24,11 +24,5 @@ def shell_command(command, cwd=None, enable_stdin=False, enable_stdout=False, en
# `bash -c "..."`.
executable = None
return subprocess.call(
command,
shell=True,
executable=executable,
stdin=stdin,
stdout=stdout,
stderr=stderr,
cwd=cwd
command, shell=True, executable=executable, stdin=stdin, stdout=stdout, stderr=stderr, cwd=cwd
)

View File

@ -2,4 +2,5 @@ def with_metaclass(meta, *bases):
class metaclass(meta):
def __new__(cls, name, this_bases, d):
return meta(name, bases, d)
return type.__new__(metaclass, 'temporary_class', (), {})
return type.__new__(metaclass, "temporary_class", (), {})

View File

@ -3,27 +3,33 @@ import sys, os.path
# We keep references to loaded modules so they don't get garbage collected.
loaded_modules = []
def load(path):
basename = os.path.basename(path)
module_name, extension = os.path.splitext(basename)
plugin = load_module(module_name, path)
loaded_modules.append(plugin)
basename = os.path.basename(path)
module_name, extension = os.path.splitext(basename)
plugin = load_module(module_name, path)
loaded_modules.append(plugin)
if sys.version_info >= (3, 5):
import importlib.util
import importlib.util
def load_module(module_name, path):
spec = importlib.util.spec_from_file_location(module_name, path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def load_module(module_name, path):
spec = importlib.util.spec_from_file_location(module_name, path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
elif sys.version_info >= (3, 3):
from importlib.machinery import SourceFileLoader
from importlib.machinery import SourceFileLoader
def load_module(module_name, path):
return SourceFileLoader(module_name, path).load_module()
def load_module(module_name, path):
return SourceFileLoader(module_name, path).load_module()
else:
import imp
import imp
def load_module(module_name, path):
return imp.load_source(module_name, path)
def load_module(module_name, path):
return imp.load_source(module_name, path)

View File

@ -1,5 +1,6 @@
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)

View File

@ -1,4 +1,4 @@
def indent_lines(string, amount=2, delimiter='\n'):
whitespace = ' ' * amount
sep = '%s%s' % (delimiter, whitespace)
return '%s%s' % (whitespace, sep.join(string.split(delimiter)))
def indent_lines(string, amount=2, delimiter="\n"):
whitespace = " " * amount
sep = "%s%s" % (delimiter, whitespace)
return "%s%s" % (whitespace, sep.join(string.split(delimiter)))

View File

@ -1,5 +1,5 @@
from setuptools import setup, find_packages
from codecs import open # For a consistent encoding
from codecs import open # For a consistent encoding
from os import path
import re
@ -7,81 +7,62 @@ import re
here = path.dirname(__file__)
with open(path.join(here, 'README.md'), encoding='utf-8') as f:
with open(path.join(here, "README.md"), encoding="utf-8") as f:
long_description = f.read()
def read(*names, **kwargs):
with open(
path.join(here, *names),
encoding=kwargs.get("encoding", "utf8")
) as fp:
with open(path.join(here, *names), encoding=kwargs.get("encoding", "utf8")) as fp:
return fp.read()
def find_version(*file_paths):
version_file = read(*file_paths)
version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]",
version_file, re.M)
version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]", version_file, re.M)
if version_match:
return version_match.group(1)
raise RuntimeError("Unable to find version string.")
setup(
name='dotbot',
version=find_version('dotbot', '__init__.py'),
description='A tool that bootstraps your dotfiles',
name="dotbot",
version=find_version("dotbot", "__init__.py"),
description="A tool that bootstraps your dotfiles",
long_description=long_description,
long_description_content_type='text/markdown',
url='https://github.com/anishathalye/dotbot',
author='Anish Athalye',
author_email='me@anishathalye.com',
license='MIT',
long_description_content_type="text/markdown",
url="https://github.com/anishathalye/dotbot",
author="Anish Athalye",
author_email="me@anishathalye.com",
license="MIT",
classifiers=[
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.2',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Topic :: Utilities',
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.2",
"Programming Language :: Python :: 3.3",
"Programming Language :: Python :: 3.4",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6",
"Topic :: Utilities",
],
keywords='dotfiles',
keywords="dotfiles",
packages=find_packages(),
setup_requires=[
'setuptools>=38.6.0',
'wheel>=0.31.0',
"setuptools>=38.6.0",
"wheel>=0.31.0",
],
install_requires=[
'PyYAML>=5.3,<6',
"PyYAML>=5.3,<6",
],
# To provide executable scripts, use entry points in preference to the
# "scripts" keyword. Entry points provide cross-platform support and allow
# pip to create the appropriate form of executable for the target platform.
entry_points={
'console_scripts': [
'dotbot=dotbot:main',
"console_scripts": [
"dotbot=dotbot:main",
],
},
)