mirror of
1
0
Fork 0
This commit is contained in:
Kallen Ceryeei Celeste 2023-07-28 21:10:13 +03:00 committed by GitHub
commit aed346d631
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 274 additions and 3 deletions

View File

@ -181,6 +181,7 @@ mapped to extended configuration dictionaries.
| `create` | When true, create parent directories to the link as needed. (default: false) |
| `relink` | Removes the old target if it's a symlink (default: false) |
| `force` | Force removes the old target, file or folder, and forces a new link (default: false) |
| `backup-root` | In case of `force` or `relink` backup the old target to `backup-root` (default: no backup) |
| `relative` | Use a relative path to the source when creating the symlink (default: false, absolute links) |
| `canonicalize` | Resolve any symbolic links encountered in the source to symlink to the canonical path (default: true, real paths) |
| `if` | Execute this in your `$SHELL` and only link if it is successful. |

View File

@ -2,6 +2,7 @@ import glob
import os
import shutil
import sys
import datetime
from ..plugin import Plugin
from ..util import shell_command
@ -32,6 +33,7 @@ class Link(Plugin):
canonical_path = defaults.get("canonicalize", defaults.get("canonicalize-path", True))
force = defaults.get("force", False)
relink = defaults.get("relink", False)
backup_root = defaults.get("backup-root", None)
create = defaults.get("create", False)
use_glob = defaults.get("glob", False)
base_prefix = defaults.get("prefix", "")
@ -47,6 +49,7 @@ class Link(Plugin):
)
force = source.get("force", force)
relink = source.get("relink", relink)
backup_root = source.get("backup-root", backup_root)
create = source.get("create", create)
use_glob = source.get("glob", use_glob)
base_prefix = source.get("prefix", base_prefix)
@ -84,6 +87,7 @@ class Link(Plugin):
relative,
canonical_path,
force,
backup_root,
)
success &= self._link(
glob_full_item,
@ -106,7 +110,9 @@ class Link(Plugin):
self._log.warning("Nonexistent source %s -> %s" % (destination, path))
continue
if force or relink:
success &= self._delete(path, destination, relative, canonical_path, force)
success &= self._delete(
path, destination, relative, canonical_path, force, backup_root
)
success &= self._link(path, destination, relative, canonical_path, ignore_missing)
if success:
self._log.info("All links have been set up")
@ -120,6 +126,48 @@ class Link(Plugin):
self._log.debug("Test '%s' returned false" % command)
return ret == 0
def _backup(self, link_name, backup_root):
if not backup_root:
return False
success = True
source = os.path.expanduser(link_name)
timestamp = datetime.datetime.now().strftime(".%Y%m%dT%H%M%S.%f")
backup_root = os.path.expandvars(os.path.expanduser(backup_root))
backup_root = os.path.normpath(os.path.join(self._context.base_directory(), backup_root))
destination, ext = os.path.splitext(
os.path.normpath(os.path.splitdrive(source)[1]).strip("/")
)
destination = os.path.join(backup_root, destination + timestamp + ext)
success &= self._create(destination)
if os.path.islink(source):
try:
shutil.copy2(source, destination, follow_symlinks=False)
except OSError:
self._log.warning("Failed to backup symlink %s to %s" % (source, destination))
success = False
else:
self._log.lowinfo("Performed backup of symlink %s to %s" % (source, destination))
elif os.path.isdir(source):
try:
shutil.copytree(source, destination, symlinks=True)
except OSError:
self._log.warning("Failed to backup directory %s to %s" % (source, destination))
success = False
else:
self._log.lowinfo("performed backup of directory %s to %s" % (source, destination))
elif os.path.isfile(source):
try:
shutil.copy2(source, destination)
except OSError:
self._log.warning("Failed to backup file %s to %s" % (source, destination))
success = False
else:
self._log.lowinfo("Performed backup of file %s to %s" % (source, destination))
else:
self._log.warning("Failed to backup irregular file %s" % source)
success = False
return success
def _default_source(self, destination, source):
if source is None:
basename = os.path.basename(destination)
@ -203,7 +251,7 @@ class Link(Plugin):
self._log.lowinfo("Creating directory %s" % parent)
return success
def _delete(self, source, path, relative, canonical_path, force):
def _delete(self, source, path, relative, canonical_path, force, backup_root):
success = True
source = os.path.join(self._context.base_directory(canonical_path=canonical_path), source)
fullpath = os.path.abspath(os.path.expanduser(path))
@ -213,8 +261,13 @@ class Link(Plugin):
self._exists(path) and not self._is_link(path)
):
removed = False
if backup_root:
backed_up = self._backup(path, backup_root)
try:
if os.path.islink(fullpath):
if backup_root and not backed_up:
self._log.warning("Skipping removal of %s due to failed backup" % path)
success = False
elif os.path.islink(fullpath):
os.unlink(fullpath)
removed = True
elif force:

