#!/usr/bin/env python3 import json import os import sys import urllib.request from pathlib import Path import boto3 SECURITY_GROUP_ID = os.environ["INGRESS_SECURITY_GROUP_ID"] RULE_DESCRIPTION = os.environ.get("INGRESS_SSH_RULE_DESCRIPTION", "SSH fallback from origin network") PORT = int(os.environ.get("INGRESS_SSH_PORT", "22")) STATE_FILE = Path(os.environ.get("INGRESS_IP_STATE_FILE", "/var/lib/desineuron-ingress-ip-sync/current_ip.txt")) def get_public_ip() -> str: with urllib.request.urlopen("https://api.ipify.org", timeout=15) as response: return response.read().decode("utf-8").strip() def get_security_group(): ec2 = boto3.client("ec2", region_name=os.environ.get("AWS_REGION", "us-east-1")) response = ec2.describe_security_groups(GroupIds=[SECURITY_GROUP_ID]) return ec2, response["SecurityGroups"][0] def find_existing_ssh_rules(ip_permissions): matches = [] for permission in ip_permissions: if permission.get("IpProtocol") != "tcp": continue if permission.get("FromPort") != PORT or permission.get("ToPort") != PORT: continue for ip_range in permission.get("IpRanges", []): if ip_range.get("Description") == RULE_DESCRIPTION: matches.append(ip_range["CidrIp"]) return matches def revoke_old_rules(ec2, cidrs): for cidr in cidrs: ec2.revoke_security_group_ingress( GroupId=SECURITY_GROUP_ID, IpPermissions=[ { "IpProtocol": "tcp", "FromPort": PORT, "ToPort": PORT, "IpRanges": [{"CidrIp": cidr}], } ], ) def authorize_new_rule(ec2, cidr): ec2.authorize_security_group_ingress( GroupId=SECURITY_GROUP_ID, IpPermissions=[ { "IpProtocol": "tcp", "FromPort": PORT, "ToPort": PORT, "IpRanges": [{"CidrIp": cidr, "Description": RULE_DESCRIPTION}], } ], ) def write_state(ip: str): STATE_FILE.parent.mkdir(parents=True, exist_ok=True) STATE_FILE.write_text(ip + "\n", encoding="utf-8") def main() -> int: public_ip = get_public_ip() desired_cidr = f"{public_ip}/32" ec2, group = get_security_group() existing_rules = find_existing_ssh_rules(group["IpPermissions"]) if existing_rules == [desired_cidr]: write_state(public_ip) print(json.dumps({"status": "noop", "public_ip": public_ip, "cidr": desired_cidr})) return 0 if existing_rules: revoke_old_rules(ec2, existing_rules) authorize_new_rule(ec2, desired_cidr) write_state(public_ip) print( json.dumps( { "status": "updated", "public_ip": public_ip, "cidr": desired_cidr, "replaced": existing_rules, } ) ) return 0 if __name__ == "__main__": try: raise SystemExit(main()) except Exception as exc: print(json.dumps({"status": "error", "error": str(exc)}), file=sys.stderr) raise