mirror of
1
0
Fork 0

Merge remote-tracking branch 'upstream/master' into add-plugin-config-loading

This commit is contained in:
Mike Hennessy 2023-02-24 21:49:57 -05:00
commit 442b8483db
No known key found for this signature in database
101 changed files with 2720 additions and 2405 deletions

View File

@ -15,6 +15,3 @@ indent_size = 4
[*.yml]
indent_size = 2
[*.md]
trim_trailing_whitespace = false

View File

@ -5,17 +5,44 @@ on:
schedule:
- cron: '0 8 * * 6'
jobs:
build:
runs-on: ubuntu-latest
test:
env:
PIP_DISABLE_PIP_VERSION_CHECK: 1
strategy:
fail-fast: false
matrix:
python: [2.7, pypy2, 3.5, 3.6, 3.7, 3.8, 3.9, pypy3]
name: Python ${{ matrix.python }}
os: ["ubuntu-20.04", "macos-latest"]
python: ["2.7", "pypy-2.7", "3.5", "3.6", "3.7", "3.8", "3.9", "3.10", "pypy-3.9"]
include:
- os: "windows-latest"
python: "3.8"
- os: "windows-latest"
python: "3.9"
- os: "windows-latest"
python: "3.10"
runs-on: ${{ matrix.os }}
name: "Test: Python ${{ matrix.python }} on ${{ matrix.os }}"
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
with:
submodules: recursive
- uses: actions/setup-python@v2
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python }}
- run: ./test/test
- name: "Install dependencies"
run: |
python -m pip install --upgrade pip setuptools
python -m pip install tox tox-gh-actions
- name: "Run tests"
run: |
python -m tox
python -m tox -e coverage_report
- uses: codecov/codecov-action@v3
fmt:
name: Format
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v3
- uses: psf/black@stable
- uses: isort/isort-action@v1

7
.gitignore vendored
View File

@ -1,4 +1,11 @@
*.egg-info
*.pyc
.coverage*
.eggs/
.idea/
.tox/
.venv/
build/
coverage.xml
dist/
htmlcov/

View File

