From e7f9616b393e9127d3fcedaa35efc2f86855e5b3 Mon Sep 17 00:00:00 2001 From: Tim Byrne Date: Wed, 11 Jul 2018 07:50:42 -0500 Subject: [PATCH] Rewrite testing system (#119) The new test system is written with py.test. These tests are more comprehensive, run faster by an order of magnitude, and are far more maintainable. The tests themselves conform to PEP8. --- .gitignore | 2 + Dockerfile | 17 +- Makefile | 15 + docker-compose.yml | 7 + pylintrc | 11 + pytest.ini | 3 + test/conftest.py | 559 ++++++++++++++++++++++++++ test/pylintrc | 1 + test/test_alt.py | 345 ++++++++++++++++ test/test_assert_private_dirs.py | 106 +++++ test/test_bootstrap.py | 31 ++ test/test_clean.py | 11 + test/test_clone.py | 274 +++++++++++++ test/test_config.py | 139 +++++++ test/test_cygwin_copy.py | 59 +++ test/test_encryption.py | 392 ++++++++++++++++++ test/test_enter.py | 85 ++++ test/test_git.py | 58 +++ test/test_help.py | 17 + test/test_hooks.py | 90 +++++ test/test_init.py | 78 ++++ test/test_introspect.py | 46 +++ test/test_jinja.py | 186 +++++++++ test/test_list.py | 47 +++ test/test_perms.py | 111 +++++ test/test_syntax.py | 41 ++ test/test_unit_bootstrap_available.py | 33 ++ test/test_unit_configure_paths.py | 80 ++++ test/test_unit_parse_encrypt.py | 175 ++++++++ test/test_unit_query_distro.py | 26 ++ test/test_unit_set_os.py | 36 ++ test/test_unit_x_program.py | 46 +++ test/test_version.py | 35 ++ test/utils.py | 59 +++ 34 files changed, 3218 insertions(+), 3 deletions(-) create mode 100644 docker-compose.yml create mode 100644 pylintrc create mode 100644 pytest.ini create mode 100644 test/conftest.py create mode 120000 test/pylintrc create mode 100644 test/test_alt.py create mode 100644 test/test_assert_private_dirs.py create mode 100644 test/test_bootstrap.py create mode 100644 test/test_clean.py create mode 100644 test/test_clone.py create mode 100644 test/test_config.py create mode 100644 test/test_cygwin_copy.py create mode 100644 test/test_encryption.py create mode 100644 test/test_enter.py create mode 100644 test/test_git.py create mode 100644 test/test_help.py create mode 100644 test/test_hooks.py create mode 100644 test/test_init.py create mode 100644 test/test_introspect.py create mode 100644 test/test_jinja.py create mode 100644 test/test_list.py create mode 100644 test/test_perms.py create mode 100644 test/test_syntax.py create mode 100644 test/test_unit_bootstrap_available.py create mode 100644 test/test_unit_configure_paths.py create mode 100644 test/test_unit_parse_encrypt.py create mode 100644 test/test_unit_query_distro.py create mode 100644 test/test_unit_set_os.py create mode 100644 test/test_unit_x_program.py create mode 100644 test/test_version.py create mode 100644 test/utils.py diff --git a/.gitignore b/.gitignore index 7aa998b..af9e6f5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ .DS_Store +.env .jekyll-metadata +.pytest_cache .sass-cache _site diff --git a/Dockerfile b/Dockerfile index 74700c4..ee29bf2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,9 +1,20 @@ -FROM ubuntu:yakkety +FROM ubuntu:18.04 MAINTAINER Tim Byrne +# No input during build +ENV DEBIAN_FRONTEND noninteractive + +# UTF8 locale +RUN apt-get update && apt-get install -y locales +RUN locale-gen en_US.UTF-8 +ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en' LC_ALL='en_US.UTF-8' + +# Convenience settings for the testbed's root account +RUN echo 'set -o vi' >> /root/.bashrc + # Install prerequisites -RUN apt-get update && apt-get install -y git gnupg1 make shellcheck bats expect curl python-pip lsb-release -RUN pip install envtpl +RUN apt-get update && apt-get install -y git gnupg1 make shellcheck=0.4.6-1 bats expect curl python3-pip lsb-release +RUN pip3 install envtpl pytest==3.6.4 pylint==1.9.2 flake8==3.5.0 # Force GNUPG version 1 at path /usr/bin/gpg RUN ln -fs /usr/bin/gpg1 /usr/bin/gpg diff --git a/Makefile b/Makefile index f544a1d..691b168 100644 --- a/Makefile +++ b/Makefile @@ -23,6 +23,11 @@ test: bats shellcheck parallel: ls test/*bats | time parallel -q -P0 -- docker run --rm -v "$$PWD:/yadm:ro" yadm/testbed bash -c 'bats {}' +.PHONY: pytest +pytest: + @echo Running all pytest tests + @pytest -v + .PHONY: bats bats: @echo Running all bats tests @@ -58,3 +63,13 @@ man: .PHONY: wide wide: man ./yadm.1 + +.PHONY: sync-clock +sync-clock: + docker run --rm --privileged alpine hwclock -s + +.PHONY: .env +.env: + virtualenv --python=python3 .env + .env/bin/pip3 install --upgrade pip setuptools + .env/bin/pip3 install --upgrade pytest pylint==1.9.2 flake8==3.5.0 diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..4fe0b86 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,7 @@ +--- +version: '3' +services: + testbed: + volumes: + - .:/yadm:ro + image: yadm/testbed:latest diff --git a/pylintrc b/pylintrc new file mode 100644 index 0000000..2ae18b6 --- /dev/null +++ b/pylintrc @@ -0,0 +1,11 @@ +[BASIC] +good-names=pytestmark + +[DESIGN] +max-args=14 +max-locals=26 +max-attributes=8 +max-statements=65 + +[MESSAGES CONTROL] +disable=redefined-outer-name diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..c7fc1db --- /dev/null +++ b/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +cache_dir = /tmp +addopts = -ra diff --git a/test/conftest.py b/test/conftest.py new file mode 100644 index 0000000..4fefabe --- /dev/null +++ b/test/conftest.py @@ -0,0 +1,559 @@ +"""Global tests configuration and fixtures""" + +import collections +import copy +import distutils.dir_util # pylint: disable=no-name-in-module,import-error +import os +import platform +import pwd +from subprocess import Popen, PIPE +import pytest + + +@pytest.fixture(scope='session') +def shellcheck_version(): + """Version of shellcheck supported""" + return '0.4.6' + + +@pytest.fixture(scope='session') +def pylint_version(): + """Version of pylint supported""" + return '1.9.2' + + +@pytest.fixture(scope='session') +def flake8_version(): + """Version of flake8 supported""" + return '3.5.0' + + +@pytest.fixture(scope='session') +def tst_user(): + """Test session's user id""" + return pwd.getpwuid(os.getuid()).pw_name + + +@pytest.fixture(scope='session') +def tst_host(): + """Test session's short hostname value""" + return platform.node().split('.')[0] + + +@pytest.fixture(scope='session') +def tst_distro(runner): + """Test session's distro""" + distro = '' + try: + run = runner(command=['lsb_release', '-si'], report=False) + distro = run.out.strip() + except BaseException: + pass + return distro + + +@pytest.fixture(scope='session') +def tst_sys(): + """Test session's uname value""" + return platform.system() + + +@pytest.fixture(scope='session') +def cygwin_sys(): + """CYGWIN uname id""" + return 'CYGWIN_NT-6.1-WOW64' + + +@pytest.fixture(scope='session') +def supported_commands(): + """List of supported commands + + This list should be updated every time yadm learns a new command. + """ + return [ + 'alt', + 'bootstrap', + 'clean', + 'clone', + 'config', + 'decrypt', + 'encrypt', + 'enter', + 'gitconfig', + 'help', + 'init', + 'introspect', + 'list', + 'perms', + 'version', + ] + + +@pytest.fixture(scope='session') +def supported_configs(): + """List of supported config options + + This list should be updated every time yadm learns a new config. + """ + return [ + 'local.class', + 'local.hostname', + 'local.os', + 'local.user', + 'yadm.auto-alt', + 'yadm.auto-perms', + 'yadm.auto-private-dirs', + 'yadm.cygwin-copy', + 'yadm.git-program', + 'yadm.gpg-perms', + 'yadm.gpg-program', + 'yadm.gpg-recipient', + 'yadm.ssh-perms', + ] + + +@pytest.fixture(scope='session') +def supported_switches(): + """List of supported switches + + This list should be updated every time yadm learns a new switch. + """ + return [ + '--yadm-archive', + '--yadm-bootstrap', + '--yadm-config', + '--yadm-dir', + '--yadm-encrypt', + '--yadm-repo', + '-Y', + ] + + +@pytest.fixture(scope='session') +def supported_local_configs(supported_configs): + """List of supported local config options""" + return [c for c in supported_configs if c.startswith('local.')] + + +class Runner(object): + """Class for running commands + + Within yadm tests, this object should be used when running commands that + require: + + * Acting on the status code + * Parsing the output of the command + * Passing input to the command + + Other instances of simply running commands should use os.system(). + """ + + def __init__( + self, + command, + inp=None, + shell=False, + cwd=None, + env=None, + expect=None, + report=True): + if shell: + self.command = ' '.join([str(cmd) for cmd in command]) + else: + self.command = command + self.inp = inp + self.wrap(expect) + process = Popen( + self.command, + stdin=PIPE, + stdout=PIPE, + stderr=PIPE, + shell=shell, + cwd=cwd, + env=env, + ) + input_bytes = self.inp + if self.inp: + input_bytes = self.inp.encode() + (out_bstream, err_bstream) = process.communicate(input=input_bytes) + self.out = out_bstream.decode() + self.err = err_bstream.decode() + self.code = process.wait() + self.success = self.code == 0 + self.failure = self.code != 0 + if report: + self.report() + + def __repr__(self): + return f'Runner({self.command})' + + def report(self): + """Print code/stdout/stderr""" + print(f'{self}') + print(f' RUN: code:{self.code}') + if self.inp: + print(f' RUN: input:\n{self.inp}') + print(f' RUN: stdout:\n{self.out}') + print(f' RUN: stderr:\n{self.err}') + + def wrap(self, expect): + """Wrap command with expect""" + if not expect: + return + cmdline = ' '.join([f'"{w}"' for w in self.command]) + expect_script = f'set timeout 2\nspawn {cmdline}\n' + for question, answer in expect: + expect_script += ( + 'expect {\n' + f'"{question}" {{send "{answer}\\r"}}\n' + 'timeout {close;exit 128}\n' + '}\n') + expect_script += ( + 'expect eof\n' + 'foreach {pid spawnid os_error_flag value} [wait] break\n' + 'exit $value') + self.inp = expect_script + print(f'EXPECT:{expect_script}') + self.command = ['expect'] + + +@pytest.fixture(scope='session') +def runner(): + """Class for running commands""" + return Runner + + +@pytest.fixture(scope='session') +def config_git(): + """Configure global git configuration, if missing""" + os.system( + 'git config user.name || ' + 'git config --global user.name "test"') + os.system( + 'git config user.email || ' + 'git config --global user.email "test@test.test"') + return None + + +@pytest.fixture() +def repo_config(runner, paths): + """Function to query a yadm repo configuration value""" + + def query_func(key): + """Query a yadm repo configuration value""" + run = runner( + command=('git', 'config', '--local', key), + env={'GIT_DIR': paths.repo}, + report=False, + ) + return run.out.rstrip() + + return query_func + + +@pytest.fixture(scope='session') +def yadm(): + """Path to yadm program to be tested""" + full_path = os.path.realpath('yadm') + assert os.path.isfile(full_path), "yadm program file isn't present" + return full_path + + +@pytest.fixture() +def paths(tmpdir, yadm): + """Function scoped test paths""" + dir_root = tmpdir.mkdir('root') + dir_work = dir_root.mkdir('work') + dir_yadm = dir_root.mkdir('yadm') + dir_repo = dir_yadm.mkdir('repo.git') + dir_hooks = dir_yadm.mkdir('hooks') + dir_remote = dir_root.mkdir('remote') + file_archive = dir_yadm.join('files.gpg') + file_bootstrap = dir_yadm.join('bootstrap') + file_config = dir_yadm.join('config') + file_encrypt = dir_yadm.join('encrypt') + paths = collections.namedtuple( + 'Paths', [ + 'pgm', + 'root', + 'work', + 'yadm', + 'repo', + 'hooks', + 'remote', + 'archive', + 'bootstrap', + 'config', + 'encrypt', + ]) + return paths( + yadm, + dir_root, + dir_work, + dir_yadm, + dir_repo, + dir_hooks, + dir_remote, + file_archive, + file_bootstrap, + file_config, + file_encrypt, + ) + + +@pytest.fixture() +def yadm_y(paths): + """Generate custom command_list function""" + def command_list(*args): + """Produce params for running yadm with -Y""" + return [paths.pgm, '-Y', str(paths.yadm)] + list(args) + return command_list + + +class DataFile(object): + """Datafile object""" + + def __init__(self, path, tracked=True, private=False): + self.__path = path + self.__parent = None + self.__tracked = tracked + self.__private = private + + @property + def path(self): + """Path property""" + return self.__path + + @property + def relative(self): + """Relative path property""" + if self.__parent: + return self.__parent.join(self.path) + raise BaseException('Unable to provide relative path, no parent') + + @property + def tracked(self): + """Tracked property""" + return self.__tracked + + @property + def private(self): + """Private property""" + return self.__private + + def relative_to(self, parent): + """Update all relative paths to this py.path""" + self.__parent = parent + return + + +class DataSet(object): + """Dataset object""" + + def __init__(self): + self.__files = list() + self.__dirs = list() + self.__tracked_dirs = list() + self.__private_dirs = list() + self.__relpath = None + + def __repr__(self): + return ( + f'[DS with {len(self)} files; ' + f'{len(self.tracked)} tracked, ' + f'{len(self.private)} private]' + ) + + def __iter__(self): + return iter(self.__files) + + def __len__(self): + return len(self.__files) + + def __contains__(self, datafile): + if [f for f in self.__files if f.path == datafile]: + return True + if datafile in self.__files: + return True + return False + + @property + def files(self): + """List of DataFiles in DataSet""" + return list(self.__files) + + @property + def tracked(self): + """List of tracked DataFiles in DataSet""" + return [f for f in self.__files if f.tracked] + + @property + def private(self): + """List of private DataFiles in DataSet""" + return [f for f in self.__files if f.private] + + @property + def dirs(self): + """List of directories in DataSet""" + return list(self.__dirs) + + @property + def plain_dirs(self): + """List of directories in DataSet not starting with '.'""" + return [d for d in self.dirs if not d.startswith('.')] + + @property + def hidden_dirs(self): + """List of directories in DataSet starting with '.'""" + return [d for d in self.dirs if d.startswith('.')] + + @property + def tracked_dirs(self): + """List of directories in DataSet not starting with '.'""" + return [d for d in self.__tracked_dirs if not d.startswith('.')] + + @property + def private_dirs(self): + """List of directories in DataSet considered 'private'""" + return list(self.__private_dirs) + + def add_file(self, path, tracked=True, private=False): + """Add file to data set""" + if path not in self: + datafile = DataFile(path, tracked, private) + if self.__relpath: + datafile.relative_to(self.__relpath) + self.__files.append(datafile) + + dname = os.path.dirname(path) + if dname and dname not in self.__dirs: + self.__dirs.append(dname) + if tracked: + self.__tracked_dirs.append(dname) + if private: + self.__private_dirs.append(dname) + + def relative_to(self, relpath): + """Update all relative paths to this py.path""" + self.__relpath = relpath + for datafile in self.files: + datafile.relative_to(self.__relpath) + return + + +@pytest.fixture(scope='session') +def ds1_dset(tst_sys, cygwin_sys): + """Meta-data for dataset one files""" + dset = DataSet() + dset.add_file('t1') + dset.add_file('d1/t2') + dset.add_file(f'test_alt##S') + dset.add_file(f'test_alt##S.H') + dset.add_file(f'test_alt##S.H.U') + dset.add_file(f'test_alt##C.S.H.U') + dset.add_file(f'test alt/test alt##S') + dset.add_file(f'test alt/test alt##S.H') + dset.add_file(f'test alt/test alt##S.H.U') + dset.add_file(f'test alt/test alt##C.S.H.U') + dset.add_file(f'test_cygwin_copy##{tst_sys}') + dset.add_file(f'test_cygwin_copy##{cygwin_sys}') + dset.add_file('u1', tracked=False) + dset.add_file('d2/u2', tracked=False) + dset.add_file('.ssh/p1', tracked=False, private=True) + dset.add_file('.ssh/.p2', tracked=False, private=True) + dset.add_file('.gnupg/p3', tracked=False, private=True) + dset.add_file('.gnupg/.p4', tracked=False, private=True) + return dset + + +@pytest.fixture(scope='session') +def ds1_data(tmpdir_factory, config_git, ds1_dset, runner): + """A set of test data, worktree & repo""" + # pylint: disable=unused-argument + # This is ignored because + # @pytest.mark.usefixtures('config_git') + # cannot be applied to another fixture. + + data = tmpdir_factory.mktemp('ds1') + + work = data.mkdir('work') + for datafile in ds1_dset: + work.join(datafile.path).write(datafile.path, ensure=True) + + repo = data.mkdir('repo.git') + env = os.environ.copy() + env['GIT_DIR'] = str(repo) + runner( + command=['git', 'init', '--shared=0600', '--bare', str(repo)], + report=False) + runner( + command=['git', 'config', 'core.bare', 'false'], + env=env, + report=False) + runner( + command=['git', 'config', 'status.showUntrackedFiles', 'no'], + env=env, + report=False) + runner( + command=['git', 'config', 'yadm.managed', 'true'], + env=env, + report=False) + runner( + command=['git', 'config', 'core.worktree', str(work)], + env=env, + report=False) + runner( + command=['git', 'add'] + + [str(work.join(f.path)) for f in ds1_dset if f.tracked], + env=env) + runner( + command=['git', 'commit', '--allow-empty', '-m', 'Initial commit'], + env=env, + report=False) + + data = collections.namedtuple('Data', ['work', 'repo']) + return data(work, repo) + + +@pytest.fixture() +def ds1_work_copy(ds1_data, paths): + """Function scoped copy of ds1_data.work""" + distutils.dir_util.copy_tree( # pylint: disable=no-member + str(ds1_data.work), str(paths.work)) + return None + + +@pytest.fixture() +def ds1_repo_copy(runner, ds1_data, paths): + """Function scoped copy of ds1_data.repo""" + distutils.dir_util.copy_tree( # pylint: disable=no-member + str(ds1_data.repo), str(paths.repo)) + env = os.environ.copy() + env['GIT_DIR'] = str(paths.repo) + runner( + command=['git', 'config', 'core.worktree', str(paths.work)], + env=env, + report=False) + return None + + +@pytest.fixture() +def ds1_copy(ds1_work_copy, ds1_repo_copy): + """Function scoped copy of ds1_data""" + # pylint: disable=unused-argument + # This is ignored because + # @pytest.mark.usefixtures('ds1_work_copy', 'ds1_repo_copy') + # cannot be applied to another fixture. + return None + + +@pytest.fixture() +def ds1(ds1_work_copy, paths, ds1_dset): + """Function scoped ds1_dset w/paths""" + # pylint: disable=unused-argument + # This is ignored because + # @pytest.mark.usefixtures('ds1_copy') + # cannot be applied to another fixture. + dscopy = copy.deepcopy(ds1_dset) + dscopy.relative_to(copy.deepcopy(paths.work)) + return dscopy diff --git a/test/pylintrc b/test/pylintrc new file mode 120000 index 0000000..05334af --- /dev/null +++ b/test/pylintrc @@ -0,0 +1 @@ +../pylintrc \ No newline at end of file diff --git a/test/test_alt.py b/test/test_alt.py new file mode 100644 index 0000000..60b7339 --- /dev/null +++ b/test/test_alt.py @@ -0,0 +1,345 @@ +"""Test alt""" + +import os +import re +import string +import pytest +import utils + +# These test IDs are broken. During the writing of these tests, problems have +# been discovered in the way yadm orders matching files. +BROKEN_TEST_IDS = [ + 'test_wild[tracked-##C.S.H.U-C-S%-H%-U]', + 'test_wild[tracked-##C.S.H.U-C-S-H%-U]', + 'test_wild[encrypted-##C.S.H.U-C-S%-H%-U]', + 'test_wild[encrypted-##C.S.H.U-C-S-H%-U]', + ] + +PRECEDENCE = [ + '##', + '##$tst_sys', + '##$tst_sys.$tst_host', + '##$tst_sys.$tst_host.$tst_user', + '##$tst_class', + '##$tst_class.$tst_sys', + '##$tst_class.$tst_sys.$tst_host', + '##$tst_class.$tst_sys.$tst_host.$tst_user', + ] + +WILD_TEMPLATES = [ + '##$tst_class', + '##$tst_class.$tst_sys', + '##$tst_class.$tst_sys.$tst_host', + '##$tst_class.$tst_sys.$tst_host.$tst_user', + ] + +WILD_TESTED = set() + + +@pytest.mark.parametrize('precedence_index', range(len(PRECEDENCE))) +@pytest.mark.parametrize( + 'tracked, encrypt, exclude', [ + (False, False, False), + (True, False, False), + (False, True, False), + (False, True, True), + ], ids=[ + 'untracked', + 'tracked', + 'encrypted', + 'excluded', + ]) +@pytest.mark.usefixtures('ds1_copy') +def test_alt(runner, yadm_y, paths, + tst_sys, tst_host, tst_user, + tracked, encrypt, exclude, + precedence_index): + """Test alternate linking + + This test is done by iterating for the number of templates in PRECEDENCE. + With each iteration, another file is left off the list. So with each + iteration, the template with the "highest precedence" is left out. The file + using the highest precedence should be the one linked. + """ + + # set the class + tst_class = 'testclass' + utils.set_local(paths, 'class', tst_class) + + # process the templates in PRECEDENCE + precedence = list() + for template in PRECEDENCE: + precedence.append( + string.Template(template).substitute( + tst_class=tst_class, + tst_host=tst_host, + tst_sys=tst_sys, + tst_user=tst_user, + ) + ) + + # create files using a subset of files + for suffix in precedence[0:precedence_index+1]: + utils.create_alt_files(paths, suffix, tracked=tracked, + encrypt=encrypt, exclude=exclude) + + # run alt to trigger linking + run = runner(yadm_y('alt')) + assert run.success + assert run.err == '' + linked = linked_list(run.out) + + # assert the proper linking has occurred + for file_path in (utils.ALT_FILE1, utils.ALT_FILE2): + source_file = file_path + precedence[precedence_index] + if tracked or (encrypt and not exclude): + assert paths.work.join(file_path).islink() + assert paths.work.join(file_path).read() == source_file + assert str(paths.work.join(source_file)) in linked + else: + assert not paths.work.join(file_path).exists() + assert str(paths.work.join(source_file)) not in linked + + +def short_template(template): + """Translate template into something short for test IDs""" + return string.Template(template).substitute( + tst_class='C', + tst_host='H', + tst_sys='S', + tst_user='U', + ) + + +@pytest.mark.parametrize('wild_user', [True, False], ids=['U%', 'U']) +@pytest.mark.parametrize('wild_host', [True, False], ids=['H%', 'H']) +@pytest.mark.parametrize('wild_sys', [True, False], ids=['S%', 'S']) +@pytest.mark.parametrize('wild_class', [True, False], ids=['C%', 'C']) +@pytest.mark.parametrize('template', WILD_TEMPLATES, ids=short_template) +@pytest.mark.parametrize( + 'tracked, encrypt', [ + (True, False), + (False, True), + ], ids=[ + 'tracked', + 'encrypted', + ]) +@pytest.mark.usefixtures('ds1_copy') +def test_wild(request, runner, yadm_y, paths, + tst_sys, tst_host, tst_user, + tracked, encrypt, + wild_class, wild_host, wild_sys, wild_user, + template): + """Test wild linking + + These tests are done by creating permutations of the possible files using + WILD_TEMPLATES. Each case is then tested (while skipping the already tested + permutations for efficiency). + """ + + if request.node.name in BROKEN_TEST_IDS: + pytest.xfail( + 'This test is known to be broken. ' + 'This bug needs to be fixed.') + + tst_class = 'testclass' + + # determine the "wild" version of the suffix + str_class = '%' if wild_class else tst_class + str_host = '%' if wild_host else tst_host + str_sys = '%' if wild_sys else tst_sys + str_user = '%' if wild_user else tst_user + wild_suffix = string.Template(template).substitute( + tst_class=str_class, + tst_host=str_host, + tst_sys=str_sys, + tst_user=str_user, + ) + + # determine the "standard" version of the suffix + std_suffix = string.Template(template).substitute( + tst_class=tst_class, + tst_host=tst_host, + tst_sys=tst_sys, + tst_user=tst_user, + ) + + # skip over duplicate tests (this seems to be the simplest way to cover the + # permutations of tests, while skipping duplicates.) + test_key = f'{tracked}{encrypt}{wild_suffix}{std_suffix}' + if test_key in WILD_TESTED: + return + else: + WILD_TESTED.add(test_key) + + # set the class + utils.set_local(paths, 'class', tst_class) + + # create files using the wild suffix + utils.create_alt_files(paths, wild_suffix, tracked=tracked, + encrypt=encrypt, exclude=False) + + # run alt to trigger linking + run = runner(yadm_y('alt')) + assert run.success + assert run.err == '' + linked = linked_list(run.out) + + # assert the proper linking has occurred + for file_path in (utils.ALT_FILE1, utils.ALT_FILE2): + source_file = file_path + wild_suffix + assert paths.work.join(file_path).islink() + assert paths.work.join(file_path).read() == source_file + assert str(paths.work.join(source_file)) in linked + + # create files using the standard suffix + utils.create_alt_files(paths, std_suffix, tracked=tracked, + encrypt=encrypt, exclude=False) + + # run alt to trigger linking + run = runner(yadm_y('alt')) + assert run.success + assert run.err == '' + linked = linked_list(run.out) + + # assert the proper linking has occurred + for file_path in (utils.ALT_FILE1, utils.ALT_FILE2): + source_file = file_path + std_suffix + assert paths.work.join(file_path).islink() + assert paths.work.join(file_path).read() == source_file + assert str(paths.work.join(source_file)) in linked + + +@pytest.mark.usefixtures('ds1_copy') +def test_local_override(runner, yadm_y, paths, + tst_sys, tst_host, tst_user): + """Test local overrides""" + + # define local overrides + utils.set_local(paths, 'class', 'or-class') + utils.set_local(paths, 'hostname', 'or-hostname') + utils.set_local(paths, 'os', 'or-os') + utils.set_local(paths, 'user', 'or-user') + + # create files, the first would normally be the most specific version + # however, the second is the overridden version which should be preferred. + utils.create_alt_files( + paths, f'##or-class.{tst_sys}.{tst_host}.{tst_user}') + utils.create_alt_files( + paths, '##or-class.or-os.or-hostname.or-user') + + # run alt to trigger linking + run = runner(yadm_y('alt')) + assert run.success + assert run.err == '' + linked = linked_list(run.out) + + # assert the proper linking has occurred + for file_path in (utils.ALT_FILE1, utils.ALT_FILE2): + source_file = file_path + '##or-class.or-os.or-hostname.or-user' + assert paths.work.join(file_path).islink() + assert paths.work.join(file_path).read() == source_file + assert str(paths.work.join(source_file)) in linked + + +@pytest.mark.parametrize('suffix', ['AAA', 'ZZZ', 'aaa', 'zzz']) +@pytest.mark.usefixtures('ds1_copy') +def test_class_case(runner, yadm_y, paths, tst_sys, suffix): + """Test range of class cases""" + + # set the class + utils.set_local(paths, 'class', suffix) + + # create files + endings = [suffix] + if tst_sys == 'Linux': + # Only create all of these side-by-side on Linux, which is + # unquestionably case-sensitive. This would break tests on + # case-insensitive systems. + endings = ['AAA', 'ZZZ', 'aaa', 'zzz'] + for ending in endings: + utils.create_alt_files(paths, f'##{ending}') + + # run alt to trigger linking + run = runner(yadm_y('alt')) + assert run.success + assert run.err == '' + linked = linked_list(run.out) + + # assert the proper linking has occurred + for file_path in (utils.ALT_FILE1, utils.ALT_FILE2): + source_file = file_path + f'##{suffix}' + assert paths.work.join(file_path).islink() + assert paths.work.join(file_path).read() == source_file + assert str(paths.work.join(source_file)) in linked + + +@pytest.mark.parametrize('autoalt', [None, 'true', 'false']) +@pytest.mark.usefixtures('ds1_copy') +def test_auto_alt(runner, yadm_y, paths, autoalt): + """Test setting auto-alt""" + + # set the value of auto-alt + if autoalt: + os.system(' '.join(yadm_y('config', 'yadm.auto-alt', autoalt))) + + # create file + suffix = '##' + utils.create_alt_files(paths, suffix) + + # run status to possibly trigger linking + run = runner(yadm_y('status')) + assert run.success + assert run.err == '' + linked = linked_list(run.out) + + # assert the proper linking has occurred + for file_path in (utils.ALT_FILE1, utils.ALT_FILE2): + source_file = file_path + suffix + if autoalt == 'false': + assert not paths.work.join(file_path).exists() + else: + assert paths.work.join(file_path).islink() + assert paths.work.join(file_path).read() == source_file + # no linking output when run via auto-alt + assert str(paths.work.join(source_file)) not in linked + + +@pytest.mark.parametrize('delimiter', ['.', '_']) +@pytest.mark.usefixtures('ds1_copy') +def test_delimiter(runner, yadm_y, paths, + tst_sys, tst_host, tst_user, delimiter): + """Test delimiters used""" + + suffix = '##' + delimiter.join([tst_sys, tst_host, tst_user]) + + # create file + utils.create_alt_files(paths, suffix) + + # run alt to trigger linking + run = runner(yadm_y('alt')) + assert run.success + assert run.err == '' + linked = linked_list(run.out) + + # assert the proper linking has occurred + # only a delimiter of '.' is valid + for file_path in (utils.ALT_FILE1, utils.ALT_FILE2): + source_file = file_path + suffix + if delimiter == '.': + assert paths.work.join(file_path).islink() + assert paths.work.join(file_path).read() == source_file + assert str(paths.work.join(source_file)) in linked + else: + assert not paths.work.join(file_path).exists() + assert str(paths.work.join(source_file)) not in linked + + +def linked_list(output): + """Parse output, and return list of linked files""" + linked = dict() + for line in output.splitlines(): + match = re.match('Linking (.+) to (.+)$', line) + if match: + linked[match.group(2)] = match.group(1) + return linked.values() diff --git a/test/test_assert_private_dirs.py b/test/test_assert_private_dirs.py new file mode 100644 index 0000000..65cb0b7 --- /dev/null +++ b/test/test_assert_private_dirs.py @@ -0,0 +1,106 @@ +"""Test asserting private directories""" + +import os +import re +import pytest + +pytestmark = pytest.mark.usefixtures('ds1_copy') +PRIVATE_DIRS = ['.gnupg', '.ssh'] + + +def test_pdirs_missing(runner, yadm_y, paths): + """Private dirs (private dirs missing) + + When a git command is run + And private directories are missing + Create private directories prior to command + """ + + # confirm directories are missing at start + for pdir in PRIVATE_DIRS: + path = paths.work.join(pdir) + if path.exists(): + path.remove() + assert not path.exists() + + # run status + run = runner(command=yadm_y('status'), env={'DEBUG': 'yes'}) + assert run.success + assert run.err == '' + assert 'On branch master' in run.out + + # confirm directories are created + # and are protected + for pdir in PRIVATE_DIRS: + path = paths.work.join(pdir) + assert path.exists() + assert oct(path.stat().mode).endswith('00'), 'Directory is not secured' + + # confirm directories are created before command is run: + assert re.search( + r'Creating.+\.gnupg.+Creating.+\.ssh.+Running git command git status', + run.out, re.DOTALL), 'directories created before command is run' + + +def test_pdirs_missing_apd_false(runner, yadm_y, paths): + """Private dirs (private dirs missing / yadm.auto-private-dirs=false) + + When a git command is run + And private directories are missing + But auto-private-dirs is false + Do not create private dirs + """ + + # confirm directories are missing at start + for pdir in PRIVATE_DIRS: + path = paths.work.join(pdir) + if path.exists(): + path.remove() + assert not path.exists() + + # set configuration + os.system(' '.join(yadm_y( + 'config', '--bool', 'yadm.auto-private-dirs', 'false'))) + + # run status + run = runner(command=yadm_y('status')) + assert run.success + assert run.err == '' + assert 'On branch master' in run.out + + # confirm directories are STILL missing + for pdir in PRIVATE_DIRS: + assert not paths.work.join(pdir).exists() + + +def test_pdirs_exist_apd_false(runner, yadm_y, paths): + """Private dirs (private dirs exist / yadm.auto-perms=false) + + When a git command is run + And private directories exist + And yadm is configured not to auto update perms + Do not alter directories + """ + + # create permissive directories + for pdir in PRIVATE_DIRS: + path = paths.work.join(pdir) + if not path.isdir(): + path.mkdir() + path.chmod(0o777) + assert oct(path.stat().mode).endswith('77'), 'Directory is secure.' + + # set configuration + os.system(' '.join(yadm_y( + 'config', '--bool', 'yadm.auto-perms', 'false'))) + + # run status + run = runner(command=yadm_y('status')) + assert run.success + assert run.err == '' + assert 'On branch master' in run.out + + # created directories are STILL permissive + for pdir in PRIVATE_DIRS: + path = paths.work.join(pdir) + assert oct(path.stat().mode).endswith('77'), 'Directory is secure' diff --git a/test/test_bootstrap.py b/test/test_bootstrap.py new file mode 100644 index 0000000..2adbe33 --- /dev/null +++ b/test/test_bootstrap.py @@ -0,0 +1,31 @@ +"""Test bootstrap""" + +import pytest + + +@pytest.mark.parametrize( + 'exists, executable, code, expect', [ + (False, False, 1, 'Cannot execute bootstrap'), + (True, False, 1, 'is not an executable program'), + (True, True, 123, 'Bootstrap successful'), + ], ids=[ + 'missing', + 'not executable', + 'executable', + ]) +def test_bootstrap( + runner, yadm_y, paths, exists, executable, code, expect): + """Test bootstrap command""" + if exists: + paths.bootstrap.write('') + if executable: + paths.bootstrap.write( + '#!/bin/bash\n' + f'echo {expect}\n' + f'exit {code}\n' + ) + paths.bootstrap.chmod(0o775) + run = runner(command=yadm_y('bootstrap')) + assert run.code == code + assert run.err == '' + assert expect in run.out diff --git a/test/test_clean.py b/test/test_clean.py new file mode 100644 index 0000000..9a2221a --- /dev/null +++ b/test/test_clean.py @@ -0,0 +1,11 @@ +"""Test clean""" + + +def test_clean_command(runner, yadm_y): + """Run with clean command""" + run = runner(command=yadm_y('clean')) + # do nothing, this is a dangerous Git command when managing dot files + # report the command as disabled and exit as a failure + assert run.failure + assert run.err == '' + assert 'disabled' in run.out diff --git a/test/test_clone.py b/test/test_clone.py new file mode 100644 index 0000000..90febe1 --- /dev/null +++ b/test/test_clone.py @@ -0,0 +1,274 @@ +"""Test clone""" + +import os +import re +import pytest + +BOOTSTRAP_CODE = 123 +BOOTSTRAP_MSG = 'Bootstrap successful' + + +@pytest.mark.usefixtures('remote') +@pytest.mark.parametrize( + 'good_remote, repo_exists, force, conflicts', [ + (False, False, False, False), + (True, False, False, False), + (True, True, False, False), + (True, True, True, False), + (True, False, False, True), + ], ids=[ + 'bad remote', + 'simple', + 'existing repo', + '-f', + 'conflicts', + ]) +def test_clone( + runner, paths, yadm_y, repo_config, ds1, + good_remote, repo_exists, force, conflicts): + """Test basic clone operation""" + + # determine remote url + remote_url = f'file://{paths.remote}' + if not good_remote: + remote_url = 'file://bad_remote' + + old_repo = None + if repo_exists: + # put a repo in the way + paths.repo.mkdir() + old_repo = paths.repo.join('old_repo') + old_repo.write('old_repo') + + if conflicts: + ds1.tracked[0].relative.write('conflict') + assert ds1.tracked[0].relative.exists() + + # run the clone command + args = ['clone', '-w', paths.work] + if force: + args += ['-f'] + args += [remote_url] + run = runner(command=yadm_y(*args)) + + if not good_remote: + # clone should fail + assert run.failure + assert run.err != '' + assert 'Unable to fetch origin' in run.out + assert not paths.repo.exists() + elif repo_exists and not force: + # can't overwrite data + assert run.failure + assert run.err == '' + assert 'Git repo already exists' in run.out + else: + # clone should succeed, and repo should be configured properly + assert successful_clone(run, paths, repo_config) + + # ensure conflicts are handled properly + if conflicts: + assert 'NOTE' in run.out + assert 'Merging origin/master failed' in run.out + assert 'Conflicts preserved' in run.out + + # confirm correct Git origin + run = runner( + command=('git', 'remote', '-v', 'show'), + env={'GIT_DIR': paths.repo}) + assert run.success + assert run.err == '' + assert f'origin\t{remote_url}' in run.out + + # ensure conflicts are really preserved + if conflicts: + # test to see if the work tree is actually "clean" + run = runner( + command=yadm_y('status', '-uno', '--porcelain'), + cwd=paths.work) + assert run.success + assert run.err == '' + assert run.out == '', 'worktree has unexpected changes' + + # test to see if the conflicts are stashed + run = runner(command=yadm_y('stash', 'list'), cwd=paths.work) + assert run.success + assert run.err == '' + assert 'Conflicts preserved' in run.out, 'conflicts not stashed' + + # verify content of the stashed conflicts + run = runner(command=yadm_y('stash', 'show', '-p'), cwd=paths.work) + assert run.success + assert run.err == '' + assert '\n+conflict' in run.out, 'conflicts not stashed' + + # another force-related assertion + if old_repo: + if force: + assert not old_repo.exists() + else: + assert old_repo.exists() + + +@pytest.mark.usefixtures('remote') +@pytest.mark.parametrize( + 'bs_exists, bs_param, answer', [ + (False, '--bootstrap', None), + (True, '--bootstrap', None), + (True, '--no-bootstrap', None), + (True, None, 'n'), + (True, None, 'y'), + ], ids=[ + 'force, missing', + 'force, existing', + 'prevent', + 'existing, answer n', + 'existing, answer y', + ]) +def test_clone_bootstrap( + runner, paths, yadm_y, repo_config, bs_exists, bs_param, answer): + """Test bootstrap clone features""" + + # establish a bootstrap + create_bootstrap(paths, bs_exists) + + # run the clone command + args = ['clone', '-w', paths.work] + if bs_param: + args += [bs_param] + args += [f'file://{paths.remote}'] + expect = [] + if answer: + expect.append(('Would you like to execute it now', answer)) + run = runner(command=yadm_y(*args), expect=expect) + + if answer: + assert 'Would you like to execute it now' in run.out + + expected_code = 0 + if bs_exists and bs_param != '--no-bootstrap': + expected_code = BOOTSTRAP_CODE + + if answer == 'y': + expected_code = BOOTSTRAP_CODE + assert BOOTSTRAP_MSG in run.out + elif answer == 'n': + expected_code = 0 + assert BOOTSTRAP_MSG not in run.out + + assert successful_clone(run, paths, repo_config, expected_code) + + if not bs_exists: + assert BOOTSTRAP_MSG not in run.out + + +def create_bootstrap(paths, exists): + """Create bootstrap file for test""" + if exists: + paths.bootstrap.write( + '#!/bin/sh\n' + f'echo {BOOTSTRAP_MSG}\n' + f'exit {BOOTSTRAP_CODE}\n') + paths.bootstrap.chmod(0o775) + assert paths.bootstrap.exists() + else: + assert not paths.bootstrap.exists() + + +@pytest.mark.usefixtures('remote') +@pytest.mark.parametrize( + 'private_type, in_repo, in_work', [ + ('ssh', False, True), + ('gnupg', False, True), + ('ssh', True, True), + ('gnupg', True, True), + ('ssh', True, False), + ('gnupg', True, False), + ], ids=[ + 'open ssh, not tracked', + 'open gnupg, not tracked', + 'open ssh, tracked', + 'open gnupg, tracked', + 'missing ssh, tracked', + 'missing gnupg, tracked', + ]) +def test_clone_perms( + runner, yadm_y, paths, repo_config, + private_type, in_repo, in_work): + """Test clone permission-related functions""" + + # update remote repo to include private data + if in_repo: + rpath = paths.work.mkdir(f'.{private_type}').join('related') + rpath.write('related') + os.system(f'GIT_DIR="{paths.remote}" git add {rpath}') + os.system(f'GIT_DIR="{paths.remote}" git commit -m "{rpath}"') + rpath.remove() + + # ensure local private data is insecure at the start + if in_work: + pdir = paths.work.join(f'.{private_type}') + if not pdir.exists(): + pdir.mkdir() + pfile = pdir.join('existing') + pfile.write('existing') + pdir.chmod(0o777) + pfile.chmod(0o777) + else: + paths.work.remove() + paths.work.mkdir() + + run = runner( + yadm_y('clone', '-d', '-w', paths.work, f'file://{paths.remote}')) + + assert successful_clone(run, paths, repo_config) + if in_work: + # private directories which already exist, should be left as they are, + # which in this test is "insecure". + assert re.search( + f'initial private dir perms drwxrwxrwx.+.{private_type}', + run.out) + assert re.search( + f'pre-merge private dir perms drwxrwxrwx.+.{private_type}', + run.out) + assert re.search( + f'post-merge private dir perms drwxrwxrwx.+.{private_type}', + run.out) + else: + # private directories which are created, should be done prior to + # merging, and with secure permissions. + assert 'initial private dir perms' not in run.out + assert re.search( + f'pre-merge private dir perms drwx------.+.{private_type}', + run.out) + assert re.search( + f'post-merge private dir perms drwx------.+.{private_type}', + run.out) + + # standard perms still apply afterwards unless disabled with auto.perms + assert oct( + paths.work.join(f'.{private_type}').stat().mode).endswith('00'), ( + f'.{private_type} has not been secured by auto.perms') + + +def successful_clone(run, paths, repo_config, expected_code=0): + """Assert clone is successful""" + assert run.code == expected_code + assert 'Initialized' in run.out + assert oct(paths.repo.stat().mode).endswith('00'), 'Repo is not secured' + assert repo_config('core.bare') == 'false' + assert repo_config('status.showUntrackedFiles') == 'no' + assert repo_config('yadm.managed') == 'true' + return True + + +@pytest.fixture() +def remote(paths, ds1_repo_copy): + """Function scoped remote (based on ds1)""" + # pylint: disable=unused-argument + # This is ignored because + # @pytest.mark.usefixtures('ds1_remote_copy') + # cannot be applied to another fixture. + paths.remote.remove() + paths.repo.move(paths.remote) + return None diff --git a/test/test_config.py b/test/test_config.py new file mode 100644 index 0000000..4e44b1c --- /dev/null +++ b/test/test_config.py @@ -0,0 +1,139 @@ +"""Test config""" + +import os +import pytest + +TEST_SECTION = 'test' +TEST_ATTRIBUTE = 'attribute' +TEST_KEY = f'{TEST_SECTION}.{TEST_ATTRIBUTE}' +TEST_VALUE = 'testvalue' +TEST_FILE = f'[{TEST_SECTION}]\n\t{TEST_ATTRIBUTE} = {TEST_VALUE}' + + +def test_config_no_params(runner, yadm_y, supported_configs): + """No parameters + + Display instructions + Display supported configs + Exit with 0 + """ + + run = runner(yadm_y('config')) + + assert run.success + assert run.err == '' + assert 'Please read the CONFIGURATION section' in run.out + for config in supported_configs: + assert config in run.out + + +def test_config_read_missing(runner, yadm_y): + """Read missing attribute + + Display an empty value + Exit with 0 + """ + + run = runner(yadm_y('config', TEST_KEY)) + + assert run.success + assert run.err == '' + assert run.out == '' + + +def test_config_write(runner, yadm_y, paths): + """Write attribute + + Display no output + Update configuration file + Exit with 0 + """ + + run = runner(yadm_y('config', TEST_KEY, TEST_VALUE)) + + assert run.success + assert run.err == '' + assert run.out == '' + assert paths.config.read().strip() == TEST_FILE + + +def test_config_read(runner, yadm_y, paths): + """Read attribute + + Display value + Exit with 0 + """ + + paths.config.write(TEST_FILE) + run = runner(yadm_y('config', TEST_KEY)) + + assert run.success + assert run.err == '' + assert run.out.strip() == TEST_VALUE + + +def test_config_update(runner, yadm_y, paths): + """Update attribute + + Display no output + Update configuration file + Exit with 0 + """ + + paths.config.write(TEST_FILE) + + run = runner(yadm_y('config', TEST_KEY, TEST_VALUE + 'extra')) + + assert run.success + assert run.err == '' + assert run.out == '' + + assert paths.config.read().strip() == TEST_FILE + 'extra' + + +@pytest.mark.usefixtures('ds1_repo_copy') +def test_config_local_read(runner, yadm_y, paths, supported_local_configs): + """Read local attribute + + Display value from the repo config + Exit with 0 + """ + + # populate test values + for config in supported_local_configs: + os.system( + f'GIT_DIR="{paths.repo}" ' + f'git config --local "{config}" "value_of_{config}"') + + # run yadm config + for config in supported_local_configs: + run = runner(yadm_y('config', config)) + assert run.success + assert run.err == '' + assert run.out.strip() == f'value_of_{config}' + + +@pytest.mark.usefixtures('ds1_repo_copy') +def test_config_local_write(runner, yadm_y, paths, supported_local_configs): + """Write local attribute + + Display no output + Write value to the repo config + Exit with 0 + """ + + # run yadm config + for config in supported_local_configs: + run = runner(yadm_y('config', config, f'value_of_{config}')) + assert run.success + assert run.err == '' + assert run.out == '' + + # verify test values + for config in supported_local_configs: + run = runner( + command=('git', 'config', config), + env={'GIT_DIR': paths.repo}) + assert run.success + assert run.err == '' + assert run.out.strip() == f'value_of_{config}' diff --git a/test/test_cygwin_copy.py b/test/test_cygwin_copy.py new file mode 100644 index 0000000..82b08ba --- /dev/null +++ b/test/test_cygwin_copy.py @@ -0,0 +1,59 @@ +"""Test yadm.cygwin_copy""" + +import os +import pytest + + +@pytest.mark.parametrize( + 'setting, is_cygwin, expect_link, pre_existing', [ + (None, False, True, None), + (True, False, True, None), + (False, False, True, None), + (None, True, True, None), + (True, True, False, None), + (False, True, True, None), + (True, True, False, 'link'), + (True, True, False, 'file'), + ], + ids=[ + 'unset, non-cygwin', + 'true, non-cygwin', + 'false, non-cygwin', + 'unset, cygwin', + 'true, cygwin', + 'false, cygwin', + 'pre-existing symlink', + 'pre-existing file', + ]) +@pytest.mark.usefixtures('ds1_copy') +def test_cygwin_copy( + runner, yadm_y, paths, cygwin_sys, tst_sys, + setting, is_cygwin, expect_link, pre_existing): + """Test yadm.cygwin_copy""" + + if setting is not None: + os.system(' '.join(yadm_y('config', 'yadm.cygwin-copy', str(setting)))) + + expected_content = f'test_cygwin_copy##{tst_sys}' + alt_path = paths.work.join('test_cygwin_copy') + if pre_existing == 'symlink': + alt_path.mklinkto(expected_content) + elif pre_existing == 'file': + alt_path.write('wrong content') + + uname_path = paths.root.join('tmp').mkdir() + if is_cygwin: + uname = uname_path.join('uname') + uname.write(f'#!/bin/sh\necho "{cygwin_sys}"\n') + uname.chmod(0o777) + expected_content = f'test_cygwin_copy##{cygwin_sys}' + env = os.environ.copy() + env['PATH'] = ':'.join([str(uname_path), env['PATH']]) + + run = runner(yadm_y('alt'), env=env) + assert run.success + assert run.err == '' + assert 'Linking' in run.out + + assert alt_path.read() == expected_content + assert alt_path.islink() == expect_link diff --git a/test/test_encryption.py b/test/test_encryption.py new file mode 100644 index 0000000..f107ad5 --- /dev/null +++ b/test/test_encryption.py @@ -0,0 +1,392 @@ +"""Test encryption""" + +import os +import pipes +import pytest + +KEY_FILE = 'test/test_key' +KEY_FINGERPRINT = 'F8BBFC746C58945442349BCEBA54FFD04C599B1A' +KEY_NAME = 'yadm-test1' +KEY_TRUST = 'test/ownertrust.txt' +PASSPHRASE = 'ExamplePassword' + +pytestmark = pytest.mark.usefixtures('config_git') + + +def add_asymmetric_key(): + """Add asymmetric key""" + os.system(f'gpg --import {pipes.quote(KEY_FILE)}') + os.system(f'gpg --import-ownertrust < {pipes.quote(KEY_TRUST)}') + + +def remove_asymmetric_key(): + """Remove asymmetric key""" + os.system( + f'gpg --batch --yes ' + f'--delete-secret-keys {pipes.quote(KEY_FINGERPRINT)}') + os.system(f'gpg --batch --yes --delete-key {pipes.quote(KEY_FINGERPRINT)}') + + +@pytest.fixture +def asymmetric_key(): + """Fixture for asymmetric key, removed in teardown""" + add_asymmetric_key() + yield KEY_NAME + remove_asymmetric_key() + + +@pytest.fixture +def encrypt_targets(yadm_y, paths): + """Fixture for setting up data to encrypt + + This fixture: + * inits an empty repo + * creates test files in the work tree + * creates a ".yadm/encrypt" file for testing: + * standard files + * standard globs + * directories + * comments + * empty lines and lines with just space + * exclusions + * returns a list of expected encrypted files + """ + + # init empty yadm repo + os.system(' '.join(yadm_y('init', '-w', str(paths.work), '-f'))) + + expected = [] + + # standard files w/ dirs & spaces + paths.work.join('inc file1').write('inc file1') + expected.append('inc file1') + paths.encrypt.write('inc file1\n') + paths.work.join('inc dir').mkdir() + paths.work.join('inc dir/inc file2').write('inc file2') + expected.append('inc dir/inc file2') + paths.encrypt.write('inc dir/inc file2\n', mode='a') + + # standard globs w/ dirs & spaces + paths.work.join('globs file1').write('globs file1') + expected.append('globs file1') + paths.work.join('globs dir').mkdir() + paths.work.join('globs dir/globs file2').write('globs file2') + expected.append('globs dir/globs file2') + paths.encrypt.write('globs*\n', mode='a') + + # blank lines + paths.encrypt.write('\n \n\t\n', mode='a') + + # comments + paths.work.join('commentfile1').write('commentfile1') + paths.encrypt.write('#commentfile1\n', mode='a') + paths.encrypt.write(' #commentfile1\n', mode='a') + + # exclusions + paths.work.join('extest').mkdir() + paths.encrypt.write('extest/*\n', mode='a') # include within extest + paths.work.join('extest/inglob1').write('inglob1') + paths.work.join('extest/exglob1').write('exglob1') + paths.work.join('extest/exglob2').write('exglob2') + paths.encrypt.write('!extest/ex*\n', mode='a') # exclude the ex* + expected.append('extest/inglob1') # should be left with only in* + + return expected + + +@pytest.fixture(scope='session') +def decrypt_targets(tmpdir_factory, runner): + """Fixture for setting data to decrypt + + This fixture: + * creates symmetric/asymmetric encrypted archives + * creates a list of expected decrypted files + """ + + tmpdir = tmpdir_factory.mktemp('decrypt_targets') + symmetric = tmpdir.join('symmetric.tar.gz.gpg') + asymmetric = tmpdir.join('asymmetric.tar.gz.gpg') + + expected = [] + + tmpdir.join('decrypt1').write('decrypt1') + expected.append('decrypt1') + tmpdir.join('decrypt2').write('decrypt2') + expected.append('decrypt2') + tmpdir.join('subdir').mkdir() + tmpdir.join('subdir/decrypt3').write('subdir/decrypt3') + expected.append('subdir/decrypt3') + + run = runner( + ['tar', 'cvf', '-'] + + expected + + ['|', 'gpg', '--batch', '--yes', '-c'] + + ['--passphrase', pipes.quote(PASSPHRASE)] + + ['--output', pipes.quote(str(symmetric))], + cwd=tmpdir, + shell=True) + assert run.success + + add_asymmetric_key() + run = runner( + ['tar', 'cvf', '-'] + + expected + + ['|', 'gpg', '--batch', '--yes', '-e'] + + ['-r', pipes.quote(KEY_NAME)] + + ['--output', pipes.quote(str(asymmetric))], + cwd=tmpdir, + shell=True) + assert run.success + remove_asymmetric_key() + + return { + 'asymmetric': asymmetric, + 'expected': expected, + 'symmetric': symmetric, + } + + +@pytest.mark.parametrize( + 'mismatched_phrase', [False, True], + ids=['matching_phrase', 'mismatched_phrase']) +@pytest.mark.parametrize( + 'missing_encrypt', [False, True], + ids=['encrypt_exists', 'encrypt_missing']) +@pytest.mark.parametrize( + 'overwrite', [False, True], + ids=['clean', 'overwrite']) +def test_symmetric_encrypt( + runner, yadm_y, paths, encrypt_targets, + overwrite, missing_encrypt, mismatched_phrase): + """Test symmetric encryption""" + + if missing_encrypt: + paths.encrypt.remove() + + matched_phrase = PASSPHRASE + if mismatched_phrase: + matched_phrase = 'mismatched' + + if overwrite: + paths.archive.write('existing archive') + + run = runner(yadm_y('encrypt'), expect=[ + ('passphrase:', PASSPHRASE), + ('passphrase:', matched_phrase), + ]) + + if missing_encrypt or mismatched_phrase: + assert run.failure + else: + assert run.success + assert run.err == '' + + if missing_encrypt: + assert 'does not exist' in run.out + elif mismatched_phrase: + assert 'invalid passphrase' in run.out + else: + assert encrypted_data_valid(runner, paths.archive, encrypt_targets) + + +@pytest.mark.parametrize( + 'wrong_phrase', [False, True], + ids=['correct_phrase', 'wrong_phrase']) +@pytest.mark.parametrize( + 'archive_exists', [True, False], + ids=['archive_exists', 'archive_missing']) +@pytest.mark.parametrize( + 'dolist', [False, True], + ids=['decrypt', 'list']) +def test_symmetric_decrypt( + runner, yadm_y, paths, decrypt_targets, + dolist, archive_exists, wrong_phrase): + """Test decryption""" + + # init empty yadm repo + os.system(' '.join(yadm_y('init', '-w', str(paths.work), '-f'))) + + phrase = PASSPHRASE + if wrong_phrase: + phrase = 'wrong-phrase' + + if archive_exists: + decrypt_targets['symmetric'].copy(paths.archive) + + # to test overwriting + paths.work.join('decrypt1').write('pre-existing file') + + args = [] + + if dolist: + args.append('-l') + run = runner(yadm_y('decrypt') + args, expect=[('passphrase:', phrase)]) + + if archive_exists and not wrong_phrase: + assert run.success + assert run.err == '' + if dolist: + for filename in decrypt_targets['expected']: + if filename != 'decrypt1': # this one should exist + assert not paths.work.join(filename).exists() + assert filename in run.out + else: + for filename in decrypt_targets['expected']: + assert paths.work.join(filename).read() == filename + else: + assert run.failure + + +@pytest.mark.usefixtures('asymmetric_key') +@pytest.mark.parametrize( + 'ask', [False, True], + ids=['no_ask', 'ask']) +@pytest.mark.parametrize( + 'key_exists', [True, False], + ids=['key_exists', 'key_missing']) +@pytest.mark.parametrize( + 'overwrite', [False, True], + ids=['clean', 'overwrite']) +def test_asymmetric_encrypt( + runner, yadm_y, paths, encrypt_targets, + overwrite, key_exists, ask): + """Test asymmetric encryption""" + + # specify encryption recipient + if ask: + os.system(' '.join(yadm_y('config', 'yadm.gpg-recipient', 'ASK'))) + expect = [('Enter the user ID', KEY_NAME), ('Enter the user ID', '')] + else: + os.system(' '.join(yadm_y('config', 'yadm.gpg-recipient', KEY_NAME))) + expect = [] + + if overwrite: + paths.archive.write('existing archive') + + if not key_exists: + remove_asymmetric_key() + + run = runner(yadm_y('encrypt'), expect=expect) + + if key_exists: + assert run.success + assert encrypted_data_valid(runner, paths.archive, encrypt_targets) + else: + assert run.failure + assert 'Unable to write' in run.out + + if ask: + assert 'Enter the user ID' in run.out + + +@pytest.mark.usefixtures('asymmetric_key') +@pytest.mark.parametrize( + 'key_exists', [True, False], + ids=['key_exists', 'key_missing']) +@pytest.mark.parametrize( + 'dolist', [False, True], + ids=['decrypt', 'list']) +def test_asymmetric_decrypt( + runner, yadm_y, paths, decrypt_targets, + dolist, key_exists): + """Test decryption""" + + # init empty yadm repo + os.system(' '.join(yadm_y('init', '-w', str(paths.work), '-f'))) + + decrypt_targets['asymmetric'].copy(paths.archive) + + # to test overwriting + paths.work.join('decrypt1').write('pre-existing file') + + if not key_exists: + remove_asymmetric_key() + + args = [] + + if dolist: + args.append('-l') + run = runner(yadm_y('decrypt') + args) + + if key_exists: + assert run.success + if dolist: + for filename in decrypt_targets['expected']: + if filename != 'decrypt1': # this one should exist + assert not paths.work.join(filename).exists() + assert filename in run.out + else: + for filename in decrypt_targets['expected']: + assert paths.work.join(filename).read() == filename + else: + assert run.failure + assert 'Unable to extract encrypted files' in run.out + + +@pytest.mark.parametrize( + 'untracked', + [False, 'y', 'n'], + ids=['tracked', 'untracked_answer_y', 'untracked_answer_n']) +def test_offer_to_add(runner, yadm_y, paths, encrypt_targets, untracked): + """Test offer to add encrypted archive + + All the other encryption tests use an archive outside of the work tree. + However, the archive is often inside the work tree, and if it is, there + should be an offer to add it to the repo if it is not tracked. + """ + + worktree_archive = paths.work.join('worktree-archive.tar.gpg') + expect = [ + ('passphrase:', PASSPHRASE), + ('passphrase:', PASSPHRASE), + ] + + if untracked: + expect.append(('add it now', untracked)) + else: + worktree_archive.write('exists') + os.system(' '.join(yadm_y('add', str(worktree_archive)))) + + run = runner( + yadm_y('encrypt', '--yadm-archive', str(worktree_archive)), + expect=expect + ) + + assert run.success + assert run.err == '' + assert encrypted_data_valid(runner, worktree_archive, encrypt_targets) + + run = runner( + yadm_y('status', '--porcelain', '-uall', str(worktree_archive))) + assert run.success + assert run.err == '' + + if untracked == 'y': + # should be added to the index + assert f'A {worktree_archive.basename}' in run.out + elif untracked == 'n': + # should NOT be added to the index + assert f'?? {worktree_archive.basename}' in run.out + else: + # should appear modified in the index + assert f'AM {worktree_archive.basename}' in run.out + + +def encrypted_data_valid(runner, encrypted, expected): + """Verify encrypted data matches expectations""" + run = runner([ + 'gpg', + '--passphrase', pipes.quote(PASSPHRASE), + '-d', pipes.quote(str(encrypted)), + '2>/dev/null', + '|', 'tar', 't'], shell=True, report=False) + file_count = 0 + for filename in run.out.splitlines(): + if filename.endswith('/'): + continue + file_count += 1 + assert filename in expected, ( + f'Unexpected file in archive: {filename}') + assert file_count == len(expected), ( + 'Number of files in archive does not match expected') + return True diff --git a/test/test_enter.py b/test/test_enter.py new file mode 100644 index 0000000..202df32 --- /dev/null +++ b/test/test_enter.py @@ -0,0 +1,85 @@ +"""Test enter""" + +import os +import warnings +import pytest + + +@pytest.mark.parametrize( + 'shell, success', [ + ('delete', True), + ('', False), + ('/usr/bin/env', True), + ('noexec', False), + ], ids=[ + 'shell-missing', + 'shell-empty', + 'shell-env', + 'shell-noexec', + ]) +@pytest.mark.usefixtures('ds1_copy') +def test_enter(runner, yadm_y, paths, shell, success): + """Enter tests""" + env = os.environ.copy() + if shell == 'delete': + # remove shell + if 'SHELL' in env: + del env['SHELL'] + elif shell == 'noexec': + # specify a non-executable path + noexec = paths.root.join('noexec') + noexec.write('') + noexec.chmod(0o664) + env['SHELL'] = str(noexec) + else: + env['SHELL'] = shell + run = runner(command=yadm_y('enter'), env=env) + assert run.success == success + assert run.err == '' + prompt = f'yadm shell ({paths.repo})' + if success: + assert run.out.startswith('Entering yadm repo') + assert run.out.rstrip().endswith('Leaving yadm repo') + if shell == 'delete': + # When SHELL is empty (unlikely), it is attempted to be run anyway. + # This is a but which must be fixed. + warnings.warn('Unhandled bug: SHELL executed when empty', Warning) + else: + assert f'PROMPT={prompt}' in run.out + assert f'PS1={prompt}' in run.out + assert f'GIT_DIR={paths.repo}' in run.out + if not success: + assert 'does not refer to an executable' in run.out + if 'env' in shell: + assert f'GIT_DIR={paths.repo}' in run.out + assert 'PROMPT=yadm shell' in run.out + assert 'PS1=yadm shell' in run.out + + +@pytest.mark.parametrize( + 'shell, opts, path', [ + ('bash', '--norc', '\\w'), + ('csh', '-f', '%~'), + ('zsh', '-f', '%~'), + ], ids=[ + 'bash', + 'csh', + 'zsh', + ]) +@pytest.mark.usefixtures('ds1_copy') +def test_enter_shell_ops(runner, yadm_y, paths, shell, opts, path): + """Enter tests for specific shell options""" + + # Create custom shell to detect options passed + custom_shell = paths.root.join(shell) + custom_shell.write('#!/bin/sh\necho OPTS=$*\necho PROMPT=$PROMPT') + custom_shell.chmod(0o775) + + env = os.environ.copy() + env['SHELL'] = custom_shell + + run = runner(command=yadm_y('enter'), env=env) + assert run.success + assert run.err == '' + assert f'OPTS={opts}' in run.out + assert f'PROMPT=yadm shell ({paths.repo}) {path} >' in run.out diff --git a/test/test_git.py b/test/test_git.py new file mode 100644 index 0000000..427c54a --- /dev/null +++ b/test/test_git.py @@ -0,0 +1,58 @@ +"""Test git""" + +import re +import pytest + + +@pytest.mark.usefixtures('ds1_copy') +def test_git(runner, yadm_y, paths): + """Test series of passthrough git commands + + Passthru unknown commands to Git + Git command 'add' - badfile + Git command 'add' + Git command 'status' + Git command 'commit' + Git command 'log' + """ + + # passthru unknown commands to Git + run = runner(command=yadm_y('bogus')) + assert run.failure + assert "git: 'bogus' is not a git command." in run.err + assert "See 'git --help'" in run.err + assert run.out == '' + + # git command 'add' - badfile + run = runner(command=yadm_y('add', '-v', 'does_not_exist')) + assert run.code == 128 + assert "pathspec 'does_not_exist' did not match any files" in run.err + assert run.out == '' + + # git command 'add' + newfile = paths.work.join('test_git') + newfile.write('test_git') + run = runner(command=yadm_y('add', '-v', str(newfile))) + assert run.success + assert run.err == '' + assert "add 'test_git'" in run.out + + # git command 'status' + run = runner(command=yadm_y('status')) + assert run.success + assert run.err == '' + assert re.search(r'new file:\s+test_git', run.out) + + # git command 'commit' + run = runner(command=yadm_y('commit', '-m', 'Add test_git')) + assert run.success + assert run.err == '' + assert '1 file changed' in run.out + assert '1 insertion' in run.out + assert re.search(r'create mode .+ test_git', run.out) + + # git command 'log' + run = runner(command=yadm_y('log', '--oneline')) + assert run.success + assert run.err == '' + assert 'Add test_git' in run.out diff --git a/test/test_help.py b/test/test_help.py new file mode 100644 index 0000000..79a7652 --- /dev/null +++ b/test/test_help.py @@ -0,0 +1,17 @@ +"""Test help""" + + +def test_missing_command(runner, yadm_y): + """Run without any command""" + run = runner(command=yadm_y()) + assert run.failure + assert run.err == '' + assert run.out.startswith('Usage: yadm') + + +def test_help_command(runner, yadm_y): + """Run with help command""" + run = runner(command=yadm_y('help')) + assert run.failure + assert run.err == '' + assert run.out.startswith('Usage: yadm') diff --git a/test/test_hooks.py b/test/test_hooks.py new file mode 100644 index 0000000..f1df91e --- /dev/null +++ b/test/test_hooks.py @@ -0,0 +1,90 @@ +"""Test hooks""" + +import pytest + + +@pytest.mark.parametrize( + 'pre, pre_code, post, post_code', [ + (False, 0, False, 0), + (True, 0, False, 0), + (True, 5, False, 0), + (False, 0, True, 0), + (False, 0, True, 5), + (True, 0, True, 0), + (True, 5, True, 5), + ], ids=[ + 'no-hooks', + 'pre-success', + 'pre-fail', + 'post-success', + 'post-fail', + 'pre-post-success', + 'pre-post-fail', + ]) +def test_hooks( + runner, yadm_y, paths, + pre, pre_code, post, post_code): + """Test pre/post hook""" + + # generate hooks + if pre: + create_hook(paths, 'pre_version', pre_code) + if post: + create_hook(paths, 'post_version', post_code) + + # run yadm + run = runner(yadm_y('version')) + # when a pre hook fails, yadm should exit with the hook's code + assert run.code == pre_code + assert run.err == '' + + if pre: + assert 'HOOK:pre_version' in run.out + # if pre hook is missing or successful, yadm itself should exit 0 + if run.success: + if post: + assert 'HOOK:post_version' in run.out + else: + # when a pre hook fails, yadm should not run the command + assert 'version will not be run' in run.out + # when a pre hook fails, yadm should not run the post hook + assert 'HOOK:post_version' not in run.out + + +# repo fixture is needed to test the population of YADM_HOOK_WORK +@pytest.mark.usefixtures('ds1_repo_copy') +def test_hook_env(runner, yadm_y, paths): + """Test hook environment""" + + # test will be done with a non existent "git" passthru command + # which should exit with a failing code + cmd = 'passthrucmd' + + # write the hook + hook = paths.hooks.join(f'post_{cmd}') + hook.write('#!/bin/sh\nenv\n') + hook.chmod(0o755) + + run = runner(yadm_y(cmd, 'extra_args')) + + # expect passthru to fail + assert run.failure + assert f"'{cmd}' is not a git command" in run.err + + # verify hook environment + assert 'YADM_HOOK_EXIT=1\n' in run.out + assert f'YADM_HOOK_COMMAND={cmd}\n' in run.out + assert f'YADM_HOOK_FULL_COMMAND={cmd} extra_args\n' in run.out + assert f'YADM_HOOK_REPO={paths.repo}\n' in run.out + assert f'YADM_HOOK_WORK={paths.work}\n' in run.out + + +def create_hook(paths, name, code): + """Create hook""" + hook = paths.hooks.join(name) + hook.write( + '#!/bin/sh\n' + f'echo HOOK:{name}\n' + f'exit {code}\n' + ) + hook.chmod(0o755) diff --git a/test/test_init.py b/test/test_init.py new file mode 100644 index 0000000..1519b38 --- /dev/null +++ b/test/test_init.py @@ -0,0 +1,78 @@ +"""Test init""" + +import pytest + + +@pytest.mark.parametrize( + 'alt_work, repo_present, force', [ + (False, False, False), + (True, False, False), + (False, True, False), + (False, True, True), + (True, True, True), + ], ids=[ + 'simple', + '-w', + 'existing repo', + '-f', + '-w & -f', + ]) +@pytest.mark.usefixtures('ds1_work_copy') +def test_init( + runner, yadm_y, paths, repo_config, alt_work, repo_present, force): + """Test init + + Repos should have attribs: + - 0600 permissions + - not bare + - worktree = $HOME + - showUntrackedFiles = no + - yadm.managed = true + """ + + # these tests will assume this for $HOME + home = str(paths.root.mkdir('HOME')) + + # ds1_work_copy comes WITH an empty repo dir present. + old_repo = paths.repo.join('old_repo') + if repo_present: + # Let's put some data in it, so we can confirm that data is gone when + # forced to be overwritten. + old_repo.write('old repo data') + assert old_repo.isfile() + else: + paths.repo.remove() + + # command args + args = ['init'] + if alt_work: + args.extend(['-w', paths.work]) + if force: + args.append('-f') + + # run init + run = runner(yadm_y(*args), env={'HOME': home}) + assert run.err == '' + + if repo_present and not force: + assert run.failure + assert 'repo already exists' in run.out + assert old_repo.isfile(), 'Missing original repo' + else: + assert run.success + assert 'Initialized empty shared Git repository' in run.out + + if repo_present: + assert not old_repo.isfile(), 'Original repo still exists' + + if alt_work: + assert repo_config('core.worktree') == paths.work + else: + assert repo_config('core.worktree') == home + + # uniform repo assertions + assert oct(paths.repo.stat().mode).endswith('00'), ( + 'Repo is not secure') + assert repo_config('core.bare') == 'false' + assert repo_config('status.showUntrackedFiles') == 'no' + assert repo_config('yadm.managed') == 'true' diff --git a/test/test_introspect.py b/test/test_introspect.py new file mode 100644 index 0000000..9026e98 --- /dev/null +++ b/test/test_introspect.py @@ -0,0 +1,46 @@ +"""Test introspect""" + +import pytest + + +@pytest.mark.parametrize( + 'name', [ + '', + 'invalid', + 'commands', + 'configs', + 'repo', + 'switches', + ]) +def test_introspect_category( + runner, yadm_y, paths, name, + supported_commands, supported_configs, supported_switches): + """Validate introspection category""" + if name: + run = runner(command=yadm_y('introspect', name)) + else: + run = runner(command=yadm_y('introspect')) + + assert run.success + assert run.err == '' + + expected = [] + if name == 'commands': + expected = supported_commands + elif name == 'config': + expected = supported_configs + elif name == 'switches': + expected = supported_switches + + # assert values + if name in ('', 'invalid'): + assert run.out == '' + if name == 'repo': + assert run.out.rstrip() == paths.repo + + # make sure every expected value is present + for value in expected: + assert value in run.out + # make sure nothing extra is present + if expected: + assert len(run.out.split()) == len(expected) diff --git a/test/test_jinja.py b/test/test_jinja.py new file mode 100644 index 0000000..6e43f35 --- /dev/null +++ b/test/test_jinja.py @@ -0,0 +1,186 @@ +"""Test jinja""" + +import os +import re +import pytest +import utils + + +@pytest.fixture(scope='module') +def envtpl_present(runner): + """Is envtpl present and working?""" + try: + run = runner(command=['envtpl', '-h']) + if run.success: + return True + except BaseException: + pass + return False + + +@pytest.mark.usefixtures('ds1_copy') +def test_local_override(runner, yadm_y, paths, + tst_distro, envtpl_present): + """Test local overrides""" + if not envtpl_present: + pytest.skip('Unable to test without envtpl.') + + # define local overrides + utils.set_local(paths, 'class', 'or-class') + utils.set_local(paths, 'hostname', 'or-hostname') + utils.set_local(paths, 'os', 'or-os') + utils.set_local(paths, 'user', 'or-user') + + template = ( + 'j2-{{ YADM_CLASS }}-' + '{{ YADM_OS }}-{{ YADM_HOSTNAME }}-' + '{{ YADM_USER }}-{{ YADM_DISTRO }}' + ) + expected = f'j2-or-class-or-os-or-hostname-or-user-{tst_distro}' + + utils.create_alt_files(paths, '##yadm.j2', content=template) + + # run alt to trigger linking + run = runner(yadm_y('alt')) + assert run.success + assert run.err == '' + created = created_list(run.out) + + # assert the proper creation has occurred + for file_path in (utils.ALT_FILE1, utils.ALT_FILE2): + source_file = file_path + '##yadm.j2' + assert paths.work.join(file_path).isfile() + lines = paths.work.join(file_path).readlines(cr=False) + assert lines[0] == source_file + assert lines[1] == expected + assert str(paths.work.join(source_file)) in created + + +@pytest.mark.parametrize('autoalt', [None, 'true', 'false']) +@pytest.mark.usefixtures('ds1_copy') +def test_auto_alt(runner, yadm_y, paths, autoalt, tst_sys, + envtpl_present): + """Test setting auto-alt""" + + if not envtpl_present: + pytest.skip('Unable to test without envtpl.') + + # set the value of auto-alt + if autoalt: + os.system(' '.join(yadm_y('config', 'yadm.auto-alt', autoalt))) + + # create file + jinja_suffix = '##yadm.j2' + utils.create_alt_files(paths, jinja_suffix, content='{{ YADM_OS }}') + + # run status to possibly trigger linking + run = runner(yadm_y('status')) + assert run.success + assert run.err == '' + created = created_list(run.out) + + # assert the proper creation has occurred + for file_path in (utils.ALT_FILE1, utils.ALT_FILE2): + source_file = file_path + jinja_suffix + if autoalt == 'false': + assert not paths.work.join(file_path).exists() + else: + assert paths.work.join(file_path).isfile() + lines = paths.work.join(file_path).readlines(cr=False) + assert lines[0] == source_file + assert lines[1] == tst_sys + # no created output when run via auto-alt + assert str(paths.work.join(source_file)) not in created + + +@pytest.mark.usefixtures('ds1_copy') +def test_jinja_envtpl_missing(runner, paths): + """Test operation when envtpl is missing""" + + script = f""" + YADM_TEST=1 source {paths.pgm} + process_global_args -Y "{paths.yadm}" + set_operating_system + configure_paths + ENVTPL_PROGRAM='envtpl_missing' main alt + """ + + utils.create_alt_files(paths, '##yadm.j2') + + run = runner(command=['bash'], inp=script) + assert run.success + assert run.err == '' + assert f'envtpl not available, not creating' in run.out + + +@pytest.mark.parametrize( + 'tracked, encrypt, exclude', [ + (False, False, False), + (True, False, False), + (False, True, False), + (False, True, True), + ], ids=[ + 'untracked', + 'tracked', + 'encrypted', + 'excluded', + ]) +@pytest.mark.usefixtures('ds1_copy') +def test_jinja(runner, yadm_y, paths, + tst_sys, tst_host, tst_user, tst_distro, + tracked, encrypt, exclude, + envtpl_present): + """Test jinja processing""" + + if not envtpl_present: + pytest.skip('Unable to test without envtpl.') + + jinja_suffix = '##yadm.j2' + + # set the class + tst_class = 'testclass' + utils.set_local(paths, 'class', tst_class) + + template = ( + 'j2-{{ YADM_CLASS }}-' + '{{ YADM_OS }}-{{ YADM_HOSTNAME }}-' + '{{ YADM_USER }}-{{ YADM_DISTRO }}' + ) + expected = ( + f'j2-{tst_class}-' + f'{tst_sys}-{tst_host}-' + f'{tst_user}-{tst_distro}' + ) + + utils.create_alt_files(paths, jinja_suffix, content=template, + tracked=tracked, encrypt=encrypt, exclude=exclude) + + # run alt to trigger linking + run = runner(yadm_y('alt')) + assert run.success + assert run.err == '' + created = created_list(run.out) + + # assert the proper creation has occurred + for file_path in (utils.ALT_FILE1, utils.ALT_FILE2): + source_file = file_path + jinja_suffix + if tracked or (encrypt and not exclude): + assert paths.work.join(file_path).isfile() + lines = paths.work.join(file_path).readlines(cr=False) + assert lines[0] == source_file + assert lines[1] == expected + assert str(paths.work.join(source_file)) in created + else: + assert not paths.work.join(file_path).exists() + assert str(paths.work.join(source_file)) not in created + + +def created_list(output): + """Parse output, and return list of created files""" + + created = dict() + for line in output.splitlines(): + match = re.match('Creating (.+) from template (.+)$', line) + if match: + created[match.group(1)] = match.group(2) + return created.values() diff --git a/test/test_list.py b/test/test_list.py new file mode 100644 index 0000000..44a5573 --- /dev/null +++ b/test/test_list.py @@ -0,0 +1,47 @@ +"""Test list""" + +import os +import pytest + + +@pytest.mark.parametrize( + 'location', [ + 'work', + 'outside', + 'subdir', + ]) +@pytest.mark.usefixtures('ds1_copy') +def test_list(runner, yadm_y, paths, ds1, location): + """List tests""" + if location == 'work': + run_dir = paths.work + elif location == 'outside': + run_dir = paths.work.join('..') + elif location == 'subdir': + # first directory with tracked data + run_dir = paths.work.join(ds1.tracked_dirs[0]) + with run_dir.as_cwd(): + # test with '-a' + # should get all tracked files, relative to the work path + run = runner(command=yadm_y('list', '-a')) + assert run.success + assert run.err == '' + returned_files = set(run.out.splitlines()) + expected_files = set([e.path for e in ds1 if e.tracked]) + assert returned_files == expected_files + # test without '-a' + # should get all tracked files, relative to the work path unless in a + # subdir, then those should be a limited set of files, relative to the + # subdir + run = runner(command=yadm_y('list')) + assert run.success + assert run.err == '' + returned_files = set(run.out.splitlines()) + if location == 'subdir': + basepath = os.path.basename(os.getcwd()) + # only expect files within the subdir + # names should be relative to subdir + expected_files = set( + [e.path[len(basepath)+1:] for e in ds1 + if e.tracked and e.path.startswith(basepath)]) + assert returned_files == expected_files diff --git a/test/test_perms.py b/test/test_perms.py new file mode 100644 index 0000000..eb7ad8f --- /dev/null +++ b/test/test_perms.py @@ -0,0 +1,111 @@ +"""Test perms""" + +import os +import warnings +import pytest + + +@pytest.mark.parametrize('autoperms', ['notest', 'unset', 'true', 'false']) +@pytest.mark.usefixtures('ds1_copy') +def test_perms(runner, yadm_y, paths, ds1, autoperms): + """Test perms""" + # set the value of auto-perms + if autoperms != 'notest': + if autoperms != 'unset': + os.system(' '.join(yadm_y('config', 'yadm.auto-perms', autoperms))) + + # privatepaths will hold all paths that should become secured + privatepaths = [paths.work.join('.ssh'), paths.work.join('.gnupg')] + privatepaths += [paths.work.join(private.path) for private in ds1.private] + + # create an archive file + os.system(f'touch "{str(paths.archive)}"') + privatepaths.append(paths.archive) + + # create encrypted file test data + efile1 = paths.work.join('efile1') + efile1.write('efile1') + efile2 = paths.work.join('efile2') + efile2.write('efile2') + paths.encrypt.write('efile1\nefile2\n!efile1\n') + insecurepaths = [efile1] + privatepaths.append(efile2) + + # assert these paths begin unsecured + for private in privatepaths + insecurepaths: + assert not oct(private.stat().mode).endswith('00'), ( + 'Path started secured') + + cmd = 'perms' + if autoperms != 'notest': + cmd = 'status' + run = runner(yadm_y(cmd)) + assert run.success + assert run.err == '' + if cmd == 'perms': + assert run.out == '' + + # these paths should be secured if processing perms + for private in privatepaths: + if '.p2' in private.basename or '.p4' in private.basename: + # Dot files within .ssh/.gnupg are not protected. + # This is a but which must be fixed + warnings.warn('Unhandled bug: private dot files', Warning) + continue + if autoperms == 'false': + assert not oct(private.stat().mode).endswith('00'), ( + 'Path should not be secured') + else: + assert oct(private.stat().mode).endswith('00'), ( + 'Path has not been secured') + + # these paths should never be secured + for private in insecurepaths: + assert not oct(private.stat().mode).endswith('00'), ( + 'Path should not be secured') + + +@pytest.mark.parametrize('sshperms', [None, 'true', 'false']) +@pytest.mark.parametrize('gpgperms', [None, 'true', 'false']) +@pytest.mark.usefixtures('ds1_copy') +def test_perms_control(runner, yadm_y, paths, ds1, sshperms, gpgperms): + """Test fine control of perms""" + # set the value of ssh-perms + if sshperms: + os.system(' '.join(yadm_y('config', 'yadm.ssh-perms', sshperms))) + + # set the value of gpg-perms + if gpgperms: + os.system(' '.join(yadm_y('config', 'yadm.gpg-perms', gpgperms))) + + # privatepaths will hold all paths that should become secured + privatepaths = [paths.work.join('.ssh'), paths.work.join('.gnupg')] + privatepaths += [paths.work.join(private.path) for private in ds1.private] + + # assert these paths begin unsecured + for private in privatepaths: + assert not oct(private.stat().mode).endswith('00'), ( + 'Path started secured') + + run = runner(yadm_y('perms')) + assert run.success + assert run.err == '' + assert run.out == '' + + # these paths should be secured if processing perms + for private in privatepaths: + if '.p2' in private.basename or '.p4' in private.basename: + # Dot files within .ssh/.gnupg are not protected. + # This is a but which must be fixed + warnings.warn('Unhandled bug: private dot files', Warning) + continue + if ( + (sshperms == 'false' and 'ssh' in str(private)) + or + (gpgperms == 'false' and 'gnupg' in str(private)) + ): + assert not oct(private.stat().mode).endswith('00'), ( + 'Path should not be secured') + else: + assert oct(private.stat().mode).endswith('00'), ( + 'Path has not been secured') diff --git a/test/test_syntax.py b/test/test_syntax.py new file mode 100644 index 0000000..5408885 --- /dev/null +++ b/test/test_syntax.py @@ -0,0 +1,41 @@ +"""Syntax checks""" + +import os +import pytest + + +def test_yadm_syntax(runner, yadm): + """Is syntactically valid""" + run = runner(command=['bash', '-n', yadm]) + assert run.success + + +def test_shellcheck(runner, yadm, shellcheck_version): + """Passes shellcheck""" + run = runner(command=['shellcheck', '-V'], report=False) + if f'version: {shellcheck_version}' not in run.out: + pytest.skip('Unsupported shellcheck version') + run = runner(command=['shellcheck', '-s', 'bash', yadm]) + assert run.success + + +def test_pylint(runner, pylint_version): + """Passes pylint""" + run = runner(command=['pylint', '--version'], report=False) + if f'pylint {pylint_version}' not in run.out: + pytest.skip('Unsupported pylint version') + pyfiles = list() + for tfile in os.listdir('test'): + if tfile.endswith('.py'): + pyfiles.append(f'test/{tfile}') + run = runner(command=['pylint'] + pyfiles) + assert run.success + + +def test_flake8(runner, flake8_version): + """Passes flake8""" + run = runner(command=['flake8', '--version'], report=False) + if not run.out.startswith(flake8_version): + pytest.skip('Unsupported flake8 version') + run = runner(command=['flake8', 'test']) + assert run.success diff --git a/test/test_unit_bootstrap_available.py b/test/test_unit_bootstrap_available.py new file mode 100644 index 0000000..f37ac08 --- /dev/null +++ b/test/test_unit_bootstrap_available.py @@ -0,0 +1,33 @@ +"""Unit tests: bootstrap_available""" + + +def test_bootstrap_missing(runner, paths): + """Test result of bootstrap_available, when bootstrap missing""" + run_test(runner, paths, False) + + +def test_bootstrap_no_exec(runner, paths): + """Test result of bootstrap_available, when bootstrap not executable""" + paths.bootstrap.write('') + paths.bootstrap.chmod(0o644) + run_test(runner, paths, False) + + +def test_bootstrap_exec(runner, paths): + """Test result of bootstrap_available, when bootstrap executable""" + paths.bootstrap.write('') + paths.bootstrap.chmod(0o775) + run_test(runner, paths, True) + + +def run_test(runner, paths, success): + """Run bootstrap_available, and test result""" + script = f""" + YADM_TEST=1 source {paths.pgm} + YADM_BOOTSTRAP='{paths.bootstrap}' + bootstrap_available + """ + run = runner(command=['bash'], inp=script) + assert run.success == success + assert run.err == '' + assert run.out == '' diff --git a/test/test_unit_configure_paths.py b/test/test_unit_configure_paths.py new file mode 100644 index 0000000..094ff6b --- /dev/null +++ b/test/test_unit_configure_paths.py @@ -0,0 +1,80 @@ +"""Unit tests: configure_paths""" + +import pytest + +ARCHIVE = 'files.gpg' +BOOTSTRAP = 'bootstrap' +CONFIG = 'config' +ENCRYPT = 'encrypt' +HOME = '/testhome' +REPO = 'repo.git' +YDIR = '.yadm' + + +@pytest.mark.parametrize( + 'override, expect', [ + (None, {}), + ('-Y', {}), + ('--yadm-repo', {'repo': 'YADM_REPO', 'git': 'GIT_DIR'}), + ('--yadm-config', {'config': 'YADM_CONFIG'}), + ('--yadm-encrypt', {'encrypt': 'YADM_ENCRYPT'}), + ('--yadm-archive', {'archive': 'YADM_ARCHIVE'}), + ('--yadm-bootstrap', {'bootstrap': 'YADM_BOOTSTRAP'}), + ], ids=[ + 'default', + 'override yadm dir', + 'override repo', + 'override config', + 'override encrypt', + 'override archive', + 'override bootstrap', + ]) +def test_config(runner, paths, override, expect): + """Test configure_paths""" + opath = 'override' + matches = match_map() + args = [] + if override == '-Y': + matches = match_map('/' + opath) + + if override: + args = [override, '/' + opath] + for ekey in expect.keys(): + matches[ekey] = f'{expect[ekey]}="/{opath}"' + run_test( + runner, paths, + [override, opath], + ['must specify a fully qualified'], 1) + + run_test(runner, paths, args, matches.values(), 0) + + +def match_map(yadm_dir=None): + """Create a dictionary of matches, relative to yadm_dir""" + if not yadm_dir: + yadm_dir = '/'.join([HOME, YDIR]) + return { + 'yadm': f'YADM_DIR="{yadm_dir}"', + 'repo': f'YADM_REPO="{yadm_dir}/{REPO}"', + 'config': f'YADM_CONFIG="{yadm_dir}/{CONFIG}"', + 'encrypt': f'YADM_ENCRYPT="{yadm_dir}/{ENCRYPT}"', + 'archive': f'YADM_ARCHIVE="{yadm_dir}/{ARCHIVE}"', + 'bootstrap': f'YADM_BOOTSTRAP="{yadm_dir}/{BOOTSTRAP}"', + 'git': f'GIT_DIR="{yadm_dir}/{REPO}"', + } + + +def run_test(runner, paths, args, expected_matches, expected_code=0): + """Run proces global args, and run configure_paths""" + argstring = ' '.join(['"'+a+'"' for a in args]) + script = f""" + YADM_TEST=1 HOME="{HOME}" source {paths.pgm} + process_global_args {argstring} + configure_paths + declare -p | grep -E '(YADM|GIT)_' + """ + run = runner(command=['bash'], inp=script) + assert run.code == expected_code + assert run.err == '' + for match in expected_matches: + assert match in run.out diff --git a/test/test_unit_parse_encrypt.py b/test/test_unit_parse_encrypt.py new file mode 100644 index 0000000..914d907 --- /dev/null +++ b/test/test_unit_parse_encrypt.py @@ -0,0 +1,175 @@ +"""Unit tests: parse_encrypt""" + +import pytest + + +def test_not_called(runner, paths): + """Test parse_encrypt (not called)""" + run = run_parse_encrypt(runner, paths, skip_parse=True) + assert run.success + assert run.err == '' + assert 'EIF:unparsed' in run.out, 'EIF should be unparsed' + assert 'EIF_COUNT:1' in run.out, 'Only value of EIF should be unparsed' + + +def test_short_circuit(runner, paths): + """Test parse_encrypt (short-circuit)""" + run = run_parse_encrypt(runner, paths, twice=True) + assert run.success + assert run.err == '' + assert 'PARSE_ENCRYPT_SHORT=parse_encrypt() not reprocessed' in run.out, ( + 'parse_encrypt() should short-circuit') + + +@pytest.mark.parametrize( + 'encrypt', [ + ('missing'), + ('empty'), + ]) +def test_empty(runner, paths, encrypt): + """Test parse_encrypt (file missing/empty)""" + + # write encrypt file + if encrypt == 'missing': + assert not paths.encrypt.exists(), 'Encrypt should be missing' + else: + paths.encrypt.write('') + assert paths.encrypt.exists(), 'Encrypt should exist' + assert paths.encrypt.size() == 0, 'Encrypt should be empty' + + # run parse_encrypt + run = run_parse_encrypt(runner, paths) + assert run.success + assert run.err == '' + + # validate parsing result + assert 'EIF_COUNT:0' in run.out, 'EIF should be empty' + + +@pytest.mark.usefixtures('ds1_repo_copy') +def test_file_parse_encrypt(runner, paths): + """Test parse_encrypt + + Test an array of supported features of the encrypt configuration. + """ + + edata = '' + expected = set() + + # empty line + edata += '\n' + + # simple comments + edata += '# a simple comment\n' + edata += ' # a comment with leading space\n' + + # unreferenced directory + paths.work.join('unreferenced').mkdir() + + # simple files + edata += 'simple_file\n' + edata += 'simple.file\n' + paths.work.join('simple_file').write('') + paths.work.join('simple.file').write('') + paths.work.join('simple_file2').write('') + paths.work.join('simple.file2').write('') + expected.add('simple_file') + expected.add('simple.file') + + # simple files in directories + edata += 'simple_dir/simple_file\n' + paths.work.join('simple_dir/simple_file').write('', ensure=True) + paths.work.join('simple_dir/simple_file2').write('', ensure=True) + expected.add('simple_dir/simple_file') + + # paths with spaces + edata += 'with space/with space\n' + paths.work.join('with space/with space').write('', ensure=True) + paths.work.join('with space/with space2').write('', ensure=True) + expected.add('with space/with space') + + # hidden files + edata += '.hidden\n' + paths.work.join('.hidden').write('') + expected.add('.hidden') + + # hidden files in directories + edata += '.hidden_dir/.hidden_file\n' + paths.work.join('.hidden_dir/.hidden_file').write('', ensure=True) + expected.add('.hidden_dir/.hidden_file') + + # wildcards + edata += 'wild*\n' + paths.work.join('wildcard1').write('', ensure=True) + paths.work.join('wildcard2').write('', ensure=True) + expected.add('wildcard1') + expected.add('wildcard2') + + edata += 'dirwild*\n' + paths.work.join('dirwildcard/file1').write('', ensure=True) + paths.work.join('dirwildcard/file2').write('', ensure=True) + expected.add('dirwildcard') + + # excludes + edata += 'exclude*\n' + edata += 'ex ex/*\n' + paths.work.join('exclude_file1').write('') + paths.work.join('exclude_file2.ex').write('') + paths.work.join('exclude_file3.ex3').write('') + expected.add('exclude_file1') + expected.add('exclude_file3.ex3') + edata += '!*.ex\n' + edata += '!ex ex/*.txt\n' + paths.work.join('ex ex/file4').write('', ensure=True) + paths.work.join('ex ex/file5.txt').write('', ensure=True) + paths.work.join('ex ex/file6.text').write('', ensure=True) + expected.add('ex ex/file4') + expected.add('ex ex/file6.text') + + # write encrypt file + print(f'ENCRYPT:\n---\n{edata}---\n') + paths.encrypt.write(edata) + assert paths.encrypt.isfile() + + # run parse_encrypt + run = run_parse_encrypt(runner, paths) + assert run.success + assert run.err == '' + + assert f'EIF_COUNT:{len(expected)}' in run.out, 'EIF count wrong' + for expected_file in expected: + assert f'EIF:{expected_file}\n' in run.out + + +def run_parse_encrypt( + runner, paths, + skip_parse=False, + twice=False): + """Run parse_encrypt + + A count of ENCRYPT_INCLUDE_FILES will be reported as EIF_COUNT:X. All + values of ENCRYPT_INCLUDE_FILES will be reported as individual EIF:value + lines. + """ + parse_cmd = 'parse_encrypt' + if skip_parse: + parse_cmd = '' + if twice: + parse_cmd = 'parse_encrypt; parse_encrypt' + script = f""" + YADM_TEST=1 source {paths.pgm} + YADM_ENCRYPT={paths.encrypt} + export YADM_ENCRYPT + GIT_DIR={paths.repo} + export GIT_DIR + {parse_cmd} + export ENCRYPT_INCLUDE_FILES + export PARSE_ENCRYPT_SHORT + env + echo EIF_COUNT:${{#ENCRYPT_INCLUDE_FILES[@]}} + for value in "${{ENCRYPT_INCLUDE_FILES[@]}}"; do + echo "EIF:$value" + done + """ + run = runner(command=['bash'], inp=script) + return run diff --git a/test/test_unit_query_distro.py b/test/test_unit_query_distro.py new file mode 100644 index 0000000..3c53c54 --- /dev/null +++ b/test/test_unit_query_distro.py @@ -0,0 +1,26 @@ +"""Unit tests: query_distro""" + + +def test_lsb_release_present(runner, yadm, tst_distro): + """Match lsb_release -si when present""" + script = f""" + YADM_TEST=1 source {yadm} + query_distro + """ + run = runner(command=['bash'], inp=script) + assert run.success + assert run.err == '' + assert run.out.rstrip() == tst_distro + + +def test_lsb_release_missing(runner, yadm): + """Empty value when missing""" + script = f""" + YADM_TEST=1 source {yadm} + LSB_RELEASE_PROGRAM="missing_lsb_release" + query_distro + """ + run = runner(command=['bash'], inp=script) + assert run.success + assert run.err == '' + assert run.out.rstrip() == '' diff --git a/test/test_unit_set_os.py b/test/test_unit_set_os.py new file mode 100644 index 0000000..d2f2a2a --- /dev/null +++ b/test/test_unit_set_os.py @@ -0,0 +1,36 @@ +"""Unit tests: set_operating_system""" + +import pytest + + +@pytest.mark.parametrize( + 'proc_value, expected_os', [ + ('missing', 'uname'), + ('has Microsoft inside', 'WSL'), + ('another value', 'uname'), + ], ids=[ + '/proc/version missing', + '/proc/version includes MS', + '/proc/version excludes MS', + ]) +def test_set_operating_system( + runner, paths, tst_sys, proc_value, expected_os): + """Run set_operating_system and test result""" + + # Normally /proc/version (set in PROC_VERSION) is inspected to identify + # WSL. During testing, we will override that value. + proc_version = paths.root.join('proc_version') + if proc_value != 'missing': + proc_version.write(proc_value) + script = f""" + YADM_TEST=1 source {paths.pgm} + PROC_VERSION={proc_version} + set_operating_system + echo $OPERATING_SYSTEM + """ + run = runner(command=['bash'], inp=script) + assert run.success + assert run.err == '' + if expected_os == 'uname': + expected_os = tst_sys + assert run.out.rstrip() == expected_os diff --git a/test/test_unit_x_program.py b/test/test_unit_x_program.py new file mode 100644 index 0000000..3233a3d --- /dev/null +++ b/test/test_unit_x_program.py @@ -0,0 +1,46 @@ +"""Unit tests: yadm.[git,gpg]-program""" + +import os +import pytest + + +@pytest.mark.parametrize( + 'executable, success, value, match', [ + (None, True, 'program', None), + ('cat', True, 'cat', None), + ('badprogram', False, None, 'badprogram'), + ], ids=[ + 'executable missing', + 'valid alternative', + 'invalid alternative', + ]) +@pytest.mark.parametrize('program', ['git', 'gpg']) +def test_x_program( + runner, yadm_y, paths, program, executable, success, value, match): + """Set yadm.X-program, and test result of require_X""" + + # set configuration + if executable: + os.system(' '.join(yadm_y( + 'config', f'yadm.{program}-program', executable))) + + # test require_[git,gpg] + script = f""" + YADM_TEST=1 source {paths.pgm} + YADM_CONFIG="{paths.config}" + require_{program} + echo ${program.upper()}_PROGRAM + """ + run = runner(command=['bash'], inp=script) + assert run.success == success + assert run.err == '' + + # [GIT,GPG]_PROGRAM set correctly + if value == 'program': + assert run.out.rstrip() == program + elif value: + assert run.out.rstrip() == value + + # error reported about bad config + if match: + assert match in run.out diff --git a/test/test_version.py b/test/test_version.py new file mode 100644 index 0000000..023eb82 --- /dev/null +++ b/test/test_version.py @@ -0,0 +1,35 @@ +"""Test version""" + +import re +import pytest + + +@pytest.fixture(scope='module') +def expected_version(yadm): + """ + Expected semantic version number. This is taken directly out of yadm, + searching for the VERSION= string. + """ + yadm_version = re.findall( + r'VERSION=([^\n]+)', + open(yadm).read()) + if yadm_version: + return yadm_version[0] + pytest.fail(f'version not found in {yadm}') + return 'not found' + + +def test_semantic_version(expected_version): + """Version is semantic""" + # semantic version conforms to MAJOR.MINOR.PATCH + assert re.search(r'^\d+\.\d+\.\d+$', expected_version), ( + 'does not conform to MAJOR.MINOR.PATCH') + + +def test_reported_version( + runner, yadm_y, expected_version): + """Report correct version""" + run = runner(command=yadm_y('version')) + assert run.success + assert run.err == '' + assert run.out == f'yadm {expected_version}\n' diff --git a/test/utils.py b/test/utils.py new file mode 100644 index 0000000..411dde1 --- /dev/null +++ b/test/utils.py @@ -0,0 +1,59 @@ +"""Testing Utilities + +This module holds values/functions common to multiple tests. +""" + +import os + +ALT_FILE1 = 'test_alt' +ALT_FILE2 = 'test alt/test alt' + + +def set_local(paths, variable, value): + """Set local override""" + os.system( + f'GIT_DIR={str(paths.repo)} ' + f'git config --local "local.{variable}" "{value}"' + ) + + +def create_alt_files(paths, suffix, + preserve=False, tracked=True, + encrypt=False, exclude=False, + content=None): + """Create new files, and add to the repo + + This is used for testing alternate files. In each case, a suffix is + appended to two standard file paths. Particulars of the file creation and + repo handling are dependent upon the function arguments. + """ + + if not preserve: + if paths.work.join(ALT_FILE1).exists(): + paths.work.join(ALT_FILE1).remove(rec=1, ignore_errors=True) + assert not paths.work.join(ALT_FILE1).exists() + if paths.work.join(ALT_FILE2).exists(): + paths.work.join(ALT_FILE2).remove(rec=1, ignore_errors=True) + assert not paths.work.join(ALT_FILE2).exists() + + new_file1 = paths.work.join(ALT_FILE1 + suffix) + new_file1.write(ALT_FILE1 + suffix, ensure=True) + new_file2 = paths.work.join(ALT_FILE2 + suffix) + new_file2.write(ALT_FILE2 + suffix, ensure=True) + if content: + new_file1.write('\n' + content, mode='a', ensure=True) + new_file2.write('\n' + content, mode='a', ensure=True) + assert new_file1.exists() + assert new_file2.exists() + + if tracked: + for path in (new_file1, new_file2): + os.system(f'GIT_DIR={str(paths.repo)} git add "{path}"') + os.system(f'GIT_DIR={str(paths.repo)} git commit -m "Add test files"') + + if encrypt: + paths.encrypt.write(f'{ALT_FILE1 + suffix}\n', mode='a') + paths.encrypt.write(f'{ALT_FILE2 + suffix}\n', mode='a') + if exclude: + paths.encrypt.write(f'!{ALT_FILE1 + suffix}\n', mode='a') + paths.encrypt.write(f'!{ALT_FILE2 + suffix}\n', mode='a')