#!/usr/bin/env python3 # pylint:disable=invalid-name # pylint:enable=invalid-name # pylint:disable=invalid-name # pylint:enable=invalid-name """ netplan does not have network namespace support so configure by hand Tools used before committing code: black sunet-l4lb-namespace isort sunet-l4lb-namespace pylint sunet-l4lb-namespace mypy --strict sunet-l4lb-namespace """ import ipaddress import json import os import shlex import subprocess import sys def run_command(cmd: str) -> subprocess.CompletedProcess[str]: """Execute subprocess command""" print(f"{cmd}") args = shlex.split(cmd) try: proc = subprocess.run(args, capture_output=True, check=True, encoding="utf-8") except subprocess.CalledProcessError as exc: stderr_str = exc.stderr.rstrip() print( f"command failed: cmd='{cmd}', rc={exc.returncode}, stderr='{stderr_str}'", file=sys.stderr, ) sys.exit(1) return proc def configure_interfaces( # pylint: disable=too-many-locals,too-many-branches namespace: str, if_data: dict[str, dict[str, list[str]]] ) -> None: """Configure interfaces""" proc = run_command(f"ip netns exec {namespace} ip -j addr show") namespace_ifs = json.loads(proc.stdout) ipv4_key = "ipv4" ipv6_key = "ipv6" for if_name, data in if_data.items(): if_exists = next( (True for interface in namespace_ifs if interface["ifname"] == if_name), False, ) if not if_exists: if if_name.startswith("dummy"): run_command( f"ip netns exec {namespace} ip link add {if_name} type dummy" ) else: run_command(f"ip link set {if_name} netns {namespace}") run_command(f"ip netns exec {namespace} ip link set {if_name} up") proc = run_command(f"ip netns exec {namespace} ip -j addr show dev {if_name}") if_conf = json.loads(proc.stdout) # Add missing addresses from config if ipv4_key in data: for configured_ipv4_cidr in data[ipv4_key]: ip4, prefix = configured_ipv4_cidr.split("/") v4_addr_exists = next( ( True for addr in if_conf[0]["addr_info"] if addr["local"] == ip4 and addr["prefixlen"] == int(prefix) ), False, ) if not v4_addr_exists: run_command( f"ip netns exec {namespace} ip addr add {configured_ipv4_cidr} dev {if_name}" # pylint: disable=line-too-long ) if ipv6_key in data: for ipv6_cidr in data[ipv6_key]: ip6, prefix = ipv6_cidr.split("/") v6_addr_exists = next( ( True for addr in if_conf[0]["addr_info"] if addr["local"] == ip6 and addr["prefixlen"] == int(prefix) ), False, ) if not v6_addr_exists: run_command( f"ip netns exec {namespace} ip addr add {ipv6_cidr} dev {if_name}" ) # Remove no longer configured addresseses for addr_info in if_conf[0]["addr_info"]: # Ignore addresses like fe80 if addr_info["scope"] != "global": continue cidr = "/".join((addr_info["local"], str(addr_info["prefixlen"]))) # We need strict=False because otherwise ip_network() gets angry if # there are host bits set in the address (which of course there is # because we are parsing actual interface configs, not pure # "networks") cidr_net = ipaddress.ip_network(cidr, strict=False) needs_removal = False if cidr_net.version == 4: if ipv4_key not in data or cidr not in data[ipv4_key]: needs_removal = True elif cidr_net.version == 6: if ipv6_key not in data or cidr not in data[ipv6_key]: needs_removal = True else: raise ValueError( f"Expected IPv4 or IPv6, got something else: {cidr_net.version}" ) if needs_removal: run_command( f"ip netns exec {namespace} ip addr del {cidr} dev {if_name}" ) def setup_namespaces(netns_data: dict[str, dict[str, dict[str, list[str]]]]) -> None: """Setup network namespaces""" proc = run_command("ip -j netns list") existing_netns = json.loads(proc.stdout) for namespace, if_data in netns_data.items(): netns_exists = next( (True for netns in existing_netns if netns["name"] == namespace), False ) if not netns_exists: run_command(f"ip netns add {namespace}") # Make localhost available run_command(f"ip netns exec {namespace} ip link set lo up") # (Re)load the nft ruleset for the given namespace nft_ruleset = f"/opt/sunet-cdn/l4lb/conf/nft-{namespace}.conf" if os.path.isfile(nft_ruleset): run_command(f"ip netns exec {namespace} nft -f {nft_ruleset}") else: print( f"WARNING: no nft ruleset found for namespace '{namespace}' ({nft_ruleset}), the namespace will not be firewalled" # pylint: disable=line-too-long ) configure_interfaces(namespace, if_data) def main() -> None: """Starting point of the program""" # JSON file format: # { # "namespace1": { # "interface1": { # "ipv4": [ # "192.168.10.1/31" # ], # "ipv6": [ # "2001:db8:1337:74::1/127" # ] # }, # "interface2": { # "ipv4": [ # "192.168.10.3/31" # ], # "ipv6": [ # "2001:db8:1338:75::1/127" # ] # } # } # } input_files = [ "/opt/sunet-cdn/l4lb/conf/netns-base.json", "/opt/sunet-cdn/l4lb/conf/netns-sunet-cdn-agent.json", ] merged_netns_data: dict[str, dict[str, dict[str, list[str]]]] = {} for input_file in input_files: try: with open(input_file, encoding="utf-8") as f: netns_data = json.load(f) # Combine interface config from multiple files belonging to the same namespace for ns, ns_data in netns_data.items(): if ns in merged_netns_data: merged_netns_data[ns].update(ns_data) else: merged_netns_data[ns] = ns_data except FileNotFoundError: print(f"skipping nonexistant file '{input_file}'") continue setup_namespaces(merged_netns_data) if __name__ == "__main__": main()