Rewrite testing system (#119)

The new test system is written with py.test. These tests are more
comprehensive, run faster by an order of magnitude, and are far more
maintainable. The tests themselves conform to PEP8.
This commit is contained in:
Tim Byrne 2018-07-11 07:50:42 -05:00
parent 09a018ea5a
commit e7f9616b39
No known key found for this signature in database
GPG Key ID: 14DB4FC2465A4B12
34 changed files with 3218 additions and 3 deletions

2
.gitignore vendored
View File

@ -1,4 +1,6 @@
.DS_Store
.env
.jekyll-metadata
.pytest_cache
.sass-cache
_site

View File

@ -1,9 +1,20 @@
FROM ubuntu:yakkety
FROM ubuntu:18.04
MAINTAINER Tim Byrne <sultan@locehilios.com>
# No input during build
ENV DEBIAN_FRONTEND noninteractive
# UTF8 locale
RUN apt-get update && apt-get install -y locales
RUN locale-gen en_US.UTF-8
ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en' LC_ALL='en_US.UTF-8'
# Convenience settings for the testbed's root account
RUN echo 'set -o vi' >> /root/.bashrc
# Install prerequisites
RUN apt-get update && apt-get install -y git gnupg1 make shellcheck bats expect curl python-pip lsb-release
RUN pip install envtpl
RUN apt-get update && apt-get install -y git gnupg1 make shellcheck=0.4.6-1 bats expect curl python3-pip lsb-release
RUN pip3 install envtpl pytest==3.6.4 pylint==1.9.2 flake8==3.5.0
# Force GNUPG version 1 at path /usr/bin/gpg
RUN ln -fs /usr/bin/gpg1 /usr/bin/gpg

View File

@ -23,6 +23,11 @@ test: bats shellcheck
parallel:
ls test/*bats | time parallel -q -P0 -- docker run --rm -v "$$PWD:/yadm:ro" yadm/testbed bash -c 'bats {}'
.PHONY: pytest
pytest:
@echo Running all pytest tests
@pytest -v
.PHONY: bats
bats:
@echo Running all bats tests
@ -58,3 +63,13 @@ man:
.PHONY: wide
wide:
man ./yadm.1
.PHONY: sync-clock
sync-clock:
docker run --rm --privileged alpine hwclock -s
.PHONY: .env
.env:
virtualenv --python=python3 .env
.env/bin/pip3 install --upgrade pip setuptools
.env/bin/pip3 install --upgrade pytest pylint==1.9.2 flake8==3.5.0

7
docker-compose.yml Normal file
View File

@ -0,0 +1,7 @@
---
version: '3'
services:
testbed:
volumes:
- .:/yadm:ro
image: yadm/testbed:latest

11
pylintrc Normal file
View File

@ -0,0 +1,11 @@
[BASIC]
good-names=pytestmark
[DESIGN]
max-args=14
max-locals=26
max-attributes=8
max-statements=65
[MESSAGES CONTROL]
disable=redefined-outer-name

3
pytest.ini Normal file
View File

@ -0,0 +1,3 @@
[pytest]
cache_dir = /tmp
addopts = -ra

559
test/conftest.py Normal file
View File

@ -0,0 +1,559 @@
"""Global tests configuration and fixtures"""
import collections
import copy
import distutils.dir_util # pylint: disable=no-name-in-module,import-error
import os
import platform
import pwd
from subprocess import Popen, PIPE
import pytest
@pytest.fixture(scope='session')
def shellcheck_version():
"""Version of shellcheck supported"""
return '0.4.6'
@pytest.fixture(scope='session')
def pylint_version():
"""Version of pylint supported"""
return '1.9.2'
@pytest.fixture(scope='session')
def flake8_version():
"""Version of flake8 supported"""
return '3.5.0'
@pytest.fixture(scope='session')
def tst_user():
"""Test session's user id"""
return pwd.getpwuid(os.getuid()).pw_name
@pytest.fixture(scope='session')
def tst_host():
"""Test session's short hostname value"""
return platform.node().split('.')[0]
@pytest.fixture(scope='session')
def tst_distro(runner):
"""Test session's distro"""
distro = ''
try:
run = runner(command=['lsb_release', '-si'], report=False)
distro = run.out.strip()
except BaseException:
pass
return distro
@pytest.fixture(scope='session')
def tst_sys():
"""Test session's uname value"""
return platform.system()
@pytest.fixture(scope='session')
def cygwin_sys():
"""CYGWIN uname id"""
return 'CYGWIN_NT-6.1-WOW64'
@pytest.fixture(scope='session')
def supported_commands():
"""List of supported commands
This list should be updated every time yadm learns a new command.
"""
return [
'alt',
'bootstrap',
'clean',
'clone',
'config',
'decrypt',
'encrypt',
'enter',
'gitconfig',
'help',
'init',
'introspect',
'list',
'perms',
'version',
]
@pytest.fixture(scope='session')
def supported_configs():
"""List of supported config options
This list should be updated every time yadm learns a new config.
"""
return [
'local.class',
'local.hostname',
'local.os',
'local.user',
'yadm.auto-alt',
'yadm.auto-perms',
'yadm.auto-private-dirs',
'yadm.cygwin-copy',
'yadm.git-program',
'yadm.gpg-perms',
'yadm.gpg-program',
'yadm.gpg-recipient',
'yadm.ssh-perms',
]
@pytest.fixture(scope='session')
def supported_switches():
"""List of supported switches
This list should be updated every time yadm learns a new switch.
"""
return [
'--yadm-archive',
'--yadm-bootstrap',
'--yadm-config',
'--yadm-dir',
'--yadm-encrypt',
'--yadm-repo',
'-Y',
]
@pytest.fixture(scope='session')
def supported_local_configs(supported_configs):
"""List of supported local config options"""
return [c for c in supported_configs if c.startswith('local.')]
class Runner(object):
"""Class for running commands
Within yadm tests, this object should be used when running commands that
require:
* Acting on the status code
* Parsing the output of the command
* Passing input to the command
Other instances of simply running commands should use os.system().
"""
def __init__(
self,
command,
inp=None,
shell=False,
cwd=None,
env=None,
expect=None,
report=True):
if shell:
self.command = ' '.join([str(cmd) for cmd in command])
else:
self.command = command
self.inp = inp
self.wrap(expect)
process = Popen(
self.command,
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
shell=shell,
cwd=cwd,
env=env,
)
input_bytes = self.inp
if self.inp:
input_bytes = self.inp.encode()
(out_bstream, err_bstream) = process.communicate(input=input_bytes)
self.out = out_bstream.decode()
self.err = err_bstream.decode()
self.code = process.wait()
self.success = self.code == 0
self.failure = self.code != 0
if report:
self.report()
def __repr__(self):
return f'Runner({self.command})'
def report(self):
"""Print code/stdout/stderr"""
print(f'{self}')
print(f' RUN: code:{self.code}')
if self.inp:
print(f' RUN: input:\n{self.inp}')
print(f' RUN: stdout:\n{self.out}')
print(f' RUN: stderr:\n{self.err}')
def wrap(self, expect):
"""Wrap command with expect"""
if not expect:
return
cmdline = ' '.join([f'"{w}"' for w in self.command])
expect_script = f'set timeout 2\nspawn {cmdline}\n'
for question, answer in expect:
expect_script += (
'expect {\n'
f'"{question}" {{send "{answer}\\r"}}\n'
'timeout {close;exit 128}\n'
'}\n')
expect_script += (
'expect eof\n'
'foreach {pid spawnid os_error_flag value} [wait] break\n'
'exit $value')
self.inp = expect_script
print(f'EXPECT:{expect_script}')
self.command = ['expect']
@pytest.fixture(scope='session')
def runner():
"""Class for running commands"""
return Runner
@pytest.fixture(scope='session')
def config_git():
"""Configure global git configuration, if missing"""
os.system(
'git config user.name || '
'git config --global user.name "test"')
os.system(
'git config user.email || '
'git config --global user.email "test@test.test"')
return None
@pytest.fixture()
def repo_config(runner, paths):
"""Function to query a yadm repo configuration value"""
def query_func(key):
"""Query a yadm repo configuration value"""
run = runner(
command=('git', 'config', '--local', key),
env={'GIT_DIR': paths.repo},
report=False,
)
return run.out.rstrip()
return query_func
@pytest.fixture(scope='session')
def yadm():
"""Path to yadm program to be tested"""
full_path = os.path.realpath('yadm')
assert os.path.isfile(full_path), "yadm program file isn't present"
return full_path
@pytest.fixture()
def paths(tmpdir, yadm):
"""Function scoped test paths"""
dir_root = tmpdir.mkdir('root')
dir_work = dir_root.mkdir('work')
dir_yadm = dir_root.mkdir('yadm')
dir_repo = dir_yadm.mkdir('repo.git')
dir_hooks = dir_yadm.mkdir('hooks')
dir_remote = dir_root.mkdir('remote')
file_archive = dir_yadm.join('files.gpg')
file_bootstrap = dir_yadm.join('bootstrap')
file_config = dir_yadm.join('config')
file_encrypt = dir_yadm.join('encrypt')
paths = collections.namedtuple(
'Paths', [
'pgm',
'root',
'work',
'yadm',
'repo',
'hooks',
'remote',
'archive',
'bootstrap',
'config',
'encrypt',
])
return paths(
yadm,
dir_root,
dir_work,
dir_yadm,
dir_repo,
dir_hooks,
dir_remote,
file_archive,
file_bootstrap,
file_config,
file_encrypt,
)
@pytest.fixture()
def yadm_y(paths):
"""Generate custom command_list function"""
def command_list(*args):
"""Produce params for running yadm with -Y"""
return [paths.pgm, '-Y', str(paths.yadm)] + list(args)
return command_list
class DataFile(object):
"""Datafile object"""
def __init__(self, path, tracked=True, private=False):
self.__path = path
self.__parent = None
self.__tracked = tracked
self.__private = private
@property
def path(self):
"""Path property"""
return self.__path
@property
def relative(self):
"""Relative path property"""
if self.__parent:
return self.__parent.join(self.path)
raise BaseException('Unable to provide relative path, no parent')
@property
def tracked(self):
"""Tracked property"""
return self.__tracked
@property
def private(self):
"""Private property"""
return self.__private
def relative_to(self, parent):
"""Update all relative paths to this py.path"""
self.__parent = parent
return
class DataSet(object):
"""Dataset object"""
def __init__(self):
self.__files = list()
self.__dirs = list()
self.__tracked_dirs = list()
self.__private_dirs = list()
self.__relpath = None
def __repr__(self):
return (
f'[DS with {len(self)} files; '
f'{len(self.tracked)} tracked, '
f'{len(self.private)} private]'
)
def __iter__(self):
return iter(self.__files)
def __len__(self):
return len(self.__files)
def __contains__(self, datafile):
if [f for f in self.__files if f.path == datafile]:
return True
if datafile in self.__files:
return True
return False
@property
def files(self):
"""List of DataFiles in DataSet"""
return list(self.__files)
@property
def tracked(self):
"""List of tracked DataFiles in DataSet"""
return [f for f in self.__files if f.tracked]
@property
def private(self):
"""List of private DataFiles in DataSet"""
return [f for f in self.__files if f.private]
@property
def dirs(self):
"""List of directories in DataSet"""
return list(self.__dirs)
@property
def plain_dirs(self):
"""List of directories in DataSet not starting with '.'"""
return [d for d in self.dirs if not d.startswith('.')]
@property
def hidden_dirs(self):
"""List of directories in DataSet starting with '.'"""
return [d for d in self.dirs if d.startswith('.')]
@property
def tracked_dirs(self):
"""List of directories in DataSet not starting with '.'"""
return [d for d in self.__tracked_dirs if not d.startswith('.')]
@property
def private_dirs(self):
"""List of directories in DataSet considered 'private'"""
return list(self.__private_dirs)
def add_file(self, path, tracked=True, private=False):
"""Add file to data set"""
if path not in self:
datafile = DataFile(path, tracked, private)
if self.__relpath:
datafile.relative_to(self.__relpath)
self.__files.append(datafile)
dname = os.path.dirname(path)
if dname and dname not in self.__dirs:
self.__dirs.append(dname)
if tracked:
self.__tracked_dirs.append(dname)
if private:
self.__private_dirs.append(dname)
def relative_to(self, relpath):
"""Update all relative paths to this py.path"""
self.__relpath = relpath
for datafile in self.files:
datafile.relative_to(self.__relpath)
return
@pytest.fixture(scope='session')
def ds1_dset(tst_sys, cygwin_sys):
"""Meta-data for dataset one files"""
dset = DataSet()
dset.add_file('t1')
dset.add_file('d1/t2')
dset.add_file(f'test_alt##S')
dset.add_file(f'test_alt##S.H')
dset.add_file(f'test_alt##S.H.U')
dset.add_file(f'test_alt##C.S.H.U')
dset.add_file(f'test alt/test alt##S')
dset.add_file(f'test alt/test alt##S.H')
dset.add_file(f'test alt/test alt##S.H.U')
dset.add_file(f'test alt/test alt##C.S.H.U')
dset.add_file(f'test_cygwin_copy##{tst_sys}')
dset.add_file(f'test_cygwin_copy##{cygwin_sys}')
dset.add_file('u1', tracked=False)
dset.add_file('d2/u2', tracked=False)
dset.add_file('.ssh/p1', tracked=False, private=True)
dset.add_file('.ssh/.p2', tracked=False, private=True)
dset.add_file('.gnupg/p3', tracked=False, private=True)
dset.add_file('.gnupg/.p4', tracked=False, private=True)
return dset
@pytest.fixture(scope='session')
def ds1_data(tmpdir_factory, config_git, ds1_dset, runner):
"""A set of test data, worktree & repo"""
# pylint: disable=unused-argument
# This is ignored because
# @pytest.mark.usefixtures('config_git')
# cannot be applied to another fixture.
data = tmpdir_factory.mktemp('ds1')
work = data.mkdir('work')
for datafile in ds1_dset:
work.join(datafile.path).write(datafile.path, ensure=True)
repo = data.mkdir('repo.git')
env = os.environ.copy()
env['GIT_DIR'] = str(repo)
runner(
command=['git', 'init', '--shared=0600', '--bare', str(repo)],
report=False)
runner(
command=['git', 'config', 'core.bare', 'false'],
env=env,
report=False)
runner(
command=['git', 'config', 'status.showUntrackedFiles', 'no'],
env=env,
report=False)
runner(
command=['git', 'config', 'yadm.managed', 'true'],
env=env,
report=False)
runner(
command=['git', 'config', 'core.worktree', str(work)],
env=env,
report=False)
runner(
command=['git', 'add'] +
[str(work.join(f.path)) for f in ds1_dset if f.tracked],
env=env)
runner(
command=['git', 'commit', '--allow-empty', '-m', 'Initial commit'],
env=env,
report=False)
data = collections.namedtuple('Data', ['work', 'repo'])
return data(work, repo)
@pytest.fixture()
def ds1_work_copy(ds1_data, paths):
"""Function scoped copy of ds1_data.work"""
distutils.dir_util.copy_tree( # pylint: disable=no-member
str(ds1_data.work), str(paths.work))
return None
@pytest.fixture()
def ds1_repo_copy(runner, ds1_data, paths):
"""Function scoped copy of ds1_data.repo"""
distutils.dir_util.copy_tree( # pylint: disable=no-member
str(ds1_data.repo), str(paths.repo))
env = os.environ.copy()
env['GIT_DIR'] = str(paths.repo)
runner(
command=['git', 'config', 'core.worktree', str(paths.work)],
env=env,
report=False)
return None
@pytest.fixture()
def ds1_copy(ds1_work_copy, ds1_repo_copy):
"""Function scoped copy of ds1_data"""
# pylint: disable=unused-argument
# This is ignored because
# @pytest.mark.usefixtures('ds1_work_copy', 'ds1_repo_copy')
# cannot be applied to another fixture.
return None
@pytest.fixture()
def ds1(ds1_work_copy, paths, ds1_dset):
"""Function scoped ds1_dset w/paths"""
# pylint: disable=unused-argument
# This is ignored because
# @pytest.mark.usefixtures('ds1_copy')
# cannot be applied to another fixture.
dscopy = copy.deepcopy(ds1_dset)
dscopy.relative_to(copy.deepcopy(paths.work))
return dscopy

1
test/pylintrc Symbolic link
View File

@ -0,0 +1 @@
../pylintrc

345
test/test_alt.py Normal file
View File

@ -0,0 +1,345 @@
"""Test alt"""
import os
import re
import string
import pytest
import utils
# These test IDs are broken. During the writing of these tests, problems have
# been discovered in the way yadm orders matching files.
BROKEN_TEST_IDS = [
'test_wild[tracked-##C.S.H.U-C-S%-H%-U]',
'test_wild[tracked-##C.S.H.U-C-S-H%-U]',
'test_wild[encrypted-##C.S.H.U-C-S%-H%-U]',
'test_wild[encrypted-##C.S.H.U-C-S-H%-U]',
]
PRECEDENCE = [
'##',
'##$tst_sys',
'##$tst_sys.$tst_host',
'##$tst_sys.$tst_host.$tst_user',
'##$tst_class',
'##$tst_class.$tst_sys',
'##$tst_class.$tst_sys.$tst_host',
'##$tst_class.$tst_sys.$tst_host.$tst_user',
]
WILD_TEMPLATES = [
'##$tst_class',
'##$tst_class.$tst_sys',
'##$tst_class.$tst_sys.$tst_host',
'##$tst_class.$tst_sys.$tst_host.$tst_user',
]
WILD_TESTED = set()
@pytest.mark.parametrize('precedence_index', range(len(PRECEDENCE)))
@pytest.mark.parametrize(
'tracked, encrypt, exclude', [
(False, False, False),
(True, False, False),
(False, True, False),
(False, True, True),
], ids=[
'untracked',
'tracked',
'encrypted',
'excluded',
])
@pytest.mark.usefixtures('ds1_copy')
def test_alt(runner, yadm_y, paths,
tst_sys, tst_host, tst_user,
tracked, encrypt, exclude,
precedence_index):
"""Test alternate linking
This test is done by iterating for the number of templates in PRECEDENCE.
With each iteration, another file is left off the list. So with each
iteration, the template with the "highest precedence" is left out. The file
using the highest precedence should be the one linked.
"""
# set the class
tst_class = 'testclass'
utils.set_local(paths, 'class', tst_class)
# process the templates in PRECEDENCE
precedence = list()
for template in PRECEDENCE:
precedence.append(
string.Template(template).substitute(
tst_class=tst_class,
tst_host=tst_host,
tst_sys=tst_sys,
tst_user=tst_user,
)
)
# create files using a subset of files
for suffix in precedence[0:precedence_index+1]:
utils.create_alt_files(paths, suffix, tracked=tracked,
encrypt=encrypt, exclude=exclude)
# run alt to trigger linking
run = runner(yadm_y('alt'))
assert run.success
assert run.err == ''
linked = linked_list(run.out)
# assert the proper linking has occurred
for file_path in (utils.ALT_FILE1, utils.ALT_FILE2):
source_file = file_path + precedence[precedence_index]
if tracked or (encrypt and not exclude):
assert paths.work.join(file_path).islink()
assert paths.work.join(file_path).read() == source_file
assert str(paths.work.join(source_file)) in linked
else:
assert not paths.work.join(file_path).exists()
assert str(paths.work.join(source_file)) not in linked
def short_template(template):
"""Translate template into something short for test IDs"""
return string.Template(template).substitute(
tst_class='C',
tst_host='H',
tst_sys='S',
tst_user='U',
)
@pytest.mark.parametrize('wild_user', [True, False], ids=['U%', 'U'])
@pytest.mark.parametrize('wild_host', [True, False], ids=['H%', 'H'])
@pytest.mark.parametrize('wild_sys', [True, False], ids=['S%', 'S'])
@pytest.mark.parametrize('wild_class', [True, False], ids=['C%', 'C'])
@pytest.mark.parametrize('template', WILD_TEMPLATES, ids=short_template)
@pytest.mark.parametrize(
'tracked, encrypt', [
(True, False),
(False, True),
], ids=[
'tracked',
'encrypted',
])
@pytest.mark.usefixtures('ds1_copy')
def test_wild(request, runner, yadm_y, paths,
tst_sys, tst_host, tst_user,
tracked, encrypt,
wild_class, wild_host, wild_sys, wild_user,
template):
"""Test wild linking
These tests are done by creating permutations of the possible files using
WILD_TEMPLATES. Each case is then tested (while skipping the already tested
permutations for efficiency).
"""
if request.node.name in BROKEN_TEST_IDS:
pytest.xfail(
'This test is known to be broken. '
'This bug needs to be fixed.')
tst_class = 'testclass'
# determine the "wild" version of the suffix
str_class = '%' if wild_class else tst_class
str_host = '%' if wild_host else tst_host
str_sys = '%' if wild_sys else tst_sys
str_user = '%' if wild_user else tst_user
wild_suffix = string.Template(template).substitute(
tst_class=str_class,
tst_host=str_host,
tst_sys=str_sys,
tst_user=str_user,
)
# determine the "standard" version of the suffix
std_suffix = string.Template(template).substitute(
tst_class=tst_class,
tst_host=tst_host,
tst_sys=tst_sys,
tst_user=tst_user,
)
# skip over duplicate tests (this seems to be the simplest way to cover the
# permutations of tests, while skipping duplicates.)
test_key = f'{tracked}{encrypt}{wild_suffix}{std_suffix}'
if test_key in WILD_TESTED:
return
else:
WILD_TESTED.add(test_key)
# set the class
utils.set_local(paths, 'class', tst_class)
# create files using the wild suffix
utils.create_alt_files(paths, wild_suffix, tracked=tracked,
encrypt=encrypt, exclude=False)
# run alt to trigger linking
run = runner(yadm_y('alt'))
assert run.success
assert run.err == ''
linked = linked_list(run.out)
# assert the proper linking has occurred
for file_path in (utils.ALT_FILE1, utils.ALT_FILE2):
source_file = file_path + wild_suffix
assert paths.work.join(file_path).islink()
assert paths.work.join(file_path).read() == source_file
assert str(paths.work.join(source_file)) in linked
# create files using the standard suffix
utils.create_alt_files(paths, std_suffix, tracked=tracked,
encrypt=encrypt, exclude=False)
# run alt to trigger linking
run = runner(yadm_y('alt'))
assert run.success
assert run.err == ''
linked = linked_list(run.out)
# assert the proper linking has occurred
for file_path in (utils.ALT_FILE1, utils.ALT_FILE2):
source_file = file_path + std_suffix
assert paths.work.join(file_path).islink()
assert paths.work.join(file_path).read() == source_file
assert str(paths.work.join(source_file)) in linked
@pytest.mark.usefixtures('ds1_copy')
def test_local_override(runner, yadm_y, paths,
tst_sys, tst_host, tst_user):
"""Test local overrides"""
# define local overrides
utils.set_local(paths, 'class', 'or-class')
utils.set_local(paths, 'hostname', 'or-hostname')
utils.set_local(paths, 'os', 'or-os')
utils.set_local(paths, 'user', 'or-user')
# create files, the first would normally be the most specific version
# however, the second is the overridden version which should be preferred.
utils.create_alt_files(
paths, f'##or-class.{tst_sys}.{tst_host}.{tst_user}')
utils.create_alt_files(
paths, '##or-class.or-os.or-hostname.or-user')
# run alt to trigger linking
run = runner(yadm_y('alt'))
assert run.success
assert run.err == ''
linked = linked_list(run.out)
# assert the proper linking has occurred
for file_path in (utils.ALT_FILE1, utils.ALT_FILE2):
source_file = file_path + '##or-class.or-os.or-hostname.or-user'
assert paths.work.join(file_path).islink()
assert paths.work.join(file_path).read() == source_file
assert str(paths.work.join(source_file)) in linked
@pytest.mark.parametrize('suffix', ['AAA', 'ZZZ', 'aaa', 'zzz'])
@pytest.mark.usefixtures('ds1_copy')
def test_class_case(runner, yadm_y, paths, tst_sys, suffix):
"""Test range of class cases"""
# set the class
utils.set_local(paths, 'class', suffix)
# create files
endings = [suffix]
if tst_sys == 'Linux':
# Only create all of these side-by-side on Linux, which is
# unquestionably case-sensitive. This would break tests on
# case-insensitive systems.
endings = ['AAA', 'ZZZ', 'aaa', 'zzz']
for ending in endings:
utils.create_alt_files(paths, f'##{ending}')
# run alt to trigger linking
run = runner(yadm_y('alt'))
assert run.success
assert run.err == ''
linked = linked_list(run.out)
# assert the proper linking has occurred
for file_path in (utils.ALT_FILE1, utils.ALT_FILE2):
source_file = file_path + f'##{suffix}'
assert paths.work.join(file_path).islink()
assert paths.work.join(file_path).read() == source_file
assert str(paths.work.join(source_file)) in linked
@pytest.mark.parametrize('autoalt', [None, 'true', 'false'])
@pytest.mark.usefixtures('ds1_copy')
def test_auto_alt(runner, yadm_y, paths, autoalt):
"""Test setting auto-alt"""
# set the value of auto-alt
if autoalt:
os.system(' '.join(yadm_y('config', 'yadm.auto-alt', autoalt)))
# create file
suffix = '##'
utils.create_alt_files(paths, suffix)
# run status to possibly trigger linking
run = runner(yadm_y('status'))
assert run.success
assert run.err == ''
linked = linked_list(run.out)
# assert the proper linking has occurred
for file_path in (utils.ALT_FILE1, utils.ALT_FILE2):
source_file = file_path + suffix
if autoalt == 'false':
assert not paths.work.join(file_path).exists()
else:
assert paths.work.join(file_path).islink()
assert paths.work.join(file_path).read() == source_file
# no linking output when run via auto-alt
assert str(paths.work.join(source_file)) not in linked
@pytest.mark.parametrize('delimiter', ['.', '_'])
@pytest.mark.usefixtures('ds1_copy')
def test_delimiter(runner, yadm_y, paths,
tst_sys, tst_host, tst_user, delimiter):
"""Test delimiters used"""
suffix = '##' + delimiter.join([tst_sys, tst_host, tst_user])
# create file
utils.create_alt_files(paths, suffix)
# run alt to trigger linking
run = runner(yadm_y('alt'))
assert run.success
assert run.err == ''
linked = linked_list(run.out)
# assert the proper linking has occurred
# only a delimiter of '.' is valid
for file_path in (utils.ALT_FILE1, utils.ALT_FILE2):
source_file = file_path + suffix
if delimiter == '.':
assert paths.work.join(file_path).islink()
assert paths.work.join(file_path).read() == source_file
assert str(paths.work.join(source_file)) in linked
else:
assert not paths.work.join(file_path).exists()
assert str(paths.work.join(source_file)) not in linked
def linked_list(output):
"""Parse output, and return list of linked files"""
linked = dict()
for line in output.splitlines():
match = re.match('Linking (.+) to (.+)$', line)
if match:
linked[match.group(2)] = match.group(1)
return linked.values()

View File

@ -0,0 +1,106 @@
"""Test asserting private directories"""
import os
import re
import pytest
pytestmark = pytest.mark.usefixtures('ds1_copy')
PRIVATE_DIRS = ['.gnupg', '.ssh']
def test_pdirs_missing(runner, yadm_y, paths):
"""Private dirs (private dirs missing)
When a git command is run
And private directories are missing
Create private directories prior to command
"""
# confirm directories are missing at start
for pdir in PRIVATE_DIRS:
path = paths.work.join(pdir)
if path.exists():
path.remove()
assert not path.exists()
# run status
run = runner(command=yadm_y('status'), env={'DEBUG': 'yes'})
assert run.success
assert run.err == ''
assert 'On branch master' in run.out
# confirm directories are created
# and are protected
for pdir in PRIVATE_DIRS:
path = paths.work.join(pdir)
assert path.exists()
assert oct(path.stat().mode).endswith('00'), 'Directory is not secured'
# confirm directories are created before command is run:
assert re.search(
r'Creating.+\.gnupg.+Creating.+\.ssh.+Running git command git status',
run.out, re.DOTALL), 'directories created before command is run'
def test_pdirs_missing_apd_false(runner, yadm_y, paths):
"""Private dirs (private dirs missing / yadm.auto-private-dirs=false)
When a git command is run
And private directories are missing
But auto-private-dirs is false
Do not create private dirs
"""
# confirm directories are missing at start
for pdir in PRIVATE_DIRS:
path = paths.work.join(pdir)
if path.exists():
path.remove()
assert not path.exists()
# set configuration
os.system(' '.join(yadm_y(
'config', '--bool', 'yadm.auto-private-dirs', 'false')))
# run status
run = runner(command=yadm_y('status'))
assert run.success
assert run.err == ''
assert 'On branch master' in run.out
# confirm directories are STILL missing
for pdir in PRIVATE_DIRS:
assert not paths.work.join(pdir).exists()
def test_pdirs_exist_apd_false(runner, yadm_y, paths):
"""Private dirs (private dirs exist / yadm.auto-perms=false)
When a git command is run
And private directories exist
And yadm is configured not to auto update perms
Do not alter directories
"""
# create permissive directories
for pdir in PRIVATE_DIRS:
path = paths.work.join(pdir)
if not path.isdir():
path.mkdir()
path.chmod(0o777)
assert oct(path.stat().mode).endswith('77'), 'Directory is secure.'
# set configuration
os.system(' '.join(yadm_y(
'config', '--bool', 'yadm.auto-perms', 'false')))
# run status
run = runner(command=yadm_y('status'))
assert run.success
assert run.err == ''
assert 'On branch master' in run.out
# created directories are STILL permissive
for pdir in PRIVATE_DIRS:
path = paths.work.join(pdir)
assert oct(path.stat().mode).endswith('77'), 'Directory is secure'

31
test/test_bootstrap.py Normal file
View File

@ -0,0 +1,31 @@
"""Test bootstrap"""
import pytest
@pytest.mark.parametrize(
'exists, executable, code, expect', [
(False, False, 1, 'Cannot execute bootstrap'),
(True, False, 1, 'is not an executable program'),
(True, True, 123, 'Bootstrap successful'),
], ids=[
'missing',
'not executable',
'executable',
])
def test_bootstrap(
runner, yadm_y, paths, exists, executable, code, expect):
"""Test bootstrap command"""
if exists:
paths.bootstrap.write('')
if executable:
paths.bootstrap.write(
'#!/bin/bash\n'
f'echo {expect}\n'
f'exit {code}\n'
)
paths.bootstrap.chmod(0o775)
run = runner(command=yadm_y('bootstrap'))
assert run.code == code
assert run.err == ''
assert expect in run.out

11
test/test_clean.py Normal file
View File

@ -0,0 +1,11 @@
"""Test clean"""
def test_clean_command(runner, yadm_y):
"""Run with clean command"""
run = runner(command=yadm_y('clean'))
# do nothing, this is a dangerous Git command when managing dot files
# report the command as disabled and exit as a failure
assert run.failure
assert run.err == ''
assert 'disabled' in run.out

274
test/test_clone.py Normal file
View File

@ -0,0 +1,274 @@
"""Test clone"""
import os
import re
import pytest
BOOTSTRAP_CODE = 123
BOOTSTRAP_MSG = 'Bootstrap successful'
@pytest.mark.usefixtures('remote')
@pytest.mark.parametrize(
'good_remote, repo_exists, force, conflicts', [
(False, False, False, False),
(True, False, False, False),
(True, True, False, False),
(True, True, True, False),
(True, False, False, True),
], ids=[
'bad remote',
'simple',
'existing repo',
'-f',
'conflicts',
])
def test_clone(
runner, paths, yadm_y, repo_config, ds1,
good_remote, repo_exists, force, conflicts):
"""Test basic clone operation"""
# determine remote url
remote_url = f'file://{paths.remote}'
if not good_remote:
remote_url = 'file://bad_remote'
old_repo = None
if repo_exists:
# put a repo in the way
paths.repo.mkdir()
old_repo = paths.repo.join('old_repo')
old_repo.write('old_repo')
if conflicts:
ds1.tracked[0].relative.write('conflict')
assert ds1.tracked[0].relative.exists()
# run the clone command
args = ['clone', '-w', paths.work]
if force:
args += ['-f']
args += [remote_url]
run = runner(command=yadm_y(*args))
if not good_remote:
# clone should fail
assert run.failure
assert run.err != ''
assert 'Unable to fetch origin' in run.out
assert not paths.repo.exists()
elif repo_exists and not force:
# can't overwrite data
assert run.failure
assert run.err == ''
assert 'Git repo already exists' in run.out
else:
# clone should succeed, and repo should be configured properly
assert successful_clone(run, paths, repo_config)
# ensure conflicts are handled properly
if conflicts:
assert 'NOTE' in run.out
assert 'Merging origin/master failed' in run.out
assert 'Conflicts preserved' in run.out
# confirm correct Git origin
run = runner(
command=('git', 'remote', '-v', 'show'),
env={'GIT_DIR': paths.repo})
assert run.success
assert run.err == ''
assert f'origin\t{remote_url}' in run.out
# ensure conflicts are really preserved
if conflicts:
# test to see if the work tree is actually "clean"
run = runner(
command=yadm_y('status', '-uno', '--porcelain'),
cwd=paths.work)
assert run.success
assert run.err == ''
assert run.out == '', 'worktree has unexpected changes'
# test to see if the conflicts are stashed
run = runner(command=yadm_y('stash', 'list'), cwd=paths.work)
assert run.success
assert run.err == ''
assert 'Conflicts preserved' in run.out, 'conflicts not stashed'
# verify content of the stashed conflicts
run = runner(command=yadm_y('stash', 'show', '-p'), cwd=paths.work)
assert run.success
assert run.err == ''
assert '\n+conflict' in run.out, 'conflicts not stashed'
# another force-related assertion
if old_repo:
if force:
assert not old_repo.exists()
else:
assert old_repo.exists()
@pytest.mark.usefixtures('remote')
@pytest.mark.parametrize(
'bs_exists, bs_param, answer', [
(False, '--bootstrap', None),
(True, '--bootstrap', None),
(True, '--no-bootstrap', None),
(True, None, 'n'),
(True, None, 'y'),
], ids=[
'force, missing',
'force, existing',
'prevent',
'existing, answer n',
'existing, answer y',
])
def test_clone_bootstrap(
runner, paths, yadm_y, repo_config, bs_exists, bs_param, answer):
"""Test bootstrap clone features"""
# establish a bootstrap
create_bootstrap(paths, bs_exists)
# run the clone command
args = ['clone', '-w', paths.work]
if bs_param:
args += [bs_param]
args += [f'file://{paths.remote}']
expect = []
if answer:
expect.append(('Would you like to execute it now', answer))
run = runner(command=yadm_y(*args), expect=expect)
if answer:
assert 'Would you like to execute it now' in run.out
expected_code = 0
if bs_exists and bs_param != '--no-bootstrap':
expected_code = BOOTSTRAP_CODE
if answer == 'y':
expected_code = BOOTSTRAP_CODE
assert BOOTSTRAP_MSG in run.out
elif answer == 'n':
expected_code = 0
assert BOOTSTRAP_MSG not in run.out
assert successful_clone(run, paths, repo_config, expected_code)
if not bs_exists:
assert BOOTSTRAP_MSG not in run.out
def create_bootstrap(paths, exists):
"""Create bootstrap file for test"""
if exists:
paths.bootstrap.write(
'#!/bin/sh\n'
f'echo {BOOTSTRAP_MSG}\n'
f'exit {BOOTSTRAP_CODE}\n')
paths.bootstrap.chmod(0o775)
assert paths.bootstrap.exists()
else:
assert not paths.bootstrap.exists()
@pytest.mark.usefixtures('remote')
@pytest.mark.parametrize(
'private_type, in_repo, in_work', [
('ssh', False, True),
('gnupg', False, True),
('ssh', True, True),
('gnupg', True, True),
('ssh', True, False),
('gnupg', True, False),
], ids=[
'open ssh, not tracked',
'open gnupg, not tracked',
'open ssh, tracked',
'open gnupg, tracked',
'missing ssh, tracked',
'missing gnupg, tracked',
])
def test_clone_perms(
runner, yadm_y, paths, repo_config,
private_type, in_repo, in_work):
"""Test clone permission-related functions"""
# update remote repo to include private data
if in_repo:
rpath = paths.work.mkdir(f'.{private_type}').join('related')
rpath.write('related')
os.system(f'GIT_DIR="{paths.remote}" git add {rpath}')
os.system(f'GIT_DIR="{paths.remote}" git commit -m "{rpath}"')
rpath.remove()
# ensure local private data is insecure at the start
if in_work:
pdir = paths.work.join(f'.{private_type}')
if not pdir.exists():
pdir.mkdir()
pfile = pdir.join('existing')
pfile.write('existing')
pdir.chmod(0o777)
pfile.chmod(0o777)
else:
paths.work.remove()
paths.work.mkdir()
run = runner(
yadm_y('clone', '-d', '-w', paths.work, f'file://{paths.remote}'))
assert successful_clone(run, paths, repo_config)
if in_work:
# private directories which already exist, should be left as they are,
# which in this test is "insecure".
assert re.search(
f'initial private dir perms drwxrwxrwx.+.{private_type}',
run.out)
assert re.search(
f'pre-merge private dir perms drwxrwxrwx.+.{private_type}',
run.out)
assert re.search(
f'post-merge private dir perms drwxrwxrwx.+.{private_type}',
run.out)
else:
# private directories which are created, should be done prior to
# merging, and with secure permissions.
assert 'initial private dir perms' not in run.out
assert re.search(
f'pre-merge private dir perms drwx------.+.{private_type}',
run.out)
assert re.search(
f'post-merge private dir perms drwx------.+.{private_type}',
run.out)
# standard perms still apply afterwards unless disabled with auto.perms
assert oct(
paths.work.join(f'.{private_type}').stat().mode).endswith('00'), (
f'.{private_type} has not been secured by auto.perms')
def successful_clone(run, paths, repo_config, expected_code=0):
"""Assert clone is successful"""
assert run.code == expected_code
assert 'Initialized' in run.out
assert oct(paths.repo.stat().mode).endswith('00'), 'Repo is not secured'
assert repo_config('core.bare') == 'false'
assert repo_config('status.showUntrackedFiles') == 'no'
assert repo_config('yadm.managed') == 'true'
return True
@pytest.fixture()
def remote(paths, ds1_repo_copy):
"""Function scoped remote (based on ds1)"""
# pylint: disable=unused-argument
# This is ignored because
# @pytest.mark.usefixtures('ds1_remote_copy')
# cannot be applied to another fixture.
paths.remote.remove()
paths.repo.move(paths.remote)
return None

139
test/test_config.py Normal file
View File

@ -0,0 +1,139 @@
"""Test config"""
import os
import pytest
TEST_SECTION = 'test'
TEST_ATTRIBUTE = 'attribute'
TEST_KEY = f'{TEST_SECTION}.{TEST_ATTRIBUTE}'
TEST_VALUE = 'testvalue'
TEST_FILE = f'[{TEST_SECTION}]\n\t{TEST_ATTRIBUTE} = {TEST_VALUE}'
def test_config_no_params(runner, yadm_y, supported_configs):
"""No parameters
Display instructions
Display supported configs
Exit with 0
"""
run = runner(yadm_y('config'))
assert run.success
assert run.err == ''
assert 'Please read the CONFIGURATION section' in run.out
for config in supported_configs:
assert config in run.out
def test_config_read_missing(runner, yadm_y):
"""Read missing attribute
Display an empty value
Exit with 0
"""
run = runner(yadm_y('config', TEST_KEY))
assert run.success
assert run.err == ''
assert run.out == ''
def test_config_write(runner, yadm_y, paths):
"""Write attribute
Display no output
Update configuration file
Exit with 0
"""
run = runner(yadm_y('config', TEST_KEY, TEST_VALUE))
assert run.success
assert run.err == ''
assert run.out == ''
assert paths.config.read().strip() == TEST_FILE
def test_config_read(runner, yadm_y, paths):
"""Read attribute
Display value
Exit with 0
"""
paths.config.write(TEST_FILE)
run = runner(yadm_y('config', TEST_KEY))
assert run.success
assert run.err == ''
assert run.out.strip() == TEST_VALUE
def test_config_update(runner, yadm_y, paths):
"""Update attribute
Display no output
Update configuration file
Exit with 0
"""
paths.config.write(TEST_FILE)
run = runner(yadm_y('config', TEST_KEY, TEST_VALUE + 'extra'))
assert run.success
assert run.err == ''
assert run.out == ''
assert paths.config.read().strip() == TEST_FILE + 'extra'
@pytest.mark.usefixtures('ds1_repo_copy')
def test_config_local_read(runner, yadm_y, paths, supported_local_configs):
"""Read local attribute
Display value from the repo config
Exit with 0
"""
# populate test values
for config in supported_local_configs:
os.system(
f'GIT_DIR="{paths.repo}" '
f'git config --local "{config}" "value_of_{config}"')
# run yadm config
for config in supported_local_configs:
run = runner(yadm_y('config', config))
assert run.success
assert run.err == ''
assert run.out.strip() == f'value_of_{config}'
@pytest.mark.usefixtures('ds1_repo_copy')
def test_config_local_write(runner, yadm_y, paths, supported_local_configs):
"""Write local attribute
Display no output
Write value to the repo config
Exit with 0
"""
# run yadm config
for config in supported_local_configs:
run = runner(yadm_y('config', config, f'value_of_{config}'))
assert run.success
assert run.err == ''
assert run.out == ''
# verify test values
for config in supported_local_configs:
run = runner(
command=('git', 'config', config),
env={'GIT_DIR': paths.repo})
assert run.success
assert run.err == ''
assert run.out.strip() == f'value_of_{config}'

59
test/test_cygwin_copy.py Normal file
View File

@ -0,0 +1,59 @@
"""Test yadm.cygwin_copy"""
import os
import pytest
@pytest.mark.parametrize(
'setting, is_cygwin, expect_link, pre_existing', [
(None, False, True, None),
(True, False, True, None),
(False, False, True, None),
(None, True, True, None),
(True, True, False, None),
(False, True, True, None),
(True, True, False, 'link'),
(True, True, False, 'file'),
],
ids=[
'unset, non-cygwin',
'true, non-cygwin',
'false, non-cygwin',
'unset, cygwin',
'true, cygwin',
'false, cygwin',
'pre-existing symlink',
'pre-existing file',
])
@pytest.mark.usefixtures('ds1_copy')
def test_cygwin_copy(
runner, yadm_y, paths, cygwin_sys, tst_sys,
setting, is_cygwin, expect_link, pre_existing):
"""Test yadm.cygwin_copy"""
if setting is not None:
os.system(' '.join(yadm_y('config', 'yadm.cygwin-copy', str(setting))))
expected_content = f'test_cygwin_copy##{tst_sys}'
alt_path = paths.work.join('test_cygwin_copy')
if pre_existing == 'symlink':
alt_path.mklinkto(expected_content)
elif pre_existing == 'file':
alt_path.write('wrong content')
uname_path = paths.root.join('tmp').mkdir()
if is_cygwin:
uname = uname_path.join('uname')
uname.write(f'#!/bin/sh\necho "{cygwin_sys}"\n')
uname.chmod(0o777)
expected_content = f'test_cygwin_copy##{cygwin_sys}'
env = os.environ.copy()
env['PATH'] = ':'.join([str(uname_path), env['PATH']])
run = runner(yadm_y('alt'), env=env)
assert run.success
assert run.err == ''
assert 'Linking' in run.out
assert alt_path.read() == expected_content
assert alt_path.islink() == expect_link

392
test/test_encryption.py Normal file
View File

@ -0,0 +1,392 @@
"""Test encryption"""
import os
import pipes
import pytest
KEY_FILE = 'test/test_key'
KEY_FINGERPRINT = 'F8BBFC746C58945442349BCEBA54FFD04C599B1A'
KEY_NAME = 'yadm-test1'
KEY_TRUST = 'test/ownertrust.txt'
PASSPHRASE = 'ExamplePassword'
pytestmark = pytest.mark.usefixtures('config_git')
def add_asymmetric_key():
"""Add asymmetric key"""
os.system(f'gpg --import {pipes.quote(KEY_FILE)}')
os.system(f'gpg --import-ownertrust < {pipes.quote(KEY_TRUST)}')
def remove_asymmetric_key():
"""Remove asymmetric key"""
os.system(
f'gpg --batch --yes '
f'--delete-secret-keys {pipes.quote(KEY_FINGERPRINT)}')
os.system(f'gpg --batch --yes --delete-key {pipes.quote(KEY_FINGERPRINT)}')
@pytest.fixture
def asymmetric_key():
"""Fixture for asymmetric key, removed in teardown"""
add_asymmetric_key()
yield KEY_NAME
remove_asymmetric_key()
@pytest.fixture
def encrypt_targets(yadm_y, paths):
"""Fixture for setting up data to encrypt
This fixture:
* inits an empty repo
* creates test files in the work tree
* creates a ".yadm/encrypt" file for testing:
* standard files
* standard globs
* directories
* comments
* empty lines and lines with just space
* exclusions
* returns a list of expected encrypted files
"""
# init empty yadm repo
os.system(' '.join(yadm_y('init', '-w', str(paths.work), '-f')))
expected = []
# standard files w/ dirs & spaces
paths.work.join('inc file1').write('inc file1')
expected.append('inc file1')
paths.encrypt.write('inc file1\n')
paths.work.join('inc dir').mkdir()
paths.work.join('inc dir/inc file2').write('inc file2')
expected.append('inc dir/inc file2')
paths.encrypt.write('inc dir/inc file2\n', mode='a')
# standard globs w/ dirs & spaces
paths.work.join('globs file1').write('globs file1')
expected.append('globs file1')
paths.work.join('globs dir').mkdir()
paths.work.join('globs dir/globs file2').write('globs file2')
expected.append('globs dir/globs file2')
paths.encrypt.write('globs*\n', mode='a')
# blank lines
paths.encrypt.write('\n \n\t\n', mode='a')
# comments
paths.work.join('commentfile1').write('commentfile1')
paths.encrypt.write('#commentfile1\n', mode='a')
paths.encrypt.write(' #commentfile1\n', mode='a')
# exclusions
paths.work.join('extest').mkdir()
paths.encrypt.write('extest/*\n', mode='a') # include within extest
paths.work.join('extest/inglob1').write('inglob1')
paths.work.join('extest/exglob1').write('exglob1')
paths.work.join('extest/exglob2').write('exglob2')
paths.encrypt.write('!extest/ex*\n', mode='a') # exclude the ex*
expected.append('extest/inglob1') # should be left with only in*
return expected
@pytest.fixture(scope='session')
def decrypt_targets(tmpdir_factory, runner):
"""Fixture for setting data to decrypt
This fixture:
* creates symmetric/asymmetric encrypted archives
* creates a list of expected decrypted files
"""
tmpdir = tmpdir_factory.mktemp('decrypt_targets')
symmetric = tmpdir.join('symmetric.tar.gz.gpg')
asymmetric = tmpdir.join('asymmetric.tar.gz.gpg')
expected = []
tmpdir.join('decrypt1').write('decrypt1')
expected.append('decrypt1')
tmpdir.join('decrypt2').write('decrypt2')
expected.append('decrypt2')
tmpdir.join('subdir').mkdir()
tmpdir.join('subdir/decrypt3').write('subdir/decrypt3')
expected.append('subdir/decrypt3')
run = runner(
['tar', 'cvf', '-'] +
expected +
['|', 'gpg', '--batch', '--yes', '-c'] +
['--passphrase', pipes.quote(PASSPHRASE)] +
['--output', pipes.quote(str(symmetric))],
cwd=tmpdir,
shell=True)
assert run.success
add_asymmetric_key()
run = runner(
['tar', 'cvf', '-'] +
expected +
['|', 'gpg', '--batch', '--yes', '-e'] +
['-r', pipes.quote(KEY_NAME)] +
['--output', pipes.quote(str(asymmetric))],
cwd=tmpdir,
shell=True)
assert run.success
remove_asymmetric_key()
return {
'asymmetric': asymmetric,
'expected': expected,
'symmetric': symmetric,
}
@pytest.mark.parametrize(
'mismatched_phrase', [False, True],
ids=['matching_phrase', 'mismatched_phrase'])
@pytest.mark.parametrize(
'missing_encrypt', [False, True],
ids=['encrypt_exists', 'encrypt_missing'])
@pytest.mark.parametrize(
'overwrite', [False, True],
ids=['clean', 'overwrite'])
def test_symmetric_encrypt(
runner, yadm_y, paths, encrypt_targets,
overwrite, missing_encrypt, mismatched_phrase):
"""Test symmetric encryption"""
if missing_encrypt:
paths.encrypt.remove()
matched_phrase = PASSPHRASE
if mismatched_phrase:
matched_phrase = 'mismatched'
if overwrite:
paths.archive.write('existing archive')
run = runner(yadm_y('encrypt'), expect=[
('passphrase:', PASSPHRASE),
('passphrase:', matched_phrase),
])
if missing_encrypt or mismatched_phrase:
assert run.failure
else:
assert run.success
assert run.err == ''
if missing_encrypt:
assert 'does not exist' in run.out
elif mismatched_phrase:
assert 'invalid passphrase' in run.out
else:
assert encrypted_data_valid(runner, paths.archive, encrypt_targets)
@pytest.mark.parametrize(
'wrong_phrase', [False, True],
ids=['correct_phrase', 'wrong_phrase'])
@pytest.mark.parametrize(
'archive_exists', [True, False],
ids=['archive_exists', 'archive_missing'])
@pytest.mark.parametrize(
'dolist', [False, True],
ids=['decrypt', 'list'])
def test_symmetric_decrypt(
runner, yadm_y, paths, decrypt_targets,
dolist, archive_exists, wrong_phrase):
"""Test decryption"""
# init empty yadm repo
os.system(' '.join(yadm_y('init', '-w', str(paths.work), '-f')))
phrase = PASSPHRASE
if wrong_phrase:
phrase = 'wrong-phrase'
if archive_exists:
decrypt_targets['symmetric'].copy(paths.archive)
# to test overwriting
paths.work.join('decrypt1').write('pre-existing file')
args = []
if dolist:
args.append('-l')
run = runner(yadm_y('decrypt') + args, expect=[('passphrase:', phrase)])
if archive_exists and not wrong_phrase:
assert run.success
assert run.err == ''
if dolist:
for filename in decrypt_targets['expected']:
if filename != 'decrypt1': # this one should exist
assert not paths.work.join(filename).exists()
assert filename in run.out
else:
for filename in decrypt_targets['expected']:
assert paths.work.join(filename).read() == filename
else:
assert run.failure
@pytest.mark.usefixtures('asymmetric_key')
@pytest.mark.parametrize(
'ask', [False, True],
ids=['no_ask', 'ask'])
@pytest.mark.parametrize(
'key_exists', [True, False],
ids=['key_exists', 'key_missing'])
@pytest.mark.parametrize(
'overwrite', [False, True],
ids=['clean', 'overwrite'])
def test_asymmetric_encrypt(
runner, yadm_y, paths, encrypt_targets,
overwrite, key_exists, ask):
"""Test asymmetric encryption"""
# specify encryption recipient
if ask:
os.system(' '.join(yadm_y('config', 'yadm.gpg-recipient', 'ASK')))
expect = [('Enter the user ID', KEY_NAME), ('Enter the user ID', '')]
else:
os.system(' '.join(yadm_y('config', 'yadm.gpg-recipient', KEY_NAME)))
expect = []
if overwrite:
paths.archive.write('existing archive')
if not key_exists:
remove_asymmetric_key()
run = runner(yadm_y('encrypt'), expect=expect)
if key_exists:
assert run.success
assert encrypted_data_valid(runner, paths.archive, encrypt_targets)
else:
assert run.failure
assert 'Unable to write' in run.out
if ask:
assert 'Enter the user ID' in run.out
@pytest.mark.usefixtures('asymmetric_key')
@pytest.mark.parametrize(
'key_exists', [True, False],
ids=['key_exists', 'key_missing'])
@pytest.mark.parametrize(
'dolist', [False, True],
ids=['decrypt', 'list'])
def test_asymmetric_decrypt(
runner, yadm_y, paths, decrypt_targets,
dolist, key_exists):
"""Test decryption"""
# init empty yadm repo
os.system(' '.join(yadm_y('init', '-w', str(paths.work), '-f')))
decrypt_targets['asymmetric'].copy(paths.archive)
# to test overwriting
paths.work.join('decrypt1').write('pre-existing file')
if not key_exists:
remove_asymmetric_key()
args = []
if dolist:
args.append('-l')
run = runner(yadm_y('decrypt') + args)
if key_exists:
assert run.success
if dolist:
for filename in decrypt_targets['expected']:
if filename != 'decrypt1': # this one should exist
assert not paths.work.join(filename).exists()
assert filename in run.out
else:
for filename in decrypt_targets['expected']:
assert paths.work.join(filename).read() == filename
else:
assert run.failure
assert 'Unable to extract encrypted files' in run.out
@pytest.mark.parametrize(
'untracked',
[False, 'y', 'n'],
ids=['tracked', 'untracked_answer_y', 'untracked_answer_n'])
def test_offer_to_add(runner, yadm_y, paths, encrypt_targets, untracked):
"""Test offer to add encrypted archive
All the other encryption tests use an archive outside of the work tree.
However, the archive is often inside the work tree, and if it is, there
should be an offer to add it to the repo if it is not tracked.
"""
worktree_archive = paths.work.join('worktree-archive.tar.gpg')
expect = [
('passphrase:', PASSPHRASE),
('passphrase:', PASSPHRASE),
]
if untracked:
expect.append(('add it now', untracked))
else:
worktree_archive.write('exists')
os.system(' '.join(yadm_y('add', str(worktree_archive))))
run = runner(
yadm_y('encrypt', '--yadm-archive', str(worktree_archive)),
expect=expect
)
assert run.success
assert run.err == ''
assert encrypted_data_valid(runner, worktree_archive, encrypt_targets)
run = runner(
yadm_y('status', '--porcelain', '-uall', str(worktree_archive)))
assert run.success
assert run.err == ''
if untracked == 'y':
# should be added to the index
assert f'A {worktree_archive.basename}' in run.out
elif untracked == 'n':
# should NOT be added to the index
assert f'?? {worktree_archive.basename}' in run.out
else:
# should appear modified in the index
assert f'AM {worktree_archive.basename}' in run.out
def encrypted_data_valid(runner, encrypted, expected):
"""Verify encrypted data matches expectations"""
run = runner([
'gpg',
'--passphrase', pipes.quote(PASSPHRASE),
'-d', pipes.quote(str(encrypted)),
'2>/dev/null',
'|', 'tar', 't'], shell=True, report=False)
file_count = 0
for filename in run.out.splitlines():
if filename.endswith('/'):
continue
file_count += 1
assert filename in expected, (
f'Unexpected file in archive: {filename}')
assert file_count == len(expected), (
'Number of files in archive does not match expected')
return True

85
test/test_enter.py Normal file
View File

@ -0,0 +1,85 @@
"""Test enter"""
import os
import warnings
import pytest
@pytest.mark.parametrize(
'shell, success', [
('delete', True),
('', False),
('/usr/bin/env', True),
('noexec', False),
], ids=[
'shell-missing',
'shell-empty',
'shell-env',
'shell-noexec',
])
@pytest.mark.usefixtures('ds1_copy')
def test_enter(runner, yadm_y, paths, shell, success):
"""Enter tests"""
env = os.environ.copy()
if shell == 'delete':
# remove shell
if 'SHELL' in env:
del env['SHELL']
elif shell == 'noexec':
# specify a non-executable path
noexec = paths.root.join('noexec')
noexec.write('')
noexec.chmod(0o664)
env['SHELL'] = str(noexec)
else:
env['SHELL'] = shell
run = runner(command=yadm_y('enter'), env=env)
assert run.success == success
assert run.err == ''
prompt = f'yadm shell ({paths.repo})'
if success:
assert run.out.startswith('Entering yadm repo')
assert run.out.rstrip().endswith('Leaving yadm repo')
if shell == 'delete':
# When SHELL is empty (unlikely), it is attempted to be run anyway.
# This is a but which must be fixed.
warnings.warn('Unhandled bug: SHELL executed when empty', Warning)
else:
assert f'PROMPT={prompt}' in run.out
assert f'PS1={prompt}' in run.out
assert f'GIT_DIR={paths.repo}' in run.out
if not success:
assert 'does not refer to an executable' in run.out
if 'env' in shell:
assert f'GIT_DIR={paths.repo}' in run.out
assert 'PROMPT=yadm shell' in run.out
assert 'PS1=yadm shell' in run.out
@pytest.mark.parametrize(
'shell, opts, path', [
('bash', '--norc', '\\w'),
('csh', '-f', '%~'),
('zsh', '-f', '%~'),
], ids=[
'bash',
'csh',
'zsh',
])
@pytest.mark.usefixtures('ds1_copy')
def test_enter_shell_ops(runner, yadm_y, paths, shell, opts, path):
"""Enter tests for specific shell options"""
# Create custom shell to detect options passed
custom_shell = paths.root.join(shell)
custom_shell.write('#!/bin/sh\necho OPTS=$*\necho PROMPT=$PROMPT')
custom_shell.chmod(0o775)
env = os.environ.copy()
env['SHELL'] = custom_shell
run = runner(command=yadm_y('enter'), env=env)
assert run.success
assert run.err == ''
assert f'OPTS={opts}' in run.out
assert f'PROMPT=yadm shell ({paths.repo}) {path} >' in run.out

58
test/test_git.py Normal file
View File

@ -0,0 +1,58 @@
"""Test git"""
import re
import pytest
@pytest.mark.usefixtures('ds1_copy')
def test_git(runner, yadm_y, paths):
"""Test series of passthrough git commands
Passthru unknown commands to Git
Git command 'add' - badfile
Git command 'add'
Git command 'status'
Git command 'commit'
Git command 'log'
"""
# passthru unknown commands to Git
run = runner(command=yadm_y('bogus'))
assert run.failure
assert "git: 'bogus' is not a git command." in run.err
assert "See 'git --help'" in run.err
assert run.out == ''
# git command 'add' - badfile
run = runner(command=yadm_y('add', '-v', 'does_not_exist'))
assert run.code == 128
assert "pathspec 'does_not_exist' did not match any files" in run.err
assert run.out == ''
# git command 'add'
newfile = paths.work.join('test_git')
newfile.write('test_git')
run = runner(command=yadm_y('add', '-v', str(newfile)))
assert run.success
assert run.err == ''
assert "add 'test_git'" in run.out
# git command 'status'
run = runner(command=yadm_y('status'))
assert run.success
assert run.err == ''
assert re.search(r'new file:\s+test_git', run.out)
# git command 'commit'
run = runner(command=yadm_y('commit', '-m', 'Add test_git'))
assert run.success
assert run.err == ''
assert '1 file changed' in run.out
assert '1 insertion' in run.out
assert re.search(r'create mode .+ test_git', run.out)
# git command 'log'
run = runner(command=yadm_y('log', '--oneline'))
assert run.success
assert run.err == ''
assert 'Add test_git' in run.out

17
test/test_help.py Normal file
View File

@ -0,0 +1,17 @@
"""Test help"""
def test_missing_command(runner, yadm_y):
"""Run without any command"""
run = runner(command=yadm_y())
assert run.failure
assert run.err == ''
assert run.out.startswith('Usage: yadm')
def test_help_command(runner, yadm_y):
"""Run with help command"""
run = runner(command=yadm_y('help'))
assert run.failure
assert run.err == ''
assert run.out.startswith('Usage: yadm')

90
test/test_hooks.py Normal file
View File

@ -0,0 +1,90 @@
"""Test hooks"""
import pytest
@pytest.mark.parametrize(
'pre, pre_code, post, post_code', [
(False, 0, False, 0),
(True, 0, False, 0),
(True, 5, False, 0),
(False, 0, True, 0),
(False, 0, True, 5),
(True, 0, True, 0),
(True, 5, True, 5),
], ids=[
'no-hooks',
'pre-success',
'pre-fail',
'post-success',
'post-fail',
'pre-post-success',
'pre-post-fail',
])
def test_hooks(
runner, yadm_y, paths,
pre, pre_code, post, post_code):
"""Test pre/post hook"""
# generate hooks
if pre:
create_hook(paths, 'pre_version', pre_code)
if post:
create_hook(paths, 'post_version', post_code)
# run yadm
run = runner(yadm_y('version'))
# when a pre hook fails, yadm should exit with the hook's code
assert run.code == pre_code
assert run.err == ''
if pre:
assert 'HOOK:pre_version' in run.out
# if pre hook is missing or successful, yadm itself should exit 0
if run.success:
if post:
assert 'HOOK:post_version' in run.out
else:
# when a pre hook fails, yadm should not run the command
assert 'version will not be run' in run.out
# when a pre hook fails, yadm should not run the post hook
assert 'HOOK:post_version' not in run.out
# repo fixture is needed to test the population of YADM_HOOK_WORK
@pytest.mark.usefixtures('ds1_repo_copy')
def test_hook_env(runner, yadm_y, paths):
"""Test hook environment"""
# test will be done with a non existent "git" passthru command
# which should exit with a failing code
cmd = 'passthrucmd'
# write the hook
hook = paths.hooks.join(f'post_{cmd}')
hook.write('#!/bin/sh\nenv\n')
hook.chmod(0o755)
run = runner(yadm_y(cmd, 'extra_args'))
# expect passthru to fail
assert run.failure
assert f"'{cmd}' is not a git command" in run.err
# verify hook environment
assert 'YADM_HOOK_EXIT=1\n' in run.out
assert f'YADM_HOOK_COMMAND={cmd}\n' in run.out
assert f'YADM_HOOK_FULL_COMMAND={cmd} extra_args\n' in run.out
assert f'YADM_HOOK_REPO={paths.repo}\n' in run.out
assert f'YADM_HOOK_WORK={paths.work}\n' in run.out
def create_hook(paths, name, code):
"""Create hook"""
hook = paths.hooks.join(name)
hook.write(
'#!/bin/sh\n'
f'echo HOOK:{name}\n'
f'exit {code}\n'
)
hook.chmod(0o755)

78
test/test_init.py Normal file
View File

@ -0,0 +1,78 @@
"""Test init"""
import pytest
@pytest.mark.parametrize(
'alt_work, repo_present, force', [
(False, False, False),
(True, False, False),
(False, True, False),
(False, True, True),
(True, True, True),
], ids=[
'simple',
'-w',
'existing repo',
'-f',
'-w & -f',
])
@pytest.mark.usefixtures('ds1_work_copy')
def test_init(
runner, yadm_y, paths, repo_config, alt_work, repo_present, force):
"""Test init
Repos should have attribs:
- 0600 permissions
- not bare
- worktree = $HOME
- showUntrackedFiles = no
- yadm.managed = true
"""
# these tests will assume this for $HOME
home = str(paths.root.mkdir('HOME'))
# ds1_work_copy comes WITH an empty repo dir present.
old_repo = paths.repo.join('old_repo')
if repo_present:
# Let's put some data in it, so we can confirm that data is gone when
# forced to be overwritten.
old_repo.write('old repo data')
assert old_repo.isfile()
else:
paths.repo.remove()
# command args
args = ['init']
if alt_work:
args.extend(['-w', paths.work])
if force:
args.append('-f')
# run init
run = runner(yadm_y(*args), env={'HOME': home})
assert run.err == ''
if repo_present and not force:
assert run.failure
assert 'repo already exists' in run.out
assert old_repo.isfile(), 'Missing original repo'
else:
assert run.success
assert 'Initialized empty shared Git repository' in run.out
if repo_present:
assert not old_repo.isfile(), 'Original repo still exists'
if alt_work:
assert repo_config('core.worktree') == paths.work
else:
assert repo_config('core.worktree') == home
# uniform repo assertions
assert oct(paths.repo.stat().mode).endswith('00'), (
'Repo is not secure')
assert repo_config('core.bare') == 'false'
assert repo_config('status.showUntrackedFiles') == 'no'
assert repo_config('yadm.managed') == 'true'

46
test/test_introspect.py Normal file
View File

@ -0,0 +1,46 @@
"""Test introspect"""
import pytest
@pytest.mark.parametrize(
'name', [
'',
'invalid',
'commands',
'configs',
'repo',
'switches',
])
def test_introspect_category(
runner, yadm_y, paths, name,
supported_commands, supported_configs, supported_switches):
"""Validate introspection category"""
if name:
run = runner(command=yadm_y('introspect', name))
else:
run = runner(command=yadm_y('introspect'))
assert run.success
assert run.err == ''
expected = []
if name == 'commands':
expected = supported_commands
elif name == 'config':
expected = supported_configs
elif name == 'switches':
expected = supported_switches
# assert values
if name in ('', 'invalid'):
assert run.out == ''
if name == 'repo':
assert run.out.rstrip() == paths.repo
# make sure every expected value is present
for value in expected:
assert value in run.out
# make sure nothing extra is present
if expected:
assert len(run.out.split()) == len(expected)

186
test/test_jinja.py Normal file
View File

@ -0,0 +1,186 @@
"""Test jinja"""
import os
import re
import pytest
import utils
@pytest.fixture(scope='module')
def envtpl_present(runner):
"""Is envtpl present and working?"""
try:
run = runner(command=['envtpl', '-h'])
if run.success:
return True
except BaseException:
pass
return False
@pytest.mark.usefixtures('ds1_copy')
def test_local_override(runner, yadm_y, paths,
tst_distro, envtpl_present):
"""Test local overrides"""
if not envtpl_present:
pytest.skip('Unable to test without envtpl.')
# define local overrides
utils.set_local(paths, 'class', 'or-class')
utils.set_local(paths, 'hostname', 'or-hostname')
utils.set_local(paths, 'os', 'or-os')
utils.set_local(paths, 'user', 'or-user')
template = (
'j2-{{ YADM_CLASS }}-'
'{{ YADM_OS }}-{{ YADM_HOSTNAME }}-'
'{{ YADM_USER }}-{{ YADM_DISTRO }}'
)
expected = f'j2-or-class-or-os-or-hostname-or-user-{tst_distro}'
utils.create_alt_files(paths, '##yadm.j2', content=template)
# run alt to trigger linking
run = runner(yadm_y('alt'))
assert run.success
assert run.err == ''
created = created_list(run.out)
# assert the proper creation has occurred
for file_path in (utils.ALT_FILE1, utils.ALT_FILE2):
source_file = file_path + '##yadm.j2'
assert paths.work.join(file_path).isfile()
lines = paths.work.join(file_path).readlines(cr=False)
assert lines[0] == source_file
assert lines[1] == expected
assert str(paths.work.join(source_file)) in created
@pytest.mark.parametrize('autoalt', [None, 'true', 'false'])
@pytest.mark.usefixtures('ds1_copy')
def test_auto_alt(runner, yadm_y, paths, autoalt, tst_sys,
envtpl_present):
"""Test setting auto-alt"""
if not envtpl_present:
pytest.skip('Unable to test without envtpl.')
# set the value of auto-alt
if autoalt:
os.system(' '.join(yadm_y('config', 'yadm.auto-alt', autoalt)))
# create file
jinja_suffix = '##yadm.j2'
utils.create_alt_files(paths, jinja_suffix, content='{{ YADM_OS }}')
# run status to possibly trigger linking
run = runner(yadm_y('status'))
assert run.success
assert run.err == ''
created = created_list(run.out)
# assert the proper creation has occurred
for file_path in (utils.ALT_FILE1, utils.ALT_FILE2):
source_file = file_path + jinja_suffix
if autoalt == 'false':
assert not paths.work.join(file_path).exists()
else:
assert paths.work.join(file_path).isfile()
lines = paths.work.join(file_path).readlines(cr=False)
assert lines[0] == source_file
assert lines[1] == tst_sys
# no created output when run via auto-alt
assert str(paths.work.join(source_file)) not in created
@pytest.mark.usefixtures('ds1_copy')
def test_jinja_envtpl_missing(runner, paths):
"""Test operation when envtpl is missing"""
script = f"""
YADM_TEST=1 source {paths.pgm}
process_global_args -Y "{paths.yadm}"
set_operating_system
configure_paths
ENVTPL_PROGRAM='envtpl_missing' main alt
"""
utils.create_alt_files(paths, '##yadm.j2')
run = runner(command=['bash'], inp=script)
assert run.success
assert run.err == ''
assert f'envtpl not available, not creating' in run.out
@pytest.mark.parametrize(
'tracked, encrypt, exclude', [
(False, False, False),
(True, False, False),
(False, True, False),
(False, True, True),
], ids=[
'untracked',
'tracked',
'encrypted',
'excluded',
])
@pytest.mark.usefixtures('ds1_copy')
def test_jinja(runner, yadm_y, paths,
tst_sys, tst_host, tst_user, tst_distro,
tracked, encrypt, exclude,
envtpl_present):
"""Test jinja processing"""
if not envtpl_present:
pytest.skip('Unable to test without envtpl.')
jinja_suffix = '##yadm.j2'
# set the class
tst_class = 'testclass'
utils.set_local(paths, 'class', tst_class)
template = (
'j2-{{ YADM_CLASS }}-'
'{{ YADM_OS }}-{{ YADM_HOSTNAME }}-'
'{{ YADM_USER }}-{{ YADM_DISTRO }}'
)
expected = (
f'j2-{tst_class}-'
f'{tst_sys}-{tst_host}-'
f'{tst_user}-{tst_distro}'
)
utils.create_alt_files(paths, jinja_suffix, content=template,
tracked=tracked, encrypt=encrypt, exclude=exclude)
# run alt to trigger linking
run = runner(yadm_y('alt'))
assert run.success
assert run.err == ''
created = created_list(run.out)
# assert the proper creation has occurred
for file_path in (utils.ALT_FILE1, utils.ALT_FILE2):
source_file = file_path + jinja_suffix
if tracked or (encrypt and not exclude):
assert paths.work.join(file_path).isfile()
lines = paths.work.join(file_path).readlines(cr=False)
assert lines[0] == source_file
assert lines[1] == expected
assert str(paths.work.join(source_file)) in created
else:
assert not paths.work.join(file_path).exists()
assert str(paths.work.join(source_file)) not in created
def created_list(output):
"""Parse output, and return list of created files"""
created = dict()
for line in output.splitlines():
match = re.match('Creating (.+) from template (.+)$', line)
if match:
created[match.group(1)] = match.group(2)
return created.values()

47
test/test_list.py Normal file
View File

@ -0,0 +1,47 @@
"""Test list"""
import os
import pytest
@pytest.mark.parametrize(
'location', [
'work',
'outside',
'subdir',
])
@pytest.mark.usefixtures('ds1_copy')
def test_list(runner, yadm_y, paths, ds1, location):
"""List tests"""
if location == 'work':
run_dir = paths.work
elif location == 'outside':
run_dir = paths.work.join('..')
elif location == 'subdir':
# first directory with tracked data
run_dir = paths.work.join(ds1.tracked_dirs[0])
with run_dir.as_cwd():
# test with '-a'
# should get all tracked files, relative to the work path
run = runner(command=yadm_y('list', '-a'))
assert run.success
assert run.err == ''
returned_files = set(run.out.splitlines())
expected_files = set([e.path for e in ds1 if e.tracked])
assert returned_files == expected_files
# test without '-a'
# should get all tracked files, relative to the work path unless in a
# subdir, then those should be a limited set of files, relative to the
# subdir
run = runner(command=yadm_y('list'))
assert run.success
assert run.err == ''
returned_files = set(run.out.splitlines())
if location == 'subdir':
basepath = os.path.basename(os.getcwd())
# only expect files within the subdir
# names should be relative to subdir
expected_files = set(
[e.path[len(basepath)+1:] for e in ds1
if e.tracked and e.path.startswith(basepath)])
assert returned_files == expected_files

111
test/test_perms.py Normal file
View File

@ -0,0 +1,111 @@
"""Test perms"""
import os
import warnings
import pytest
@pytest.mark.parametrize('autoperms', ['notest', 'unset', 'true', 'false'])
@pytest.mark.usefixtures('ds1_copy')
def test_perms(runner, yadm_y, paths, ds1, autoperms):
"""Test perms"""
# set the value of auto-perms
if autoperms != 'notest':
if autoperms != 'unset':
os.system(' '.join(yadm_y('config', 'yadm.auto-perms', autoperms)))
# privatepaths will hold all paths that should become secured
privatepaths = [paths.work.join('.ssh'), paths.work.join('.gnupg')]
privatepaths += [paths.work.join(private.path) for private in ds1.private]
# create an archive file
os.system(f'touch "{str(paths.archive)}"')
privatepaths.append(paths.archive)
# create encrypted file test data
efile1 = paths.work.join('efile1')
efile1.write('efile1')
efile2 = paths.work.join('efile2')
efile2.write('efile2')
paths.encrypt.write('efile1\nefile2\n!efile1\n')
insecurepaths = [efile1]
privatepaths.append(efile2)
# assert these paths begin unsecured
for private in privatepaths + insecurepaths:
assert not oct(private.stat().mode).endswith('00'), (
'Path started secured')
cmd = 'perms'
if autoperms != 'notest':
cmd = 'status'
run = runner(yadm_y(cmd))
assert run.success
assert run.err == ''
if cmd == 'perms':
assert run.out == ''
# these paths should be secured if processing perms
for private in privatepaths:
if '.p2' in private.basename or '.p4' in private.basename:
# Dot files within .ssh/.gnupg are not protected.
# This is a but which must be fixed
warnings.warn('Unhandled bug: private dot files', Warning)
continue
if autoperms == 'false':
assert not oct(private.stat().mode).endswith('00'), (
'Path should not be secured')
else:
assert oct(private.stat().mode).endswith('00'), (
'Path has not been secured')
# these paths should never be secured
for private in insecurepaths:
assert not oct(private.stat().mode).endswith('00'), (
'Path should not be secured')
@pytest.mark.parametrize('sshperms', [None, 'true', 'false'])
@pytest.mark.parametrize('gpgperms', [None, 'true', 'false'])
@pytest.mark.usefixtures('ds1_copy')
def test_perms_control(runner, yadm_y, paths, ds1, sshperms, gpgperms):
"""Test fine control of perms"""
# set the value of ssh-perms
if sshperms:
os.system(' '.join(yadm_y('config', 'yadm.ssh-perms', sshperms)))
# set the value of gpg-perms
if gpgperms:
os.system(' '.join(yadm_y('config', 'yadm.gpg-perms', gpgperms)))
# privatepaths will hold all paths that should become secured
privatepaths = [paths.work.join('.ssh'), paths.work.join('.gnupg')]
privatepaths += [paths.work.join(private.path) for private in ds1.private]
# assert these paths begin unsecured
for private in privatepaths:
assert not oct(private.stat().mode).endswith('00'), (
'Path started secured')
run = runner(yadm_y('perms'))
assert run.success
assert run.err == ''
assert run.out == ''
# these paths should be secured if processing perms
for private in privatepaths:
if '.p2' in private.basename or '.p4' in private.basename:
# Dot files within .ssh/.gnupg are not protected.
# This is a but which must be fixed
warnings.warn('Unhandled bug: private dot files', Warning)
continue
if (
(sshperms == 'false' and 'ssh' in str(private))
or
(gpgperms == 'false' and 'gnupg' in str(private))
):
assert not oct(private.stat().mode).endswith('00'), (
'Path should not be secured')
else:
assert oct(private.stat().mode).endswith('00'), (
'Path has not been secured')

41
test/test_syntax.py Normal file
View File

@ -0,0 +1,41 @@
"""Syntax checks"""
import os
import pytest
def test_yadm_syntax(runner, yadm):
"""Is syntactically valid"""
run = runner(command=['bash', '-n', yadm])
assert run.success
def test_shellcheck(runner, yadm, shellcheck_version):
"""Passes shellcheck"""
run = runner(command=['shellcheck', '-V'], report=False)
if f'version: {shellcheck_version}' not in run.out:
pytest.skip('Unsupported shellcheck version')
run = runner(command=['shellcheck', '-s', 'bash', yadm])
assert run.success
def test_pylint(runner, pylint_version):
"""Passes pylint"""
run = runner(command=['pylint', '--version'], report=False)
if f'pylint {pylint_version}' not in run.out:
pytest.skip('Unsupported pylint version')
pyfiles = list()
for tfile in os.listdir('test'):
if tfile.endswith('.py'):
pyfiles.append(f'test/{tfile}')
run = runner(command=['pylint'] + pyfiles)
assert run.success
def test_flake8(runner, flake8_version):
"""Passes flake8"""
run = runner(command=['flake8', '--version'], report=False)
if not run.out.startswith(flake8_version):
pytest.skip('Unsupported flake8 version')
run = runner(command=['flake8', 'test'])
assert run.success

View File

@ -0,0 +1,33 @@
"""Unit tests: bootstrap_available"""
def test_bootstrap_missing(runner, paths):
"""Test result of bootstrap_available, when bootstrap missing"""
run_test(runner, paths, False)
def test_bootstrap_no_exec(runner, paths):
"""Test result of bootstrap_available, when bootstrap not executable"""
paths.bootstrap.write('')
paths.bootstrap.chmod(0o644)
run_test(runner, paths, False)
def test_bootstrap_exec(runner, paths):
"""Test result of bootstrap_available, when bootstrap executable"""
paths.bootstrap.write('')
paths.bootstrap.chmod(0o775)
run_test(runner, paths, True)
def run_test(runner, paths, success):
"""Run bootstrap_available, and test result"""
script = f"""
YADM_TEST=1 source {paths.pgm}
YADM_BOOTSTRAP='{paths.bootstrap}'
bootstrap_available
"""
run = runner(command=['bash'], inp=script)
assert run.success == success
assert run.err == ''
assert run.out == ''

View File

@ -0,0 +1,80 @@
"""Unit tests: configure_paths"""
import pytest
ARCHIVE = 'files.gpg'
BOOTSTRAP = 'bootstrap'
CONFIG = 'config'
ENCRYPT = 'encrypt'
HOME = '/testhome'
REPO = 'repo.git'
YDIR = '.yadm'
@pytest.mark.parametrize(
'override, expect', [
(None, {}),
('-Y', {}),
('--yadm-repo', {'repo': 'YADM_REPO', 'git': 'GIT_DIR'}),
('--yadm-config', {'config': 'YADM_CONFIG'}),
('--yadm-encrypt', {'encrypt': 'YADM_ENCRYPT'}),
('--yadm-archive', {'archive': 'YADM_ARCHIVE'}),
('--yadm-bootstrap', {'bootstrap': 'YADM_BOOTSTRAP'}),
], ids=[
'default',
'override yadm dir',
'override repo',
'override config',
'override encrypt',
'override archive',
'override bootstrap',
])
def test_config(runner, paths, override, expect):
"""Test configure_paths"""
opath = 'override'
matches = match_map()
args = []
if override == '-Y':
matches = match_map('/' + opath)
if override:
args = [override, '/' + opath]
for ekey in expect.keys():
matches[ekey] = f'{expect[ekey]}="/{opath}"'
run_test(
runner, paths,
[override, opath],
['must specify a fully qualified'], 1)
run_test(runner, paths, args, matches.values(), 0)
def match_map(yadm_dir=None):
"""Create a dictionary of matches, relative to yadm_dir"""
if not yadm_dir:
yadm_dir = '/'.join([HOME, YDIR])
return {
'yadm': f'YADM_DIR="{yadm_dir}"',
'repo': f'YADM_REPO="{yadm_dir}/{REPO}"',
'config': f'YADM_CONFIG="{yadm_dir}/{CONFIG}"',
'encrypt': f'YADM_ENCRYPT="{yadm_dir}/{ENCRYPT}"',
'archive': f'YADM_ARCHIVE="{yadm_dir}/{ARCHIVE}"',
'bootstrap': f'YADM_BOOTSTRAP="{yadm_dir}/{BOOTSTRAP}"',
'git': f'GIT_DIR="{yadm_dir}/{REPO}"',
}
def run_test(runner, paths, args, expected_matches, expected_code=0):
"""Run proces global args, and run configure_paths"""
argstring = ' '.join(['"'+a+'"' for a in args])
script = f"""
YADM_TEST=1 HOME="{HOME}" source {paths.pgm}
process_global_args {argstring}
configure_paths
declare -p | grep -E '(YADM|GIT)_'
"""
run = runner(command=['bash'], inp=script)
assert run.code == expected_code
assert run.err == ''
for match in expected_matches:
assert match in run.out

View File

@ -0,0 +1,175 @@
"""Unit tests: parse_encrypt"""
import pytest
def test_not_called(runner, paths):
"""Test parse_encrypt (not called)"""
run = run_parse_encrypt(runner, paths, skip_parse=True)
assert run.success
assert run.err == ''
assert 'EIF:unparsed' in run.out, 'EIF should be unparsed'
assert 'EIF_COUNT:1' in run.out, 'Only value of EIF should be unparsed'
def test_short_circuit(runner, paths):
"""Test parse_encrypt (short-circuit)"""
run = run_parse_encrypt(runner, paths, twice=True)
assert run.success
assert run.err == ''
assert 'PARSE_ENCRYPT_SHORT=parse_encrypt() not reprocessed' in run.out, (
'parse_encrypt() should short-circuit')
@pytest.mark.parametrize(
'encrypt', [
('missing'),
('empty'),
])
def test_empty(runner, paths, encrypt):
"""Test parse_encrypt (file missing/empty)"""
# write encrypt file
if encrypt == 'missing':
assert not paths.encrypt.exists(), 'Encrypt should be missing'
else:
paths.encrypt.write('')
assert paths.encrypt.exists(), 'Encrypt should exist'
assert paths.encrypt.size() == 0, 'Encrypt should be empty'
# run parse_encrypt
run = run_parse_encrypt(runner, paths)
assert run.success
assert run.err == ''
# validate parsing result
assert 'EIF_COUNT:0' in run.out, 'EIF should be empty'
@pytest.mark.usefixtures('ds1_repo_copy')
def test_file_parse_encrypt(runner, paths):
"""Test parse_encrypt
Test an array of supported features of the encrypt configuration.
"""
edata = ''
expected = set()
# empty line
edata += '\n'
# simple comments
edata += '# a simple comment\n'
edata += ' # a comment with leading space\n'
# unreferenced directory
paths.work.join('unreferenced').mkdir()
# simple files
edata += 'simple_file\n'
edata += 'simple.file\n'
paths.work.join('simple_file').write('')
paths.work.join('simple.file').write('')
paths.work.join('simple_file2').write('')
paths.work.join('simple.file2').write('')
expected.add('simple_file')
expected.add('simple.file')
# simple files in directories
edata += 'simple_dir/simple_file\n'
paths.work.join('simple_dir/simple_file').write('', ensure=True)
paths.work.join('simple_dir/simple_file2').write('', ensure=True)
expected.add('simple_dir/simple_file')
# paths with spaces
edata += 'with space/with space\n'
paths.work.join('with space/with space').write('', ensure=True)
paths.work.join('with space/with space2').write('', ensure=True)
expected.add('with space/with space')
# hidden files
edata += '.hidden\n'
paths.work.join('.hidden').write('')
expected.add('.hidden')
# hidden files in directories
edata += '.hidden_dir/.hidden_file\n'
paths.work.join('.hidden_dir/.hidden_file').write('', ensure=True)
expected.add('.hidden_dir/.hidden_file')
# wildcards
edata += 'wild*\n'
paths.work.join('wildcard1').write('', ensure=True)
paths.work.join('wildcard2').write('', ensure=True)
expected.add('wildcard1')
expected.add('wildcard2')
edata += 'dirwild*\n'
paths.work.join('dirwildcard/file1').write('', ensure=True)
paths.work.join('dirwildcard/file2').write('', ensure=True)
expected.add('dirwildcard')
# excludes
edata += 'exclude*\n'
edata += 'ex ex/*\n'
paths.work.join('exclude_file1').write('')
paths.work.join('exclude_file2.ex').write('')
paths.work.join('exclude_file3.ex3').write('')
expected.add('exclude_file1')
expected.add('exclude_file3.ex3')
edata += '!*.ex\n'
edata += '!ex ex/*.txt\n'
paths.work.join('ex ex/file4').write('', ensure=True)
paths.work.join('ex ex/file5.txt').write('', ensure=True)
paths.work.join('ex ex/file6.text').write('', ensure=True)
expected.add('ex ex/file4')
expected.add('ex ex/file6.text')
# write encrypt file
print(f'ENCRYPT:\n---\n{edata}---\n')
paths.encrypt.write(edata)
assert paths.encrypt.isfile()
# run parse_encrypt
run = run_parse_encrypt(runner, paths)
assert run.success
assert run.err == ''
assert f'EIF_COUNT:{len(expected)}' in run.out, 'EIF count wrong'
for expected_file in expected:
assert f'EIF:{expected_file}\n' in run.out
def run_parse_encrypt(
runner, paths,
skip_parse=False,
twice=False):
"""Run parse_encrypt
A count of ENCRYPT_INCLUDE_FILES will be reported as EIF_COUNT:X. All
values of ENCRYPT_INCLUDE_FILES will be reported as individual EIF:value
lines.
"""
parse_cmd = 'parse_encrypt'
if skip_parse:
parse_cmd = ''
if twice:
parse_cmd = 'parse_encrypt; parse_encrypt'
script = f"""
YADM_TEST=1 source {paths.pgm}
YADM_ENCRYPT={paths.encrypt}
export YADM_ENCRYPT
GIT_DIR={paths.repo}
export GIT_DIR
{parse_cmd}
export ENCRYPT_INCLUDE_FILES
export PARSE_ENCRYPT_SHORT
env
echo EIF_COUNT:${{#ENCRYPT_INCLUDE_FILES[@]}}
for value in "${{ENCRYPT_INCLUDE_FILES[@]}}"; do
echo "EIF:$value"
done
"""
run = runner(command=['bash'], inp=script)
return run

View File

@ -0,0 +1,26 @@
"""Unit tests: query_distro"""
def test_lsb_release_present(runner, yadm, tst_distro):
"""Match lsb_release -si when present"""
script = f"""
YADM_TEST=1 source {yadm}
query_distro
"""
run = runner(command=['bash'], inp=script)
assert run.success
assert run.err == ''
assert run.out.rstrip() == tst_distro
def test_lsb_release_missing(runner, yadm):
"""Empty value when missing"""
script = f"""
YADM_TEST=1 source {yadm}
LSB_RELEASE_PROGRAM="missing_lsb_release"
query_distro
"""
run = runner(command=['bash'], inp=script)
assert run.success
assert run.err == ''
assert run.out.rstrip() == ''

36
test/test_unit_set_os.py Normal file
View File

@ -0,0 +1,36 @@
"""Unit tests: set_operating_system"""
import pytest
@pytest.mark.parametrize(
'proc_value, expected_os', [
('missing', 'uname'),
('has Microsoft inside', 'WSL'),
('another value', 'uname'),
], ids=[
'/proc/version missing',
'/proc/version includes MS',
'/proc/version excludes MS',
])
def test_set_operating_system(
runner, paths, tst_sys, proc_value, expected_os):
"""Run set_operating_system and test result"""
# Normally /proc/version (set in PROC_VERSION) is inspected to identify
# WSL. During testing, we will override that value.
proc_version = paths.root.join('proc_version')
if proc_value != 'missing':
proc_version.write(proc_value)
script = f"""
YADM_TEST=1 source {paths.pgm}
PROC_VERSION={proc_version}
set_operating_system
echo $OPERATING_SYSTEM
"""
run = runner(command=['bash'], inp=script)
assert run.success
assert run.err == ''
if expected_os == 'uname':
expected_os = tst_sys
assert run.out.rstrip() == expected_os

View File

@ -0,0 +1,46 @@
"""Unit tests: yadm.[git,gpg]-program"""
import os
import pytest
@pytest.mark.parametrize(
'executable, success, value, match', [
(None, True, 'program', None),
('cat', True, 'cat', None),
('badprogram', False, None, 'badprogram'),
], ids=[
'executable missing',
'valid alternative',
'invalid alternative',
])
@pytest.mark.parametrize('program', ['git', 'gpg'])
def test_x_program(
runner, yadm_y, paths, program, executable, success, value, match):
"""Set yadm.X-program, and test result of require_X"""
# set configuration
if executable:
os.system(' '.join(yadm_y(
'config', f'yadm.{program}-program', executable)))
# test require_[git,gpg]
script = f"""
YADM_TEST=1 source {paths.pgm}
YADM_CONFIG="{paths.config}"
require_{program}
echo ${program.upper()}_PROGRAM
"""
run = runner(command=['bash'], inp=script)
assert run.success == success
assert run.err == ''
# [GIT,GPG]_PROGRAM set correctly
if value == 'program':
assert run.out.rstrip() == program
elif value:
assert run.out.rstrip() == value
# error reported about bad config
if match:
assert match in run.out

35
test/test_version.py Normal file
View File

@ -0,0 +1,35 @@
"""Test version"""
import re
import pytest
@pytest.fixture(scope='module')
def expected_version(yadm):
"""
Expected semantic version number. This is taken directly out of yadm,
searching for the VERSION= string.
"""
yadm_version = re.findall(
r'VERSION=([^\n]+)',
open(yadm).read())
if yadm_version:
return yadm_version[0]
pytest.fail(f'version not found in {yadm}')
return 'not found'
def test_semantic_version(expected_version):
"""Version is semantic"""
# semantic version conforms to MAJOR.MINOR.PATCH
assert re.search(r'^\d+\.\d+\.\d+$', expected_version), (
'does not conform to MAJOR.MINOR.PATCH')
def test_reported_version(
runner, yadm_y, expected_version):
"""Report correct version"""
run = runner(command=yadm_y('version'))
assert run.success
assert run.err == ''
assert run.out == f'yadm {expected_version}\n'

59
test/utils.py Normal file
View File

@ -0,0 +1,59 @@
"""Testing Utilities
This module holds values/functions common to multiple tests.
"""
import os
ALT_FILE1 = 'test_alt'
ALT_FILE2 = 'test alt/test alt'
def set_local(paths, variable, value):
"""Set local override"""
os.system(
f'GIT_DIR={str(paths.repo)} '
f'git config --local "local.{variable}" "{value}"'
)
def create_alt_files(paths, suffix,
preserve=False, tracked=True,
encrypt=False, exclude=False,
content=None):
"""Create new files, and add to the repo
This is used for testing alternate files. In each case, a suffix is
appended to two standard file paths. Particulars of the file creation and
repo handling are dependent upon the function arguments.
"""
if not preserve:
if paths.work.join(ALT_FILE1).exists():
paths.work.join(ALT_FILE1).remove(rec=1, ignore_errors=True)
assert not paths.work.join(ALT_FILE1).exists()
if paths.work.join(ALT_FILE2).exists():
paths.work.join(ALT_FILE2).remove(rec=1, ignore_errors=True)
assert not paths.work.join(ALT_FILE2).exists()
new_file1 = paths.work.join(ALT_FILE1 + suffix)
new_file1.write(ALT_FILE1 + suffix, ensure=True)
new_file2 = paths.work.join(ALT_FILE2 + suffix)
new_file2.write(ALT_FILE2 + suffix, ensure=True)
if content:
new_file1.write('\n' + content, mode='a', ensure=True)
new_file2.write('\n' + content, mode='a', ensure=True)
assert new_file1.exists()
assert new_file2.exists()
if tracked:
for path in (new_file1, new_file2):
os.system(f'GIT_DIR={str(paths.repo)} git add "{path}"')
os.system(f'GIT_DIR={str(paths.repo)} git commit -m "Add test files"')
if encrypt:
paths.encrypt.write(f'{ALT_FILE1 + suffix}\n', mode='a')
paths.encrypt.write(f'{ALT_FILE2 + suffix}\n', mode='a')
if exclude:
paths.encrypt.write(f'!{ALT_FILE1 + suffix}\n', mode='a')
paths.encrypt.write(f'!{ALT_FILE2 + suffix}\n', mode='a')