@ -67,7 +67,9 @@ touch install.conf.yaml
```
If you are using PowerShell instead of a POSIX shell, you can use the provided
`install.ps1` script instead of `install`.
`install.ps1` script instead of `install`. On Windows, Dotbot only supports
Python 3.8+, and it requires that your account is [allowed to create symbolic
links][windows-symlinks].
To get started, you just need to fill in the `install.conf.yaml` and Dotbot
will take care of the rest. To help you get started we have [an
@ -481,7 +483,7 @@ Copyright (c) 2014-2021 Anish Athalye. Released under the MIT License. See
[init-dotfiles]: https://github.com/Vaelatern/init-dotfiles
[dotfiles-template]: https://github.com/anishathalye/dotfiles_template
[inspiration]: https://github.com/anishathalye/dotbot/wiki/Users
[managing-dotfiles-post]: http://www.anishathalye.com/2014/08/03/managing-your-dotfiles/
[windows-symlinks]: https://learn.microsoft.com/en-us/windows/security/threat-protection/security-policy-settings/create-symbolic-links
[json2yaml]: https://www.json2yaml.com/
[plugins]: https://github.com/anishathalye/dotbot/wiki/Plugins
[wiki]: https://github.com/anishathalye/dotbot/wiki

View File

@ -1,4 +1,4 @@
from .cli import main
from .plugin import Plugin
__version__ = '1.19.0'
__version__ = "1.19.1"

View File

@ -1,54 +1,87 @@
import os, glob
import sys
from argparse import ArgumentParser, RawTextHelpFormatter
from .config import ConfigReader, ReadingError
from .dispatcher import Dispatcher, DispatchError
from .messenger import Messenger
from .messenger import Level
from .util import module
import dotbot
import glob
import os
import subprocess
import sys
from argparse import ArgumentParser, RawTextHelpFormatter
import dotbot
from .config import ConfigReader, ReadingError
from .dispatcher import Dispatcher, DispatchError
from .messenger import Level, Messenger
from .plugins import Clean, Create, Link, Plugins, Shell
from .util import module
def add_options(parser):
parser.add_argument('-Q', '--super-quiet', action='store_true',
help='suppress almost all output')
parser.add_argument('-q', '--quiet', action='store_true',
help='suppress most output')
parser.add_argument('-v', '--verbose', action='count', default=0,
help='enable verbose output\n'
'-v: typical verbose\n'
'-vv: also, set shell commands stderr/stdout to true')
parser.add_argument('-d', '--base-directory',
help='execute commands from within BASEDIR',
metavar='BASEDIR')
parser.add_argument('-c', '--config-file',
help='run commands given in CONFIGFILE', metavar='CONFIGFILE')
parser.add_argument('-p', '--plugin', action='append', dest='plugins', default=[],
help='load PLUGIN as a plugin', metavar='PLUGIN')
parser.add_argument('--disable-built-in-plugins',
action='store_true', help='disable built-in plugins')
parser.add_argument('--plugin-dir', action='append', dest='plugin_dirs', default=[],
metavar='PLUGIN_DIR', help='load all plugins in PLUGIN_DIR')
parser.add_argument('--only', nargs='+',
help='only run specified directives', metavar='DIRECTIVE')
parser.add_argument('--except', nargs='+', dest='skip',
help='skip specified directives', metavar='DIRECTIVE')
parser.add_argument('--force-color', dest='force_color', action='store_true',
help='force color output')
parser.add_argument('--no-color', dest='no_color', action='store_true',
help='disable color output')
parser.add_argument('--version', action='store_true',
help='show program\'s version number and exit')
parser.add_argument('-x', '--exit-on-failure', dest='exit_on_failure', action='store_true',
help='exit after first failed directive')
parser.add_argument(
"-Q", "--super-quiet", action="store_true", help="suppress almost all output"
)
parser.add_argument("-q", "--quiet", action="store_true", help="suppress most output")
parser.add_argument(
"-v",
"--verbose",
action="count",
default=0,
help="enable verbose output\n"
"-v: typical verbose\n"
"-vv: also, set shell commands stderr/stdout to true",
)
parser.add_argument(
"-d", "--base-directory", help="execute commands from within BASEDIR", metavar="BASEDIR"
)
parser.add_argument(
"-c", "--config-file", help="run commands given in CONFIGFILE", metavar="CONFIGFILE"
)
parser.add_argument(
"-p",
"--plugin",
action="append",
dest="plugins",
default=[],
help="load PLUGIN as a plugin",
metavar="PLUGIN",
)
parser.add_argument(
"--disable-built-in-plugins", action="store_true", help="disable built-in plugins"
)
parser.add_argument(
"--plugin-dir",
action="append",
dest="plugin_dirs",
default=[],
metavar="PLUGIN_DIR",
help="load all plugins in PLUGIN_DIR",
)
parser.add_argument(
"--only", nargs="+", help="only run specified directives", metavar="DIRECTIVE"
)
parser.add_argument(
"--except", nargs="+", dest="skip", help="skip specified directives", metavar="DIRECTIVE"
)
parser.add_argument(
"--force-color", dest="force_color", action="store_true", help="force color output"
)
parser.add_argument(
"--no-color", dest="no_color", action="store_true", help="disable color output"
)
parser.add_argument(
"--version", action="store_true", help="show program's version number and exit"
)
parser.add_argument(
"-x",
"--exit-on-failure",
dest="exit_on_failure",
action="store_true",
help="exit after first failed directive",
)
def read_config(config_file):
reader = ConfigReader(config_file)
return reader.get_config()
def main():
log = Messenger()
try:
@ -58,12 +91,15 @@ def main():
if options.version:
try:
with open(os.devnull) as devnull:
git_hash = subprocess.check_output(['git', 'rev-parse', 'HEAD'],
cwd=os.path.dirname(os.path.abspath(__file__)), stderr=devnull)
hash_msg = ' (git %s)' % git_hash[:10]
git_hash = subprocess.check_output(
["git", "rev-parse", "HEAD"],
cwd=os.path.dirname(os.path.abspath(__file__)),
stderr=devnull,
)
hash_msg = " (git %s)" % git_hash[:10]
except (OSError, subprocess.CalledProcessError):
hash_msg = ''
print('Dotbot version %s%s' % (dotbot.__version__, hash_msg))
hash_msg = ""
print("Dotbot version %s%s" % (dotbot.__version__, hash_msg))
exit(0)
if options.super_quiet:
log.set_level(Level.WARNING)
@ -82,43 +118,50 @@ def main():
else:
log.use_color(sys.stdout.isatty())
plugins = []
plugin_directories = list(options.plugin_dirs)
if not options.disable_built_in_plugins:
from .plugins import Clean, Create, Link, Shell, Plugins
plugins.extend([Clean, Create, Link, Plugins, Shell])
plugin_paths = []
for directory in plugin_directories:
for plugin_path in glob.glob(os.path.join(directory, '*.py')):
plugin_paths.append(plugin_path)
for plugin_path in glob.glob(os.path.join(directory, "*.py")):
plugin_paths.append(plugin_path)
for plugin_path in options.plugins:
plugin_paths.append(plugin_path)
for plugin_path in plugin_paths:
abspath = os.path.abspath(plugin_path)
module.load(abspath)
plugins.extend(module.load(abspath))
if not options.config_file:
log.error('No configuration file specified')
log.error("No configuration file specified")
exit(1)
tasks = read_config(options.config_file)
if tasks is None:
log.warning('Configuration file is empty, no work to do')
log.warning("Configuration file is empty, no work to do")
tasks = []
if not isinstance(tasks, list):
raise ReadingError('Configuration file must be a list of tasks')
raise ReadingError("Configuration file must be a list of tasks")
if options.base_directory:
base_directory = os.path.abspath(options.base_directory)
else:
# default to directory of config file
base_directory = os.path.dirname(os.path.abspath(options.config_file))
os.chdir(base_directory)
dispatcher = Dispatcher(base_directory, only=options.only, skip=options.skip,
exit_on_failure=options.exit_on_failure, options=options)
dispatcher = Dispatcher(
base_directory,
only=options.only,
skip=options.skip,
exit_on_failure=options.exit_on_failure,
options=options,
plugins=plugins,
)
success = dispatcher.dispatch(tasks)
if success:
log.info('\n==> All tasks executed successfully')
log.info("\n==> All tasks executed successfully")
else:
raise DispatchError('\n==> Some tasks were not executed successfully')
raise DispatchError("\n==> Some tasks were not executed successfully")
except (ReadingError, DispatchError) as e:
log.error('%s' % e)
log.error("%s" % e)
exit(1)
except KeyboardInterrupt:
log.error('\n==> Operation aborted')
log.error("\n==> Operation aborted")
exit(1)

View File

@ -1,8 +1,11 @@
import yaml
import json
import os.path
import yaml
from .util import string
class ConfigReader(object):
def __init__(self, config_file_path):
self._config = self._read(config_file_path)
@ -11,17 +14,18 @@ class ConfigReader(object):
try:
_, ext = os.path.splitext(config_file_path)
with open(config_file_path) as fin:
if ext == '.json':
if ext == ".json":
data = json.load(fin)
else:
data = yaml.safe_load(fin)
return data
except Exception as e:
msg = string.indent_lines(str(e))
raise ReadingError('Could not read config file:\n%s' % msg)
raise ReadingError("Could not read config file:\n%s" % msg)
def get_config(self):
return self._config
class ReadingError(Exception):
pass

View File

@ -2,10 +2,11 @@ import copy
import os
from argparse import Namespace
class Context(object):
'''
"""
Contextual data and information for plugins.
'''
"""
def __init__(self, base_directory, options=Namespace()):
self._base_directory = base_directory

View File

@ -15,10 +15,12 @@ class Dispatcher(object):
skip=None,
exit_on_failure=False,
options=Namespace(),
plugins=None,
):
self._log = Messenger()
self._setup_context(base_directory, options)
self._load_plugins()
plugins = plugins or []
self._plugins = [plugin(self._context) for plugin in plugins]
self._only = only
self._skip = skip
self._exit = exit_on_failure
@ -60,8 +62,7 @@ class Dispatcher(object):
handled = True
except Exception as err:
self._log.error(
"An error was encountered while executing action %s"
% action
"An error was encountered while executing action %s" % action
)
self._log.debug(err)
if self._exit:
@ -77,9 +78,7 @@ class Dispatcher(object):
if action == "plugins":
# Create a list of loaded plugin names
loaded_plugins = [
plugin.__class__.__name__ for plugin in self._plugins
]
loaded_plugins = [plugin.__class__.__name__ for plugin in self._plugins]
# Load plugins that haven't been loaded yet
for plugin in Plugin.__subclasses__():
@ -88,9 +87,6 @@ class Dispatcher(object):
return success
def _load_plugins(self):
self._plugins = [plugin(self._context) for plugin in Plugin.__subclasses__()]
class DispatchError(Exception):
pass

View File

@ -1,2 +1,2 @@
from .messenger import Messenger
from .level import Level
from .messenger import Messenger

View File

@ -1,8 +1,8 @@
class Color(object):
NONE = ''
RESET = '\033[0m'
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
MAGENTA = '\033[95m'
NONE = ""
RESET = "\033[0m"
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
MAGENTA = "\033[95m"

View File

@ -1,10 +1,11 @@
from ..util.singleton import Singleton
from ..util.compat import with_metaclass
from ..util.singleton import Singleton
from .color import Color
from .level import Level
class Messenger(with_metaclass(Singleton, object)):
def __init__(self, level = Level.LOWINFO):
def __init__(self, level=Level.LOWINFO):
self.set_level(level)
self.use_color(True)
@ -15,8 +16,8 @@ class Messenger(with_metaclass(Singleton, object)):
self._use_color = yesno
def log(self, level, message):
if (level >= self._level):
print('%s%s%s' % (self._color(level), message, self._reset()))
if level >= self._level:
print("%s%s%s" % (self._color(level), message, self._reset()))
def debug(self, message):
self.log(Level.DEBUG, message)
@ -34,13 +35,13 @@ class Messenger(with_metaclass(Singleton, object)):
self.log(Level.ERROR, message)
def _color(self, level):
'''
"""
Get a color (terminal escape sequence) according to a level.
'''
"""
if not self._use_color:
return ''
return ""
elif level < Level.DEBUG:
return ''
return ""
elif Level.DEBUG <= level < Level.LOWINFO:
return Color.YELLOW
elif Level.LOWINFO <= level < Level.INFO:
@ -53,10 +54,10 @@ class Messenger(with_metaclass(Singleton, object)):
return Color.RED
def _reset(self):
'''
"""
Get a reset color (terminal escape sequence).
'''
"""
if not self._use_color:
return ''
return ""
else:
return Color.RESET

View File

@ -1,25 +1,26 @@
from .messenger import Messenger
from .context import Context
from .messenger import Messenger
class Plugin(object):
'''
"""
Abstract base class for commands that process directives.
'''
"""
def __init__(self, context):
self._context = context
self._log = Messenger()
def can_handle(self, directive):
'''
"""
Returns true if the Plugin can handle the directive.
'''
"""
raise NotImplementedError
def handle(self, directive, data):
'''
"""
Executes the directive.
Returns true if the Plugin successfully handled the directive.
'''
"""
raise NotImplementedError

View File

@ -1,48 +1,52 @@
import os
import dotbot
import sys
from ..plugin import Plugin
class Clean(dotbot.Plugin):
'''
class Clean(Plugin):
"""
Cleans broken symbolic links.
'''
"""
_directive = 'clean'
_directive = "clean"
def can_handle(self, directive):
return directive == self._directive
def handle(self, directive, data):
if directive != self._directive:
raise ValueError('Clean cannot handle directive %s' % directive)
raise ValueError("Clean cannot handle directive %s" % directive)
return self._process_clean(data)
def _process_clean(self, targets):
success = True
defaults = self._context.defaults().get(self._directive, {})
for target in targets:
force = defaults.get('force', False)
recursive = defaults.get('recursive', False)
force = defaults.get("force", False)
recursive = defaults.get("recursive", False)
if isinstance(targets, dict) and isinstance(targets[target], dict):
force = targets[target].get('force', force)
recursive = targets[target].get('recursive', recursive)
force = targets[target].get("force", force)
recursive = targets[target].get("recursive", recursive)
success &= self._clean(target, force, recursive)
if success:
self._log.info('All targets have been cleaned')
self._log.info("All targets have been cleaned")
else:
self._log.error('Some targets were not successfully cleaned')
self._log.error("Some targets were not successfully cleaned")
return success
def _clean(self, target, force, recursive):
'''
"""
Cleans all the broken symbolic links in target if they point to
a subdirectory of the base directory or if forced to clean.
'''
"""
if not os.path.isdir(os.path.expandvars(os.path.expanduser(target))):
self._log.debug('Ignoring nonexistent directory %s' % target)
self._log.debug("Ignoring nonexistent directory %s" % target)
return True
for item in os.listdir(os.path.expandvars(os.path.expanduser(target))):
path = os.path.join(os.path.expandvars(os.path.expanduser(target)), item)
path = os.path.abspath(
os.path.join(os.path.expandvars(os.path.expanduser(target)), item)
)
if recursive and os.path.isdir(path):
# isdir implies not islink -- we don't want to descend into
# symlinked directories. okay to do a recursive call here
@ -50,17 +54,19 @@ class Clean(dotbot.Plugin):
self._clean(path, force, recursive)
if not os.path.exists(path) and os.path.islink(path):
points_at = os.path.join(os.path.dirname(path), os.readlink(path))
if sys.platform[:5] == "win32" and points_at.startswith("\\\\?\\"):
points_at = points_at[4:]
if self._in_directory(path, self._context.base_directory()) or force:
self._log.lowinfo('Removing invalid link %s -> %s' % (path, points_at))
self._log.lowinfo("Removing invalid link %s -> %s" % (path, points_at))
os.remove(path)
else:
self._log.lowinfo('Link %s -> %s not removed.' % (path, points_at))
self._log.lowinfo("Link %s -> %s not removed." % (path, points_at))
return True
def _in_directory(self, path, directory):
'''
"""
Returns true if the path is in the directory.
'''
directory = os.path.join(os.path.realpath(directory), '')
"""
directory = os.path.join(os.path.realpath(directory), "")
path = os.path.realpath(path)
return os.path.commonprefix([path, directory]) == directory

View File

@ -1,56 +1,60 @@
import os
import dotbot
from ..plugin import Plugin
class Create(dotbot.Plugin):
'''
class Create(Plugin):
"""
Create empty paths.
'''
"""
_directive = 'create'
_directive = "create"
def can_handle(self, directive):
return directive == self._directive
def handle(self, directive, data):
if directive != self._directive:
raise ValueError('Create cannot handle directive %s' % directive)
raise ValueError("Create cannot handle directive %s" % directive)
return self._process_paths(data)
def _process_paths(self, paths):
success = True
defaults = self._context.defaults().get('create', {})
defaults = self._context.defaults().get("create", {})
for key in paths:
path = os.path.expandvars(os.path.expanduser(key))
mode = defaults.get('mode', 0o777) # same as the default for os.makedirs
path = os.path.abspath(os.path.expandvars(os.path.expanduser(key)))
mode = defaults.get("mode", 0o777) # same as the default for os.makedirs
if isinstance(paths, dict):
options = paths[key]
if options:
mode = options.get('mode', mode)
mode = options.get("mode", mode)
success &= self._create(path, mode)
if success:
self._log.info('All paths have been set up')
self._log.info("All paths have been set up")
else:
self._log.error('Some paths were not successfully set up')
self._log.error("Some paths were not successfully set up")
return success
def _exists(self, path):
'''
"""
Returns true if the path exists.
'''
"""
path = os.path.expanduser(path)
return os.path.exists(path)
def _create(self, path, mode):
success = True
if not self._exists(path):
self._log.debug('Trying to create path %s with mode %o' % (path, mode))
self._log.debug("Trying to create path %s with mode %o" % (path, mode))
try:
self._log.lowinfo('Creating path %s' % path)
self._log.lowinfo("Creating path %s" % path)
os.makedirs(path, mode)
# On Windows, the *mode* argument to `os.makedirs()` is ignored.
# The mode must be set explicitly in a follow-up call.
os.chmod(path, mode)
except OSError:
self._log.warning('Failed to create path %s' % path)
self._log.warning("Failed to create path %s" % path)
success = False
else:
self._log.lowinfo('Path exists %s' % path)
self._log.lowinfo("Path exists %s" % path)
return success

View File

@ -1,88 +1,100 @@
import os
import sys
import glob
import os
import shutil
import dotbot
import dotbot.util
import subprocess
import sys
from ..plugin import Plugin
from ..util import shell_command
class Link(dotbot.Plugin):
'''
class Link(Plugin):
"""
Symbolically links dotfiles.
'''
"""
_directive = 'link'
_directive = "link"
def can_handle(self, directive):
return directive == self._directive
def handle(self, directive, data):
if directive != self._directive:
raise ValueError('Link cannot handle directive %s' % directive)
raise ValueError("Link cannot handle directive %s" % directive)
return self._process_links(data)
def _process_links(self, links):
success = True
defaults = self._context.defaults().get('link', {})
defaults = self._context.defaults().get("link", {})
for destination, source in links.items():
destination = os.path.expandvars(destination)
relative = defaults.get('relative', False)
relative = defaults.get("relative", False)
# support old "canonicalize-path" key for compatibility
canonical_path = defaults.get('canonicalize', defaults.get('canonicalize-path', True))
force = defaults.get('force', False)
relink = defaults.get('relink', False)
create = defaults.get('create', False)
use_glob = defaults.get('glob', False)
base_prefix = defaults.get('prefix', '')
test = defaults.get('if', None)
ignore_missing = defaults.get('ignore-missing', False)
exclude_paths = defaults.get('exclude', [])
canonical_path = defaults.get("canonicalize", defaults.get("canonicalize-path", True))
force = defaults.get("force", False)
relink = defaults.get("relink", False)
create = defaults.get("create", False)
use_glob = defaults.get("glob", False)
base_prefix = defaults.get("prefix", "")
test = defaults.get("if", None)
ignore_missing = defaults.get("ignore-missing", False)
exclude_paths = defaults.get("exclude", [])
if isinstance(source, dict):
# extended config
test = source.get('if', test)
relative = source.get('relative', relative)
canonical_path = source.get('canonicalize', source.get('canonicalize-path', canonical_path))
force = source.get('force', force)
relink = source.get('relink', relink)
create = source.get('create', create)
use_glob = source.get('glob', use_glob)
base_prefix = source.get('prefix', base_prefix)
ignore_missing = source.get('ignore-missing', ignore_missing)
exclude_paths = source.get('exclude', exclude_paths)
path = self._default_source(destination, source.get('path'))
test = source.get("if", test)
relative = source.get("relative", relative)
canonical_path = source.get(
"canonicalize", source.get("canonicalize-path", canonical_path)
)
force = source.get("force", force)
relink = source.get("relink", relink)
create = source.get("create", create)
use_glob = source.get("glob", use_glob)
base_prefix = source.get("prefix", base_prefix)
ignore_missing = source.get("ignore-missing", ignore_missing)
exclude_paths = source.get("exclude", exclude_paths)
path = self._default_source(destination, source.get("path"))
else:
path = self._default_source(destination, source)
if test is not None and not self._test_success(test):
self._log.lowinfo('Skipping %s' % destination)
self._log.lowinfo("Skipping %s" % destination)
continue
path = os.path.expandvars(os.path.expanduser(path))
path = os.path.normpath(os.path.expandvars(os.path.expanduser(path)))
if use_glob:
glob_results = self._create_glob_results(path, exclude_paths)
if len(glob_results) == 0:
self._log.warning("Globbing couldn't find anything matching " + str(path))
success = False
continue
if len(glob_results) == 1 and destination[-1] == '/':
if len(glob_results) == 1 and destination[-1] == "/":
self._log.error("Ambiguous action requested.")
self._log.error("No wildcard in glob, directory use undefined: " +
destination + " -> " + str(glob_results))
self._log.error(
"No wildcard in glob, directory use undefined: "
+ destination
+ " -> "
+ str(glob_results)
)
self._log.warning("Did you want to link the directory or into it?")
success = False
continue
elif len(glob_results) == 1 and destination[-1] != '/':
elif len(glob_results) == 1 and destination[-1] != "/":
# perform a normal link operation
if create:
success &= self._create(destination)
if force or relink:
success &= self._delete(path, destination, relative, canonical_path, force)
success &= self._link(path, destination, relative, canonical_path, ignore_missing)
success &= self._link(
path, destination, relative, canonical_path, ignore_missing
)
else:
self._log.lowinfo("Globs from '" + path + "': " + str(glob_results))
for glob_full_item in glob_results:
# Find common dirname between pattern and the item:
glob_dirname = os.path.dirname(os.path.commonprefix([path, glob_full_item]))
glob_item = (glob_full_item if len(glob_dirname) == 0 else glob_full_item[len(glob_dirname) + 1:])
glob_item = (
glob_full_item
if len(glob_dirname) == 0
else glob_full_item[len(glob_dirname) + 1 :]
)
# Add prefix to basepath, if provided
if base_prefix:
glob_item = base_prefix + glob_item
@ -91,39 +103,52 @@ class Link(dotbot.Plugin):
if create:
success &= self._create(glob_link_destination)
if force or relink:
success &= self._delete(glob_full_item, glob_link_destination, relative, canonical_path, force)
success &= self._link(glob_full_item, glob_link_destination, relative, canonical_path, ignore_missing)
success &= self._delete(
glob_full_item,
glob_link_destination,
relative,
canonical_path,
force,
)
success &= self._link(
glob_full_item,
glob_link_destination,
relative,
canonical_path,
ignore_missing,
)
else:
if create:
success &= self._create(destination)
if not ignore_missing and not self._exists(os.path.join(self._context.base_directory(), path)):
if not ignore_missing and not self._exists(
os.path.join(self._context.base_directory(), path)
):
# we seemingly check this twice (here and in _link) because
# if the file doesn't exist and force is True, we don't
# want to remove the original (this is tested by
# link-force-leaves-when-nonexistent.bash)
success = False
self._log.warning('Nonexistent source %s -> %s' %
(destination, path))
self._log.warning("Nonexistent source %s -> %s" % (destination, path))
continue
if force or relink:
success &= self._delete(path, destination, relative, canonical_path, force)
success &= self._link(path, destination, relative, canonical_path, ignore_missing)
if success:
self._log.info('All links have been set up')
self._log.info("All links have been set up")
else:
self._log.error('Some links were not successfully set up')
self._log.error("Some links were not successfully set up")
return success
def _test_success(self, command):
ret = dotbot.util.shell_command(command, cwd=self._context.base_directory())
ret = shell_command(command, cwd=self._context.base_directory())
if ret != 0:
self._log.debug('Test \'%s\' returned false' % command)
self._log.debug("Test '%s' returned false" % command)
return ret == 0
def _default_source(self, destination, source):
if source is None:
basename = os.path.basename(destination)
if basename.startswith('.'):
if basename.startswith("."):
return basename[1:]
else:
return basename
@ -131,20 +156,22 @@ class Link(dotbot.Plugin):
return source
def _glob(self, path):
'''
"""
Wrap `glob.glob` in a python agnostic way, catching errors in usage.
'''
if (sys.version_info < (3, 5) and '**' in path):
self._log.error('Link cannot handle recursive glob ("**") for Python < version 3.5: "%s"' % path)
"""
if sys.version_info < (3, 5) and "**" in path:
self._log.error(
'Link cannot handle recursive glob ("**") for Python < version 3.5: "%s"' % path
)
return []
# call glob.glob; only python >= 3.5 supports recursive globs
found = ( glob.glob(path)
if (sys.version_info < (3, 5)) else
glob.glob(path, recursive=True) )
found = glob.glob(path) if (sys.version_info < (3, 5)) else glob.glob(path, recursive=True)
# normalize paths to ensure cross-platform compatibility
found = [os.path.normpath(p) for p in found]
# if using recursive glob (`**`), filter results to return only files:
if '**' in path and not path.endswith(str(os.sep)):
self._log.debug("Excluding directories from recursive glob: " + str(path))
found = [f for f in found if os.path.isfile(f)]
if "**" in path and not path.endswith(str(os.sep)):
self._log.debug("Excluding directories from recursive glob: " + str(path))
found = [f for f in found if os.path.isfile(f)]
# return matched results
return found
@ -156,28 +183,31 @@ class Link(dotbot.Plugin):
exclude = []
for expat in exclude_paths:
self._log.debug("Excluding globs with pattern: " + str(expat))
exclude.extend( self._glob(expat) )
exclude.extend(self._glob(expat))
self._log.debug("Excluded globs from '" + path + "': " + str(exclude))
ret = set(include) - set(exclude)
return list(ret)
def _is_link(self, path):
'''
"""
Returns true if the path is a symbolic link.
'''
"""
return os.path.islink(os.path.expanduser(path))
def _link_destination(self, path):
'''
"""
Returns the destination of the symbolic link.
'''
"""
path = os.path.expanduser(path)
return os.readlink(path)
path = os.readlink(path)
if sys.platform[:5] == "win32" and path.startswith("\\\\?\\"):
path = path[4:]
return path
def _exists(self, path):
'''
"""
Returns true if the path exists.
'''
"""
path = os.path.expanduser(path)
return os.path.exists(path)
@ -189,20 +219,21 @@ class Link(dotbot.Plugin):
try:
os.makedirs(parent)
except OSError:
self._log.warning('Failed to create directory %s' % parent)
self._log.warning("Failed to create directory %s" % parent)
success = False
else:
self._log.lowinfo('Creating directory %s' % parent)
self._log.lowinfo("Creating directory %s" % parent)
return success
def _delete(self, source, path, relative, canonical_path, force):
success = True
source = os.path.join(self._context.base_directory(canonical_path=canonical_path), source)
fullpath = os.path.expanduser(path)
fullpath = os.path.abspath(os.path.expanduser(path))
if relative:
source = self._relative_path(source, fullpath)
if ((self._is_link(path) and self._link_destination(path) != source) or
(self._exists(path) and not self._is_link(path))):
if (self._is_link(path) and self._link_destination(path) != source) or (
self._exists(path) and not self._is_link(path)
):
removed = False
try:
if os.path.islink(fullpath):
@ -216,39 +247,44 @@ class Link(dotbot.Plugin):
os.remove(fullpath)
removed = True
except OSError:
self._log.warning('Failed to remove %s' % path)
self._log.warning("Failed to remove %s" % path)
success = False
else:
if removed:
self._log.lowinfo('Removing %s' % path)
self._log.lowinfo("Removing %s" % path)
return success
def _relative_path(self, source, destination):
'''
"""
Returns the relative path to get to the source file from the
destination file.
'''
"""
destination_dir = os.path.dirname(destination)
return os.path.relpath(source, destination_dir)
def _link(self, source, link_name, relative, canonical_path, ignore_missing):
'''
"""
Links link_name to source.
Returns true if successfully linked files.
'''
"""
success = False
destination = os.path.expanduser(link_name)
destination = os.path.abspath(os.path.expanduser(link_name))
base_directory = self._context.base_directory(canonical_path=canonical_path)
absolute_source = os.path.join(base_directory, source)
link_name = os.path.normpath(link_name)
if relative:
source = self._relative_path(absolute_source, destination)
else:
source = absolute_source
if (not self._exists(link_name) and self._is_link(link_name) and
self._link_destination(link_name) != source):
self._log.warning('Invalid link %s -> %s' %
(link_name, self._link_destination(link_name)))
if (
not self._exists(link_name)
and self._is_link(link_name)
and self._link_destination(link_name) != source
):
self._log.warning(
"Invalid link %s -> %s" % (link_name, self._link_destination(link_name))
)
# we need to use absolute_source below because our cwd is the dotfiles
# directory, and if source is relative, it will be relative to the
# destination directory
@ -256,26 +292,23 @@ class Link(dotbot.Plugin):
try:
os.symlink(source, destination)
except OSError:
self._log.warning('Linking failed %s -> %s' % (link_name, source))
self._log.warning("Linking failed %s -> %s" % (link_name, source))
else:
self._log.lowinfo('Creating link %s -> %s' % (link_name, source))
self._log.lowinfo("Creating link %s -> %s" % (link_name, source))
success = True
elif self._exists(link_name) and not self._is_link(link_name):
self._log.warning(
'%s already exists but is a regular file or directory' %
link_name)
self._log.warning("%s already exists but is a regular file or directory" % link_name)
elif self._is_link(link_name) and self._link_destination(link_name) != source:
self._log.warning('Incorrect link %s -> %s' %
(link_name, self._link_destination(link_name)))
self._log.warning(
"Incorrect link %s -> %s" % (link_name, self._link_destination(link_name))
)
# again, we use absolute_source to check for existence
elif not self._exists(absolute_source):
if self._is_link(link_name):
self._log.warning('Nonexistent source %s -> %s' %
(link_name, source))
self._log.warning("Nonexistent source %s -> %s" % (link_name, source))
else:
self._log.warning('Nonexistent source for %s : %s' %
(link_name, source))
self._log.warning("Nonexistent source for %s : %s" % (link_name, source))
else:
self._log.lowinfo('Link exists %s -> %s' % (link_name, source))
self._log.lowinfo("Link exists %s -> %s" % (link_name, source))
success = True
return success

View File

@ -1,15 +1,13 @@
import os
import subprocess
import dotbot
import dotbot.util
from ..plugin import Plugin
from ..util import shell_command
class Shell(dotbot.Plugin):
'''
class Shell(Plugin):
"""
Run arbitrary shell commands.
'''
"""
_directive = 'shell'
_directive = "shell"
_has_shown_override_message = False
def can_handle(self, directive):
@ -17,62 +15,62 @@ class Shell(dotbot.Plugin):
def handle(self, directive, data):
if directive != self._directive:
raise ValueError('Shell cannot handle directive %s' %
directive)
raise ValueError("Shell cannot handle directive %s" % directive)
return self._process_commands(data)
def _process_commands(self, data):
success = True
defaults = self._context.defaults().get('shell', {})
defaults = self._context.defaults().get("shell", {})
options = self._get_option_overrides()
for item in data:
stdin = defaults.get('stdin', False)
stdout = defaults.get('stdout', False)
stderr = defaults.get('stderr', False)
quiet = defaults.get('quiet', False)
stdin = defaults.get("stdin", False)
stdout = defaults.get("stdout", False)
stderr = defaults.get("stderr", False)
quiet = defaults.get("quiet", False)
if isinstance(item, dict):
cmd = item['command']
msg = item.get('description', None)
stdin = item.get('stdin', stdin)
stdout = item.get('stdout', stdout)
stderr = item.get('stderr', stderr)
quiet = item.get('quiet', quiet)
cmd = item["command"]
msg = item.get("description", None)
stdin = item.get("stdin", stdin)
stdout = item.get("stdout", stdout)
stderr = item.get("stderr", stderr)
quiet = item.get("quiet", quiet)
elif isinstance(item, list):
cmd = item[0]
msg = item[1] if len(item) > 1 else None
else:
cmd = item
msg = None
if msg is None:
if quiet:
if msg is not None:
self._log.lowinfo("%s" % msg)
elif msg is None:
self._log.lowinfo(cmd)
elif quiet:
self._log.lowinfo('%s' % msg)
else:
self._log.lowinfo('%s [%s]' % (msg, cmd))
stdout = options.get('stdout', stdout)
stderr = options.get('stderr', stderr)
ret = dotbot.util.shell_command(
self._log.lowinfo("%s [%s]" % (msg, cmd))
stdout = options.get("stdout", stdout)
stderr = options.get("stderr", stderr)
ret = shell_command(
cmd,
cwd=self._context.base_directory(),
enable_stdin=stdin,
enable_stdout=stdout,
enable_stderr=stderr
enable_stderr=stderr,
)
if ret != 0:
success = False
self._log.warning('Command [%s] failed' % cmd)
self._log.warning("Command [%s] failed" % cmd)
if success:
self._log.info('All commands have been executed')
self._log.info("All commands have been executed")
else:
self._log.error('Some commands were not successfully executed')
self._log.error("Some commands were not successfully executed")
return success
def _get_option_overrides(self):
ret = {}
options = self._context.options()
if options.verbose > 1:
ret['stderr'] = True
ret['stdout'] = True
ret["stderr"] = True
ret["stdout"] = True
if not self._has_shown_override_message:
self._log.debug("Shell: Found cli option to force show stderr and stdout.")
self._has_shown_override_message = True

View File

@ -1,15 +1,15 @@
import os
import subprocess
import platform
import subprocess
def shell_command(command, cwd=None, enable_stdin=False, enable_stdout=False, enable_stderr=False):
with open(os.devnull, 'w') as devnull_w, open(os.devnull, 'r') as devnull_r:
with open(os.devnull, "w") as devnull_w, open(os.devnull, "r") as devnull_r:
stdin = None if enable_stdin else devnull_r
stdout = None if enable_stdout else devnull_w
stderr = None if enable_stderr else devnull_w
executable = os.environ.get('SHELL')
if platform.system() == 'Windows':
executable = os.environ.get("SHELL")
if platform.system() == "Windows":
# We avoid setting the executable kwarg on Windows because it does
# not have the desired effect when combined with shell=True. It
# will result in the correct program being run (e.g. bash), but it
@ -30,5 +30,5 @@ def shell_command(command, cwd=None, enable_stdin=False, enable_stdout=False, en
stdin=stdin,
stdout=stdout,
stderr=stderr,
cwd=cwd
cwd=cwd,
)

View File

@ -2,4 +2,5 @@ def with_metaclass(meta, *bases):
class metaclass(meta):
def __new__(cls, name, this_bases, d):
return meta(name, bases, d)
return type.__new__(metaclass, 'temporary_class', (), {})
return type.__new__(metaclass, "temporary_class", (), {})

View File

@ -1,13 +1,27 @@
import sys, os.path
import os