diff --git a/test/test_encryption.py b/test/test_encryption.py index 3d10786..ec3b330 100644 --- a/test/test_encryption.py +++ b/test/test_encryption.py @@ -2,6 +2,7 @@ import os import pipes +import time import pytest KEY_FILE = 'test/test_key' @@ -13,26 +14,50 @@ PASSPHRASE = 'ExamplePassword' pytestmark = pytest.mark.usefixtures('config_git') -def add_asymmetric_key(): +def add_asymmetric_key(runner, gnupg): """Add asymmetric key""" - os.system(f'gpg --import {pipes.quote(KEY_FILE)}') - os.system(f'gpg --import-ownertrust < {pipes.quote(KEY_TRUST)}') + env = os.environ.copy() + env['GNUPGHOME'] = gnupg.home + runner( + ['gpg', '--import', pipes.quote(KEY_FILE)], + env=env, + shell=True, + ) + runner( + ['gpg', '--import-ownertrust', '<', pipes.quote(KEY_TRUST)], + env=env, + shell=True, + ) -def remove_asymmetric_key(): +def remove_asymmetric_key(runner, gnupg): """Remove asymmetric key""" - os.system( - f'gpg --batch --yes ' - f'--delete-secret-keys {pipes.quote(KEY_FINGERPRINT)}') - os.system(f'gpg --batch --yes --delete-key {pipes.quote(KEY_FINGERPRINT)}') + env = os.environ.copy() + env['GNUPGHOME'] = gnupg.home + runner( + [ + 'gpg', '--batch', '--yes', + '--delete-secret-keys', pipes.quote(KEY_FINGERPRINT) + ], + env=env, + shell=True, + ) + runner( + [ + 'gpg', '--batch', '--yes', + '--delete-key', pipes.quote(KEY_FINGERPRINT) + ], + env=env, + shell=True, + ) @pytest.fixture -def asymmetric_key(): +def asymmetric_key(runner, gnupg): """Fixture for asymmetric key, removed in teardown""" - add_asymmetric_key() + add_asymmetric_key(runner, gnupg) yield KEY_NAME - remove_asymmetric_key() + remove_asymmetric_key(runner, gnupg) @pytest.fixture @@ -95,7 +120,7 @@ def encrypt_targets(yadm_y, paths): @pytest.fixture(scope='session') -def decrypt_targets(tmpdir_factory, runner): +def decrypt_targets(tmpdir_factory, runner, gnupg): """Fixture for setting data to decrypt This fixture: @@ -117,17 +142,21 @@ def decrypt_targets(tmpdir_factory, runner): tmpdir.join('subdir/decrypt3').write('subdir/decrypt3') expected.append('subdir/decrypt3') + gnupg.pw(PASSPHRASE) + env = os.environ.copy() + env['GNUPGHOME'] = gnupg.home run = runner( ['tar', 'cvf', '-'] + expected + ['|', 'gpg', '--batch', '--yes', '-c'] + - ['--passphrase', pipes.quote(PASSPHRASE)] + ['--output', pipes.quote(str(symmetric))], cwd=tmpdir, + env=env, shell=True) assert run.success - add_asymmetric_key() + gnupg.pw('') + add_asymmetric_key(runner, gnupg) run = runner( ['tar', 'cvf', '-'] + expected + @@ -135,9 +164,10 @@ def decrypt_targets(tmpdir_factory, runner): ['-r', pipes.quote(KEY_NAME)] + ['--output', pipes.quote(str(asymmetric))], cwd=tmpdir, + env=env, shell=True) assert run.success - remove_asymmetric_key() + remove_asymmetric_key(runner, gnupg) return { 'asymmetric': asymmetric, @@ -147,8 +177,8 @@ def decrypt_targets(tmpdir_factory, runner): @pytest.mark.parametrize( - 'mismatched_phrase', [False, True], - ids=['matching_phrase', 'mismatched_phrase']) + 'bad_phrase', [False, True], + ids=['good_phrase', 'bad_phrase']) @pytest.mark.parametrize( 'missing_encrypt', [False, True], ids=['encrypt_exists', 'encrypt_missing']) @@ -157,25 +187,25 @@ def decrypt_targets(tmpdir_factory, runner): ids=['clean', 'overwrite']) def test_symmetric_encrypt( runner, yadm_y, paths, encrypt_targets, - overwrite, missing_encrypt, mismatched_phrase): + gnupg, bad_phrase, overwrite, missing_encrypt): """Test symmetric encryption""" if missing_encrypt: paths.encrypt.remove() - matched_phrase = PASSPHRASE - if mismatched_phrase: - matched_phrase = 'mismatched' + if bad_phrase: + gnupg.pw('') + else: + gnupg.pw(PASSPHRASE) if overwrite: paths.archive.write('existing archive') - run = runner(yadm_y('encrypt'), expect=[ - ('passphrase:', PASSPHRASE), - ('passphrase:', matched_phrase), - ]) + env = os.environ.copy() + env['GNUPGHOME'] = gnupg.home + run = runner(yadm_y('encrypt'), env=env) - if missing_encrypt or mismatched_phrase: + if missing_encrypt or bad_phrase: assert run.failure else: assert run.success @@ -183,15 +213,16 @@ def test_symmetric_encrypt( if missing_encrypt: assert 'does not exist' in run.out - elif mismatched_phrase: - assert 'invalid passphrase' in run.out + elif bad_phrase: + assert 'Invalid passphrase' in run.err else: - assert encrypted_data_valid(runner, paths.archive, encrypt_targets) + assert encrypted_data_valid( + runner, gnupg, paths.archive, encrypt_targets) @pytest.mark.parametrize( - 'wrong_phrase', [False, True], - ids=['correct_phrase', 'wrong_phrase']) + 'bad_phrase', [False, True], + ids=['good_phrase', 'bad_phrase']) @pytest.mark.parametrize( 'archive_exists', [True, False], ids=['archive_exists', 'archive_missing']) @@ -199,16 +230,18 @@ def test_symmetric_encrypt( 'dolist', [False, True], ids=['decrypt', 'list']) def test_symmetric_decrypt( - runner, yadm_y, paths, decrypt_targets, - dolist, archive_exists, wrong_phrase): + runner, yadm_y, paths, decrypt_targets, gnupg, + dolist, archive_exists, bad_phrase): """Test decryption""" # init empty yadm repo os.system(' '.join(yadm_y('init', '-w', str(paths.work), '-f'))) - phrase = PASSPHRASE - if wrong_phrase: - phrase = 'wrong-phrase' + if bad_phrase: + gnupg.pw('') + time.sleep(1) # allow gpg-agent cache to expire + else: + gnupg.pw(PASSPHRASE) if archive_exists: decrypt_targets['symmetric'].copy(paths.archive) @@ -216,15 +249,18 @@ def test_symmetric_decrypt( # to test overwriting paths.work.join('decrypt1').write('pre-existing file') + env = os.environ.copy() + env['GNUPGHOME'] = gnupg.home + args = [] if dolist: args.append('-l') - run = runner(yadm_y('decrypt') + args, expect=[('passphrase:', phrase)]) + run = runner(yadm_y('decrypt') + args, env=env) - if archive_exists and not wrong_phrase: + if archive_exists and not bad_phrase: assert run.success - assert run.err == '' + assert 'encrypted with 1 passphrase' in run.err if dolist: for filename in decrypt_targets['expected']: if filename != 'decrypt1': # this one should exist @@ -248,7 +284,7 @@ def test_symmetric_decrypt( 'overwrite', [False, True], ids=['clean', 'overwrite']) def test_asymmetric_encrypt( - runner, yadm_y, paths, encrypt_targets, + runner, yadm_y, paths, encrypt_targets, gnupg, overwrite, key_exists, ask): """Test asymmetric encryption""" @@ -264,13 +300,17 @@ def test_asymmetric_encrypt( paths.archive.write('existing archive') if not key_exists: - remove_asymmetric_key() + remove_asymmetric_key(runner, gnupg) - run = runner(yadm_y('encrypt'), expect=expect) + env = os.environ.copy() + env['GNUPGHOME'] = gnupg.home + + run = runner(yadm_y('encrypt'), env=env, expect=expect) if key_exists: assert run.success - assert encrypted_data_valid(runner, paths.archive, encrypt_targets) + assert encrypted_data_valid( + runner, gnupg, paths.archive, encrypt_targets) else: assert run.failure assert 'Unable to write' in run.out @@ -287,7 +327,7 @@ def test_asymmetric_encrypt( 'dolist', [False, True], ids=['decrypt', 'list']) def test_asymmetric_decrypt( - runner, yadm_y, paths, decrypt_targets, + runner, yadm_y, paths, decrypt_targets, gnupg, dolist, key_exists): """Test decryption""" @@ -300,13 +340,15 @@ def test_asymmetric_decrypt( paths.work.join('decrypt1').write('pre-existing file') if not key_exists: - remove_asymmetric_key() + remove_asymmetric_key(runner, gnupg) args = [] if dolist: args.append('-l') - run = runner(yadm_y('decrypt') + args) + env = os.environ.copy() + env['GNUPGHOME'] = gnupg.home + run = runner(yadm_y('decrypt') + args, env=env) if key_exists: assert run.success @@ -327,7 +369,8 @@ def test_asymmetric_decrypt( 'untracked', [False, 'y', 'n'], ids=['tracked', 'untracked_answer_y', 'untracked_answer_n']) -def test_offer_to_add(runner, yadm_y, paths, encrypt_targets, untracked): +def test_offer_to_add( + runner, yadm_y, paths, encrypt_targets, gnupg, untracked): """Test offer to add encrypted archive All the other encryption tests use an archive outside of the work tree. @@ -336,10 +379,12 @@ def test_offer_to_add(runner, yadm_y, paths, encrypt_targets, untracked): """ worktree_archive = paths.work.join('worktree-archive.tar.gpg') - expect = [ - ('passphrase:', PASSPHRASE), - ('passphrase:', PASSPHRASE), - ] + + expect = [] + + gnupg.pw(PASSPHRASE) + env = os.environ.copy() + env['GNUPGHOME'] = gnupg.home if untracked: expect.append(('add it now', untracked)) @@ -349,12 +394,14 @@ def test_offer_to_add(runner, yadm_y, paths, encrypt_targets, untracked): run = runner( yadm_y('encrypt', '--yadm-archive', str(worktree_archive)), + env=env, expect=expect ) assert run.success assert run.err == '' - assert encrypted_data_valid(runner, worktree_archive, encrypt_targets) + assert encrypted_data_valid( + runner, gnupg, worktree_archive, encrypt_targets) run = runner( yadm_y('status', '--porcelain', '-uall', str(worktree_archive))) @@ -372,22 +419,20 @@ def test_offer_to_add(runner, yadm_y, paths, encrypt_targets, untracked): assert f'AM {worktree_archive.basename}' in run.out -def test_encrypt_added_to_exclude(runner, yadm_y, paths): +@pytest.mark.usefixtures('ds1_copy') +def test_encrypt_added_to_exclude(runner, yadm_y, paths, gnupg): """Confirm that .config/yadm/encrypt is added to exclude""" - expect = [ - ('passphrase:', PASSPHRASE), - ('passphrase:', PASSPHRASE), - ] + gnupg.pw(PASSPHRASE) + env = os.environ.copy() + env['GNUPGHOME'] = gnupg.home exclude_file = paths.repo.join('info/exclude') paths.encrypt.write('test-encrypt-data\n') + paths.work.join('test-encrypt-data').write('') exclude_file.write('original-data', ensure=True) - run = runner( - yadm_y('encrypt'), - expect=expect, - ) + run = runner(yadm_y('encrypt'), env=env) assert 'test-encrypt-data' in paths.repo.join('info/exclude').read() assert 'original-data' in paths.repo.join('info/exclude').read() @@ -395,14 +440,16 @@ def test_encrypt_added_to_exclude(runner, yadm_y, paths): assert run.err == '' -def encrypted_data_valid(runner, encrypted, expected): +def encrypted_data_valid(runner, gnupg, encrypted, expected): """Verify encrypted data matches expectations""" + gnupg.pw(PASSPHRASE) + env = os.environ.copy() + env['GNUPGHOME'] = gnupg.home run = runner([ 'gpg', - '--passphrase', pipes.quote(PASSPHRASE), '-d', pipes.quote(str(encrypted)), '2>/dev/null', - '|', 'tar', 't'], shell=True, report=False) + '|', 'tar', 't'], env=env, shell=True, report=False) file_count = 0 for filename in run.out.splitlines(): if filename.endswith('/'):