Added keepassxc-unlocker script

This commit is contained in:
Eric Renfro 2024-11-25 13:48:06 -05:00
parent 9c8f16e6eb
commit 91cbe4d5a1
Signed by: psi-jack
SSH key fingerprint: SHA256:1TKB8Z257L8EHK8GWNxKgMhD8a+FAR+f+j3nnlcuNVM

383
keepassxc-unlocker Executable file
View file

@ -0,0 +1,383 @@
#!/usr/bin/python3
import sys
import dbus
import dbus.mainloop.glib
from gi.repository import GLib
import logging
import keyring
import getpass
import configparser
import os
import psutil
import threading
import time
# Set up logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
# Global variable to track the last known lock state
last_state = None
keepassxc_running = False
# Global variable for config file
config_file = os.path.join(
os.environ.get('XDG_CONFIG_HOME', os.path.expanduser('~/.config')),
'keepassxc-unlockerrc'
)
def get_process_by_name(name):
"""
Finds a process by its name and returns its Process object.
Args:
name: The name of the process to search for.
Returns:
A psutil.Process object representing the found process,
or None if not found.
"""
for process in psutil.process_iter():
try:
if process.name() == name:
return process
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass # Ignore non-existent processes
return None
def on_signal_received(is_active):
"""
Callback for handling the ActiveChanged signal.
:param is_active: Boolean indicating session lock status (True = locked, False = unlocked).
"""
global last_state
if last_state == is_active:
# Ignore redundant signals
return
if keepassxc_running == False:
last_state = False
return
last_state = is_active
if is_active:
logging.info("Session is locked.")
# Add your custom code for session lock here
else:
logging.info("Session is unlocked.")
# Add your custom code for session unlock here
unlock()
return
def monitor_process(process_name):
global keepassxc_running
process_pid = 0
logging.info(f"Monitoring process: {process_name}...")
while True:
current_process = get_process_by_name(process_name)
if current_process:
if process_pid != current_process.pid:
process_pid = current_process.pid
logging.info(f"KeePassXC process found with PID: {process_pid}")
keepassxc_running = True
unlock()
else:
logging.warning(f"KeePassXC process not found")
process_pid = 0
keepassxc_running = False
time.sleep(5) # Adjust the interval as needed
def watch():
"""
Watches for screensaver lock/unlock events and logs them.
"""
config = load_config()
process_name = config['monitor']['process']
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
session_bus = dbus.SessionBus()
interfaces_to_monitor = [
("org.gnome.ScreenSaver", "/org/gnome/ScreenSaver"), # GNOME
("org.freedesktop.ScreenSaver", "/org/freedesktop/ScreenSaver"), # KDE, XFCE
]
found_interface = False
for interface, path in interfaces_to_monitor:
try:
proxy = session_bus.get_object(interface, path)
iface = dbus.Interface(proxy, dbus_interface=interface)
session_bus.add_signal_receiver(
on_signal_received,
dbus_interface=interface,
signal_name="ActiveChanged",
)
logging.info(f"Subscribed to {interface} at {path}")
found_interface = True
except dbus.exceptions.DBusException as e:
pass
#logging.warning(f"Could not subscribe to {interface} at {path}: {e}")
if not found_interface:
logging.error("No suitable interfaces were found. Exiting...")
return
# Start the process monitoring thread
monitor_thread = threading.Thread(target=monitor_process, args=(process_name,))
monitor_thread.daemon = True
monitor_thread.start()
logging.info("Listening for screensaver events...")
loop = GLib.MainLoop()
try:
loop.run()
except KeyboardInterrupt:
logging.info("Exiting...")
loop.quit()
def load_config():
"""
Loads configuration from the specified file.
:return: A configparser object containing the loaded configuration.
"""
config = configparser.ConfigParser()
config.optionxform = str
try:
config.read(config_file)
if not config.has_section('databases'):
config.add_section('databases')
if not config.has_section('monitor'):
config.add_section('monitor')
config['monitor']['process'] = "keepassxc"
config['monitor']['service'] = "keepassxc-unlocker"
if not config.has_option('monitor', 'process'):
config['monitor']['process'] = "keepassxc"
if not config.has_option('monitor', 'service'):
config['monitor']['service'] = "keepassxc-unlocker"
return config
except FileNotFoundError:
logging.warning(f"Config file not found: {config_file}")
return None
def save_config(config):
"""
Saves configuration to the specified file.
:param config_file: Path to the configuration file.
"""
try:
with open(config_file, 'w') as f:
config.write(f)
#logging.info(f"Wrote configuration file: {config_file}")
except Exception as e:
logging.error(f"Failed to save to configuration: {e} {config_file}")
def unlock_database(database, password, interactive=True):
"""
Sends a DBus message to KeePassXC to unlock a database with a password.
:param database: Path to the KeePassXC database file.
:param password: Password for the database.
"""
try:
session_bus = dbus.SessionBus()
# Get the proxy object for KeePassXC
keepassxc_object = session_bus.get_object('org.keepassxc.KeePassXC.MainWindow', '/keepassxc')
# Define the method signature ('ss' for two strings)
signature = 'ss'
# Call the method 'openDatabase' with two string arguments
keepassxc_object.openDatabase(database, password, signature=signature)
if interactive:
print(f"Successfully sent request to unlock database: {database}")
else:
logging.info(f"Successfully sent request to unlock database: {database}")
except dbus.exceptions.DBusException as e:
if interactive:
print(f"Failed to send DBus message to KeePassXC: {e}")
else:
logging.error(f"Failed to send DBus message to KeePassXC: {e}")
def unlock():
"""
Loops through all password entries in the keyring under the 'keepassxc-unlocker' service,
treating the username as the database path and the password as the database password.
"""
try:
config = load_config()
if config is None:
logging.warning("No configuration found, skipping unlock.")
return
service_name = config['monitor']['service']
except Exception as e:
#print(f"Failed to load configuration: {e}")
logging.error(f"Failed to load configuration: {e}")
for database, enabled in config['databases'].items():
if enabled.lower() != "enabled":
continue
try:
# This method should return one credential at a time, so we handle it accordingly.
credentials = keyring.get_credential(service_name, database)
if credentials is None:
logging.warning("No credentials found in the keyring for KeePassXC unlocker.")
return
if credentials.username != database:
logging.info(f"Skipping due to mismatch: {database} vs {credentials.username}")
continue
# Since `get_credential` only returns a single `SimpleCredential`, we access it directly
#database_path = str(credentials.username) # Username is the database path
password = str(credentials.password) # Password is the associated password
if not password:
logging.warning(f"No password found for database: {database}")
return
logging.info(f"Attempting to unlock database: {database}")
unlock_database(database, password, False)
except Exception as e:
logging.error(f"Failed to unlock databases: {e}")
def add(database, config_file=None):
"""
Adds a new entry to the 'keepassxc-unlocker' service in the keyring, prompting for the password.
:param database: Path to the KeePassXC database file.
:param config_file: Path to the configuration file (defaults to XDG_CONFIG_HOME/keepassxc-unlockerrc)
"""
try:
config = load_config()
service_name = config['monitor']['service']
except Exception as e:
print(f"Failed to load configuration: {e}")
#logging.error(f"Failed to load configuration: {e}")
try:
#password = getpass.getpass(f"Enter password for database '{database}': ")
password = getpass.getpass("Password: ")
keyring.set_password(service_name, database, password)
print(f"Added entry to keyring for database: {database}")
#logging.info(f"Added entry to keyring for database: {database}")
except KeyboardInterrupt:
try:
sys.exit(130)
except SystemExit:
os._exit(130)
except Exception as e:
print(f"Failed to add entry to keyring: {e}")
#logging.error(f"Failed to add entry to keyring: {e}")
try:
config['databases'][database] = 'enabled'
save_config(config)
except Exception as e:
print(f"Failed to save configuration: {e}")
#logging.error(f"Failed to save to configuration: {e} {config_file}")
def remove(database):
"""
Removes an entry for a database from the keyring and the configuration file.
Args:
database: Path to the KeePassXC database file.
"""
try:
config = load_config()
service_name = config['monitor']['service']
except Exception as e:
print(f"Failed to load configuration: {e}")
#logging.error(f"Failed to load configuration: {e}")
#config = load_config()
#service_name = config['monitor']['service']
#try:
# config = load_config()
# if config is None:
# logging.warning("No configuration found, skipping unlock.")
# return
#
# service_name = config['monitor']['service']
#
#except Exception as e:
# print(f"Failed to load configuration: {e}")
# #logging.error(f"Failed to load configuration: {e}")
try:
# Try to delete the entry from the keyring
keyring.delete_password(service_name, database)
#logging.info(f"Removed entry from keyring for database: {database}")
print("Removed entry from keyring")
except keyring.errors.PasswordDeleteError:
#logging.warning(f"No entry found in keyring for database: {database}")
print("No entry found in keyring for database")
# Remove the entry from the configuration file
if config is None:
return
elif database in config['databases']:
del config['databases'][database]
save_config(config)
print("Removed database from configuration")
#logging.info(f"Removed database configuration: {database}")
def help():
"""
Prints the help message.
"""
print("Usage: python script.py <command> [arguments]")
print("Commands:")
print(" add <db> - Add an entry to the keyring")
print(" remove <db> - Remove an entry from the keyring")
print(" unlock - Unlock all KeePassXC databases from keyring")
print(" watch - Monitor screensaver lock/unlock events")
print(" help - Show this help message")
def main():
"""
Main entry point for the script.
"""
if len(sys.argv) < 2:
help()
sys.exit(1)
command = sys.argv[1]
if command == "add" and len(sys.argv) == 3:
database = sys.argv[2]
add(database)
elif command == "remove" and len(sys.argv) == 3:
database = sys.argv[2]
remove(database)
elif command == "unlock":
unlock()
elif command == "watch":
watch()
#elif command == "unlock-database" and len(sys.argv) == 4:
# database = sys.argv[2]
# password = sys.argv[3]
# unlock_database(database, password)
else:
print(f"Unknown or invalid command: {command}")
help()
sys.exit(1)
if __name__ == "__main__":
main()