#!/usr/bin/env python3 # pylint: disable=invalid-name # pylint: enable=invalid-name """ Tool for taking and releasing fleetlock locks, used by run-cosmos if fleetlock is configured """ # # You need a config file in "configparser" format with a section for the # lock group you are using, so if the file describes two lock groups where one # is called "fl-test1" and the other "fl-test2" then example contents would # look like this: # === # [fl-test1] # server = https://fleetlock-server1.example.com # password = mysecret1 # # [fl-test2] # server = https://fleetlock-server2.example.com # password = mysecret2 # === # # The password needs to match an acl configured for the lock group in the # knubbis-fleetlock service. # # When modifying this code please make sure it is passed through the following # tools: # === # isort # black # pylint # mypy --strict # === import argparse import configparser import os.path import platform import signal import sys import time from types import FrameType from typing import Optional, Union import requests class TimeoutException(Exception): """Exception raised when we hit tool timeout""" def timeout_handler(signum: int, frame: Optional[FrameType]) -> None: """This is called if the tool takes too long to run""" raise TimeoutException(f"{os.path.basename(sys.argv[0])} hit --timeout limit") def do_fleetlock_request( config: configparser.ConfigParser, args: argparse.Namespace, operation: str ) -> bool: """Perform fleetlock request based on given operation and return true if it succeeded""" fleetlock_data = { "client_params": { "group": args.lock_group, "id": args.lock_id, }, } fleetlock_headers = { "fleet-lock-protocol": "true", } if operation == "lock": fleetlock_path = "/v1/pre-reboot" url = config[args.lock_group]["server"] + fleetlock_path elif operation == "unlock": fleetlock_path = "/v1/steady-state" url = config[args.lock_group]["server"] + fleetlock_path else: raise ValueError(f"unsupported operation: {operation}") # Log the request-id header from responses so we can track requests in # the knubbis-fleetlock logs more easily request_id_key = "request-id" request_id = None retry_sleep_delay = 1 # Loop forever: we depend on the SIGALRM timout to raise an error if it # takes too long while True: if args.verbose: print(f"{operation} POST at url {url}") try: resp = requests.post( url, headers=fleetlock_headers, json=fleetlock_data, timeout=args.request_timeout, auth=("", config[args.lock_group]["password"]), ) except Exception as e: # pylint: disable=broad-exception-caught if isinstance(e, TimeoutException): # This means our global timer is up, no more time to retry raise e print(f"POST request failed: {e}") time.sleep(retry_sleep_delay) continue if request_id_key in resp.headers: request_id = resp.headers[request_id_key] if resp.status_code == requests.codes.ok: # pylint: disable=no-member if args.verbose: print( f"successful {operation} request for lock ID '{args.lock_id}'", f"in lock group '{args.lock_group}' ({request_id_key}: {request_id})", ) return True # If the request is unauthorized this means we probably either try to # use a lock group that does not exist, or we are using the wrong # credentials and in either case we can give up immediately if resp.status_code == requests.codes.unauthorized: # pylint: disable=no-member print( f"{operation} request unauthorized: incorrect lock group name '{args.lock_group}'", f"or wrong credentials? ({request_id_key}: {request_id})", ) return False # If the request failed in some other way we expect a JSON formatted # response message: print( f"{operation} request failed:" + " " + resp.content.decode("utf-8").rstrip() + " " + f"({request_id_key}: {request_id})" ) time.sleep(retry_sleep_delay) def read_config(args: argparse.Namespace) -> Union[configparser.ConfigParser, None]: """Read lock group specific settings from config file""" config = configparser.ConfigParser() with open(args.config, encoding="utf-8") as config_fileobj: config.read_file(config_fileobj) if args.lock_group not in config: print(f"missing required config section for lock group '{args.lock_group}'") return None required_settings = { "server", "password", } have_required_settings = True for setting in required_settings: if setting not in config[args.lock_group]: print( f"missing required setting '{setting}' in lock group '{args.lock_group}'" ) have_required_settings = False if not have_required_settings: return None return config def main() -> None: """Starting point of the program""" # How long to wait per HTTP request to fleetlock service default_request_timeout = 5 # How to long before giving up and exiting the tool with a failure default_timeout = 60 default_config_file = "/etc/sunet-fleetlock/sunet-fleetlock.conf" parser = argparse.ArgumentParser(description="Take and release fleetlock lock.") parser.add_argument("--verbose", help="print more information", action="store_true") parser.add_argument( "--config", help=f"the conf file to read (default: {default_config_file})", default=default_config_file, ) parser.add_argument( "--lock-group", required=True, help="the group to take a lock in" ) parser.add_argument( "--lock-id", help=f"the lock ID to use in the group (default: {platform.node()})", default=platform.node(), ) parser.add_argument( "--timeout", type=int, help=f"how many seconds before giving up and exiting tool (default: {default_timeout}s)", default=default_timeout, ) parser.add_argument( "--request_timeout", type=int, help=f"individal fleetlock HTTP request timeout (default: {default_request_timeout}s)", default=default_request_timeout, ) action_group = parser.add_mutually_exclusive_group(required=True) action_group.add_argument("--lock", action="store_true", help="lock a reboot slot") action_group.add_argument( "--unlock", action="store_true", help="unlock a reboot slot" ) args = parser.parse_args() config = read_config(args) if config is None: sys.exit(1) # Give up if tool has been running for more than --timeout seconds: signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(args.timeout) if args.lock: locked = False try: locked = do_fleetlock_request(config, args, "lock") except TimeoutException as exc: print(exc) if locked: sys.exit(0) if args.unlock: unlocked = False try: unlocked = do_fleetlock_request(config, args, "unlock") except TimeoutException as exc: print(exc) if unlocked: sys.exit(0) sys.exit(1) if __name__ == "__main__": main()