diff --git a/.gitignore b/.gitignore index 7aa998b..af9e6f5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ .DS_Store +.env .jekyll-metadata +.pytest_cache .sass-cache _site diff --git a/Dockerfile b/Dockerfile index 74700c4..ee29bf2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,9 +1,20 @@ -FROM ubuntu:yakkety +FROM ubuntu:18.04 MAINTAINER Tim Byrne +# No input during build +ENV DEBIAN_FRONTEND noninteractive + +# UTF8 locale +RUN apt-get update && apt-get install -y locales +RUN locale-gen en_US.UTF-8 +ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en' LC_ALL='en_US.UTF-8' + +# Convenience settings for the testbed's root account +RUN echo 'set -o vi' >> /root/.bashrc + # Install prerequisites -RUN apt-get update && apt-get install -y git gnupg1 make shellcheck bats expect curl python-pip lsb-release -RUN pip install envtpl +RUN apt-get update && apt-get install -y git gnupg1 make shellcheck=0.4.6-1 bats expect curl python3-pip lsb-release +RUN pip3 install envtpl pytest==3.6.4 pylint==1.9.2 flake8==3.5.0 # Force GNUPG version 1 at path /usr/bin/gpg RUN ln -fs /usr/bin/gpg1 /usr/bin/gpg diff --git a/Makefile b/Makefile index f544a1d..691b168 100644 --- a/Makefile +++ b/Makefile @@ -23,6 +23,11 @@ test: bats shellcheck parallel: ls test/*bats | time parallel -q -P0 -- docker run --rm -v "$$PWD:/yadm:ro" yadm/testbed bash -c 'bats {}' +.PHONY: pytest +pytest: + @echo Running all pytest tests + @pytest -v + .PHONY: bats bats: @echo Running all bats tests @@ -58,3 +63,13 @@ man: .PHONY: wide wide: man ./yadm.1 + +.PHONY: sync-clock +sync-clock: + docker run --rm --privileged alpine hwclock -s + +.PHONY: .env +.env: + virtualenv --python=python3 .env + .env/bin/pip3 install --upgrade pip setuptools + .env/bin/pip3 install --upgrade pytest pylint==1.9.2 flake8==3.5.0 diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..4fe0b86 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,7 @@ +--- +version: '3' +services: + testbed: + volumes: + - .:/yadm:ro + image: yadm/testbed:latest diff --git a/pylintrc b/pylintrc new file mode 100644 index 0000000..2ae18b6 --- /dev/null +++ b/pylintrc @@ -0,0 +1,11 @@ +[BASIC] +good-names=pytestmark + +[DESIGN] +max-args=14 +max-locals=26 +max-attributes=8 +max-statements=65 + +[MESSAGES CONTROL] +disable=redefined-outer-name diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..c7fc1db --- /dev/null +++ b/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +cache_dir = /tmp +addopts = -ra diff --git a/test/conftest.py b/test/conftest.py new file mode 100644 index 0000000..4fefabe --- /dev/null +++ b/test/conftest.py @@ -0,0 +1,559 @@ +"""Global tests configuration and fixtures""" + +import collections +import copy +import distutils.dir_util # pylint: disable=no-name-in-module,import-error +import os +import platform +import pwd +from subprocess import Popen, PIPE +import pytest + + +@pytest.fixture(scope='session') +def shellcheck_version(): + """Version of shellcheck supported""" + return '0.4.6' + + +@pytest.fixture(scope='session') +def pylint_version(): + """Version of pylint supported""" + return '1.9.2' + + +@pytest.fixture(scope='session') +def flake8_version(): + """Version of flake8 supported""" + return '3.5.0' + + +@pytest.fixture(scope='session') +def tst_user(): + """Test session's user id""" + return pwd.getpwuid(os.getuid()).pw_name + + +@pytest.fixture(scope='session') +def tst_host(): + """Test session's short hostname value""" + return platform.node().split('.')[0] + + +@pytest.fixture(scope='session') +def tst_distro(runner): + """Test session's distro""" + distro = '' + try: + run = runner(command=['lsb_release', '-si'], report=False) + distro = run.out.strip() + except BaseException: + pass + return distro + + +@pytest.fixture(scope='session') +def tst_sys(): + """Test session's uname value""" + return platform.system() + + +@pytest.fixture(scope='session') +def cygwin_sys(): + """CYGWIN uname id""" + return 'CYGWIN_NT-6.1-WOW64' + + +@pytest.fixture(scope='session') +def supported_commands(): + """List of supported commands + + This list should be updated every time yadm learns a new command. + """ + return [ + 'alt', + 'bootstrap', + 'clean', + 'clone', + 'config', + 'decrypt', + 'encrypt', + 'enter', + 'gitconfig', + 'help', + 'init', + 'introspect', + 'list', + 'perms', + 'version', + ] + + +@pytest.fixture(scope='session') +def supported_configs(): + """List of supported config options + + This list should be updated every time yadm learns a new config. + """ + return [ + 'local.class', + 'local.hostname', + 'local.os', + 'local.user', + 'yadm.auto-alt', + 'yadm.auto-perms', + 'yadm.auto-private-dirs', + 'yadm.cygwin-copy', + 'yadm.git-program', + 'yadm.gpg-perms', + 'yadm.gpg-program', + 'yadm.gpg-recipient', + 'yadm.ssh-perms', + ] + + +@pytest.fixture(scope='session') +def supported_switches(): + """List of supported switches + + This list should be updated every time yadm learns a new switch. + """ + return [ + '--yadm-archive', + '--yadm-bootstrap', + '--yadm-config', + '--yadm-dir', + '--yadm-encrypt', + '--yadm-repo', + '-Y', + ] + + +@pytest.fixture(scope='session') +def supported_local_configs(supported_configs): + """List of supported local config options""" + return [c for c in supported_configs if c.startswith('local.')] + + +class Runner(object): + """Class for running commands + + Within yadm tests, this object should be used when running commands that + require: + + * Acting on the status code + * Parsing the output of the command + * Passing input to the command + + Other instances of simply running commands should use os.system(). + """ + + def __init__( + self, + command, + inp=None, + shell=False, + cwd=None, + env=None, + expect=None, + report=True): + if shell: + self.command = ' '.join([str(cmd) for cmd in command]) + else: + self.command = command + self.inp = inp + self.wrap(expect) + process = Popen( + self.command, + stdin=PIPE, + stdout=PIPE, + stderr=PIPE, + shell=shell, + cwd=cwd, + env=env, + ) + input_bytes = self.inp + if self.inp: + input_bytes = self.inp.encode() + (out_bstream, err_bstream) = process.communicate(input=input_bytes) + self.out = out_bstream.decode() + self.err = err_bstream.decode() + self.code = process.wait() + self.success = self.code == 0 + self.failure = self.code != 0 + if report: + self.report() + + def __repr__(self): + return f'Runner({self.command})' + + def report(self): + """Print code/stdout/stderr""" + print(f'{self}') + print(f' RUN: code:{self.code}') + if self.inp: + print(f' RUN: input:\n{self.inp}') + print(f' RUN: stdout:\n{self.out}') + print(f' RUN: stderr:\n{self.err}') + + def wrap(self, expect): + """Wrap command with expect""" + if not expect: + return + cmdline = ' '.join([f'"{w}"' for w in self.command]) + expect_script = f'set timeout 2\nspawn {cmdline}\n' + for question, answer in expect: + expect_script += ( + 'expect {\n' + f'"{question}" {{send "{answer}\\r"}}\n' + 'timeout {close;exit 128}\n' + '}\n') + expect_script += ( + 'expect eof\n' + 'foreach {pid spawnid os_error_flag value} [wait] break\n' + 'exit $value') + self.inp = expect_script + print(f'EXPECT:{expect_script}') + self.command = ['expect'] + + +@pytest.fixture(scope='session') +def runner(): + """Class for running commands""" + return Runner + + +@pytest.fixture(scope='session') +def config_git(): + """Configure global git configuration, if missing""" + os.system( + 'git config user.name || ' + 'git config --global user.name "test"') + os.system( + 'git config user.email || ' + 'git config --global user.email "test@test.test"') + return None + + +@pytest.fixture() +def repo_config(runner, paths): + """Function to query a yadm repo configuration value""" + + def query_func(key): + """Query a yadm repo configuration value""" + run = runner( + command=('git', 'config', '--local', key), + env={'GIT_DIR': paths.repo}, + report=False, + ) + return run.out.rstrip() + + return query_func + + +@pytest.fixture(scope='session') +def yadm(): + """Path to yadm program to be tested""" + full_path = os.path.realpath('yadm') + assert os.path.isfile(full_path), "yadm program file isn't present" + return full_path + + +@pytest.fixture() +def paths(tmpdir, yadm): + """Function scoped test paths""" + dir_root = tmpdir.mkdir('root') + dir_work = dir_root.mkdir('work') + dir_yadm = dir_root.mkdir('yadm') + dir_repo = dir_yadm.mkdir('repo.git') + dir_hooks = dir_yadm.mkdir('hooks') + dir_remote = dir_root.mkdir('remote') + file_archive = dir_yadm.join('files.gpg') + file_bootstrap = dir_yadm.join('bootstrap') + file_config = dir_yadm.join('config') + file_encrypt = dir_yadm.join('encrypt') + paths = collections.namedtuple( + 'Paths', [ + 'pgm', + 'root', + 'work', + 'yadm', + 'repo', + 'hooks', + 'remote', + 'archive', + 'bootstrap', + 'config', + 'encrypt', + ]) + return paths( + yadm, + dir_root, + dir_work, + dir_yadm, + dir_repo, + dir_hooks, + dir_remote, + file_archive, + file_bootstrap, + file_config, + file_encrypt, + ) + + +@pytest.fixture() +def yadm_y(paths): + """Generate custom command_list function""" + def command_list(*args): + """Produce params for running yadm with -Y""" + return [paths.pgm, '-Y', str(paths.yadm)] + list(args) + return command_list + + +class DataFile(object): + """Datafile object""" + + def __init__(self, path, tracked=True, private=False): + self.__path = path + self.__parent = None + self.__tracked = tracked + self.__private = private + + @property + def path(self): + """Path property""" + return self.__path + + @property + def relative(self): + """Relative path property""" + if self.__parent: + return self.__parent.join(self.path) + raise BaseException('Unable to provide relative path, no parent') + + @property + def tracked(self): + """Tracked property""" + return self.__tracked + + @property + def private(self): + """Private property""" + return self.__private + + def relative_to(self, parent): + """Update all relative paths to this py.path""" + self.__parent = parent + return + + +class DataSet(object): + """Dataset object""" + + def __init__(self): + self.__files = list() + self.__dirs = list() + self.__tracked_dirs = list() + self.__private_dirs = list() + self.__relpath = None + + def __repr__(self): + return ( + f'[DS with {len(self)} files; ' + f'{len(self.tracked)} tracked, ' + f'{len(self.private)} private]' + ) + + def __iter__(self): + return iter(self.__files) + + def __len__(self): + return len(self.__files) + + def __contains__(self, datafile): + if [f for f in self.__files if f.path == datafile]: + return True + if datafile in self.__files: + return True + return False + + @property + def files(self): + """List of DataFiles in DataSet""" + return list(self.__files) + + @property + def tracked(self): + """List of tracked DataFiles in DataSet""" + return [f for f in self.__files if f.tracked] + + @property + def private(self): + """List of private DataFiles in DataSet""" + return [f for f in self.__files if f.private] + + @property + def dirs(self): + """List of directories in DataSet""" + return list(self.__dirs) + + @property + def plain_dirs(self): + """List of directories in DataSet not starting with '.'""" + return [d for d in self.dirs if not d.startswith('.')] + + @property + def hidden_dirs(self): + """List of directories in DataSet starting with '.'""" + return [d for d in self.dirs if d.startswith('.')] + + @property + def tracked_dirs(self): + """List of directories in DataSet not starting with '.'""" + return [d for d in self.__tracked_dirs if not d.startswith('.')] + + @property + def private_dirs(self): + """List of directories in DataSet considered 'private'""" + return list(self.__private_dirs) + + def add_file(self, path, tracked=True, private=False): + """Add file to data set""" + if path not in self: + datafile = DataFile(path, tracked, private) + if self.__relpath: + datafile.relative_to(self.__relpath) + self.__files.append(datafile) + + dname = os.path.dirname(path) + if dname and dname not in self.__dirs: + self.__dirs.append(dname) + if tracked: + self.__tracked_dirs.append(dname) + if private: + self.__private_dirs.append(dname) + + def relative_to(self, relpath): + """Update all relative paths to this py.path""" + self.__relpath = relpath + for datafile in self.files: + datafile.relative_to(self.__relpath) + return + + +@pytest.fixture(scope='session') +def ds1_dset(tst_sys, cygwin_sys): + """Meta-data for dataset one files""" + dset = DataSet() + dset.add_file('t1') + dset.add_file('d1/t2') + dset.add_file(f'test_alt##S') + dset.add_file(f'test_alt##S.H') + dset.add_file(f'test_alt##S.H.U') + dset.add_file(f'test_alt##C.S.H.U') + dset.add_file(f'test alt/test alt##S') + dset.add_file(f'test alt/test alt##S.H') + dset.add_file(f'test alt/test alt##S.H.U') + dset.add_file(f'test alt/test alt##C.S.H.U') + dset.add_file(f'test_cygwin_copy##{tst_sys}') + dset.add_file(f'test_cygwin_copy##{cygwin_sys}') + dset.add_file('u1', tracked=False) + dset.add_file('d2/u2', tracked=False) + dset.add_file('.ssh/p1', tracked=False, private=True) + dset.add_file('.ssh/.p2', tracked=False, private=True) + dset.add_file('.gnupg/p3', tracked=False, private=True) + dset.add_file('.gnupg/.p4', tracked=False, private=True) + return dset + + +@pytest.fixture(scope='session') +def ds1_data(tmpdir_factory, config_git, ds1_dset, runner): + """A set of test data, worktree & repo""" + # pylint: disable=unused-argument + # This is ignored because + # @pytest.mark.usefixtures('config_git') + # cannot be applied to another fixture. + + data = tmpdir_factory.mktemp('ds1') + + work = data.mkdir('work') + for datafile in ds1_dset: + work.join(datafile.path).write(datafile.path, ensure=True) + + repo = data.mkdir('repo.git') + env = os.environ.copy() + env['GIT_DIR'] = str(repo) + runner( + command=['git', 'init', '--shared=0600', '--bare', str(repo)], + report=False) + runner( + command=['git', 'config', 'core.bare', 'false'], + env=env, + report=False) + runner( + command=['git', 'config', 'status.showUntrackedFiles', 'no'], + env=env, + report=False) + runner( + command=['git', 'config', 'yadm.managed', 'true'], + env=env, + report=False) + runner( + command=['git', 'config', 'core.worktree', str(work)], + env=env, + report=False) + runner( + command=['git', 'add'] + + [str(work.join(f.path)) for f in ds1_dset if f.tracked], + env=env) + runner( + command=['git', 'commit', '--allow-empty', '-m', 'Initial commit'], + env=env, + report=False) + + data = collections.namedtuple('Data', ['work', 'repo']) + return data(work, repo) + + +@pytest.fixture() +def ds1_work_copy(ds1_data, paths): + """Function scoped copy of ds1_data.work""" + distutils.dir_util.copy_tree( # pylint: disable=no-member + str(ds1_data.work), str(paths.work)) + return None + + +@pytest.fixture() +def ds1_repo_copy(runner, ds1_data, paths): + """Function scoped copy of ds1_data.repo""" + distutils.dir_util.copy_tree( # pylint: disable=no-member + str(ds1_data.repo), str(paths.repo)) + env = os.environ.copy() + env['GIT_DIR'] = str(paths.repo) + runner( + command=['git', 'config', 'core.worktree', str(paths.work)], + env=env, + report=False) + return None + + +@pytest.fixture() +def ds1_copy(ds1_work_copy, ds1_repo_copy): + """Function scoped copy of ds1_data""" + # pylint: disable=unused-argument + # This is ignored because + # @pytest.mark.usefixtures('ds1_work_copy', 'ds1_repo_copy') + # cannot be applied to another fixture. + return None + + +@pytest.fixture() +def ds1(ds1_work_copy, paths, ds1_dset): + """Function scoped ds1_dset w/paths""" + # pylint: disable=unused-argument + # This is ignored because + # @pytest.mark.usefixtures('ds1_copy') + # cannot be applied to another fixture. + dscopy = copy.deepcopy(ds1_dset) + dscopy.relative_to(copy.deepcopy(paths.work)) + return dscopy diff --git a/test/pylintrc b/test/pylintrc new file mode 120000 index 0000000..05334af --- /dev/null +++ b/test/pylintrc @@ -0,0 +1 @@ +../pylintrc \ No newline at end of file diff --git a/test/test_alt.py b/test/test_alt.py new file mode 100644 index 0000000..60b7339 --- /dev/null +++ b/test/test_alt.py @@ -0,0 +1,345 @@ +"""Test alt""" + +import os +import re +import string +import pytest +import utils + +# These test IDs are broken. During the writing of these tests, problems have +# been discovered in the way yadm orders matching files. +BROKEN_TEST_IDS = [ + 'test_wild[tracked-##C.S.H.U-C-S%-H%-U]', + 'test_wild[tracked-##C.S.H.U-C-S-H%-U]', + 'test_wild[encrypted-##C.S.H.U-C-S%-H%-U]', + 'test_wild[encrypted-##C.S.H.U-C-S-H%-U]', + ] + +PRECEDENCE = [ + '##', + '##$tst_sys', + '##$tst_sys.$tst_host', + '##$tst_sys.$tst_host.$tst_user', + '##$tst_class', + '##$tst_class.$tst_sys', + '##$tst_class.$tst_sys.$tst_host', + '##$tst_class.$tst_sys.$tst_host.$tst_user', + ] + +WILD_TEMPLATES = [ + '##$tst_class', + '##$tst_class.$tst_sys', + '##$tst_class.$tst_sys.$tst_host', + '##$tst_class.$tst_sys.$tst_host.$tst_user', + ] + +WILD_TESTED = set() + + +@pytest.mark.parametrize('precedence_index', range(len(PRECEDENCE))) +@pytest.mark.parametrize( + 'tracked, encrypt, exclude', [ + (False, False, False), + (True, False, False), + (False, True, False), + (False, True, True), + ], ids=[ + 'untracked', + 'tracked', + 'encrypted', + 'excluded', + ]) +@pytest.mark.usefixtures('ds1_copy') +def test_alt(runner, yadm_y, paths, + tst_sys, tst_host, tst_user, + tracked, encrypt, exclude, + precedence_index): + """Test alternate linking + + This test is done by iterating for the number of templates in PRECEDENCE. + With each iteration, another file is left off the list. So with each + iteration, the template with the "highest precedence" is left out. The file + using the highest precedence should be the one linked. + """ + + # set the class + tst_class = 'testclass' + utils.set_local(paths, 'class', tst_class) + + # process the templates in PRECEDENCE + precedence = list() + for template in PRECEDENCE: + precedence.append( + string.Template(template).substitute( + tst_class=tst_class, + tst_host=tst_host, + tst_sys=tst_sys, + tst_user=tst_user, + ) + ) + + # create files using a subset of files + for suffix in precedence[0:precedence_index+1]: + utils.create_alt_files(paths, suffix, tracked=tracked, + encrypt=encrypt, exclude=exclude) + + # run alt to trigger linking + run = runner(yadm_y('alt')) + assert run.success + assert run.err == '' + linked = linked_list(run.out) + + # assert the proper linking has occurred + for file_path in (utils.ALT_FILE1, utils.ALT_FILE2): + source_file = file_path + precedence[precedence_index] + if tracked or (encrypt and not exclude): + assert paths.work.join(file_path).islink() + assert paths.work.join(file_path).read() == source_file + assert str(paths.work.join(source_file)) in linked + else: + assert not paths.work.join(file_path).exists() + assert str(paths.work.join(source_file)) not in linked + + +def short_template(template): + """Translate template into something short for test IDs""" + return string.Template(template).substitute( + tst_class='C', + tst_host='H', + tst_sys='S', + tst_user='U', + ) + + +@pytest.mark.parametrize('wild_user', [True, False], ids=['U%', 'U']) +@pytest.mark.parametrize('wild_host', [True, False], ids=['H%', 'H']) +@pytest.mark.parametrize('wild_sys', [True, False], ids=['S%', 'S']) +@pytest.mark.parametrize('wild_class', [True, False], ids=['C%', 'C']) +@pytest.mark.parametrize('template', WILD_TEMPLATES, ids=short_template) +@pytest.mark.parametrize( + 'tracked, encrypt', [ + (True, False), + (False, True), + ], ids=[ + 'tracked', + 'encrypted', + ]) +@pytest.mark.usefixtures('ds1_copy') +def test_wild(request, runner, yadm_y, paths, + tst_sys, tst_host, tst_user, + tracked, encrypt, + wild_class, wild_host, wild_sys, wild_user, + template): + """Test wild linking + + These tests are done by creating permutations of the possible files using + WILD_TEMPLATES. Each case is then tested (while skipping the already tested + permutations for efficiency). + """ + + if request.node.name in BROKEN_TEST_IDS: + pytest.xfail( + 'This test is known to be broken. ' + 'This bug needs to be fixed.') + + tst_class = 'testclass' + + # determine the "wild" version of the suffix + str_class = '%' if wild_class else tst_class + str_host = '%' if wild_host else tst_host + str_sys = '%' if wild_sys else tst_sys + str_user = '%' if wild_user else tst_user + wild_suffix = string.Template(template).substitute( + tst_class=str_class, + tst_host=str_host, + tst_sys=str_sys, + tst_user=str_user, + ) + + # determine the "standard" version of the suffix + std_suffix = string.Template(template).substitute( + tst_class=tst_class, + tst_host=tst_host, + tst_sys=tst_sys, + tst_user=tst_user, + ) + + # skip over duplicate tests (this seems to be the simplest way to cover the + # permutations of tests, while skipping duplicates.) + test_key = f'{tracked}{encrypt}{wild_suffix}{std_suffix}' + if test_key in WILD_TESTED: + return + else: + WILD_TESTED.add(test_key) + + # set the class + utils.set_local(paths, 'class', tst_class) + + # create files using the wild suffix + utils.create_alt_files(paths, wild_suffix, tracked=tracked, + encrypt=encrypt, exclude=False) + + # run alt to trigger linking + run = runner(yadm_y('alt')) + assert run.success + assert run.err == '' + linked = linked_list(run.out) + + # assert the proper linking has occurred + for file_path in (utils.ALT_FILE1, utils.ALT_FILE2): + source_file = file_path + wild_suffix + assert paths.work.join(file_path).islink() + assert paths.work.join(file_path).read() == source_file + assert str(paths.work.join(source_file)) in linked + + # create files using the standard suffix + utils.create_alt_files(paths, std_suffix, tracked=tracked, + encrypt=encrypt, exclude=False) + + # run alt to trigger linking + run = runner(yadm_y('alt')) + assert run.success + assert run.err == '' + linked = linked_list(run.out) + + # assert the proper linking has occurred + for file_path in (utils.ALT_FILE1, utils.ALT_FILE2): + source_file = file_path + std_suffix + assert paths.work.join(file_path).islink() + assert paths.work.join(file_path).read() == source_file + assert str(paths.work.join(source_file)) in linked + + +@pytest.mark.usefixtures('ds1_copy') +def test_local_override(runner, yadm_y, paths, + tst_sys, tst_host, tst_user): + """Test local overrides""" + + # define local overrides + utils.set_local(paths, 'class', 'or-class') + utils.set_local(paths, 'hostname', 'or-hostname') + utils.set_local(paths, 'os', 'or-os') + utils.set_local(paths, 'user', 'or-user') + + # create files, the first would normally be the most specific version + # however, the second is the overridden version which should be preferred. + utils.create_alt_files( + paths, f'##or-class.{tst_sys}.{tst_host}.{tst_user}') + utils.create_alt_files( + paths, '##or-class.or-os.or-hostname.or-user') + + # run alt to trigger linking + run = runner(yadm_y('alt')) + assert run.success + assert run.err == '' + linked = linked_list(run.out) + + # assert the proper linking has occurred + for file_path in (utils.ALT_FILE1, utils.ALT_FILE2): + source_file = file_path + '##or-class.or-os.or-hostname.or-user' + assert paths.work.join(file_path).islink() + assert paths.work.join(file_path).read() == source_file + assert str(paths.work.join(source_file)) in linked + + +@pytest.mark.parametrize('suffix', ['AAA', 'ZZZ', 'aaa', 'zzz']) +@pytest.mark.usefixtures('ds1_copy') +def test_class_case(runner, yadm_y, paths, tst_sys, suffix): + """Test range of class cases""" + + # set the class + utils.set_local(paths, 'class', suffix) + + # create files + endings = [suffix] + if tst_sys == 'Linux': + # Only create all of these side-by-side on Linux, which is + # unquestionably case-sensitive. This would break tests on + # case-insensitive systems. + endings = ['AAA', 'ZZZ', 'aaa', 'zzz'] + for ending in endings: + utils.create_alt_files(paths, f'##{ending}') + + # run alt to trigger linking + run = runner(yadm_y('alt')) + assert run.success + assert run.err == '' + linked = linked_list(run.out) + + # assert the proper linking has occurred + for file_path in (utils.ALT_FILE1, utils.ALT_FILE2): + source_file = file_path + f'##{suffix}' + assert paths.work.join(file_path).islink() + assert paths.work.join(file_path).read() == source_file + assert str(paths.work.join(source_file)) in linked + + +@pytest.mark.parametrize('autoalt', [None, 'true', 'false']) +@pytest.mark.usefixtures('ds1_copy') +def test_auto_alt(runner, yadm_y, paths, autoalt): + """Test setting auto-alt""" + + # set the value of auto-alt + if autoalt: + os.system(' '.join(yadm_y('config', 'yadm.auto-alt', autoalt))) + + # create file + suffix = '##' + utils.create_alt_files(paths, suffix) + + # run status to possibly trigger linking + run = runner(yadm_y('status')) + assert run.success + assert run.err == '' + linked = linked_list(run.out) + + # assert the proper linking has occurred + for file_path in (utils.ALT_FILE1, utils.ALT_FILE2): + source_file = file_path + suffix + if autoalt == 'false': + assert not paths.work.join(file_path).exists() + else: + assert paths.work.join(file_path).islink() + assert paths.work.join(file_path).read() == source_file + # no linking output when run via auto-alt + assert str(paths.work.join(source_file)) not in linked + + +@pytest.mark.parametrize('delimiter', ['.', '_']) +@pytest.mark.usefixtures('ds1_copy') +def test_delimiter(runner, yadm_y, paths, + tst_sys, tst_host, tst_user, delimiter): + """Test delimiters used""" + + suffix = '##' + delimiter.join([tst_sys, tst_host, tst_user]) + + # create file + utils.create_alt_files(paths, suffix) + + # run alt to trigger linking + run = runner(yadm_y('alt')) + assert run.success + assert run.err == '' + linked = linked_list(run.out) + + # assert the proper linking has occurred + # only a delimiter of '.' is valid + for file_path in (utils.ALT_FILE1, utils.ALT_FILE2): + source_file = file_path + suffix + if delimiter == '.': + assert paths.work.join(file_path).islink() + assert paths.work.join(file_path).read() == source_file + assert str(paths.work.join(source_file)) in linked + else: + assert not paths.work.join(file_path).exists() + assert str(paths.work.join(source_file)) not in linked + + +def linked_list(output): + """Parse output, and return list of linked files""" + linked = dict() + for line in output.splitlines(): + match = re.match('Linking (.+) to (.+)$', line) + if match: + linked[match.group(2)] = match.group(1) + return linked.values() diff --git a/test/test_assert_private_dirs.py b/test/test_assert_private_dirs.py new file mode 100644 index 0000000..65cb0b7 --- /dev/null +++ b/test/test_assert_private_dirs.py @@ -0,0 +1,106 @@ +"""Test asserting private directories""" + +import os +import re +import pytest + +pytestmark = pytest.mark.usefixtures('ds1_copy') +PRIVATE_DIRS = ['.gnupg', '.ssh'] + + +def test_pdirs_missing(runner, yadm_y, paths): + """Private dirs (private dirs missing) + + When a git command is run + And private directories are missing + Create private directories prior to command + """ + + # confirm directories are missing at start + for pdir in PRIVATE_DIRS: + path = paths.work.join(pdir) + if path.exists(): + path.remove() + assert not path.exists() + + # run status + run = runner(command=yadm_y('status'), env={'DEBUG': 'yes'}) + assert run.success + assert run.err == '' + assert 'On branch master' in run.out + + # confirm directories are created + # and are protected + for pdir in PRIVATE_DIRS: + path = paths.work.join(pdir) + assert path.exists() + assert oct(path.stat().mode).endswith('00'), 'Directory is not secured' + + # confirm directories are created before command is run: + assert re.search( + r'Creating.+\.gnupg.+Creating.+\.ssh.+Running git command git status', + run.out, re.DOTALL), 'directories created before command is run' + + +def test_pdirs_missing_apd_false(runner, yadm_y, paths): + """Private dirs (private dirs missing / yadm.auto-private-dirs=false) + + When a git command is run + And private directories are missing + But auto-private-dirs is false + Do not create private dirs + """ + + # confirm directories are missing at start + for pdir in PRIVATE_DIRS: + path = paths.work.join(pdir) + if path.exists(): + path.remove() + assert not path.exists() + + # set configuration + os.system(' '.join(yadm_y( + 'config', '--bool', 'yadm.auto-private-dirs', 'false'))) + + # run status + run = runner(command=yadm_y('status')) + assert run.success + assert run.err == '' + assert 'On branch master' in run.out + + # confirm directories are STILL missing + for pdir in PRIVATE_DIRS: + assert not paths.work.join(pdir).exists() + + +def test_pdirs_exist_apd_false(runner, yadm_y, paths): + """Private dirs (private dirs exist / yadm.auto-perms=false) + + When a git command is run + And private directories exist + And yadm is configured not to auto update perms + Do not alter directories + """ + + # create permissive directories + for pdir in PRIVATE_DIRS: + path = paths.work.join(pdir) + if not path.isdir(): + path.mkdir() + path.chmod(0o777) + assert oct(path.stat().mode).endswith('77'), 'Directory is secure.' + + # set configuration + os.system(' '.join(yadm_y( + 'config', '--bool', 'yadm.auto-perms', 'false'))) + + # run status + run = runner(command=yadm_y('status')) + assert run.success + assert run.err == '' + assert 'On branch master' in run.out + + # created directories are STILL permissive + for pdir in PRIVATE_DIRS: + path = paths.work.join(pdir) + assert oct(path.stat().mode).endswith('77'), 'Directory is secure' diff --git a/test/test_bootstrap.py b/test/test_bootstrap.py new file mode 100644 index 0000000..2adbe33 --- /dev/null +++ b/test/test_bootstrap.py @@ -0,0 +1,31 @@ +"""Test bootstrap""" + +import pytest + + +@pytest.mark.parametrize( + 'exists, executable, code, expect', [ + (False, False, 1, 'Cannot execute bootstrap'), + (True, False, 1, 'is not an executable program'), + (True, True, 123, 'Bootstrap successful'), + ], ids=[ + 'missing', + 'not executable', + 'executable', + ]) +def test_bootstrap( + runner, yadm_y, paths, exists, executable, code, expect): + """Test bootstrap command""" + if exists: + paths.bootstrap.write('') + if executable: + paths.bootstrap.write( + '#!/bin/bash\n' + f'echo {expect}\n' + f'exit {code}\n' + ) + paths.bootstrap.chmod(0o775) + run = runner(command=yadm_y('bootstrap')) + assert run.code == code + assert run.err == '' + assert expect in run.out diff --git a/test/test_clean.py b/test/test_clean.py new file mode 100644 index 0000000..9a2221a --- /dev/null +++ b/test/test_clean.py @@ -0,0 +1,11 @@ +"""Test clean""" + + +def test_clean_command(runner, yadm_y): + """Run with clean command""" + run = runner(command=yadm_y('clean')) + # do nothing, this is a dangerous Git command when managing dot files + # report the command as disabled and exit as a failure + assert run.failure + assert run.err == '' + assert 'disabled' in run.out diff --git a/test/test_clone.py b/test/test_clone.py new file mode 100644 index 0000000..90febe1 --- /dev/null +++ b/test/test_clone.py @@ -0,0 +1,274 @@ +"""Test clone""" + +import os +import re +import pytest + +BOOTSTRAP_CODE = 123 +BOOTSTRAP_MSG = 'Bootstrap successful' + + +@pytest.mark.usefixtures('remote') +@pytest.mark.parametrize( + 'good_remote, repo_exists, force, conflicts', [ + (False, False, False, False), + (True, False, False, False), + (True, True, False, False), + (True, True, True, False), + (True, False, False, True), + ], ids=[ + 'bad remote', + 'simple', + 'existing repo', + '-f', + 'conflicts', + ]) +def test_clone( + runner, paths, yadm_y, repo_config, ds1, + good_remote, repo_exists, force, conflicts): + """Test basic clone operation""" + + # determine remote url + remote_url = f'file://{paths.remote}' + if not good_remote: + remote_url = 'file://bad_remote' + + old_repo = None + if repo_exists: + # put a repo in the way + paths.repo.mkdir() + old_repo = paths.repo.join('old_repo') + old_repo.write('old_repo') + + if conflicts: + ds1.tracked[0].relative.write('conflict') + assert ds1.tracked[0].relative.exists() + + # run the clone command + args = ['clone', '-w', paths.work] + if force: + args += ['-f'] + args += [remote_url] + run = runner(command=yadm_y(*args)) + + if not good_remote: + # clone should fail + assert run.failure + assert run.err != '' + assert 'Unable to fetch origin' in run.out + assert not paths.repo.exists() + elif repo_exists and not force: + # can't overwrite data + assert run.failure + assert run.err == '' + assert 'Git repo already exists' in run.out + else: + # clone should succeed, and repo should be configured properly + assert successful_clone(run, paths, repo_config) + + # ensure conflicts are handled properly + if conflicts: + assert 'NOTE' in run.out + assert 'Merging origin/master failed' in run.out + assert 'Conflicts preserved' in run.out + + # confirm correct Git origin + run = runner( + command=('git', 'remote', '-v', 'show'), + env={'GIT_DIR': paths.repo}) + assert run.success + assert run.err == '' + assert f'origin\t{remote_url}' in run.out + + # ensure conflicts are really preserved + if conflicts: + # test to see if the work tree is actually "clean" + run = runner( + command=yadm_y('status', '-uno', '--porcelain'), + cwd=paths.work) + assert run.success + assert run.err == '' + assert run.out == '', 'worktree has unexpected changes' + + # test to see if the conflicts are stashed + run = runner(command=yadm_y('stash', 'list'), cwd=paths.work) + assert run.success + assert run.err == '' + assert 'Conflicts preserved' in run.out, 'conflicts not stashed' + + # verify content of the stashed conflicts + run = runner(command=yadm_y('stash', 'show', '-p'), cwd=paths.work) + assert run.success + assert run.err == '' + assert '\n+conflict' in run.out, 'conflicts not stashed' + + # another force-related assertion + if old_repo: + if force: + assert not old_repo.exists() + else: + assert old_repo.exists() + + +@pytest.mark.usefixtures('remote') +@pytest.mark.parametrize( + 'bs_exists, bs_param, answer', [ + (False, '--bootstrap', None), + (True, '--bootstrap', None), + (True, '--no-bootstrap', None), + (True, None, 'n'), + (True, None, 'y'), + ], ids=[ + 'force, missing', + 'force, existing', + 'prevent', + 'existing, answer n', + 'existing, answer y', + ]) +def test_clone_bootstrap( + runner, paths, yadm_y, repo_config, bs_exists, bs_param, answer): + """Test bootstrap clone features""" + + # establish a bootstrap + create_bootstrap(paths, bs_exists) + + # run the clone command + args = ['clone', '-w', paths.work] + if bs_param: + args += [bs_param] + args += [f'file://{paths.remote}'] + expect = [] + if answer: + expect.append(('Would you like to execute it now', answer)) + run = runner(command=yadm_y(*args), expect=expect) + + if answer: + assert 'Would you like to execute it now' in run.out + + expected_code = 0 + if bs_exists and bs_param != '--no-bootstrap': + expected_code = BOOTSTRAP_CODE + + if answer == 'y': + expected_code = BOOTSTRAP_CODE + assert BOOTSTRAP_MSG in run.out + elif answer == 'n': + expected_code = 0 + assert BOOTSTRAP_MSG not in run.out + + assert successful_clone(run, paths, repo_config, expected_code) + + if not bs_exists: + assert BOOTSTRAP_MSG not in run.out + + +def create_bootstrap(paths, exists): + """Create bootstrap file for test""" + if exists: + paths.bootstrap.write( + '#!/bin/sh\n' + f'echo {BOOTSTRAP_MSG}\n' + f'exit {BOOTSTRAP_CODE}\n') + paths.bootstrap.chmod(0o775) + assert paths.bootstrap.exists() + else: + assert not paths.bootstrap.exists() + + +@pytest.mark.usefixtures('remote') +@pytest.mark.parametrize( + 'private_type, in_repo, in_work', [ + ('ssh', False, True), + ('gnupg', False, True), + ('ssh', True, True), + ('gnupg', True, True), + ('ssh', True, False), + ('gnupg', True, False), + ], ids=[ + 'open ssh, not tracked', + 'open gnupg, not tracked', + 'open ssh, tracked', + 'open gnupg, tracked', + 'missing ssh, tracked', + 'missing gnupg, tracked', + ]) +def test_clone_perms( + runner, yadm_y, paths, repo_config, + private_type, in_repo, in_work): + """Test clone permission-related functions""" + + # update remote repo to include private data + if in_repo: + rpath = paths.work.mkdir(f'.{private_type}').join('related') + rpath.write('related') + os.system(f'GIT_DIR="{paths.remote}" git add {rpath}') + os.system(f'GIT_DIR="{paths.remote}" git commit -m "{rpath}"') + rpath.remove() + + # ensure local private data is insecure at the start + if in_work: + pdir = paths.work.join(f'.{private_type}') + if not pdir.exists(): + pdir.mkdir() + pfile = pdir.join('existing') + pfile.write('existing') + pdir.chmod(0o777) + pfile.chmod(0o777) + else: + paths.work.remove() + paths.work.mkdir() + + run = runner( + yadm_y('clone', '-d', '-w', paths.work, f'file://{paths.remote}')) + + assert successful_clone(run, paths, repo_config) + if in_work: + # private directories which already exist, should be left as they are, + # which in this test is "insecure". + assert re.search( + f'initial private dir perms drwxrwxrwx.+.{private_type}', + run.out) + assert re.search( + f'pre-merge private dir perms drwxrwxrwx.+.{private_type}', + run.out) + assert re.search( + f'post-merge private dir perms drwxrwxrwx.+.{private_type}', + run.out) + else: + # private directories which are created, should be done prior to + # merging, and with secure permissions. + assert 'initial private dir perms' not in run.out + assert re.search( + f'pre-merge private dir perms drwx------.+.{private_type}', + run.out) + assert re.search( + f'post-merge private dir perms drwx------.+.{private_type}', + run.out) + + # standard perms still apply afterwards unless disabled with auto.perms + assert oct( + paths.work.join(f'.{private_type}').stat().mode).endswith('00'), ( + f'.{private_type} has not been secured by auto.perms') + + +def successful_clone(run, paths, repo_config, expected_code=0): + """Assert clone is successful""" + assert run.code == expected_code + assert 'Initialized' in run.out + assert oct(paths.repo.stat().mode).endswith('00'), 'Repo is not secured' + assert repo_config('core.bare') == 'false' + assert repo_config('status.showUntrackedFiles') == 'no' + assert repo_config('yadm.managed') == 'true' + return True + + +@pytest.fixture() +def remote(paths, ds1_repo_copy): + """Function scoped remote (based on ds1)""" + # pylint: disable=unused-argument + # This is ignored because + # @pytest.mark.usefixtures('ds1_remote_copy') + # cannot be applied to another fixture. + paths.remote.remove() + paths.repo.move(paths.remote) + return None diff --git a/test/test_config.py b/test/test_config.py new file mode 100644 index 0000000..4e44b1c --- /dev/null +++ b/test/test_config.py @@ -0,0 +1,139 @@ +"""Test config""" + +import os +import pytest + +TEST_SECTION = 'test' +TEST_ATTRIBUTE = 'attribute' +TEST_KEY = f'{TEST_SECTION}.{TEST_ATTRIBUTE}' +TEST_VALUE = 'testvalue' +TEST_FILE = f'[{TEST_SECTION}]\n\t{TEST_ATTRIBUTE} = {TEST_VALUE}' + + +def test_config_no_params(runner, yadm_y, supported_configs): + """No parameters + + Display instructions + Display supported configs + Exit with 0 + """ + + run = runner(yadm_y('config')) + + assert run.success + assert run.err == '' + assert 'Please read the CONFIGURATION section' in run.out + for config in supported_configs: + assert config in run.out + + +def test_config_read_missing(runner, yadm_y): + """Read missing attribute + + Display an empty value + Exit with 0 + """ + + run = runner(yadm_y('config', TEST_KEY)) + + assert run.success + assert run.err == '' + assert run.out == '' + + +def test_config_write(runner, yadm_y, paths): + """Write attribute + + Display no output + Update configuration file + Exit with 0 + """ + + run = runner(yadm_y('config', TEST_KEY, TEST_VALUE)) + + assert run.success + assert run.err == '' + assert run.out == '' + assert paths.config.read().strip() == TEST_FILE + + +def test_config_read(runner, yadm_y, paths): + """Read attribute + + Display value + Exit with 0 + """ + + paths.config.write(TEST_FILE) + run = runner(yadm_y('config', TEST_KEY)) + + assert run.success + assert run.err == '' + assert run.out.strip() == TEST_VALUE + + +def test_config_update(runner, yadm_y, paths): + """Update attribute + + Display no output + Update configuration file + Exit with 0 + """ + + paths.config.write(TEST_FILE) + + run = runner(yadm_y('config', TEST_KEY, TEST_VALUE + 'extra')) + + assert run.success + assert run.err == '' + assert run.out == '' + + assert paths.config.read().strip() == TEST_FILE + 'extra' + + +@pytest.mark.usefixtures('ds1_repo_copy') +def test_config_local_read(runner, yadm_y, paths, supported_local_configs): + """Read local attribute + + Display value from the repo config + Exit with 0 + """ + + # populate test values + for config in supported_local_configs: + os.system( + f'GIT_DIR="{paths.repo}" ' + f'git config --local "{config}" "value_of_{config}"') + + # run yadm config + for config in supported_local_configs: + run = runner(yadm_y('config', config)) + assert run.success + assert run.err == '' + assert run.out.strip() == f'value_of_{config}' + + +@pytest.mark.usefixtures('ds1_repo_copy') +def test_config_local_write(runner, yadm_y, paths, supported_local_configs): + """Write local attribute + + Display no output + Write value to the repo config + Exit with 0 + """ + + # run yadm config + for config in supported_local_configs: + run = runner(yadm_y('config', config, f'value_of_{config}')) + assert run.success + assert run.err == '' + assert run.out == '' + + # verify test values + for config in supported_local_configs: + run = runner( + command=('git', 'config', config), + env={'GIT_DIR': paths.repo}) + assert run.success + assert run.err == '' + assert run.out.strip() == f'value_of_{config}' diff --git a/test/test_cygwin_copy.py b/test/test_cygwin_copy.py new file mode 100644 index 0000000..82b08ba --- /dev/null +++ b/test/test_cygwin_copy.py @@ -0,0 +1,59 @@ +"""Test yadm.cygwin_copy""" + +import os +import pytest + + +@pytest.mark.parametrize( + 'setting, is_cygwin, expect_link, pre_existing', [ + (None, False, True, None), + (True, False, True, None), + (False, False, True, None), + (None, True, True, None), + (True, True, False, None), + (False, True, True, None), + (True, True, False, 'link'), + (True, True, False, 'file'), + ], + ids=[ + 'unset, non-cygwin', + 'true, non-cygwin', + 'false, non-cygwin', + 'unset, cygwin', + 'true, cygwin', + 'false, cygwin', + 'pre-existing symlink', + 'pre-existing file', + ]) +@pytest.mark.usefixtures('ds1_copy') +def test_cygwin_copy( + runner, yadm_y, paths, cygwin_sys, tst_sys, + setting, is_cygwin, expect_link, pre_existing): + """Test yadm.cygwin_copy""" + + if setting is not None: + os.system(' '.join(yadm_y('config', 'yadm.cygwin-copy', str(setting)))) + + expected_content = f'test_cygwin_copy##{tst_sys}' + alt_path = paths.work.join('test_cygwin_copy') + if pre_existing == 'symlink': + alt_path.mklinkto(expected_content) + elif pre_existing == 'file': + alt_path.write('wrong content') + + uname_path = paths.root.join('tmp').mkdir() + if is_cygwin: + uname = uname_path.join('uname') + uname.write(f'#!/bin/sh\necho "{cygwin_sys}"\n') + uname.chmod(0o777) + expected_content = f'test_cygwin_copy##{cygwin_sys}' + env = os.environ.copy() + env['PATH'] = ':'.join([str(uname_path), env['PATH']]) + + run = runner(yadm_y('alt'), env=env) + assert run.success + assert run.err == '' + assert 'Linking' in run.out + + assert alt_path.read() == expected_content + assert alt_path.islink() == expect_link diff --git a/test/test_encryption.py b/test/test_encryption.py new file mode 100644 index 0000000..f107ad5 --- /dev/null +++ b/test/test_encryption.py @@ -0,0 +1,392 @@ +"""Test encryption""" + +import os +import pipes +import pytest + +KEY_FILE = 'test/test_key' +KEY_FINGERPRINT = 'F8BBFC746C58945442349BCEBA54FFD04C599B1A' +KEY_NAME = 'yadm-test1' +KEY_TRUST = 'test/ownertrust.txt' +PASSPHRASE = 'ExamplePassword' + +pytestmark = pytest.mark.usefixtures('config_git') + + +def add_asymmetric_key(): + """Add asymmetric key""" + os.system(f'gpg --import {pipes.quote(KEY_FILE)}') + os.system(f'gpg --import-ownertrust < {pipes.quote(KEY_TRUST)}') + + +def remove_asymmetric_key(): + """Remove asymmetric key""" + os.system( + f'gpg --batch --yes ' + f'--delete-secret-keys {pipes.quote(KEY_FINGERPRINT)}') + os.system(f'gpg --batch --yes --delete-key {pipes.quote(KEY_FINGERPRINT)}') + + +@pytest.fixture +def asymmetric_key(): + """Fixture for asymmetric key, removed in teardown""" + add_asymmetric_key() + yield KEY_NAME + remove_asymmetric_key() + + +@pytest.fixture +def encrypt_targets(yadm_y, paths): + """Fixture for setting up data to encrypt + + This fixture: + * inits an empty repo + * creates test files in the work tree + * creates a ".yadm/encrypt" file for testing: + * standard files + * standard globs + * directories + * comments + * empty lines and lines with just space + * exclusions + * returns a list of expected encrypted files + """ + + # init empty yadm repo + os.system(' '.join(yadm_y('init', '-w', str(paths.work), '-f'))) + + expected = [] + + # standard files w/ dirs & spaces + paths.work.join('inc file1').write('inc file1') + expected.append('inc file1') + paths.encrypt.write('inc file1\n') + paths.work.join('inc dir').mkdir() + paths.work.join('inc dir/inc file2').write('inc file2') + expected.append('inc dir/inc file2') + paths.encrypt.write('inc dir/inc file2\n', mode='a') + + # standard globs w/ dirs & spaces + paths.work.join('globs file1').write('globs file1') + expected.append('globs file1') + paths.work.join('globs dir').mkdir() + paths.work.join('globs dir/globs file2').write('globs file2') + expected.append('globs dir/globs file2') + paths.encrypt.write('globs*\n', mode='a') + + # blank lines + paths.encrypt.write('\n \n\t\n', mode='a') + + # comments + paths.work.join('commentfile1').write('commentfile1') + paths.encrypt.write('#commentfile1\n', mode='a') + paths.encrypt.write(' #commentfile1\n', mode='a') + + # exclusions + paths.work.join('extest').mkdir() + paths.encrypt.write('extest/*\n', mode='a') # include within extest + paths.work.join('extest/inglob1').write('inglob1') + paths.work.join('extest/exglob1').write('exglob1') + paths.work.join('extest/exglob2').write('exglob2') + paths.encrypt.write('!extest/ex*\n', mode='a') # exclude the ex* + expected.append('extest/inglob1') # should be left with only in* + + return expected + + +@pytest.fixture(scope='session') +def decrypt_targets(tmpdir_factory, runner): + """Fixture for setting data to decrypt + + This fixture: + * creates symmetric/asymmetric encrypted archives + * creates a list of expected decrypted files + """ + + tmpdir = tmpdir_factory.mktemp('decrypt_targets') + symmetric = tmpdir.join('symmetric.tar.gz.gpg') + asymmetric = tmpdir.join('asymmetric.tar.gz.gpg') + + expected = [] + + tmpdir.join('decrypt1').write('decrypt1') + expected.append('decrypt1') + tmpdir.join('decrypt2').write('decrypt2') + expected.append('decrypt2') + tmpdir.join('subdir').mkdir() + tmpdir.join('subdir/decrypt3').write('subdir/decrypt3') + expected.append('subdir/decrypt3') + + run = runner( + ['tar', 'cvf', '-'] + + expected + + ['|', 'gpg', '--batch', '--yes', '-c'] + + ['--passphrase', pipes.quote(PASSPHRASE)] + + ['--output', pipes.quote(str(symmetric))], + cwd=tmpdir, + shell=True) + assert run.success + + add_asymmetric_key() + run = runner( + ['tar', 'cvf', '-'] + + expected + + ['|', 'gpg', '--batch', '--yes', '-e'] + + ['-r', pipes.quote(KEY_NAME)] + + ['--output', pipes.quote(str(asymmetric))], + cwd=tmpdir, + shell=True) + assert run.success + remove_asymmetric_key() + + return { + 'asymmetric': asymmetric, + 'expected': expected, + 'symmetric': symmetric, + } + + +@pytest.mark.parametrize( + 'mismatched_phrase', [False, True], + ids=['matching_phrase', 'mismatched_phrase']) +@pytest.mark.parametrize( + 'missing_encrypt', [False, True], + ids=['encrypt_exists', 'encrypt_missing']) +@pytest.mark.parametrize( + 'overwrite', [False, True], + ids=['clean', 'overwrite']) +def test_symmetric_encrypt( + runner, yadm_y, paths, encrypt_targets, + overwrite, missing_encrypt, mismatched_phrase): + """Test symmetric encryption""" + + if missing_encrypt: + paths.encrypt.remove() + + matched_phrase = PASSPHRASE + if mismatched_phrase: + matched_phrase = 'mismatched' + + if overwrite: + paths.archive.write('existing archive') + + run = runner(yadm_y('encrypt'), expect=[ + ('passphrase:', PASSPHRASE), + ('passphrase:', matched_phrase), + ]) + + if missing_encrypt or mismatched_phrase: + assert run.failure + else: + assert run.success + assert run.err == '' + + if missing_encrypt: + assert 'does not exist' in run.out + elif mismatched_phrase: + assert 'invalid passphrase' in run.out + else: + assert encrypted_data_valid(runner, paths.archive, encrypt_targets) + + +@pytest.mark.parametrize( + 'wrong_phrase', [False, True], + ids=['correct_phrase', 'wrong_phrase']) +@pytest.mark.parametrize( + 'archive_exists', [True, False], + ids=['archive_exists', 'archive_missing']) +@pytest.mark.parametrize( + 'dolist', [False, True], + ids=['decrypt', 'list']) +def test_symmetric_decrypt( + runner, yadm_y, paths, decrypt_targets, + dolist, archive_exists, wrong_phrase): + """Test decryption""" + + # init empty yadm repo + os.system(' '.join(yadm_y('init', '-w', str(paths.work), '-f'))) + + phrase = PASSPHRASE + if wrong_phrase: + phrase = 'wrong-phrase' + + if archive_exists: + decrypt_targets['symmetric'].copy(paths.archive) + + # to test overwriting + paths.work.join('decrypt1').write('pre-existing file') + + args = [] + + if dolist: + args.append('-l') + run = runner(yadm_y('decrypt') + args, expect=[('passphrase:', phrase)]) + + if archive_exists and not wrong_phrase: + assert run.success + assert run.err == '' + if dolist: + for filename in decrypt_targets['expected']: + if filename != 'decrypt1': # this one should exist + assert not paths.work.join(filename).exists() + assert filename in run.out + else: + for filename in decrypt_targets['expected']: + assert paths.work.join(filename).read() == filename + else: + assert run.failure + + +@pytest.mark.usefixtures('asymmetric_key') +@pytest.mark.parametrize( + 'ask', [False, True], + ids=['no_ask', 'ask']) +@pytest.mark.parametrize( + 'key_exists', [True, False], + ids=['key_exists', 'key_missing']) +@pytest.mark.parametrize( + 'overwrite', [False, True], + ids=['clean', 'overwrite']) +def test_asymmetric_encrypt( + runner, yadm_y, paths, encrypt_targets, + overwrite, key_exists, ask): + """Test asymmetric encryption""" + + # specify encryption recipient + if ask: + os.system(' '.join(yadm_y('config', 'yadm.gpg-recipient', 'ASK'))) + expect = [('Enter the user ID', KEY_NAME), ('Enter the user ID', '')] + else: + os.system(' '.join(yadm_y('config', 'yadm.gpg-recipient', KEY_NAME))) + expect = [] + + if overwrite: + paths.archive.write('existing archive') + + if not key_exists: + remove_asymmetric_key() + + run = runner(yadm_y('encrypt'), expect=expect) + + if key_exists: + assert run.success + assert encrypted_data_valid(runner, paths.archive, encrypt_targets) + else: + assert run.failure + assert 'Unable to write' in run.out + + if ask: + assert 'Enter the user ID' in run.out + + +@pytest.mark.usefixtures('asymmetric_key') +@pytest.mark.parametrize( + 'key_exists', [True, False], + ids=['key_exists', 'key_missing']) +@pytest.mark.parametrize( + 'dolist', [False, True], + ids=['decrypt', 'list']) +def test_asymmetric_decrypt( + runner, yadm_y, paths, decrypt_targets, + dolist, key_exists): + """Test decryption""" + + # init empty yadm repo + os.system(' '.join(yadm_y('init', '-w', str(paths.work), '-f'))) + + decrypt_targets['asymmetric'].copy(paths.archive) + + # to test overwriting + paths.work.join('decrypt1').write('pre-existing file') + + if not key_exists: + remove_asymmetric_key() + + args = [] + + if dolist: + args.append('-l') + run = runner(yadm_y('decrypt') + args) + + if key_exists: + assert run.success + if dolist: + for filename in decrypt_targets['expected']: + if filename != 'decrypt1': # this one should exist + assert not paths.work.join(filename).exists() + assert filename in run.out + else: + for filename in decrypt_targets['expected']: + assert paths.work.join(filename).read() == filename + else: + assert run.failure + assert 'Unable to extract encrypted files' in run.out + + +@pytest.mark.parametrize( + 'untracked', + [False, 'y', 'n'], + ids=['tracked', 'untracked_answer_y', 'untracked_answer_n']) +def test_offer_to_add(runner, yadm_y, paths, encrypt_targets, untracked): + """Test offer to add encrypted archive + + All the other encryption tests use an archive outside of the work tree. + However, the archive is often inside the work tree, and if it is, there + should be an offer to add it to the repo if it is not tracked. + """ + + worktree_archive = paths.work.join('worktree-archive.tar.gpg') + expect = [ + ('passphrase:', PASSPHRASE), + ('passphrase:', PASSPHRASE), + ] + + if untracked: + expect.append(('add it now', untracked)) + else: + worktree_archive.write('exists') + os.system(' '.join(yadm_y('add', str(worktree_archive)))) + + run = runner( + yadm_y('encrypt', '--yadm-archive', str(worktree_archive)), + expect=expect + ) + + assert run.success + assert run.err == '' + assert encrypted_data_valid(runner, worktree_archive, encrypt_targets) + + run = runner( + yadm_y('status', '--porcelain', '-uall', str(worktree_archive))) + assert run.success + assert run.err == '' + + if untracked == 'y': + # should be added to the index + assert f'A {worktree_archive.basename}' in run.out + elif untracked == 'n': + # should NOT be added to the index + assert f'?? {worktree_archive.basename}' in run.out + else: + # should appear modified in the index + assert f'AM {worktree_archive.basename}' in run.out + + +def encrypted_data_valid(runner, encrypted, expected): + """Verify encrypted data matches expectations""" + run = runner([ + 'gpg', + '--passphrase', pipes.quote(PASSPHRASE), + '-d', pipes.quote(str(encrypted)), + '2>/dev/null', + '|', 'tar', 't'], shell=True, report=False) + file_count = 0 + for filename in run.out.splitlines(): + if filename.endswith('/'): + continue + file_count += 1 + assert filename in expected, ( + f'Unexpected file in archive: {filename}') + assert file_count == len(expected), ( + 'Number of files in archive does not match expected') + return True diff --git a/test/test_enter.py b/test/test_enter.py new file mode 100644 index 0000000..202df32 --- /dev/null +++ b/test/test_enter.py @@ -0,0 +1,85 @@ +"""Test enter""" + +import os +import warnings +import pytest + + +@pytest.mark.parametrize( + 'shell, success', [ + ('delete', True), + ('', False), + ('/usr/bin/env', True), + ('noexec', False), + ], ids=[ + 'shell-missing', + 'shell-empty', + 'shell-env', + 'shell-noexec', + ]) +@pytest.mark.usefixtures('ds1_copy') +def test_enter(runner, yadm_y, paths, shell, success): + """Enter tests""" + env = os.environ.copy() + if shell == 'delete': + # remove shell + if 'SHELL' in env: + del env['SHELL'] + elif shell == 'noexec': + # specify a non-executable path + noexec = paths.root.join('noexec') + noexec.write('') + noexec.chmod(0o664) + env['SHELL'] = str(noexec) + else: + env['SHELL'] = shell + run = runner(command=yadm_y('enter'), env=env) + assert run.success == success + assert run.err == '' + prompt = f'yadm shell ({paths.repo})' + if success: + assert run.out.startswith('Entering yadm repo') + assert run.out.rstrip().endswith('Leaving yadm repo') + if shell == 'delete': + # When SHELL is empty (unlikely), it is attempted to be run anyway. + # This is a but which must be fixed. + warnings.warn('Unhandled bug: SHELL executed when empty', Warning) + else: + assert f'PROMPT={prompt}' in run.out + assert f'PS1={prompt}' in run.out + assert f'GIT_DIR={paths.repo}' in run.out + if not success: + assert 'does not refer to an executable' in run.out + if 'env' in shell: + assert f'GIT_DIR={paths.repo}' in run.out + assert 'PROMPT=yadm shell' in run.out + assert 'PS1=yadm shell' in run.out + + +@pytest.mark.parametrize( + 'shell, opts, path', [ + ('bash', '--norc', '\\w'), + ('csh', '-f', '%~'), + ('zsh', '-f', '%~'), + ], ids=[ + 'bash', + 'csh', + 'zsh', + ]) +@pytest.mark.usefixtures('ds1_copy') +def test_enter_shell_ops(runner, yadm_y, paths, shell, opts, path): + """Enter tests for specific shell options""" + + # Create custom shell to detect options passed + custom_shell = paths.root.join(shell) + custom_shell.write('#!/bin/sh\necho OPTS=$*\necho PROMPT=$PROMPT') + custom_shell.chmod(0o775) + + env = os.environ.copy() + env['SHELL'] = custom_shell + + run = runner(command=yadm_y('enter'), env=env) + assert run.success + assert run.err == '' + assert f'OPTS={opts}' in run.out + assert f'PROMPT=yadm shell ({paths.repo}) {path} >' in run.out diff --git a/test/test_git.py b/test/test_git.py new file mode 100644 index 0000000..427c54a --- /dev/null +++ b/test/test_git.py @@ -0,0 +1,58 @@ +"""Test git""" + +import re +import pytest + + +@pytest.mark.usefixtures('ds1_copy') +def test_git(runner, yadm_y, paths): + """Test series of passthrough git commands + + Passthru unknown commands to Git + Git command 'add' - badfile + Git command 'add' + Git command 'status' + Git command 'commit' + Git command 'log' + """ + + # passthru unknown commands to Git + run = runner(command=yadm_y('bogus')) + assert run.failure + assert "git: 'bogus' is not a git command." in run.err + assert "See 'git --help'" in run.err + assert run.out == '' + + # git command 'add' - badfile + run = runner(command=yadm_y('add', '-v', 'does_not_exist')) + assert run.code == 128 + assert "pathspec 'does_not_exist' did not match any files" in run.err + assert run.out == '' + + # git command 'add' + newfile = paths.work.join('test_git') + newfile.write('test_git') + run = runner(command=yadm_y('add', '-v', str(newfile))) + assert run.success + assert run.err == '' + assert "add 'test_git'" in run.out + + # git command 'status' + run = runner(command=yadm_y('status')) + assert run.success + assert run.err == '' + assert re.search(r'new file:\s+test_git', run.out) + + # git command 'commit' + run = runner(command=yadm_y('commit', '-m', 'Add test_git')) + assert run.success + assert run.err == '' + assert '1 file changed' in run.out + assert '1 insertion' in run.out + assert re.search(r'create mode .+ test_git', run.out) + + # git command 'log' + run = runner(command=yadm_y('log', '--oneline')) + assert run.success + assert run.err == '' + assert 'Add test_git' in run.out diff --git a/test/test_help.py b/test/test_help.py new file mode 100644 index 0000000..79a7652 --- /dev/null +++ b/test/test_help.py @@ -0,0 +1,17 @@ +"""Test help""" + + +def test_missing_command(runner, yadm_y): + """Run without any command""" + run = runner(command=yadm_y()) + assert run.failure + assert run.err == '' + assert run.out.startswith('Usage: yadm') + + +def test_help_command(runner, yadm_y): + """Run with help command""" + run = runner(command=yadm_y('help')) + assert run.failure + assert run.err == '' + assert run.out.startswith('Usage: yadm') diff --git a/test/test_hooks.py b/test/test_hooks.py new file mode 100644 index 0000000..f1df91e --- /dev/null +++ b/test/test_hooks.py @@ -0,0 +1,90 @@ +"""Test hooks""" + +import pytest + + +@pytest.mark.parametrize( + 'pre, pre_code, post, post_code', [ + (False, 0, False, 0), + (True, 0, False, 0), + (True, 5, False, 0), + (False, 0, True, 0), + (False, 0, True, 5), + (True, 0, True, 0), + (True, 5, True, 5), + ], ids=[ + 'no-hooks', + 'pre-success', + 'pre-fail', + 'post-success', + 'post-fail', + 'pre-post-success', + 'pre-post-fail', + ]) +def test_hooks( + runner, yadm_y, paths, + pre, pre_code, post, post_code): + """Test pre/post hook""" + + # generate hooks + if pre: + create_hook(paths, 'pre_version', pre_code) + if post: + create_hook(paths, 'post_version', post_code) + + # run yadm + run = runner(yadm_y('version')) + # when a pre hook fails, yadm should exit with the hook's code + assert run.code == pre_code + assert run.err == '' + + if pre: + assert 'HOOK:pre_version' in run.out + # if pre hook is missing or successful, yadm itself should exit 0 + if run.success: + if post: + assert 'HOOK:post_version' in run.out + else: + # when a pre hook fails, yadm should not run the command + assert 'version will not be run' in run.out + # when a pre hook fails, yadm should not run the post hook + assert 'HOOK:post_version' not in run.out + + +# repo fixture is needed to test the population of YADM_HOOK_WORK +@pytest.mark.usefixtures('ds1_repo_copy') +def test_hook_env(runner, yadm_y, paths): + """Test hook environment""" + + # test will be done with a non existent "git" passthru command + # which should exit with a failing code + cmd = 'passthrucmd' + + # write the hook + hook = paths.hooks.join(f'post_{cmd}') + hook.write('#!/bin/sh\nenv\n') + hook.chmod(0o755) + + run = runner(yadm_y(cmd, 'extra_args')) + + # expect passthru to fail + assert run.failure + assert f"'{cmd}' is not a git command" in run.err + + # verify hook environment + assert 'YADM_HOOK_EXIT=1\n' in run.out + assert f'YADM_HOOK_COMMAND={cmd}\n' in run.out + assert f'YADM_HOOK_FULL_COMMAND={cmd} extra_args\n' in run.out + assert f'YADM_HOOK_REPO={paths.repo}\n' in run.out + assert f'YADM_HOOK_WORK={paths.work}\n' in run.out + + +def create_hook(paths, name, code): + """Create hook""" + hook = paths.hooks.join(name) + hook.write( + '#!/bin/sh\n' + f'echo HOOK:{name}\n' + f'exit {code}\n' + ) + hook.chmod(0o755) diff --git a/test/test_init.py b/test/test_init.py new file mode 100644 index 0000000..1519b38 --- /dev/null +++ b/test/test_init.py @@ -0,0 +1,78 @@ +"""Test init""" + +import pytest + + +@pytest.mark.parametrize( + 'alt_work, repo_present, force', [ + (False, False, False), + (True, False, False), + (False, True, False), + (False, True, True), + (True, True, True), + ], ids=[ + 'simple', + '-w', + 'existing repo', + '-f', + '-w & -f', + ]) +@pytest.mark.usefixtures('ds1_work_copy') +def test_init( + runner, yadm_y, paths, repo_config, alt_work, repo_present, force): + """Test init + + Repos should have attribs: + - 0600 permissions + - not bare + - worktree = $HOME + - showUntrackedFiles = no + - yadm.managed = true + """ + + # these tests will assume this for $HOME + home = str(paths.root.mkdir('HOME')) + + # ds1_work_copy comes WITH an empty repo dir present. + old_repo = paths.repo.join('old_repo') + if repo_present: + # Let's put some data in it, so we can confirm that data is gone when + # forced to be overwritten. + old_repo.write('old repo data') + assert old_repo.isfile() + else: + paths.repo.remove() + + # command args + args = ['init'] + if alt_work: + args.extend(['-w', paths.work]) + if force: + args.append('-f') + + # run init + run = runner(yadm_y(*args), env={'HOME': home}) + assert run.err == '' + + if repo_present and not force: + assert run.failure + assert 'repo already exists' in run.out + assert old_repo.isfile(), 'Missing original repo' + else: + assert run.success + assert 'Initialized empty shared Git repository' in run.out + + if repo_present: + assert not old_repo.isfile(), 'Original repo still exists' + + if alt_work: + assert repo_config('core.worktree') == paths.work + else: + assert repo_config('core.worktree') == home + + # uniform repo assertions + assert oct(paths.repo.stat().mode).endswith('00'), ( + 'Repo is not secure') + assert repo_config('core.bare') == 'false' + assert repo_config('status.showUntrackedFiles') == 'no' + assert repo_config('yadm.managed') == 'true' diff --git a/test/test_introspect.py b/test/test_introspect.py new file mode 100644 index 0000000..9026e98 --- /dev/null +++ b/test/test_introspect.py @@ -0,0 +1,46 @@ +"""Test introspect""" + +import pytest + + +@pytest.mark.parametrize( + 'name', [ + '', + 'invalid', + 'commands', + 'configs', + 'repo', + 'switches', + ]) +def test_introspect_category( + runner, yadm_y, paths, name, + supported_commands, supported_configs, supported_switches): + """Validate introspection category""" + if name: + run = runner(command=yadm_y('introspect', name)) + else: + run = runner(command=yadm_y('introspect')) + + assert run.success + assert run.err == '' + + expected = [] + if name == 'commands': + expected = supported_commands + elif name == 'config': + expected = supported_configs + elif name == 'switches': + expected = supported_switches + + # assert values + if name in ('', 'invalid'): + assert run.out == '' + if name == 'repo': + assert run.out.rstrip() == paths.repo + + # make sure every expected value is present + for value in expected: + assert value in run.out + # make sure nothing extra is present + if expected: + assert len(run.out.split()) == len(expected) diff --git a/test/test_jinja.py b/test/test_jinja.py new file mode 100644 index 0000000..6e43f35 --- /dev/null +++ b/test/test_jinja.py @@ -0,0 +1,186 @@ +"""Test jinja""" + +import os +import re +import pytest +import utils + + +@pytest.fixture(scope='module') +def envtpl_present(runner): + """Is envtpl present and working?""" + try: + run = runner(command=['envtpl', '-h']) + if run.success: + return True + except BaseException: + pass + return False + + +@pytest.mark.usefixtures('ds1_copy') +def test_local_override(runner, yadm_y, paths, + tst_distro, envtpl_present): + """Test local overrides""" + if not envtpl_present: + pytest.skip('Unable to test without envtpl.') + + # define local overrides + utils.set_local(paths, 'class', 'or-class') + utils.set_local(paths, 'hostname', 'or-hostname') + utils.set_local(paths, 'os', 'or-os') + utils.set_local(paths, 'user', 'or-user') + + template = ( + 'j2-{{ YADM_CLASS }}-' + '{{ YADM_OS }}-{{ YADM_HOSTNAME }}-' + '{{ YADM_USER }}-{{ YADM_DISTRO }}' + ) + expected = f'j2-or-class-or-os-or-hostname-or-user-{tst_distro}' + + utils.create_alt_files(paths, '##yadm.j2', content=template) + + # run alt to trigger linking + run = runner(yadm_y('alt')) + assert run.success + assert run.err == '' + created = created_list(run.out) + + # assert the proper creation has occurred + for file_path in (utils.ALT_FILE1, utils.ALT_FILE2): + source_file = file_path + '##yadm.j2' + assert paths.work.join(file_path).isfile() + lines = paths.work.join(file_path).readlines(cr=False) + assert lines[0] == source_file + assert lines[1] == expected + assert str(paths.work.join(source_file)) in created + + +@pytest.mark.parametrize('autoalt', [None, 'true', 'false']) +@pytest.mark.usefixtures('ds1_copy') +def test_auto_alt(runner, yadm_y, paths, autoalt, tst_sys, + envtpl_present): + """Test setting auto-alt""" + + if not envtpl_present: + pytest.skip('Unable to test without envtpl.') + + # set the value of auto-alt + if autoalt: + os.system(' '.join(yadm_y('config', 'yadm.auto-alt', autoalt))) + + # create file + jinja_suffix = '##yadm.j2' + utils.create_alt_files(paths, jinja_suffix, content='{{ YADM_OS }}') + + # run status to possibly trigger linking + run = runner(yadm_y('status')) + assert run.success + assert run.err == '' + created = created_list(run.out) + + # assert the proper creation has occurred + for file_path in (utils.ALT_FILE1, utils.ALT_FILE2): + source_file = file_path + jinja_suffix + if autoalt == 'false': + assert not paths.work.join(file_path).exists() + else: + assert paths.work.join(file_path).isfile() + lines = paths.work.join(file_path).readlines(cr=False) + assert lines[0] == source_file + assert lines[1] == tst_sys + # no created output when run via auto-alt + assert str(paths.work.join(source_file)) not in created + + +@pytest.mark.usefixtures('ds1_copy') +def test_jinja_envtpl_missing(runner, paths): + """Test operation when envtpl is missing""" + + script = f""" + YADM_TEST=1 source {paths.pgm} + process_global_args -Y "{paths.yadm}" + set_operating_system + configure_paths + ENVTPL_PROGRAM='envtpl_missing' main alt + """ + + utils.create_alt_files(paths, '##yadm.j2') + + run = runner(command=['bash'], inp=script) + assert run.success + assert run.err == '' + assert f'envtpl not available, not creating' in run.out + + +@pytest.mark.parametrize( + 'tracked, encrypt, exclude', [ + (False, False, False), + (True, False, False), + (False, True, False), + (False, True, True), + ], ids=[ + 'untracked', + 'tracked', + 'encrypted', + 'excluded', + ]) +@pytest.mark.usefixtures('ds1_copy') +def test_jinja(runner, yadm_y, paths, + tst_sys, tst_host, tst_user, tst_distro, + tracked, encrypt, exclude, + envtpl_present): + """Test jinja processing""" + + if not envtpl_present: + pytest.skip('Unable to test without envtpl.') + + jinja_suffix = '##yadm.j2' + + # set the class + tst_class = 'testclass' + utils.set_local(paths, 'class', tst_class) + + template = ( + 'j2-{{ YADM_CLASS }}-' + '{{ YADM_OS }}-{{ YADM_HOSTNAME }}-' + '{{ YADM_USER }}-{{ YADM_DISTRO }}' + ) + expected = ( + f'j2-{tst_class}-' + f'{tst_sys}-{tst_host}-' + f'{tst_user}-{tst_distro}' + ) + + utils.create_alt_files(paths, jinja_suffix, content=template, + tracked=tracked, encrypt=encrypt, exclude=exclude) + + # run alt to trigger linking + run = runner(yadm_y('alt')) + assert run.success + assert run.err == '' + created = created_list(run.out) + + # assert the proper creation has occurred + for file_path in (utils.ALT_FILE1, utils.ALT_FILE2): + source_file = file_path + jinja_suffix + if tracked or (encrypt and not exclude): + assert paths.work.join(file_path).isfile() + lines = paths.work.join(file_path).readlines(cr=False) + assert lines[0] == source_file + assert lines[1] == expected + assert str(paths.work.join(source_file)) in created + else: + assert not paths.work.join(file_path).exists() + assert str(paths.work.join(source_file)) not in created + + +def created_list(output): + """Parse output, and return list of created files""" + + created = dict() + for line in output.splitlines(): + match = re.match('Creating (.+) from template (.+)$', line) + if match: + created[match.group(1)] = match.group(2) + return created.values() diff --git a/test/test_list.py b/test/test_list.py new file mode 100644 index 0000000..44a5573 --- /dev/null +++ b/test/test_list.py @@ -0,0 +1,47 @@ +"""Test list""" + +import os +import pytest + + +@pytest.mark.parametrize( + 'location', [ + 'work', + 'outside', + 'subdir', + ]) +@pytest.mark.usefixtures('ds1_copy') +def test_list(runner, yadm_y, paths, ds1, location): + """List tests""" + if location == 'work': + run_dir = paths.work + elif location == 'outside': + run_dir = paths.work.join('..') + elif location == 'subdir': + # first directory with tracked data + run_dir = paths.work.join(ds1.tracked_dirs[0]) + with run_dir.as_cwd(): + # test with '-a' + # should get all tracked files, relative to the work path + run = runner(command=yadm_y('list', '-a')) + assert run.success + assert run.err == '' + returned_files = set(run.out.splitlines()) + expected_files = set([e.path for e in ds1 if e.tracked]) + assert returned_files == expected_files + # test without '-a' + # should get all tracked files, relative to the work path unless in a + # subdir, then those should be a limited set of files, relative to the + # subdir + run = runner(command=yadm_y('list')) + assert run.success + assert run.err == '' + returned_files = set(run.out.splitlines()) + if location == 'subdir': + basepath = os.path.basename(os.getcwd()) + # only expect files within the subdir + # names should be relative to subdir + expected_files = set( + [e.path[len(basepath)+1:] for e in ds1 + if e.tracked and e.path.startswith(basepath)]) + assert returned_files == expected_files diff --git a/test/test_perms.py b/test/test_perms.py new file mode 100644 index 0000000..eb7ad8f --- /dev/null +++ b/test/test_perms.py @@ -0,0 +1,111 @@ +"""Test perms""" + +import os +import warnings +import pytest + + +@pytest.mark.parametrize('autoperms', ['notest', 'unset', 'true', 'false']) +@pytest.mark.usefixtures('ds1_copy') +def test_perms(runner, yadm_y, paths, ds1, autoperms): + """Test perms""" + # set the value of auto-perms + if autoperms != 'notest': + if autoperms != 'unset': + os.system(' '.join(yadm_y('config', 'yadm.auto-perms', autoperms))) + + # privatepaths will hold all paths that should become secured + privatepaths = [paths.work.join('.ssh'), paths.work.join('.gnupg')] + privatepaths += [paths.work.join(private.path) for private in ds1.private] + + # create an archive file + os.system(f'touch "{str(paths.archive)}"') + privatepaths.append(paths.archive) + + # create encrypted file test data + efile1 = paths.work.join('efile1') + efile1.write('efile1') + efile2 = paths.work.join('efile2') + efile2.write('efile2') + paths.encrypt.write('efile1\nefile2\n!efile1\n') + insecurepaths = [efile1] + privatepaths.append(efile2) + + # assert these paths begin unsecured + for private in privatepaths + insecurepaths: + assert not oct(private.stat().mode).endswith('00'), ( + 'Path started secured') + + cmd = 'perms' + if autoperms != 'notest': + cmd = 'status' + run = runner(yadm_y(cmd)) + assert run.success + assert run.err == '' + if cmd == 'perms': + assert run.out == '' + + # these paths should be secured if processing perms + for private in privatepaths: + if '.p2' in private.basename or '.p4' in private.basename: + # Dot files within .ssh/.gnupg are not protected. + # This is a but which must be fixed + warnings.warn('Unhandled bug: private dot files', Warning) + continue + if autoperms == 'false': + assert not oct(private.stat().mode).endswith('00'), ( + 'Path should not be secured') + else: + assert oct(private.stat().mode).endswith('00'), ( + 'Path has not been secured') + + # these paths should never be secured + for private in insecurepaths: + assert not oct(private.stat().mode).endswith('00'), ( + 'Path should not be secured') + + +@pytest.mark.parametrize('sshperms', [None, 'true', 'false']) +@pytest.mark.parametrize('gpgperms', [None, 'true', 'false']) +@pytest.mark.usefixtures('ds1_copy') +def test_perms_control(runner, yadm_y, paths, ds1, sshperms, gpgperms): + """Test fine control of perms""" + # set the value of ssh-perms + if sshperms: + os.system(' '.join(yadm_y('config', 'yadm.ssh-perms', sshperms))) + + # set the value of gpg-perms + if gpgperms: + os.system(' '.join(yadm_y('config', 'yadm.gpg-perms', gpgperms))) + + # privatepaths will hold all paths that should become secured + privatepaths = [paths.work.join('.ssh'), paths.work.join('.gnupg')] + privatepaths += [paths.work.join(private.path) for private in ds1.private] + + # assert these paths begin unsecured + for private in privatepaths: + assert not oct(private.stat().mode).endswith('00'), ( + 'Path started secured') + + run = runner(yadm_y('perms')) + assert run.success + assert run.err == '' + assert run.out == '' + + # these paths should be secured if processing perms + for private in privatepaths: + if '.p2' in private.basename or '.p4' in private.basename: + # Dot files within .ssh/.gnupg are not protected. + # This is a but which must be fixed + warnings.warn('Unhandled bug: private dot files', Warning) + continue + if ( + (sshperms == 'false' and 'ssh' in str(private)) + or + (gpgperms == 'false' and 'gnupg' in str(private)) + ): + assert not oct(private.stat().mode).endswith('00'), ( + 'Path should not be secured') + else: + assert oct(private.stat().mode).endswith('00'), ( + 'Path has not been secured') diff --git a/test/test_syntax.py b/test/test_syntax.py new file mode 100644 index 0000000..5408885 --- /dev/null +++ b/test/test_syntax.py @@ -0,0 +1,41 @@ +"""Syntax checks""" + +import os +import pytest + + +def test_yadm_syntax(runner, yadm): + """Is syntactically valid""" + run = runner(command=['bash', '-n', yadm]) + assert run.success + + +def test_shellcheck(runner, yadm, shellcheck_version): + """Passes shellcheck""" + run = runner(command=['shellcheck', '-V'], report=False) + if f'version: {shellcheck_version}' not in run.out: + pytest.skip('Unsupported shellcheck version') + run = runner(command=['shellcheck', '-s', 'bash', yadm]) + assert run.success + + +def test_pylint(runner, pylint_version): + """Passes pylint""" + run = runner(command=['pylint', '--version'], report=False) + if f'pylint {pylint_version}' not in run.out: + pytest.skip('Unsupported pylint version') + pyfiles = list() + for tfile in os.listdir('test'): + if tfile.endswith('.py'): + pyfiles.append(f'test/{tfile}') + run = runner(command=['pylint'] + pyfiles) + assert run.success + + +def test_flake8(runner, flake8_version): + """Passes flake8""" + run = runner(command=['flake8', '--version'], report=False) + if not run.out.startswith(flake8_version): + pytest.skip('Unsupported flake8 version') + run = runner(command=['flake8', 'test']) + assert run.success diff --git a/test/test_unit_bootstrap_available.py b/test/test_unit_bootstrap_available.py new file mode 100644 index 0000000..f37ac08 --- /dev/null +++ b/test/test_unit_bootstrap_available.py @@ -0,0 +1,33 @@ +"""Unit tests: bootstrap_available""" + + +def test_bootstrap_missing(runner, paths): + """Test result of bootstrap_available, when bootstrap missing""" + run_test(runner, paths, False) + + +def test_bootstrap_no_exec(runner, paths): + """Test result of bootstrap_available, when bootstrap not executable""" + paths.bootstrap.write('') + paths.bootstrap.chmod(0o644) + run_test(runner, paths, False) + + +def test_bootstrap_exec(runner, paths): + """Test result of bootstrap_available, when bootstrap executable""" + paths.bootstrap.write('') + paths.bootstrap.chmod(0o775) + run_test(runner, paths, True) + + +def run_test(runner, paths, success): + """Run bootstrap_available, and test result""" + script = f""" + YADM_TEST=1 source {paths.pgm} + YADM_BOOTSTRAP='{paths.bootstrap}' + bootstrap_available + """ + run = runner(command=['bash'], inp=script) + assert run.success == success + assert run.err == '' + assert run.out == '' diff --git a/test/test_unit_configure_paths.py b/test/test_unit_configure_paths.py new file mode 100644 index 0000000..094ff6b --- /dev/null +++ b/test/test_unit_configure_paths.py @@ -0,0 +1,80 @@ +"""Unit tests: configure_paths""" + +import pytest + +ARCHIVE = 'files.gpg' +BOOTSTRAP = 'bootstrap' +CONFIG = 'config' +ENCRYPT = 'encrypt' +HOME = '/testhome' +REPO = 'repo.git' +YDIR = '.yadm' + + +@pytest.mark.parametrize( + 'override, expect', [ + (None, {}), + ('-Y', {}), + ('--yadm-repo', {'repo': 'YADM_REPO', 'git': 'GIT_DIR'}), + ('--yadm-config', {'config': 'YADM_CONFIG'}), + ('--yadm-encrypt', {'encrypt': 'YADM_ENCRYPT'}), + ('--yadm-archive', {'archive': 'YADM_ARCHIVE'}), + ('--yadm-bootstrap', {'bootstrap': 'YADM_BOOTSTRAP'}), + ], ids=[ + 'default', + 'override yadm dir', + 'override repo', + 'override config', + 'override encrypt', + 'override archive', + 'override bootstrap', + ]) +def test_config(runner, paths, override, expect): + """Test configure_paths""" + opath = 'override' + matches = match_map() + args = [] + if override == '-Y': + matches = match_map('/' + opath) + + if override: + args = [override, '/' + opath] + for ekey in expect.keys(): + matches[ekey] = f'{expect[ekey]}="/{opath}"' + run_test( + runner, paths, + [override, opath], + ['must specify a fully qualified'], 1) + + run_test(runner, paths, args, matches.values(), 0) + + +def match_map(yadm_dir=None): + """Create a dictionary of matches, relative to yadm_dir""" + if not yadm_dir: + yadm_dir = '/'.join([HOME, YDIR]) + return { + 'yadm': f'YADM_DIR="{yadm_dir}"', + 'repo': f'YADM_REPO="{yadm_dir}/{REPO}"', + 'config': f'YADM_CONFIG="{yadm_dir}/{CONFIG}"', + 'encrypt': f'YADM_ENCRYPT="{yadm_dir}/{ENCRYPT}"', + 'archive': f'YADM_ARCHIVE="{yadm_dir}/{ARCHIVE}"', + 'bootstrap': f'YADM_BOOTSTRAP="{yadm_dir}/{BOOTSTRAP}"', + 'git': f'GIT_DIR="{yadm_dir}/{REPO}"', + } + + +def run_test(runner, paths, args, expected_matches, expected_code=0): + """Run proces global args, and run configure_paths""" + argstring = ' '.join(['"'+a+'"' for a in args]) + script = f""" + YADM_TEST=1 HOME="{HOME}" source {paths.pgm} + process_global_args {argstring} + configure_paths + declare -p | grep -E '(YADM|GIT)_' + """ + run = runner(command=['bash'], inp=script) + assert run.code == expected_code + assert run.err == '' + for match in expected_matches: + assert match in run.out diff --git a/test/test_unit_parse_encrypt.py b/test/test_unit_parse_encrypt.py new file mode 100644 index 0000000..914d907 --- /dev/null +++ b/test/test_unit_parse_encrypt.py @@ -0,0 +1,175 @@ +"""Unit tests: parse_encrypt""" + +import pytest + + +def test_not_called(runner, paths): + """Test parse_encrypt (not called)""" + run = run_parse_encrypt(runner, paths, skip_parse=True) + assert run.success + assert run.err == '' + assert 'EIF:unparsed' in run.out, 'EIF should be unparsed' + assert 'EIF_COUNT:1' in run.out, 'Only value of EIF should be unparsed' + + +def test_short_circuit(runner, paths): + """Test parse_encrypt (short-circuit)""" + run = run_parse_encrypt(runner, paths, twice=True) + assert run.success + assert run.err == '' + assert 'PARSE_ENCRYPT_SHORT=parse_encrypt() not reprocessed' in run.out, ( + 'parse_encrypt() should short-circuit') + + +@pytest.mark.parametrize( + 'encrypt', [ + ('missing'), + ('empty'), + ]) +def test_empty(runner, paths, encrypt): + """Test parse_encrypt (file missing/empty)""" + + # write encrypt file + if encrypt == 'missing': + assert not paths.encrypt.exists(), 'Encrypt should be missing' + else: + paths.encrypt.write('') + assert paths.encrypt.exists(), 'Encrypt should exist' + assert paths.encrypt.size() == 0, 'Encrypt should be empty' + + # run parse_encrypt + run = run_parse_encrypt(runner, paths) + assert run.success + assert run.err == '' + + # validate parsing result + assert 'EIF_COUNT:0' in run.out, 'EIF should be empty' + + +@pytest.mark.usefixtures('ds1_repo_copy') +def test_file_parse_encrypt(runner, paths): + """Test parse_encrypt + + Test an array of supported features of the encrypt configuration. + """ + + edata = '' + expected = set() + + # empty line + edata += '\n' + + # simple comments + edata += '# a simple comment\n' + edata += ' # a comment with leading space\n' + + # unreferenced directory + paths.work.join('unreferenced').mkdir() + + # simple files + edata += 'simple_file\n' + edata += 'simple.file\n' + paths.work.join('simple_file').write('') + paths.work.join('simple.file').write('') + paths.work.join('simple_file2').write('') + paths.work.join('simple.file2').write('') + expected.add('simple_file') + expected.add('simple.file') + + # simple files in directories + edata += 'simple_dir/simple_file\n' + paths.work.join('simple_dir/simple_file').write('', ensure=True) + paths.work.join('simple_dir/simple_file2').write('', ensure=True) + expected.add('simple_dir/simple_file') + + # paths with spaces + edata += 'with space/with space\n' + paths.work.join('with space/with space').write('', ensure=True) + paths.work.join('with space/with space2').write('', ensure=True) + expected.add('with space/with space') + + # hidden files + edata += '.hidden\n' + paths.work.join('.hidden').write('') + expected.add('.hidden') + + # hidden files in directories + edata += '.hidden_dir/.hidden_file\n' + paths.work.join('.hidden_dir/.hidden_file').write('', ensure=True) + expected.add('.hidden_dir/.hidden_file') + + # wildcards + edata += 'wild*\n' + paths.work.join('wildcard1').write('', ensure=True) + paths.work.join('wildcard2').write('', ensure=True) + expected.add('wildcard1') + expected.add('wildcard2') + + edata += 'dirwild*\n' + paths.work.join('dirwildcard/file1').write('', ensure=True) + paths.work.join('dirwildcard/file2').write('', ensure=True) + expected.add('dirwildcard') + + # excludes + edata += 'exclude*\n' + edata += 'ex ex/*\n' + paths.work.join('exclude_file1').write('') + paths.work.join('exclude_file2.ex').write('') + paths.work.join('exclude_file3.ex3').write('') + expected.add('exclude_file1') + expected.add('exclude_file3.ex3') + edata += '!*.ex\n' + edata += '!ex ex/*.txt\n' + paths.work.join('ex ex/file4').write('', ensure=True) + paths.work.join('ex ex/file5.txt').write('', ensure=True) + paths.work.join('ex ex/file6.text').write('', ensure=True) + expected.add('ex ex/file4') + expected.add('ex ex/file6.text') + + # write encrypt file + print(f'ENCRYPT:\n---\n{edata}---\n') + paths.encrypt.write(edata) + assert paths.encrypt.isfile() + + # run parse_encrypt + run = run_parse_encrypt(runner, paths) + assert run.success + assert run.err == '' + + assert f'EIF_COUNT:{len(expected)}' in run.out, 'EIF count wrong' + for expected_file in expected: + assert f'EIF:{expected_file}\n' in run.out + + +def run_parse_encrypt( + runner, paths, + skip_parse=False, + twice=False): + """Run parse_encrypt + + A count of ENCRYPT_INCLUDE_FILES will be reported as EIF_COUNT:X. All + values of ENCRYPT_INCLUDE_FILES will be reported as individual EIF:value + lines. + """ + parse_cmd = 'parse_encrypt' + if skip_parse: + parse_cmd = '' + if twice: + parse_cmd = 'parse_encrypt; parse_encrypt' + script = f""" + YADM_TEST=1 source {paths.pgm} + YADM_ENCRYPT={paths.encrypt} + export YADM_ENCRYPT + GIT_DIR={paths.repo} + export GIT_DIR + {parse_cmd} + export ENCRYPT_INCLUDE_FILES + export PARSE_ENCRYPT_SHORT + env + echo EIF_COUNT:${{#ENCRYPT_INCLUDE_FILES[@]}} + for value in "${{ENCRYPT_INCLUDE_FILES[@]}}"; do + echo "EIF:$value" + done + """ + run = runner(command=['bash'], inp=script) + return run diff --git a/test/test_unit_query_distro.py b/test/test_unit_query_distro.py new file mode 100644 index 0000000..3c53c54 --- /dev/null +++ b/test/test_unit_query_distro.py @@ -0,0 +1,26 @@ +"""Unit tests: query_distro""" + + +def test_lsb_release_present(runner, yadm, tst_distro): + """Match lsb_release -si when present""" + script = f""" + YADM_TEST=1 source {yadm} + query_distro + """ + run = runner(command=['bash'], inp=script) + assert run.success + assert run.err == '' + assert run.out.rstrip() == tst_distro + + +def test_lsb_release_missing(runner, yadm): + """Empty value when missing""" + script = f""" + YADM_TEST=1 source {yadm} + LSB_RELEASE_PROGRAM="missing_lsb_release" + query_distro + """ + run = runner(command=['bash'], inp=script) + assert run.success + assert run.err == '' + assert run.out.rstrip() == '' diff --git a/test/test_unit_set_os.py b/test/test_unit_set_os.py new file mode 100644 index 0000000..d2f2a2a --- /dev/null +++ b/test/test_unit_set_os.py @@ -0,0 +1,36 @@ +"""Unit tests: set_operating_system""" + +import pytest + + +@pytest.mark.parametrize( + 'proc_value, expected_os', [ + ('missing', 'uname'), + ('has Microsoft inside', 'WSL'), + ('another value', 'uname'), + ], ids=[ + '/proc/version missing', + '/proc/version includes MS', + '/proc/version excludes MS', + ]) +def test_set_operating_system( + runner, paths, tst_sys, proc_value, expected_os): + """Run set_operating_system and test result""" + + # Normally /proc/version (set in PROC_VERSION) is inspected to identify + # WSL. During testing, we will override that value. + proc_version = paths.root.join('proc_version') + if proc_value != 'missing': + proc_version.write(proc_value) + script = f""" + YADM_TEST=1 source {paths.pgm} + PROC_VERSION={proc_version} + set_operating_system + echo $OPERATING_SYSTEM + """ + run = runner(command=['bash'], inp=script) + assert run.success + assert run.err == '' + if expected_os == 'uname': + expected_os = tst_sys + assert run.out.rstrip() == expected_os diff --git a/test/test_unit_x_program.py b/test/test_unit_x_program.py new file mode 100644 index 0000000..3233a3d --- /dev/null +++ b/test/test_unit_x_program.py @@ -0,0 +1,46 @@ +"""Unit tests: yadm.[git,gpg]-program""" + +import os +import pytest + + +@pytest.mark.parametrize( + 'executable, success, value, match', [ + (None, True, 'program', None), + ('cat', True, 'cat', None), + ('badprogram', False, None, 'badprogram'), + ], ids=[ + 'executable missing', + 'valid alternative', + 'invalid alternative', + ]) +@pytest.mark.parametrize('program', ['git', 'gpg']) +def test_x_program( + runner, yadm_y, paths, program, executable, success, value, match): + """Set yadm.X-program, and test result of require_X""" + + # set configuration + if executable: + os.system(' '.join(yadm_y( + 'config', f'yadm.{program}-program', executable))) + + # test require_[git,gpg] + script = f""" + YADM_TEST=1 source {paths.pgm} + YADM_CONFIG="{paths.config}" + require_{program} + echo ${program.upper()}_PROGRAM + """ + run = runner(command=['bash'], inp=script) + assert run.success == success + assert run.err == '' + + # [GIT,GPG]_PROGRAM set correctly + if value == 'program': + assert run.out.rstrip() == program + elif value: + assert run.out.rstrip() == value + + # error reported about bad config + if match: + assert match in run.out diff --git a/test/test_version.py b/test/test_version.py new file mode 100644 index 0000000..023eb82 --- /dev/null +++ b/test/test_version.py @@ -0,0 +1,35 @@ +"""Test version""" + +import re +import pytest + + +@pytest.fixture(scope='module') +def expected_version(yadm): + """ + Expected semantic version number. This is taken directly out of yadm, + searching for the VERSION= string. + """ + yadm_version = re.findall( + r'VERSION=([^\n]+)', + open(yadm).read()) + if yadm_version: + return yadm_version[0] + pytest.fail(f'version not found in {yadm}') + return 'not found' + + +def test_semantic_version(expected_version): + """Version is semantic""" + # semantic version conforms to MAJOR.MINOR.PATCH + assert re.search(r'^\d+\.\d+\.\d+$', expected_version), ( + 'does not conform to MAJOR.MINOR.PATCH') + + +def test_reported_version( + runner, yadm_y, expected_version): + """Report correct version""" + run = runner(command=yadm_y('version')) + assert run.success + assert run.err == '' + assert run.out == f'yadm {expected_version}\n' diff --git a/test/utils.py b/test/utils.py new file mode 100644 index 0000000..411dde1 --- /dev/null +++ b/test/utils.py @@ -0,0 +1,59 @@ +"""Testing Utilities + +This module holds values/functions common to multiple tests. +""" + +import os + +ALT_FILE1 = 'test_alt' +ALT_FILE2 = 'test alt/test alt' + + +def set_local(paths, variable, value): + """Set local override""" + os.system( + f'GIT_DIR={str(paths.repo)} ' + f'git config --local "local.{variable}" "{value}"' + ) + + +def create_alt_files(paths, suffix, + preserve=False, tracked=True, + encrypt=False, exclude=False, + content=None): + """Create new files, and add to the repo + + This is used for testing alternate files. In each case, a suffix is + appended to two standard file paths. Particulars of the file creation and + repo handling are dependent upon the function arguments. + """ + + if not preserve: + if paths.work.join(ALT_FILE1).exists(): + paths.work.join(ALT_FILE1).remove(rec=1, ignore_errors=True) + assert not paths.work.join(ALT_FILE1).exists() + if paths.work.join(ALT_FILE2).exists(): + paths.work.join(ALT_FILE2).remove(rec=1, ignore_errors=True) + assert not paths.work.join(ALT_FILE2).exists() + + new_file1 = paths.work.join(ALT_FILE1 + suffix) + new_file1.write(ALT_FILE1 + suffix, ensure=True) + new_file2 = paths.work.join(ALT_FILE2 + suffix) + new_file2.write(ALT_FILE2 + suffix, ensure=True) + if content: + new_file1.write('\n' + content, mode='a', ensure=True) + new_file2.write('\n' + content, mode='a', ensure=True) + assert new_file1.exists() + assert new_file2.exists() + + if tracked: + for path in (new_file1, new_file2): + os.system(f'GIT_DIR={str(paths.repo)} git add "{path}"') + os.system(f'GIT_DIR={str(paths.repo)} git commit -m "Add test files"') + + if encrypt: + paths.encrypt.write(f'{ALT_FILE1 + suffix}\n', mode='a') + paths.encrypt.write(f'{ALT_FILE2 + suffix}\n', mode='a') + if exclude: + paths.encrypt.write(f'!{ALT_FILE1 + suffix}\n', mode='a') + paths.encrypt.write(f'!{ALT_FILE2 + suffix}\n', mode='a')