View File

@ -175,6 +175,149 @@ def test_link_force_overwrite_symlink(home, dotfiles, run_dotbot):
assert os.path.isfile(os.path.join(home, ".dir", "f"))
def test_link_force_overwrite_source(home, dotfiles, run_dotbot):
"""Verify force overwrites a symlinked directory."""
os.symlink(home, os.path.join(home, ".link"))
with open(os.path.join(home, "file"), "w") as file:
file.write("")
os.mkdir(os.path.join(home, "dir"))
dotfiles.write("dir/f")
config = [
{
"link": {
"~/.link": {"path": "dir", "force": True},
"~/file": {"path": "dir", "force": True},
"~/dir": {"path": "dir", "force": True},
}
}
]
dotfiles.write_config(config)
run_dotbot()
assert os.path.isfile(os.path.join(home, ".link", "f"))
assert os.path.isfile(os.path.join(home, "file", "f"))
assert os.path.isfile(os.path.join(home, "dir", "f"))
def test_backup_mirrors_absolute_path(home, dotfiles, run_dotbot):
"""Verify backup path from backup dir is same as original path from root."""
os.symlink(home, os.path.join(home, ".f"))
dotfiles.write("f")
backup_root = os.path.join(dotfiles.directory, "backup")
config = [{"link": {"~/.f": {"path": "f", "force": True, "backup-root": backup_root}}}]
dotfiles.write_config(config)
run_dotbot()
assert os.path.isdir(os.path.join(backup_root, home[1:]))
def test_link_force_backups_source(home, dotfiles, run_dotbot):
"""Verify force backups a symlinked directory."""
os.symlink(home, os.path.join(home, ".link"))
with open(os.path.join(home, "file"), "w") as file:
file.write("apple")
os.mkdir(os.path.join(home, "dir"))
dotfiles.write("dir/f", "peach")
backup_root = os.path.join(dotfiles.directory, "backup")
config = [
{
"link": {
"~/.link": {"path": "dir", "force": True, "backup-root": backup_root},
"~/file": {"path": "dir", "force": True, "backup-root": backup_root},
"~/dir": {"path": "dir", "force": True, "backup-root": backup_root},
}
}
]
dotfiles.write_config(config)
run_dotbot()
backup_home = os.path.join(backup_root, home[1:])
dir_items = os.listdir(backup_home)
test_items = (".link", "file", "dir")
backuped = {
test_item: dir_item
for dir_item in dir_items
for test_item in test_items
if test_item in dir_item
}
assert set(backuped.keys()) == set(test_items)
assert os.path.islink(os.path.join(backup_home, backuped[".link"]))
assert home in os.readlink(os.path.join(backup_home, backuped[".link"]))
assert os.path.isfile(os.path.join(backup_home, backuped["file"]))
with open(os.path.join(backup_home, backuped["file"])) as file:
assert file.read() == "apple"
assert os.path.isdir(os.path.join(backup_home, backuped["dir"]))
for link in test_items:
with open(os.path.join(home, link, "f")) as file:
assert file.read() == "peach"
def test_link_force_aborts_on_failed_backup(home, dotfiles, run_dotbot):
"""Verify force is aborted if backup fails."""
os.symlink(home, os.path.join(home, ".link"))
with open(os.path.join(home, "file"), "w") as file:
file.write("apple")
os.mkdir(os.path.join(home, "dir"))
dotfiles.write("backup")
dotfiles.write("dir/f")
backup_root = os.path.join(dotfiles.directory, "backup")
config = [
{
"link": {
"~/.link": {"path": "dir", "force": True, "backup-root": backup_root},
"~/file": {"path": "dir", "force": True, "backup-root": backup_root},
"~/dir": {"path": "dir", "force": True, "backup-root": backup_root},
}
}
]
dotfiles.write_config(config)
with pytest.raises(SystemExit):
run_dotbot()
assert os.path.islink(os.path.join(home, ".link"))
assert home in os.readlink(os.path.join(home, ".link"))
assert os.path.isfile(os.path.join(home, "file"))
with open(os.path.join(home, "file")) as file:
assert file.read() == "apple"
assert os.path.isdir(os.path.join(home, "dir"))
def test_no_backup_when_no_force_relink(home, dotfiles, run_dotbot):
"""Verify backup is a no-op without an accompanying force or relink."""
os.symlink(home, os.path.join(home, ".link"))
with open(os.path.join(home, "file"), "w") as file:
file.write("apple")
os.mkdir(os.path.join(home, "dir"))
dotfiles.write("dir/f")
backup_root = os.path.join(dotfiles.directory, "backup")
config = [
{
"link": {
"~/.link": {"path": "dir", "backup-root": backup_root},
"~/file": {"path": "dir", "backup-root": backup_root},
"~/dir": {"path": "dir", "backup-root": backup_root},
"~/.new": {"path": "dir", "backup-root": backup_root},
}
}
]
dotfiles.write_config(config)
with pytest.raises(SystemExit):
run_dotbot()
assert not os.path.exists(backup_root)
def test_link_glob_1(home, dotfiles, run_dotbot):
"""Verify globbing works."""
@ -909,6 +1052,80 @@ def test_link_relink_overwrite_symlink(home, dotfiles, run_dotbot):
assert file.read() == "apple"
def test_link_relink_backups_symlink(home, dotfiles, run_dotbot):
"""Verify relink backups symlinks."""
dotfiles.write("f", "apple")
with open(os.path.join(home, "f"), "w") as file:
file.write("grape")
os.symlink(os.path.join(home, "f"), os.path.join(home, ".f"))
backup_root = os.path.join(dotfiles.directory, "backup")
dotfiles.write_config(
[{"link": {"~/.f": {"path": "f", "relink": True, "backup-root": backup_root}}}]
)
run_dotbot()
backup_home = os.path.join(backup_root, home[1:])
assert os.path.isdir(backup_home)
dir_items = os.listdir(backup_home)
assert len(dir_items) == 1
backuped = os.path.join(backup_home, dir_items[0])
assert os.path.islink(backuped)
with open(backuped, "r") as file:
assert file.read() == "grape"
with open(os.path.join(home, ".f"), "r") as file:
assert file.read() == "apple"
def test_link_relink_aborts_on_failed_backup(home, dotfiles, run_dotbot):
"""Verify relink is aborted if backup fails."""
dotfiles.write("f", "apple")
with open(os.path.join(home, "f"), "w") as file:
file.write("grape")
os.symlink(os.path.join(home, "f"), os.path.join(home, ".f"))
dotfiles.write("backup")
backup_root = os.path.join(dotfiles.directory, "backup")
dotfiles.write_config(
[{"link": {"~/.f": {"path": "f", "relink": True, "backup-root": backup_root}}}]
)
with pytest.raises(SystemExit):
run_dotbot()
with open(os.path.join(home, ".f"), "r") as file:
assert file.read() == "grape"
def test_backup_is_sequentially_differetiated(home, dotfiles, run_dotbot):
"""Verify that backups do not overwrite each other and get sequentially different names."""
with open(os.path.join(home, "f"), "w") as file:
file.write("grape")
os.symlink(os.path.join(home, "f"), os.path.join(home, ".f"))
dotfiles.write("f", "apple")
dotfiles.write("dir/f", "peach")
backup_root = os.path.join(dotfiles.directory, "backup")
dotfiles.write_config(
[
{"link": {"~/.f": {"path": "f", "relink": True, "backup-root": backup_root}}},
{"link": {"~/.f": {"path": "dir/f", "relink": True, "backup-root": backup_root}}},
]
)
run_dotbot()
backup_home = os.path.join(backup_root, home[1:])
dir_items = sorted(os.listdir(backup_home))
with open(os.path.join(backup_home, dir_items[0])) as file:
assert file.read() == "grape"
with open(os.path.join(backup_home, dir_items[1])) as file:
assert file.read() == "apple"
with open(os.path.join(home, ".f"), "r") as file:
assert file.read() == "peach"
def test_link_relink_relative_leaves_file(home, dotfiles, run_dotbot):
"""Verify relink relative does not incorrectly relink file."""