forked from sagnik/Project_Velocity
111 lines
3.1 KiB
Python
111 lines
3.1 KiB
Python
#!/usr/bin/env python3
|
|
import json
|
|
import os
|
|
import sys
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
import boto3
|
|
|
|
|
|
SECURITY_GROUP_ID = os.environ["INGRESS_SECURITY_GROUP_ID"]
|
|
RULE_DESCRIPTION = os.environ.get("INGRESS_SSH_RULE_DESCRIPTION", "SSH fallback from origin network")
|
|
PORT = int(os.environ.get("INGRESS_SSH_PORT", "22"))
|
|
STATE_FILE = Path(os.environ.get("INGRESS_IP_STATE_FILE", "/var/lib/desineuron-ingress-ip-sync/current_ip.txt"))
|
|
|
|
|
|
def get_public_ip() -> str:
|
|
with urllib.request.urlopen("https://api.ipify.org", timeout=15) as response:
|
|
return response.read().decode("utf-8").strip()
|
|
|
|
|
|
def get_security_group():
|
|
ec2 = boto3.client("ec2", region_name=os.environ.get("AWS_REGION", "us-east-1"))
|
|
response = ec2.describe_security_groups(GroupIds=[SECURITY_GROUP_ID])
|
|
return ec2, response["SecurityGroups"][0]
|
|
|
|
|
|
def find_existing_ssh_rules(ip_permissions):
|
|
matches = []
|
|
for permission in ip_permissions:
|
|
if permission.get("IpProtocol") != "tcp":
|
|
continue
|
|
if permission.get("FromPort") != PORT or permission.get("ToPort") != PORT:
|
|
continue
|
|
for ip_range in permission.get("IpRanges", []):
|
|
if ip_range.get("Description") == RULE_DESCRIPTION:
|
|
matches.append(ip_range["CidrIp"])
|
|
return matches
|
|
|
|
|
|
def revoke_old_rules(ec2, cidrs):
|
|
for cidr in cidrs:
|
|
ec2.revoke_security_group_ingress(
|
|
GroupId=SECURITY_GROUP_ID,
|
|
IpPermissions=[
|
|
{
|
|
"IpProtocol": "tcp",
|
|
"FromPort": PORT,
|
|
"ToPort": PORT,
|
|
"IpRanges": [{"CidrIp": cidr}],
|
|
}
|
|
],
|
|
)
|
|
|
|
|
|
def authorize_new_rule(ec2, cidr):
|
|
ec2.authorize_security_group_ingress(
|
|
GroupId=SECURITY_GROUP_ID,
|
|
IpPermissions=[
|
|
{
|
|
"IpProtocol": "tcp",
|
|
"FromPort": PORT,
|
|
"ToPort": PORT,
|
|
"IpRanges": [{"CidrIp": cidr, "Description": RULE_DESCRIPTION}],
|
|
}
|
|
],
|
|
)
|
|
|
|
|
|
def write_state(ip: str):
|
|
STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
STATE_FILE.write_text(ip + "\n", encoding="utf-8")
|
|
|
|
|
|
def main() -> int:
|
|
public_ip = get_public_ip()
|
|
desired_cidr = f"{public_ip}/32"
|
|
|
|
ec2, group = get_security_group()
|
|
existing_rules = find_existing_ssh_rules(group["IpPermissions"])
|
|
|
|
if existing_rules == [desired_cidr]:
|
|
write_state(public_ip)
|
|
print(json.dumps({"status": "noop", "public_ip": public_ip, "cidr": desired_cidr}))
|
|
return 0
|
|
|
|
if existing_rules:
|
|
revoke_old_rules(ec2, existing_rules)
|
|
|
|
authorize_new_rule(ec2, desired_cidr)
|
|
write_state(public_ip)
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"status": "updated",
|
|
"public_ip": public_ip,
|
|
"cidr": desired_cidr,
|
|
"replaced": existing_rules,
|
|
}
|
|
)
|
|
)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
raise SystemExit(main())
|
|
except Exception as exc:
|
|
print(json.dumps({"status": "error", "error": str(exc)}), file=sys.stderr)
|
|
raise
|