0995f38b06
I signed all commits first
591 lines
16 KiB
Python
591 lines
16 KiB
Python
"""Global tests configuration and fixtures"""
|
|
|
|
import collections
|
|
import contextlib
|
|
import copy
|
|
import distutils.dir_util # pylint: disable=no-name-in-module,import-error
|
|
import os
|
|
import platform
|
|
import pwd
|
|
from subprocess import Popen, PIPE
|
|
import py
|
|
import pytest
|
|
|
|
|
|
def pytest_addoption(parser):
|
|
"""Add options to pytest"""
|
|
parser.addoption(
|
|
"--force-linters",
|
|
action="store_true",
|
|
default=False,
|
|
help="Run linters regardless of installed versions",
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def shellcheck_version():
|
|
"""Version of shellcheck supported"""
|
|
return '0.4.6'
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def pylint_version():
|
|
"""Version of pylint supported"""
|
|
return '2.4.1'
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def flake8_version():
|
|
"""Version of flake8 supported"""
|
|
return '3.7.8'
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def yamllint_version():
|
|
"""Version of yamllint supported"""
|
|
return '1.17.0'
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def tst_user():
|
|
"""Test session's user id"""
|
|
return pwd.getpwuid(os.getuid()).pw_name
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def tst_host():
|
|
"""Test session's short hostname value"""
|
|
return platform.node().split('.')[0]
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def tst_distro(runner):
|
|
"""Test session's distro"""
|
|
distro = ''
|
|
with contextlib.suppress(Exception):
|
|
run = runner(command=['lsb_release', '-si'], report=False)
|
|
distro = run.out.strip()
|
|
return distro
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def tst_sys():
|
|
"""Test session's uname value"""
|
|
return platform.system()
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def supported_commands():
|
|
"""List of supported commands
|
|
|
|
This list should be updated every time yadm learns a new command.
|
|
"""
|
|
return [
|
|
'alt',
|
|
'bootstrap',
|
|
'clean',
|
|
'clone',
|
|
'config',
|
|
'decrypt',
|
|
'encrypt',
|
|
'enter',
|
|
'git-crypt',
|
|
'gitconfig',
|
|
'help',
|
|
'init',
|
|
'introspect',
|
|
'list',
|
|
'perms',
|
|
'transcrypt',
|
|
'upgrade',
|
|
'version',
|
|
]
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def supported_configs():
|
|
"""List of supported config options
|
|
|
|
This list should be updated every time yadm learns a new config.
|
|
"""
|
|
return [
|
|
'local.class',
|
|
'local.hostname',
|
|
'local.os',
|
|
'local.user',
|
|
'yadm.alt-copy',
|
|
'yadm.auto-alt',
|
|
'yadm.auto-exclude',
|
|
'yadm.auto-perms',
|
|
'yadm.auto-private-dirs',
|
|
'yadm.cipher',
|
|
'yadm.git-program',
|
|
'yadm.gpg-perms',
|
|
'yadm.gpg-program',
|
|
'yadm.gpg-recipient',
|
|
'yadm.openssl-ciphername',
|
|
'yadm.openssl-program',
|
|
'yadm.ssh-perms',
|
|
]
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def supported_switches():
|
|
"""List of supported switches
|
|
|
|
This list should be updated every time yadm learns a new switch.
|
|
"""
|
|
return [
|
|
'--yadm-archive',
|
|
'--yadm-bootstrap',
|
|
'--yadm-config',
|
|
'--yadm-dir',
|
|
'--yadm-encrypt',
|
|
'--yadm-repo',
|
|
'-Y',
|
|
]
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def supported_local_configs(supported_configs):
|
|
"""List of supported local config options"""
|
|
return [c for c in supported_configs if c.startswith('local.')]
|
|
|
|
|
|
class Runner():
|
|
"""Class for running commands
|
|
|
|
Within yadm tests, this object should be used when running commands that
|
|
require:
|
|
|
|
* Acting on the status code
|
|
* Parsing the output of the command
|
|
* Passing input to the command
|
|
|
|
Other instances of simply running commands should use os.system().
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
command,
|
|
inp=None,
|
|
shell=False,
|
|
cwd=None,
|
|
env=None,
|
|
expect=None,
|
|
report=True):
|
|
if shell:
|
|
self.command = ' '.join([str(cmd) for cmd in command])
|
|
else:
|
|
self.command = command
|
|
self.inp = inp
|
|
self.wrap(expect)
|
|
process = Popen(
|
|
self.command,
|
|
stdin=PIPE,
|
|
stdout=PIPE,
|
|
stderr=PIPE,
|
|
shell=shell,
|
|
cwd=cwd,
|
|
env=env,
|
|
)
|
|
input_bytes = self.inp
|
|
if self.inp:
|
|
input_bytes = self.inp.encode()
|
|
(out_bstream, err_bstream) = process.communicate(input=input_bytes)
|
|
self.out = out_bstream.decode()
|
|
self.err = err_bstream.decode()
|
|
self.code = process.wait()
|
|
self.success = self.code == 0
|
|
self.failure = self.code != 0
|
|
if report:
|
|
self.report()
|
|
|
|
def __repr__(self):
|
|
return f'Runner({self.command})'
|
|
|
|
def report(self):
|
|
"""Print code/stdout/stderr"""
|
|
print(f'{self}')
|
|
print(f' RUN: code:{self.code}')
|
|
if self.inp:
|
|
print(f' RUN: input:\n{self.inp}')
|
|
print(f' RUN: stdout:\n{self.out}')
|
|
print(f' RUN: stderr:\n{self.err}')
|
|
|
|
def wrap(self, expect):
|
|
"""Wrap command with expect"""
|
|
if not expect:
|
|
return
|
|
cmdline = ' '.join([f'"{w}"' for w in self.command])
|
|
expect_script = f'set timeout 2\nspawn {cmdline}\n'
|
|
for question, answer in expect:
|
|
expect_script += (
|
|
'expect {\n'
|
|
f'"{question}" {{send "{answer}\\r"}}\n'
|
|
'timeout {close;exit 128}\n'
|
|
'}\n')
|
|
expect_script += (
|
|
'expect eof\n'
|
|
'foreach {pid spawnid os_error_flag value} [wait] break\n'
|
|
'exit $value')
|
|
self.inp = expect_script
|
|
print(f'EXPECT:{expect_script}')
|
|
self.command = ['expect']
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def runner():
|
|
"""Class for running commands"""
|
|
return Runner
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def config_git():
|
|
"""Configure global git configuration, if missing"""
|
|
os.system(
|
|
'git config user.name || '
|
|
'git config --global user.name "test"')
|
|
os.system(
|
|
'git config user.email || '
|
|
'git config --global user.email "test@test.test"')
|
|
|
|
|
|
@pytest.fixture()
|
|
def repo_config(runner, paths):
|
|
"""Function to query a yadm repo configuration value"""
|
|
|
|
def query_func(key):
|
|
"""Query a yadm repo configuration value"""
|
|
run = runner(
|
|
command=('git', 'config', '--local', key),
|
|
env={'GIT_DIR': paths.repo},
|
|
report=False,
|
|
)
|
|
return run.out.rstrip()
|
|
|
|
return query_func
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def yadm():
|
|
"""Path to yadm program to be tested"""
|
|
full_path = os.path.realpath('yadm')
|
|
assert os.path.isfile(full_path), "yadm program file isn't present"
|
|
return full_path
|
|
|
|
|
|
@pytest.fixture()
|
|
def paths(tmpdir, yadm):
|
|
"""Function scoped test paths"""
|
|
dir_root = tmpdir.mkdir('root')
|
|
dir_work = dir_root.mkdir('work')
|
|
dir_yadm = dir_root.mkdir('yadm')
|
|
dir_repo = dir_yadm.mkdir('repo.git')
|
|
dir_hooks = dir_yadm.mkdir('hooks')
|
|
dir_remote = dir_root.mkdir('remote')
|
|
file_archive = dir_yadm.join('files.gpg')
|
|
file_bootstrap = dir_yadm.join('bootstrap')
|
|
file_config = dir_yadm.join('config')
|
|
file_encrypt = dir_yadm.join('encrypt')
|
|
paths = collections.namedtuple(
|
|
'Paths', [
|
|
'pgm',
|
|
'root',
|
|
'work',
|
|
'yadm',
|
|
'repo',
|
|
'hooks',
|
|
'remote',
|
|
'archive',
|
|
'bootstrap',
|
|
'config',
|
|
'encrypt',
|
|
])
|
|
return paths(
|
|
yadm,
|
|
dir_root,
|
|
dir_work,
|
|
dir_yadm,
|
|
dir_repo,
|
|
dir_hooks,
|
|
dir_remote,
|
|
file_archive,
|
|
file_bootstrap,
|
|
file_config,
|
|
file_encrypt,
|
|
)
|
|
|
|
|
|
@pytest.fixture()
|
|
def yadm_y(paths):
|
|
"""Generate custom command_list function"""
|
|
def command_list(*args):
|
|
"""Produce params for running yadm with -Y"""
|
|
return [paths.pgm, '-Y', str(paths.yadm)] + list(args)
|
|
return command_list
|
|
|
|
|
|
class DataFile():
|
|
"""Datafile object"""
|
|
|
|
def __init__(self, path, tracked=True, private=False):
|
|
self.__path = path
|
|
self.__parent = None
|
|
self.__tracked = tracked
|
|
self.__private = private
|
|
|
|
@property
|
|
def path(self):
|
|
"""Path property"""
|
|
return self.__path
|
|
|
|
@property
|
|
def relative(self):
|
|
"""Relative path property"""
|
|
if self.__parent:
|
|
return self.__parent.join(self.path)
|
|
raise BaseException('Unable to provide relative path, no parent')
|
|
|
|
@property
|
|
def tracked(self):
|
|
"""Tracked property"""
|
|
return self.__tracked
|
|
|
|
@property
|
|
def private(self):
|
|
"""Private property"""
|
|
return self.__private
|
|
|
|
def relative_to(self, parent):
|
|
"""Update all relative paths to this py.path"""
|
|
self.__parent = parent
|
|
|
|
|
|
class DataSet():
|
|
"""Dataset object"""
|
|
|
|
def __init__(self):
|
|
self.__files = list()
|
|
self.__dirs = list()
|
|
self.__tracked_dirs = list()
|
|
self.__private_dirs = list()
|
|
self.__relpath = None
|
|
|
|
def __repr__(self):
|
|
return (
|
|
f'[DS with {len(self)} files; '
|
|
f'{len(self.tracked)} tracked, '
|
|
f'{len(self.private)} private]'
|
|
)
|
|
|
|
def __iter__(self):
|
|
return iter(self.__files)
|
|
|
|
def __len__(self):
|
|
return len(self.__files)
|
|
|
|
def __contains__(self, datafile):
|
|
if [f for f in self.__files if f.path == datafile]:
|
|
return True
|
|
if datafile in self.__files:
|
|
return True
|
|
return False
|
|
|
|
@property
|
|
def files(self):
|
|
"""List of DataFiles in DataSet"""
|
|
return list(self.__files)
|
|
|
|
@property
|
|
def tracked(self):
|
|
"""List of tracked DataFiles in DataSet"""
|
|
return [f for f in self.__files if f.tracked]
|
|
|
|
@property
|
|
def private(self):
|
|
"""List of private DataFiles in DataSet"""
|
|
return [f for f in self.__files if f.private]
|
|
|
|
@property
|
|
def dirs(self):
|
|
"""List of directories in DataSet"""
|
|
return list(self.__dirs)
|
|
|
|
@property
|
|
def plain_dirs(self):
|
|
"""List of directories in DataSet not starting with '.'"""
|
|
return [d for d in self.dirs if not d.startswith('.')]
|
|
|
|
@property
|
|
def hidden_dirs(self):
|
|
"""List of directories in DataSet starting with '.'"""
|
|
return [d for d in self.dirs if d.startswith('.')]
|
|
|
|
@property
|
|
def tracked_dirs(self):
|
|
"""List of directories in DataSet not starting with '.'"""
|
|
return [d for d in self.__tracked_dirs if not d.startswith('.')]
|
|
|
|
@property
|
|
def private_dirs(self):
|
|
"""List of directories in DataSet considered 'private'"""
|
|
return list(self.__private_dirs)
|
|
|
|
def add_file(self, path, tracked=True, private=False):
|
|
"""Add file to data set"""
|
|
if path not in self:
|
|
datafile = DataFile(path, tracked, private)
|
|
if self.__relpath:
|
|
datafile.relative_to(self.__relpath)
|
|
self.__files.append(datafile)
|
|
|
|
dname = os.path.dirname(path)
|
|
if dname and dname not in self.__dirs:
|
|
self.__dirs.append(dname)
|
|
if tracked:
|
|
self.__tracked_dirs.append(dname)
|
|
if private:
|
|
self.__private_dirs.append(dname)
|
|
|
|
def relative_to(self, relpath):
|
|
"""Update all relative paths to this py.path"""
|
|
self.__relpath = relpath
|
|
for datafile in self.files:
|
|
datafile.relative_to(self.__relpath)
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def ds1_dset(tst_sys):
|
|
"""Meta-data for dataset one files"""
|
|
dset = DataSet()
|
|
dset.add_file('t1')
|
|
dset.add_file('d1/t2')
|
|
dset.add_file(f'test_alt_copy##os.{tst_sys}')
|
|
dset.add_file('u1', tracked=False)
|
|
dset.add_file('d2/u2', tracked=False)
|
|
dset.add_file('.ssh/p1', tracked=False, private=True)
|
|
dset.add_file('.ssh/.p2', tracked=False, private=True)
|
|
dset.add_file('.gnupg/p3', tracked=False, private=True)
|
|
dset.add_file('.gnupg/.p4', tracked=False, private=True)
|
|
return dset
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def ds1_data(tmpdir_factory, config_git, ds1_dset, runner):
|
|
"""A set of test data, worktree & repo"""
|
|
# pylint: disable=unused-argument
|
|
# This is ignored because
|
|
# @pytest.mark.usefixtures('config_git')
|
|
# cannot be applied to another fixture.
|
|
|
|
data = tmpdir_factory.mktemp('ds1')
|
|
|
|
work = data.mkdir('work')
|
|
for datafile in ds1_dset:
|
|
work.join(datafile.path).write(datafile.path, ensure=True)
|
|
|
|
repo = data.mkdir('repo.git')
|
|
env = os.environ.copy()
|
|
env['GIT_DIR'] = str(repo)
|
|
runner(
|
|
command=['git', 'init', '--shared=0600', '--bare', str(repo)],
|
|
report=False)
|
|
runner(
|
|
command=['git', 'config', 'core.bare', 'false'],
|
|
env=env,
|
|
report=False)
|
|
runner(
|
|
command=['git', 'config', 'status.showUntrackedFiles', 'no'],
|
|
env=env,
|
|
report=False)
|
|
runner(
|
|
command=['git', 'config', 'yadm.managed', 'true'],
|
|
env=env,
|
|
report=False)
|
|
runner(
|
|
command=['git', 'config', 'core.worktree', str(work)],
|
|
env=env,
|
|
report=False)
|
|
runner(
|
|
command=['git', 'add'] +
|
|
[str(work.join(f.path)) for f in ds1_dset if f.tracked],
|
|
env=env)
|
|
runner(
|
|
command=['git', 'commit', '--allow-empty', '-m', 'Initial commit'],
|
|
env=env,
|
|
report=False)
|
|
|
|
data = collections.namedtuple('Data', ['work', 'repo'])
|
|
return data(work, repo)
|
|
|
|
|
|
@pytest.fixture()
|
|
def ds1_work_copy(ds1_data, paths):
|
|
"""Function scoped copy of ds1_data.work"""
|
|
distutils.dir_util.copy_tree( # pylint: disable=no-member
|
|
str(ds1_data.work), str(paths.work))
|
|
|
|
|
|
@pytest.fixture()
|
|
def ds1_repo_copy(runner, ds1_data, paths):
|
|
"""Function scoped copy of ds1_data.repo"""
|
|
distutils.dir_util.copy_tree( # pylint: disable=no-member
|
|
str(ds1_data.repo), str(paths.repo))
|
|
env = os.environ.copy()
|
|
env['GIT_DIR'] = str(paths.repo)
|
|
runner(
|
|
command=['git', 'config', 'core.worktree', str(paths.work)],
|
|
env=env,
|
|
report=False)
|
|
|
|
|
|
@pytest.fixture()
|
|
def ds1_copy(ds1_work_copy, ds1_repo_copy):
|
|
"""Function scoped copy of ds1_data"""
|
|
# pylint: disable=unused-argument
|
|
# This is ignored because
|
|
# @pytest.mark.usefixtures('ds1_work_copy', 'ds1_repo_copy')
|
|
# cannot be applied to another fixture.
|
|
return None
|
|
|
|
|
|
@pytest.fixture()
|
|
def ds1(ds1_work_copy, paths, ds1_dset):
|
|
"""Function scoped ds1_dset w/paths"""
|
|
# pylint: disable=unused-argument
|
|
# This is ignored because
|
|
# @pytest.mark.usefixtures('ds1_copy')
|
|
# cannot be applied to another fixture.
|
|
dscopy = copy.deepcopy(ds1_dset)
|
|
dscopy.relative_to(copy.deepcopy(paths.work))
|
|
return dscopy
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def gnupg(tmpdir_factory, runner):
|
|
"""Location of GNUPGHOME"""
|
|
|
|
def register_gpg_password(password):
|
|
"""Publish a new GPG mock password"""
|
|
py.path.local('/tmp/mock-password').write(password)
|
|
|
|
home = tmpdir_factory.mktemp('gnupghome')
|
|
home.chmod(0o700)
|
|
conf = home.join('gpg.conf')
|
|
conf.write('no-secmem-warning\n')
|
|
conf.chmod(0o600)
|
|
agentconf = home.join('gpg-agent.conf')
|
|
agentconf.write(
|
|
f'pinentry-program {os.path.abspath("test/pinentry-mock")}\n'
|
|
'max-cache-ttl 0\n'
|
|
)
|
|
agentconf.chmod(0o600)
|
|
data = collections.namedtuple('GNUPG', ['home', 'pw'])
|
|
env = os.environ.copy()
|
|
env['GNUPGHOME'] = home
|
|
|
|
# this pre-populates std files in the GNUPGHOME
|
|
runner(['gpg', '-k'], env=env)
|
|
|
|
return data(home, register_gpg_password)
|