import os import glob import shutil import dotbot import subprocess import copy class Link(dotbot.Plugin): ''' Symbolically links dotfiles. ''' _directive = 'link' def can_handle(self, directive): return directive == self._directive def handle(self, directive, data): if directive != self._directive: raise ValueError('Link cannot handle directive %s' % directive) return self._process_links(data) def _process_links(self, links): success = True defaults = self._context.defaults().get('link', {}) for destination, source in links.items(): destination = os.path.expandvars(destination) relative = defaults.get('relative', False) force = defaults.get('force', False) relink = defaults.get('relink', False) create = defaults.get('create', False) use_glob = defaults.get('glob', False) if isinstance(source, dict): # extended config relative = source.get('relative', relative) force = source.get('force', force) relink = source.get('relink', relink) create = source.get('create', create) use_glob = source.get('glob', use_glob) path = self._default_source(destination, source.get('path')) else: path = self._default_source(destination, source) path = os.path.expandvars(os.path.expanduser(path)) if use_glob: self._log.debug("Globbing with path: " + str(path)) glob_results = glob.glob(path) if len(glob_results) is 0: self._log.warning("Globbing couldn't find anything matching " + str(path)) success = False continue glob_star_loc = path.find('*') if glob_star_loc is -1 and destination[-1] is '/': self._log.error("Ambiguous action requested.") self._log.error("No wildcard in glob, directory use undefined: " + destination + " -> " + str(glob_results)) self._log.warning("Did you want to link the directory or into it?") success = False continue elif glob_star_loc is -1 and len(glob_results) is 1: # perform a normal link operation if create: success &= self._create(destination) if force or relink: success &= self._delete(path, destination, relative, force) success &= self._link(path, destination, relative) else: self._log.lowinfo("Globs from '" + path + "': " + str(glob_results)) glob_base = path[:glob_star_loc] for glob_full_item in glob_results: glob_item = glob_full_item[len(glob_base):] glob_link_destination = os.path.join(destination, glob_item) if create: success &= self._create(glob_link_destination) if force or relink: success &= self._delete(glob_full_item, glob_link_destination, relative, force) success &= self._link(glob_full_item, glob_link_destination, relative) else: if create: success &= self._create(destination) if not self._exists(os.path.join(self._context.base_directory(), path)): success = False self._log.warning('Nonexistent target %s -> %s' % (destination, path)) continue if force or relink: success &= self._delete(path, destination, relative, force) success &= self._link(path, destination, relative) if success: self._log.info('All links have been set up') else: self._log.error('Some links were not successfully set up') return success def _default_source(self, destination, source): if source is None: basename = os.path.basename(destination) if basename.startswith('.'): return basename[1:] else: return basename else: return source def _is_link(self, path): ''' Returns true if the path is a symbolic link. ''' return os.path.islink(os.path.expanduser(path)) def _link_destination(self, path): ''' Returns the destination of the symbolic link. ''' path = os.path.expanduser(path) return os.readlink(path) def _exists(self, path): ''' Returns true if the path exists. ''' path = os.path.expanduser(path) return os.path.exists(path) def _create(self, path): success = True parent = os.path.abspath(os.path.join(os.path.expanduser(path), os.pardir)) if not self._exists(parent): self._log.debug("Try to create parent: " + str(parent)) try: os.makedirs(parent) except OSError: self._log.warning('Failed to create directory %s' % parent) success = False else: self._log.lowinfo('Creating directory %s' % parent) return success def _delete(self, source, path, relative, force): success = True source = os.path.join(self._context.base_directory(), source) fullpath = os.path.expanduser(path) if relative: source = self._relative_path(source, fullpath) if ((self._is_link(path) and self._link_destination(path) != source) or (self._exists(path) and not self._is_link(path))): removed = False try: if os.path.islink(fullpath): os.unlink(fullpath) removed = True elif force: if os.path.isdir(fullpath): shutil.rmtree(fullpath) removed = True else: os.remove(fullpath) removed = True except OSError: self._log.warning('Failed to remove %s' % path) success = False else: if removed: self._log.lowinfo('Removing %s' % path) return success def _relative_path(self, source, destination): ''' Returns the relative path to get to the source file from the destination file. ''' destination_dir = os.path.dirname(destination) return os.path.relpath(source, destination_dir) def _link(self, source, link_name, relative): ''' Links link_name to source. Returns true if successfully linked files. ''' success = False destination = os.path.expanduser(link_name) absolute_source = os.path.join(self._context.base_directory(), source) if relative: source = self._relative_path(absolute_source, destination) else: source = absolute_source if (not self._exists(link_name) and self._is_link(link_name) and self._link_destination(link_name) != source): self._log.warning('Invalid link %s -> %s' % (link_name, self._link_destination(link_name))) # we need to use absolute_source below because our cwd is the dotfiles # directory, and if source is relative, it will be relative to the # destination directory elif not self._exists(link_name) and self._exists(absolute_source): try: os.symlink(source, destination) except OSError: self._log.warning('Linking failed %s -> %s' % (link_name, source)) else: self._log.lowinfo('Creating link %s -> %s' % (link_name, source)) success = True elif self._exists(link_name) and not self._is_link(link_name): self._log.warning( '%s already exists but is a regular file or directory' % link_name) elif self._is_link(link_name) and self._link_destination(link_name) != source: self._log.warning('Incorrect link %s -> %s' % (link_name, self._link_destination(link_name))) # again, we use absolute_source to check for existence elif not self._exists(absolute_source): if self._is_link(link_name): self._log.warning('Nonexistent target %s -> %s' % (link_name, source)) else: self._log.warning('Nonexistent target for %s : %s' % (link_name, source)) else: self._log.lowinfo('Link exists %s -> %s' % (link_name, source)) success = True return success class Linkif(Link, dotbot.Plugin): ''' Symbolically links dotfiles conditionally. ''' _directive = 'linkif' _config_key = 'if' def handle(self, directive, data): if directive != self._directive: raise ValueError('LinkIf cannot handle directive %s' % directive) success = True data_filtered = copy.deepcopy(data) for destination, config in data.items(): if isinstance(config, dict) and config[self._config_key] is not None: commandsuccess = self._test_success(config[self._config_key]) if commandsuccess: del data_filtered[destination][self._config_key] else: # remove the item from data del data_filtered[destination] else: self._log.error("Could not find valid conditional in linkif 'if'!") success &= False return self._process_links(data_filtered) def _test_success(self, testcommand): try: osstdout = subprocess.check_output( testcommand, stderr=subprocess.STDOUT, shell=True, executable=os.environ.get("SHELL", "/bin/sh") ) except subprocess.CalledProcessError as e: # self._log.warning("Command failed: " + str(testcommand)) self._log.debug("Command returned "+str(e.returncode)+": \n" + str(testcommand)) return False return True