drive-ops/docs/setup_cosmos_modules.eduid.example

301 lines
11 KiB
Plaintext
Raw Normal View History

2023-02-02 17:32:18 +00:00
#!/usr/bin/env python3
#
# This script is responsible for creating/updating /etc/puppet/cosmos-modules.conf.
#
# If this script exits without creating that file, a default list of modules will be
# selected (by post-tasks.d/010cosmos-modules, the script that invokes this script).
#
# NOTES ABOUT THE IMPLEMENTATION:
#
# - Avoid any third party modules. We want this script to be re-usable in all ops-repos.
# - To make merging easier, try to keep all local alterations in the local_* functions.
# - Format with black and isort. Line width 120.
# - You probably ONLY want to change things in the local_get_modules_hook() function.
#
import argparse
import csv
import json
import logging
import logging.handlers
import os
import re
import socket
import sys
from pathlib import Path
from typing import Dict, NewType, Optional, cast
from pkg_resources import parse_version
logger = logging.getLogger(__name__) # will be overwritten by _setup_logging()
# Set up types for data that is passed around in functions in this script.
# Need to use Dict (not dict) here since these aren't stripped by strip-hints, and doesn't work on Ubuntu <= 20.04.
Arguments = NewType("Arguments", argparse.Namespace)
OSInfo = Dict[str, str]
HostInfo = Dict[str, Optional[str]]
Modules = Dict[str, Dict[str, str]]
def parse_args() -> Arguments:
"""
Parse the command line arguments
"""
parser = argparse.ArgumentParser(
description="Setup cosmos-modules.conf",
add_help=True,
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("--debug", dest="debug", action="store_true", default=False, help="Enable debug operation")
parser.add_argument(
"--filename", dest="filename", type=str, default="/etc/puppet/cosmos-modules.conf", help="Filename to write to"
)
return cast(Arguments, parser.parse_args())
def get_os_info() -> OSInfo:
"""Load info about the current OS (distro, release etc.)"""
os_info: OSInfo = {}
if Path("/etc/os-release").exists():
os_info.update({k.lower(): v for k, v in _parse_bash_vars("/etc/os-release").items()})
res = local_os_info_hook(os_info)
logger.debug(f"OS info:\n{json.dumps(res, sort_keys=True, indent=4)}")
return res
def get_host_info() -> HostInfo:
"""Load info about the current host (hostname, fqdn, domain name etc.)"""
try:
fqdn = socket.getfqdn()
hostname = socket.gethostname()
except OSError:
host_info = {}
else:
_domainname = fqdn[len(hostname + ".") :]
host_info: HostInfo = {
"domainname": _domainname,
"fqdn": fqdn,
"hostname": hostname,
}
res = local_host_info_hook(host_info)
logger.debug(f"Host info: {json.dumps(res, sort_keys=True, indent=4)}")
return res
def _parse_bash_vars(path: str) -> dict[str, str]:
"""
Parses a bash script and returns a dictionary representing the
variables declared in that script.
Source: https://dev.to/htv2012/how-to-parse-bash-variables-b4f
:param path: The path to the bash script
:return: Variables as a dictionary
"""
with open(path) as stream:
contents = stream.read().strip()
var_declarations = re.findall(r"^[a-zA-Z0-9_]+=.*$", contents, flags=re.MULTILINE)
reader = csv.reader(var_declarations, delimiter="=")
bash_vars = dict(reader)
return bash_vars
def get_modules(os_info: OSInfo, host_info: HostInfo) -> Modules:
"""Load the list of default modules.
This is more or less an inventory of all the modules we have. If you don't want
to use all modules in your OPS repo, you can filter them in the local hook.
If you want to use a different tag for a module on a specific host/os, you can
do that in the local hook as well.
"""
default_modules = """
# name repo upgrade tag
apparmor https://github.com/SUNET/puppet-apparmor.git yes sunet-2*
apt https://github.com/SUNET/puppetlabs-apt.git yes sunet-2*
augeas https://github.com/SUNET/puppet-augeas.git yes sunet-2*
bastion https://github.com/SUNET/puppet-bastion.git yes sunet-2*
concat https://github.com/SUNET/puppetlabs-concat.git yes sunet-2*
cosmos https://github.com/SUNET/puppet-cosmos.git yes sunet-2*
dhcp https://github.com/SUNET/puppetlabs-dhcp.git yes sunet_dev-2*
docker https://github.com/SUNET/garethr-docker.git yes sunet-2*
hiera-gpg https://github.com/SUNET/hiera-gpg.git yes sunet-2*
munin https://github.com/SUNET/ssm-munin.git yes sunet-2*
nagioscfg https://github.com/SUNET/puppet-nagioscfg.git yes sunet-2*
network https://github.com/SUNET/attachmentgenie-network.git yes sunet-2*
pound https://github.com/SUNET/puppet-pound.git yes sunet-2*
pyff https://github.com/samlbits/puppet-pyff.git yes puppet-pyff-*
python https://github.com/SUNET/puppet-python.git yes sunet-2*
stdlib https://github.com/SUNET/puppetlabs-stdlib.git yes sunet-2*
sunet https://github.com/SUNET/puppet-sunet.git yes sunet-2*
sysctl https://github.com/SUNET/puppet-sysctl.git yes sunet-2*
ufw https://github.com/SUNET/puppet-module-ufw.git yes sunet-2*
varnish https://github.com/samlbits/puppet-varnish.git yes puppet-varnish-*
vcsrepo https://github.com/SUNET/puppetlabs-vcsrepo.git yes sunet-2*
xinetd https://github.com/SUNET/puppetlabs-xinetd.git yes sunet-2*
"""
modules: Modules = {}
for line in default_modules.splitlines():
try:
if not line.strip() or line.strip().startswith("#"):
continue
_name, _url, _upgrade, _tag = line.split()
modules[_name] = {
"repo": _url,
"upgrade": _upgrade,
"tag": _tag,
}
except ValueError:
logger.error(f"Failed to parse line: {repr(line)}")
raise
# Remove the UFW module on Ubuntu >= 22.04 (nftables is used there instead)
if os_info.get("name") == "Ubuntu":
ver = os_info.get("version_id")
if ver:
if parse_version(ver) >= parse_version("22.04"):
logger.debug("Removing UFW module for Ubuntu >= 22.04")
del modules["ufw"]
else:
logger.debug("Keeping UFW module for Ubuntu < 22.04")
else:
logger.debug("Unknown Ubuntu module version, keeping UFW module")
return local_get_modules_hook(os_info, host_info, modules)
def local_os_info_hook(os_info: OSInfo) -> OSInfo:
"""Local hook to modify os_info in an OPS repo."""
# Start local changes in this repository
# End local changes
return os_info
def local_host_info_hook(host_info: HostInfo) -> HostInfo:
"""Local hook to modify host_info in an OPS repo."""
# Start local changes in this repository
# Regular expression to tease apart an eduID hostname
hostname_re = re.compile(
r"""^
(\w+) # function ('idp', 'apps', ...)
-
(\w+) # site ('tug', 'sthb', ...)
-
(\d+) # 1 for staging, 3 for production
""",
re.VERBOSE,
)
_hostname = host_info.get("hostname")
if _hostname:
m = hostname_re.match(_hostname)
if m:
_function, _site, _num = m.groups()
host_info["function"] = _function
host_info["site"] = _site
if _num == "1":
host_info["environment"] = "staging"
# End local changes
return host_info
def local_get_modules_hook(os_info: OSInfo, host_info: HostInfo, modules: Modules) -> Modules:
"""Local hook to modify default set of modules in an OPS repo."""
# Start local changes in this repository
_eduid_modules = {
"apparmor",
"apt",
"augeas",
"bastion",
"concat",
"docker",
"munin",
"stdlib",
"sunet",
"ufw",
}
# Only keep the modules eduID actually uses
modules = {k: v for k, v in modules.items() if k in _eduid_modules}
logger.debug(f"Adding modules: {json.dumps(modules, sort_keys=True, indent=4)}")
# Use eduID tag for puppet-sunet
modules["sunet"]["tag"] = "eduid-stable-2*"
if host_info.get("environment") == "staging":
modules["sunet"]["tag"] = "eduid_dev-2*"
# use sunet_dev-2* for some modules in staging
for dev_module in ["munin"]:
if host_info.get("environment") == "staging" and dev_module in modules:
modules[dev_module]["tag"] = "sunet_dev-2*"
# End local changes
return modules
def update_cosmos_modules(filename: str, modules: Modules) -> None:
"""Create/update the cosmos-modules.conf file.
First, we check if the file already have the right content. If so, we do nothing.
"""
content = "# This file is automatically generated by the setup_cosmos_modules script.\n# Do not edit it manually.\n"
for k, v in sorted(modules.items()):
content += f"{k:15} {v['repo']:55} {v['upgrade']:5} {v['tag']}\n"
_file = Path(filename)
if _file.exists():
# Check if the content is already correct, and avoid updating the file if so (so that the timestamp
# of the file at least indicates when the content was last updated)
with _file.open("r") as f:
current = f.read()
if current == content:
logger.debug(f"{filename} is up to date")
return
# Create/update the file by writing the content to a temporary file and then renaming it
_tmp_file = _file.with_suffix(".tmp")
with _tmp_file.open("w") as f:
f.write(content)
_tmp_file.rename(_file)
logger.debug(f"Updated {filename}")
def _setup_logging(my_name: str, args: Arguments):
level = logging.INFO
if args.debug:
level = logging.DEBUG
logging.basicConfig(level=level, stream=sys.stderr, format="{asctime} | {levelname:7} | {message}", style="{")
global logger
logger = logging.getLogger(my_name)
# If stderr is not a TTY, change the log level of the StreamHandler (stream = sys.stderr above) to ERROR
if not sys.stderr.isatty() and not args.debug:
for this_h in logging.getLogger("").handlers:
this_h.setLevel(logging.ERROR)
if args.debug:
logger.setLevel(logging.DEBUG)
def main(my_name: str, args: Arguments) -> bool:
_setup_logging(my_name, args)
os_info = get_os_info()
host_info = get_host_info()
modules = get_modules(os_info, host_info)
update_cosmos_modules(args.filename, modules)
return True
if __name__ == "__main__":
my_name = os.path.basename(sys.argv[0])
args = parse_args()
res = main(my_name, args=args)
if res:
sys.exit(0)
sys.exit(1)