Files
2026-04-12 02:02:58 +05:30

111 lines
3.1 KiB
Python

#!/usr/bin/env python3
import json
import os
import sys
import urllib.request
from pathlib import Path
import boto3
SECURITY_GROUP_ID = os.environ["INGRESS_SECURITY_GROUP_ID"]
RULE_DESCRIPTION = os.environ.get("INGRESS_SSH_RULE_DESCRIPTION", "SSH fallback from origin network")
PORT = int(os.environ.get("INGRESS_SSH_PORT", "22"))
STATE_FILE = Path(os.environ.get("INGRESS_IP_STATE_FILE", "/var/lib/desineuron-ingress-ip-sync/current_ip.txt"))
def get_public_ip() -> str:
with urllib.request.urlopen("https://api.ipify.org", timeout=15) as response:
return response.read().decode("utf-8").strip()
def get_security_group():
ec2 = boto3.client("ec2", region_name=os.environ.get("AWS_REGION", "us-east-1"))
response = ec2.describe_security_groups(GroupIds=[SECURITY_GROUP_ID])
return ec2, response["SecurityGroups"][0]
def find_existing_ssh_rules(ip_permissions):
matches = []
for permission in ip_permissions:
if permission.get("IpProtocol") != "tcp":
continue
if permission.get("FromPort") != PORT or permission.get("ToPort") != PORT:
continue
for ip_range in permission.get("IpRanges", []):
if ip_range.get("Description") == RULE_DESCRIPTION:
matches.append(ip_range["CidrIp"])
return matches
def revoke_old_rules(ec2, cidrs):
for cidr in cidrs:
ec2.revoke_security_group_ingress(
GroupId=SECURITY_GROUP_ID,
IpPermissions=[
{
"IpProtocol": "tcp",
"FromPort": PORT,
"ToPort": PORT,
"IpRanges": [{"CidrIp": cidr}],
}
],
)
def authorize_new_rule(ec2, cidr):
ec2.authorize_security_group_ingress(
GroupId=SECURITY_GROUP_ID,
IpPermissions=[
{
"IpProtocol": "tcp",
"FromPort": PORT,
"ToPort": PORT,
"IpRanges": [{"CidrIp": cidr, "Description": RULE_DESCRIPTION}],
}
],
)
def write_state(ip: str):
STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
STATE_FILE.write_text(ip + "\n", encoding="utf-8")
def main() -> int:
public_ip = get_public_ip()
desired_cidr = f"{public_ip}/32"
ec2, group = get_security_group()
existing_rules = find_existing_ssh_rules(group["IpPermissions"])
if existing_rules == [desired_cidr]:
write_state(public_ip)
print(json.dumps({"status": "noop", "public_ip": public_ip, "cidr": desired_cidr}))
return 0
if existing_rules:
revoke_old_rules(ec2, existing_rules)
authorize_new_rule(ec2, desired_cidr)
write_state(public_ip)
print(
json.dumps(
{
"status": "updated",
"public_ip": public_ip,
"cidr": desired_cidr,
"replaced": existing_rules,
}
)
)
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except Exception as exc:
print(json.dumps({"status": "error", "error": str(exc)}), file=sys.stderr)
raise