Replace Permissive Security Group Rules with Evidence-Based CIDR Blocks using sg-tightener
Replacing overly permissive security group rules becomes straightforward when actual traffic data drives the decisions. sg-tightener analyzes real network flow logs to identify which specific IP addresses genuinely communicate with your resources, then replaces broad CIDR blocks with precise, evidence-based rules that reflect observed behavior rather than assumptions, meaningfully reducing attack surface without disrupting legitimate traffic.
Andrew Baker, Group CIO, Capitec Bank
There is a category of AWS security debt that accumulates quietly over years and rarely gets cleaned up. A datacenter connects to AWS over Direct Connect or a VPN. The network team provisions the connection and someone writes a security group rule that permits the entire datacenter subnet, say a 10.x.x.x/16, to reach every resource in the AWS account. It is pragmatic at the time. It works. Nothing breaks. And then that rule sits there, untouched, through dozens of deployments, account restructures, and team changes, until the organisation’s security posture is effectively built on a foundation of “the datacenter can reach everything.”
This post introduces sg-tightener, an open source tool that fixes this problem without guesswork. Rather than asking engineers to manually audit security groups and guess at what CIDR ranges are legitimate, it reads your VPC flow logs, observes what IP addresses have actually been talking to your resources over the past three months, calculates the tightest CIDR blocks that cover those addresses without exceeding AWS security group rule limits, and replaces your broad permissive rules with those evidence-based blocks. It is an extension of CloudToRepo, the existing open source tool for reverse-engineering AWS infrastructure into Terraform.
1. The Problem with Datacenter-Sized CIDR Blocks
When a rule says 10.0.0.0/16 is allowed to reach your RDS cluster, what it actually says is that any of 65,536 IP addresses can attempt a connection. In a large enterprise datacenter that number is not hypothetical. There are workstations, test servers, build agents, decommissioned machines with stale DNS records, vendor systems, and lateral movement paths that you did not design and may not fully know about.
The right mental model is not “we trust the datacenter network” but rather “we trust the specific systems in the datacenter that have a business reason to reach this resource.” A payment processing API should hear from the application servers that call it, not from every host on the network. An RDS instance should hear from the application tier, not from every Windows machine in the /16.
The reason this does not get fixed is not that engineers disagree with this principle. It is that fixing it manually is genuinely hard. You need to know, for every resource in your account, which source IPs have legitimately connected to it, then write rules specific enough to cover those IPs without accidentally locking out something important. That is a forensic exercise that takes weeks if you do it properly, and it touches every security group in the account. sg-tightener automates the forensic part entirely.
2. How sg-tightener Works
The tool operates in four modes that you run in sequence, following a workflow that should feel familiar if you have used Terraform.
Analyse mode reads VPC flow logs for a specified time window (default 90 days) and builds a complete picture of every private source IP address that has made an accepted connection to any resource in your account. It checks that flow logs are enabled for every VPC and that they have been running for at least as long as the analysis window before proceeding. It stores the resulting IP list locally as a JSON file that you review before anything else happens.
Plan mode takes the reviewed and approved IP list, runs the CIDR collapsing algorithm, and produces a complete changeset showing exactly what would be removed and what would replace it. It prints a headline summary telling you how many permissive rules you are removing, across how many security groups, spanning how many VPCs, and how many replacement rules will be added. It writes a plan file to disk. It makes no changes to AWS.
Apply mode takes the plan file, prints the full summary again, and requires you to type “apply” before touching anything. Before making any API call it writes a complete JSON backup of every security group it will modify. It then executes the changes via direct AWS API calls.
Revert mode takes a backup file produced by an apply run and restores every security group to its exact pre-apply state, requiring typed confirmation before proceeding.
A separate diagnose script is for use after events like a DR failover or a new service going live, when you expect REJECT entries in your flow logs to indicate IPs that should have been permitted but were not. It surfaces those IPs, lets you review them, merges them into the approved IP list, and optionally re-applies the updated rules across all affected security groups.
3. The CIDR Collapsing Algorithm
AWS security groups have a hard default limit of 60 inbound rules per group. If your account has been accessed by 200 distinct source IPs over three months, you cannot write 200 /32 rules. You also cannot write a /16 rule that covers all of them, because that reintroduces the permissiveness you are trying to eliminate.
The algorithm sg-tightener uses works as follows. First it clusters the observed IPs using a prefix tree approach, identifying groups of addresses that are numerically close. For each cluster it calculates the smallest CIDR prefix whose address space contains all observed IPs in the cluster. If that prefix introduces gaps (addresses within the block that were never observed), it checks whether the gap percentage is within the user-specified tolerance. The default tolerance is 20 percent: a CIDR block is acceptable if at most 20 percent of its address space consists of IPs that were not observed in the flow logs. You can adjust this with the --gap-tolerance parameter.
When the rule count would still exceed 60 after CIDR collapsing, the algorithm also merges port ranges on the same CIDR entries. Adjacent port ranges are merged first, then wider gaps, until the rule count fits within the limit. The tool warns you explicitly when port merging occurs so you can review whether the resulting broader port range is acceptable for your environment.
If neither CIDR collapsing nor port merging can bring the count within the limit at the current gap tolerance, the tool widens the tolerance in 5 percent steps, warning at each step, until it either fits or reaches 95 percent, at which point it warns you that a security group quota increase request may be needed.
4. What the Tool Does and Does Not Touch
sg-tightener only modifies rules whose source CIDR is a large RFC 1918 private block. It targets rules where the source is within 10.0.0.0/8, 172.16.0.0/12, or 192.168.0.0/16, and where the prefix length is shorter than a configurable threshold (default /24, meaning any block larger than a /24 is considered permissive).
Rules with a source of 0.0.0.0/0 are left completely untouched regardless of port or protocol. A load balancer with port 443 open to the world stays exactly as it is. Rules that reference other security groups rather than CIDR blocks are also left untouched. Rules that are already appropriately scoped at /24 or tighter are not modified.
Network ACLs are explicitly out of scope for the tightening workflow. The OU risk report (section 9) does scan and flag permissive NACL rules alongside security group rules, which gives you visibility across both layers. NACL tightening is a planned phase two. The reason for the separation is that NACLs are stateless, subnet-scoped, and have a lower default rule limit of 20 entries, which means the blast radius of a misconfigured change is larger and the collapsing algorithm has less room to work. The evidence-based approach is equally valid for NACLs but warrants its own careful implementation rather than being bolted onto the security group workflow.
5. Installation and Prerequisites
cat > install.sh << 'EOF'
#!/usr/bin/env bash
set -euo pipefail
python3 -m venv .venv
source .venv/bin/activate
pip install boto3 pandas openpyxl netaddr
echo "sg-tightener dependencies installed."
EOF
chmod +x install.sh
./install.sh The IAM role used to run sg-tightener needs the following permissions. Analyse and plan modes are entirely read-only. Apply and revert additionally need the two write permissions shown below.
cat > sg_tightener_policy.json << 'EOF'
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ReadOnly",
"Effect": "Allow",
"Action": [
"ec2:DescribeSecurityGroups",
"ec2:DescribeVpcs",
"ec2:DescribeFlowLogs",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
"logs:FilterLogEvents",
"logs:GetLogEvents",
"s3:GetObject",
"s3:ListBucket",
"organizations:ListChildren",
"organizations:DescribeAccount",
"organizations:ListAccounts",
"sts:AssumeRole"
],
"Resource": "*"
},
{
"Sid": "WriteSecurityGroups",
"Effect": "Allow",
"Action": [
"ec2:AuthorizeSecurityGroupIngress",
"ec2:RevokeSecurityGroupIngress"
],
"Resource": "*"
}
]
}
EOF
chmod 600 sg_tightener_policy.json 6. Main Tool: sg_tightener.py
cat > sg_tightener.py << 'EOF'
#!/usr/bin/env python3
"""
sg-tightener: Replace permissive RFC 1918 security group rules with
evidence-based CIDR blocks derived from VPC flow log analysis.
An extension of CloudToRepo (cloudtorepo.com).
Modes:
analyse -- Read flow logs, build IP list, write approved_ips.json
plan -- Take approved IP list, produce changeset diff and plan file
apply -- Execute a plan file, write backup, modify security groups
revert -- Restore security groups from a backup file
Usage:
python sg_tightener.py analyse --region af-south-1 --days 90
python sg_tightener.py plan --region af-south-1 --approved-ips approved_ips.json
python sg_tightener.py apply --region af-south-1 --plan sg_plan_<timestamp>.json
python sg_tightener.py revert --region af-south-1 --backup sg_backup_<timestamp>.json
Prerequisites:
pip install boto3 pandas openpyxl netaddr
"""
import boto3
import json
import sys
import argparse
import logging
import ipaddress
from datetime import datetime, timezone, timedelta
from typing import Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
try:
from netaddr import IPNetwork, IPAddress, cidr_merge
NETADDR_AVAILABLE = True
except ImportError:
NETADDR_AVAILABLE = False
print("ERROR: netaddr is required. Run: pip install netaddr", file=sys.stderr)
sys.exit(1)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
log = logging.getLogger(__name__)
MAX_SG_RULES = 60
DEFAULT_GAP_TOLERANCE = 0.20
DEFAULT_DAYS = 90
PERMISSIVE_PREFIX_LEN = 24
PRIVATE_RANGES = [
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.168.0.0/16"),
]
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def is_private(ip_str: str) -> bool:
try:
addr = ipaddress.ip_address(ip_str)
return any(addr in net for net in PRIVATE_RANGES)
except ValueError:
return False
def is_permissive_cidr(cidr_str: str) -> bool:
if not cidr_str or cidr_str in ("0.0.0.0/0", "::/0"):
return False
try:
net = ipaddress.ip_network(cidr_str, strict=False)
if not any(net.overlaps(p) for p in PRIVATE_RANGES):
return False
return net.prefixlen < PERMISSIVE_PREFIX_LEN
except ValueError:
return False
def timestamp_str() -> str:
return datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
# ---------------------------------------------------------------------------
# CIDR collapsing
# ---------------------------------------------------------------------------
def collapse_ips_to_cidrs(
ip_list: list,
gap_tolerance: float = DEFAULT_GAP_TOLERANCE,
max_rules: int = MAX_SG_RULES
) -> list:
if not ip_list:
return []
addrs = sorted(set(
ipaddress.ip_address(ip) for ip in ip_list if is_private(ip)
))
if not addrs:
return []
networks = list(cidr_merge([IPNetwork(f"{str(a)}/32") for a in addrs]))
effective_tolerance = gap_tolerance
while len(networks) > max_rules:
merged = _merge_pass(networks, effective_tolerance)
if len(merged) == len(networks):
effective_tolerance += 0.05
if effective_tolerance > 0.95:
log.warning(
"Could not reduce CIDR list to %d rules even at 95%% gap tolerance. "
"Resulting rule count: %d. Consider requesting a quota increase.",
max_rules, len(networks)
)
break
log.warning(
"CIDR count %d exceeds limit %d at gap tolerance %.0f%%. "
"Widening to %.0f%%.",
len(merged), max_rules,
(effective_tolerance - 0.05) * 100,
effective_tolerance * 100
)
networks = merged
if effective_tolerance > gap_tolerance:
log.warning(
"Final gap tolerance used: %.0f%% (requested: %.0f%%). "
"Review the output CIDR list carefully.",
effective_tolerance * 100, gap_tolerance * 100
)
return [str(n.cidr) for n in networks]
def _gap_percent(network: IPNetwork, observed_ips: set) -> float:
total = network.size
hits = sum(1 for ip in observed_ips if IPAddress(ip) in network)
return (total - hits) / total
def _merge_pass(networks: list, tolerance: float) -> list:
if len(networks) <= 1:
return networks
all_ips = set()
for n in networks:
for h in n:
all_ips.add(str(h))
merged = []
skip = set()
networks = sorted(networks)
for i, net in enumerate(networks):
if i in skip:
continue
if i + 1 < len(networks) and (i + 1) not in skip:
try:
supernet = list(net.supernet())[0]
except Exception:
merged.append(net)
continue
next_net = networks[i + 1]
if next_net in supernet or supernet.overlaps(next_net):
gap = _gap_percent(supernet, all_ips)
if gap <= tolerance:
merged.append(supernet)
skip.add(i + 1)
continue
merged.append(net)
return list(cidr_merge(merged))
# ---------------------------------------------------------------------------
# Port range merging
# ---------------------------------------------------------------------------
def merge_port_ranges(rules: list, target_count: int) -> list:
if len(rules) <= target_count:
return rules
log.warning(
"Rule count %d exceeds target %d after CIDR collapsing. "
"Merging port ranges to reduce rule count.",
len(rules), target_count
)
from collections import defaultdict
grouped = defaultdict(list)
for rule in rules:
key = (rule["cidr"], rule["protocol"])
grouped[key].append((rule["from_port"], rule["to_port"]))
merged_rules = []
for (cidr, protocol), port_pairs in grouped.items():
if protocol in ("-1", "all"):
merged_rules.append({
"cidr": cidr, "protocol": protocol,
"from_port": 0, "to_port": 65535
})
continue
sorted_pairs = sorted(set(port_pairs))
merged_pairs = []
current_start, current_end = sorted_pairs[0]
for start, end in sorted_pairs[1:]:
if start <= current_end + 1:
current_end = max(current_end, end)
else:
merged_pairs.append((current_start, current_end))
current_start, current_end = start, end
merged_pairs.append((current_start, current_end))
for from_port, to_port in merged_pairs:
merged_rules.append({
"cidr": cidr, "protocol": protocol,
"from_port": from_port, "to_port": to_port
})
if len(merged_rules) > target_count:
log.warning(
"After port merging, rule count is still %d (target %d). "
"Consider a security group rule quota increase via AWS Support.",
len(merged_rules), target_count
)
return merged_rules
# ---------------------------------------------------------------------------
# Flow log reading
# ---------------------------------------------------------------------------
def check_flow_logs_enabled(ec2_client, vpc_ids: list, required_days: int) -> dict:
result = {}
fl_resp = ec2_client.describe_flow_logs(
Filter=[{"Name": "resource-id", "Values": vpc_ids}]
)
for fl in fl_resp.get("FlowLogs", []):
resource_id = fl.get("ResourceId", "")
result[resource_id] = fl
missing = [v for v in vpc_ids if v not in result]
if missing:
raise RuntimeError(
f"VPC flow logs are NOT enabled for: {missing}.\n"
f"Please enable VPC flow logs for these VPCs and wait {required_days} days "
f"before running sg-tightener analyse."
)
for vpc_id, fl in result.items():
creation = fl.get("CreationTime")
if creation:
age_days = (
datetime.now(timezone.utc) - creation.replace(tzinfo=timezone.utc)
).days
if age_days < required_days:
raise RuntimeError(
f"Flow logs for {vpc_id} have only been enabled for {age_days} day(s). "
f"The analysis window requires {required_days} days of logs. "
f"Wait {required_days - age_days} more day(s) before re-running, "
f"or reduce --days to {age_days} to analyse the available window "
f"(note: a shorter window may miss infrequently-used sources)."
)
return result
def read_flow_logs_cloudwatch(
logs_client, log_group_name: str,
start_time: datetime, end_time: datetime
) -> list:
source_ips = set()
start_ms = int(start_time.timestamp() * 1000)
end_ms = int(end_time.timestamp() * 1000)
paginator = logs_client.get_paginator("filter_log_events")
log.info("Reading flow logs from CloudWatch group: %s", log_group_name)
for page in paginator.paginate(
logGroupName=log_group_name,
startTime=start_ms,
endTime=end_ms,
filterPattern=(
"[version, account, intf, srcaddr, dstaddr, srcport, dstport, "
"protocol, packets, bytes, start, end, action=\"ACCEPT\", log_status]"
)
):
for event in page.get("events", []):
parts = event.get("message", "").split()
if len(parts) >= 14 and parts[13] == "ACCEPT":
src_ip = parts[3]
if src_ip and src_ip != "-" and is_private(src_ip):
source_ips.add(src_ip)
log.info("Collected %d unique private source IPs.", len(source_ips))
return list(source_ips)
def read_flow_logs_s3(
s3_client, bucket: str, prefix: str,
start_time: datetime, end_time: datetime
) -> list:
import gzip
source_ips = set()
paginator = s3_client.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
key = obj["Key"]
last_modified = obj["LastModified"].replace(tzinfo=timezone.utc)
if not (start_time <= last_modified <= end_time):
continue
try:
resp = s3_client.get_object(Bucket=bucket, Key=key)
body = resp["Body"].read()
if key.endswith(".gz"):
body = gzip.decompress(body)
for line in body.decode("utf-8", errors="replace").splitlines():
parts = line.split()
if len(parts) >= 14 and parts[13] == "ACCEPT":
src_ip = parts[3]
if src_ip and src_ip != "-" and is_private(src_ip):
source_ips.add(src_ip)
except Exception as e:
log.warning("Could not read S3 object %s: %s", key, e)
log.info("Collected %d unique private source IPs from S3.", len(source_ips))
return list(source_ips)
def _parse_s3_arn(arn: str):
if arn.startswith("arn:aws:s3:::"):
rest = arn[len("arn:aws:s3:::"):]
parts = rest.split("/", 1)
return parts[0], parts[1] if len(parts) > 1 else ""
return arn, ""
# ---------------------------------------------------------------------------
# Security group helpers
# ---------------------------------------------------------------------------
def describe_all_security_groups(ec2_client) -> list:
groups = []
paginator = ec2_client.get_paginator("describe_security_groups")
for page in paginator.paginate():
groups.extend(page.get("SecurityGroups", []))
log.info("Found %d security groups.", len(groups))
return groups
def find_permissive_rules(sg: dict) -> list:
permissive = []
for perm in sg.get("IpPermissions", []):
for iprange in perm.get("IpRanges", []):
cidr = iprange.get("CidrIp", "")
if is_permissive_cidr(cidr):
permissive.append({"permission": perm, "cidr": cidr})
return permissive
def build_replacement_rules(
permissive_rules: list,
collapsed_cidrs: list,
gap_tolerance: float
) -> list:
flat_rules = []
for entry in permissive_rules:
perm = entry["permission"]
protocol = perm.get("IpProtocol", "-1")
from_port = perm.get("FromPort", 0)
to_port = perm.get("ToPort", 65535)
for cidr in collapsed_cidrs:
flat_rules.append({
"cidr": cidr,
"protocol": protocol,
"from_port": from_port,
"to_port": to_port
})
return merge_port_ranges(flat_rules, MAX_SG_RULES)
def backup_security_groups(security_groups: list, backup_path: str):
with open(backup_path, "w") as f:
json.dump(security_groups, f, indent=2, default=str)
log.info("Backup written to: %s", backup_path)
# ---------------------------------------------------------------------------
# Plan summary printing
# ---------------------------------------------------------------------------
def print_plan_summary(plan: dict):
sg_count = len(plan["changes"])
rule_removals = sum(len(c["rules_to_remove"]) for c in plan["changes"])
rule_additions = sum(len(c["replacement_rules"]) for c in plan["changes"])
vpc_ids = set(c["vpc_id"] for c in plan["changes"])
port_merges = sum(1 for c in plan["changes"] if c.get("port_merge_required"))
print()
print("=" * 64)
print(" sg-tightener PLAN SUMMARY")
print("=" * 64)
print(f" Security groups to modify : {sg_count}")
print(f" VPCs affected : {len(vpc_ids)}")
print(f" Permissive rules removed : {rule_removals}")
print(f" Replacement rules added : {rule_additions}")
if port_merges:
print(f" Security groups needing port-range merging : {port_merges} [review carefully]")
print("=" * 64)
print()
for change in plan["changes"]:
print(f" {change['sg_id']} ({change['sg_name']}) VPC: {change['vpc_id']}")
for r in change["rules_to_remove"]:
proto = r["permission"].get("IpProtocol", "?")
fp = r["permission"].get("FromPort", "all")
tp = r["permission"].get("ToPort", "all")
print(f" REMOVE {r['cidr']:22s} proto={proto} ports={fp}-{tp}")
for r in change["replacement_rules"]:
print(f" ADD {r['cidr']:22s} proto={r['protocol']} ports={r['from_port']}-{r['to_port']}")
print()
# ---------------------------------------------------------------------------
# Mode: analyse
# ---------------------------------------------------------------------------
def run_analyse(args):
session = boto3.Session(region_name=args.region)
ec2 = session.client("ec2")
logs_cl = session.client("logs")
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=args.days)
log.info("Analysis window: %s to %s (%d days)",
start_time.date(), end_time.date(), args.days)
vpcs = [v["VpcId"] for v in ec2.describe_vpcs()["Vpcs"]]
log.info("Found %d VPC(s): %s", len(vpcs), vpcs)
try:
fl_map = check_flow_logs_enabled(ec2, vpcs, args.days)
except RuntimeError as e:
log.error(str(e))
sys.exit(1)
all_ips = set()
for vpc_id, fl in fl_map.items():
log_group = fl.get("LogGroupName")
if log_group:
ips = read_flow_logs_cloudwatch(logs_cl, log_group, start_time, end_time)
all_ips.update(ips)
elif fl.get("LogDestinationType") == "s3":
s3 = session.client("s3")
bucket, prefix = _parse_s3_arn(fl.get("LogDestination", ""))
ips = read_flow_logs_s3(s3, bucket, prefix, start_time, end_time)
all_ips.update(ips)
sorted_ips = sorted(all_ips, key=lambda x: ipaddress.ip_address(x))
output = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"analysis_days": args.days,
"region": args.region,
"total_unique_ips": len(sorted_ips),
"source_ips": sorted_ips
}
out_path = args.output or f"approved_ips_{args.region}_{timestamp_str()}.json"
with open(out_path, "w") as f:
json.dump(output, f, indent=2)
print(f"\nFound {len(sorted_ips)} unique private source IPs over {args.days} days.")
print(f"IP list written to: {out_path}")
print(f"Review the list, then run:")
print(f" python sg_tightener.py plan --approved-ips {out_path} --region {args.region}")
# ---------------------------------------------------------------------------
# Mode: plan
# ---------------------------------------------------------------------------
def run_plan(args):
with open(args.approved_ips) as f:
ip_data = json.load(f)
source_ips = ip_data.get("source_ips", [])
log.info("Loaded %d approved source IPs.", len(source_ips))
collapsed_cidrs = collapse_ips_to_cidrs(
source_ips,
gap_tolerance=args.gap_tolerance,
max_rules=MAX_SG_RULES
)
log.info("Collapsed to %d CIDR block(s) at %.0f%% gap tolerance.",
len(collapsed_cidrs), args.gap_tolerance * 100)
session = boto3.Session(region_name=args.region)
ec2 = session.client("ec2")
all_groups = describe_all_security_groups(ec2)
# Build VPC lookup
vpc_map = {
sg["GroupId"]: sg.get("VpcId", "")
for sg in all_groups
}
changes = []
for sg in all_groups:
permissive = find_permissive_rules(sg)
if not permissive:
continue
replacement_rules = build_replacement_rules(
permissive, collapsed_cidrs, args.gap_tolerance
)
port_merge_required = (
len(replacement_rules) < len(permissive) * len(collapsed_cidrs)
)
changes.append({
"sg_id": sg["GroupId"],
"sg_name": sg.get("GroupName", ""),
"vpc_id": sg.get("VpcId", ""),
"rules_to_remove": permissive,
"replacement_rules": replacement_rules,
"port_merge_required": port_merge_required
})
plan = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"region": args.region,
"approved_ips": args.approved_ips,
"gap_tolerance": args.gap_tolerance,
"collapsed_cidrs": collapsed_cidrs,
"changes": changes
}
print_plan_summary(plan)
plan_path = args.output or f"sg_plan_{args.region}_{timestamp_str()}.json"
with open(plan_path, "w") as f:
json.dump(plan, f, indent=2, default=str)
log.info("Plan written to: %s", plan_path)
print(f"Plan saved to: {plan_path}")
print(f"To apply: python sg_tightener.py apply --plan {plan_path} --region {args.region}")
# ---------------------------------------------------------------------------
# Mode: apply
# ---------------------------------------------------------------------------
def run_apply(args):
with open(args.plan) as f:
plan = json.load(f)
changes = plan.get("changes", [])
if not changes:
print("Plan contains no changes. Nothing to do.")
return
print_plan_summary(plan)
if not args.yes:
confirm = input("Type 'apply' to proceed, or anything else to abort: ").strip()
if confirm != "apply":
print("Aborted.")
sys.exit(0)
session = boto3.Session(region_name=args.region)
ec2 = session.client("ec2")
# Backup all security groups that will be touched
sg_ids_to_backup = [c["sg_id"] for c in changes]
current_groups = []
for sg_id in sg_ids_to_backup:
try:
resp = ec2.describe_security_groups(GroupIds=[sg_id])
current_groups.extend(resp.get("SecurityGroups", []))
except Exception as e:
log.error("Could not describe %s for backup: %s", sg_id, e)
backup_path = f"sg_backup_{args.region}_{timestamp_str()}.json"
backup_security_groups(current_groups, backup_path)
print(f"\nBackup saved to: {backup_path}")
print(f"To revert: python sg_tightener.py revert --backup {backup_path} --region {args.region}\n")
modified_count = 0
for change in changes:
sg_id = change["sg_id"]
rules_to_remove = change["rules_to_remove"]
replacement_rules = change["replacement_rules"]
print(f" Modifying {sg_id} ({change['sg_name']})...")
# Revoke permissive rules
revoke_permissions = []
for entry in rules_to_remove:
perm = dict(entry["permission"])
perm["IpRanges"] = [{"CidrIp": entry["cidr"]}]
perm.pop("Ipv6Ranges", None)
perm.pop("PrefixListIds", None)
perm.pop("UserIdGroupPairs", None)
revoke_permissions.append(perm)
try:
ec2.revoke_security_group_ingress(
GroupId=sg_id,
IpPermissions=revoke_permissions
)
except Exception as e:
log.error("Failed to revoke rules from %s: %s", sg_id, e)
log.error("Halting. Run revert to restore from backup.")
sys.exit(1)
# Authorize replacement rules
new_permissions = []
for rule in replacement_rules:
perm = {
"IpProtocol": rule["protocol"],
"IpRanges": [{"CidrIp": rule["cidr"],
"Description": "sg-tightener managed"}]
}
if rule["protocol"] not in ("-1", "all"):
perm["FromPort"] = rule["from_port"]
perm["ToPort"] = rule["to_port"]
new_permissions.append(perm)
try:
ec2.authorize_security_group_ingress(
GroupId=sg_id,
IpPermissions=new_permissions
)
modified_count += 1
except Exception as e:
log.error("Failed to add replacement rules to %s: %s", sg_id, e)
log.error("Partial apply. Run revert to restore from backup.")
sys.exit(1)
print(f"\nModified {modified_count} security group(s).")
print(f"Backup: {backup_path}")
# ---------------------------------------------------------------------------
# Mode: revert
# ---------------------------------------------------------------------------
def run_revert(args):
with open(args.backup) as f:
backed_up_groups = json.load(f)
log.info("Loaded backup with %d security group(s) from %s",
len(backed_up_groups), args.backup)
print(f"\nThis will restore {len(backed_up_groups)} security group(s) "
f"from backup: {args.backup}")
if not args.yes:
confirm = input("Type 'revert' to proceed, or anything else to abort: ").strip()
if confirm != "revert":
print("Aborted.")
sys.exit(0)
session = boto3.Session(region_name=args.region)
ec2 = session.client("ec2")
for sg in backed_up_groups:
sg_id = sg["GroupId"]
original_permissions = sg.get("IpPermissions", [])
print(f" Restoring {sg_id} ({sg.get('GroupName', '')})...")
try:
current = ec2.describe_security_groups(GroupIds=[sg_id])
current_perms = current["SecurityGroups"][0].get("IpPermissions", [])
except Exception as e:
log.error("Could not describe %s: %s", sg_id, e)
continue
if current_perms:
try:
ec2.revoke_security_group_ingress(
GroupId=sg_id,
IpPermissions=current_perms
)
except Exception as e:
log.error("Could not revoke current rules from %s: %s", sg_id, e)
continue
if original_permissions:
try:
ec2.authorize_security_group_ingress(
GroupId=sg_id,
IpPermissions=original_permissions
)
log.info("Restored %s.", sg_id)
except Exception as e:
log.error("Could not restore rules to %s: %s", sg_id, e)
print("\nRevert complete.")
# ---------------------------------------------------------------------------
# Argument parsing
# ---------------------------------------------------------------------------
def parse_args():
parser = argparse.ArgumentParser(
description="sg-tightener: Replace permissive security group rules with "
"evidence-based CIDR blocks from VPC flow log analysis."
)
sub = parser.add_subparsers(dest="mode", required=True)
p_analyse = sub.add_parser("analyse", help="Read flow logs and build approved IP list")
p_analyse.add_argument("--region", required=True)
p_analyse.add_argument("--days", type=int, default=DEFAULT_DAYS)
p_analyse.add_argument("--output", help="Output JSON path (default: auto-named)")
p_plan = sub.add_parser("plan", help="Produce changeset from approved IP list")
p_plan.add_argument("--region", required=True)
p_plan.add_argument("--approved-ips", required=True)
p_plan.add_argument("--gap-tolerance", type=float, default=DEFAULT_GAP_TOLERANCE)
p_plan.add_argument("--output", help="Plan output path (default: auto-named)")
p_apply = sub.add_parser("apply", help="Execute a plan file")
p_apply.add_argument("--region", required=True)
p_apply.add_argument("--plan", required=True)
p_apply.add_argument("--yes", action="store_true")
p_revert = sub.add_parser("revert", help="Restore from backup")
p_revert.add_argument("--region", required=True)
p_revert.add_argument("--backup", required=True)
p_revert.add_argument("--yes", action="store_true")
return parser.parse_args()
def main():
args = parse_args()
dispatch = {
"analyse": run_analyse,
"plan": run_plan,
"apply": run_apply,
"revert": run_revert,
}
dispatch[args.mode](args)
if __name__ == "__main__":
main()
EOF
chmod +x sg_tightener.py 7. Diagnose Script: sg_diagnose.py
Run this after a DR event, a failover, or any time a new service goes live and you start seeing connection failures. It reads REJECT entries from your flow logs over a configurable lookback window, identifies private source IPs not already covered by any existing security group rule, lets you review them, merges them into the approved IP list, and optionally runs a new plan and applies it immediately.
cat > sg_diagnose.py << 'EOF'
#!/usr/bin/env python3
"""
sg-diagnose: Surface rejected private source IPs from VPC flow logs and
add them to the account-level approved IP list, then re-apply to all
affected security groups.
Run after a DR event, failover, or new service deployment when you expect
connection failures from IPs not yet in your ruleset.
Usage:
python sg_diagnose.py --region af-south-1 --hours 4
python sg_diagnose.py --region af-south-1 --hours 4 --apply
Prerequisites:
pip install boto3 netaddr
"""
import boto3
import json
import sys
import argparse
import logging
import ipaddress
from datetime import datetime, timezone, timedelta
from sg_tightener import (
is_private,
collapse_ips_to_cidrs,
describe_all_security_groups,
find_permissive_rules,
build_replacement_rules,
backup_security_groups,
timestamp_str,
print_plan_summary,
DEFAULT_GAP_TOLERANCE,
MAX_SG_RULES
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
log = logging.getLogger(__name__)
def read_rejected_ips_cloudwatch(
logs_client, log_group_name: str,
start_time: datetime, end_time: datetime
) -> list:
rejected = []
start_ms = int(start_time.timestamp() * 1000)
end_ms = int(end_time.timestamp() * 1000)
paginator = logs_client.get_paginator("filter_log_events")
log.info("Scanning REJECT entries in: %s", log_group_name)
for page in paginator.paginate(
logGroupName=log_group_name,
startTime=start_ms,
endTime=end_ms,
filterPattern=(
"[version, account, intf, srcaddr, dstaddr, srcport, dstport, "
"protocol, packets, bytes, start, end, action=\"REJECT\", log_status]"
)
):
for event in page.get("events", []):
parts = event.get("message", "").split()
if len(parts) >= 14 and parts[13] == "REJECT":
src_ip = parts[3]
dst_port = parts[6]
if src_ip and src_ip != "-" and is_private(src_ip):
rejected.append((src_ip, dst_port))
return rejected
def ip_covered_by_existing_rules(ip: str, security_groups: list) -> bool:
try:
addr = ipaddress.ip_address(ip)
except ValueError:
return False
for sg in security_groups:
for perm in sg.get("IpPermissions", []):
for iprange in perm.get("IpRanges", []):
cidr = iprange.get("CidrIp", "")
try:
if addr in ipaddress.ip_network(cidr, strict=False):
return True
except ValueError:
continue
return False
def parse_args():
parser = argparse.ArgumentParser(
description="sg-diagnose: Find rejected private IPs in flow logs "
"and add them to the approved ruleset."
)
parser.add_argument("--region", required=True)
parser.add_argument("--hours", type=int, default=4)
parser.add_argument("--approved-ips",
help="Existing approved_ips JSON to merge into")
parser.add_argument("--gap-tolerance", type=float, default=DEFAULT_GAP_TOLERANCE)
parser.add_argument("--apply", action="store_true",
help="Apply updated rules after review")
parser.add_argument("--yes", action="store_true")
return parser.parse_args()
def main():
args = parse_args()
session = boto3.Session(region_name=args.region)
ec2 = session.client("ec2")
logs_cl = session.client("logs")
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=args.hours)
log.info("Scanning REJECT entries from %s to %s (%d hours)",
start_time.strftime("%H:%M"), end_time.strftime("%H:%M"), args.hours)
vpcs = [v["VpcId"] for v in ec2.describe_vpcs()["Vpcs"]]
fl_map = ec2.describe_flow_logs(
Filter=[{"Name": "resource-id", "Values": vpcs}]
).get("FlowLogs", [])
if not fl_map:
log.error("No VPC flow logs found. Enable flow logs first.")
sys.exit(1)
all_rejected = []
for fl in fl_map:
log_group = fl.get("LogGroupName")
if log_group:
all_rejected.extend(
read_rejected_ips_cloudwatch(logs_cl, log_group, start_time, end_time)
)
if not all_rejected:
print(f"\nNo REJECT entries found for private IPs in the last {args.hours} hour(s).")
return
all_groups = describe_all_security_groups(ec2)
unique_new = sorted(
set(
ip for ip, _ in all_rejected
if not ip_covered_by_existing_rules(ip, all_groups)
),
key=lambda x: ipaddress.ip_address(x)
)
print(f"\nFound {len(all_rejected)} REJECT entries from private IPs.")
print(f"{len(unique_new)} IP(s) are not covered by any existing security group rule:\n")
for ip in unique_new:
ports = sorted(set(p for i, p in all_rejected if i == ip))
print(f" {ip:20s} destination ports: {', '.join(ports)}")
if not unique_new:
print("\nAll rejected IPs are already covered. "
"Check rule ordering or protocol settings.")
return
existing_ips = []
if args.approved_ips:
try:
with open(args.approved_ips) as f:
existing_ips = json.load(f).get("source_ips", [])
except FileNotFoundError:
log.warning("Approved IPs file not found. Starting fresh.")
merged_ips = sorted(
set(existing_ips) | set(unique_new),
key=lambda x: ipaddress.ip_address(x)
)
out_path = args.approved_ips or f"approved_ips_{args.region}_{timestamp_str()}.json"
with open(out_path, "w") as f:
json.dump({
"generated_at": datetime.now(timezone.utc).isoformat(),
"region": args.region,
"total_unique_ips": len(merged_ips),
"source_ips": merged_ips,
"diagnose_additions": unique_new
}, f, indent=2)
print(f"\nUpdated approved IP list ({len(merged_ips)} total IPs) written to: {out_path}")
if not args.apply:
print(f"\nTo apply the updated rules, run:")
print(f" python sg_tightener.py plan --approved-ips {out_path} --region {args.region}")
print(f" python sg_tightener.py apply --plan <plan_file> --region {args.region}")
return
collapsed_cidrs = collapse_ips_to_cidrs(merged_ips, gap_tolerance=args.gap_tolerance)
log.info("Collapsed to %d CIDR block(s).", len(collapsed_cidrs))
groups_to_modify = [
(sg, find_permissive_rules(sg))
for sg in all_groups
if find_permissive_rules(sg)
]
if not groups_to_modify:
print("\nNo permissive rules found to update.")
return
changes = []
for sg, permissive in groups_to_modify:
replacement_rules = build_replacement_rules(
permissive, collapsed_cidrs, args.gap_tolerance
)
changes.append({
"sg_id": sg["GroupId"],
"sg_name": sg.get("GroupName", ""),
"vpc_id": sg.get("VpcId", ""),
"rules_to_remove": permissive,
"replacement_rules": replacement_rules,
"port_merge_required": False
})
plan = {"changes": changes, "region": args.region}
print_plan_summary(plan)
backup_path = f"sg_backup_diagnose_{args.region}_{timestamp_str()}.json"
backup_security_groups([sg for sg, _ in groups_to_modify], backup_path)
print(f"Backup saved to: {backup_path}")
if not args.yes:
confirm = input("Type 'apply' to proceed: ").strip()
if confirm != "apply":
print("Aborted. Updated approved_ips file has been saved.")
return
session2 = boto3.Session(region_name=args.region)
ec2_w = session2.client("ec2")
for sg, permissive in groups_to_modify:
replacement_rules = build_replacement_rules(
permissive, collapsed_cidrs, args.gap_tolerance
)
revoke_perms = []
for entry in permissive:
perm = dict(entry["permission"])
perm["IpRanges"] = [{"CidrIp": entry["cidr"]}]
perm.pop("Ipv6Ranges", None)
perm.pop("PrefixListIds", None)
perm.pop("UserIdGroupPairs", None)
revoke_perms.append(perm)
ec2_w.revoke_security_group_ingress(
GroupId=sg["GroupId"], IpPermissions=revoke_perms
)
new_perms = []
for rule in replacement_rules:
p = {
"IpProtocol": rule["protocol"],
"IpRanges": [{"CidrIp": rule["cidr"],
"Description": "sg-tightener managed"}]
}
if rule["protocol"] not in ("-1", "all"):
p["FromPort"] = rule["from_port"]
p["ToPort"] = rule["to_port"]
new_perms.append(p)
ec2_w.authorize_security_group_ingress(
GroupId=sg["GroupId"], IpPermissions=new_perms
)
log.info("Updated %s.", sg["GroupId"])
print(f"\nUpdated {len(groups_to_modify)} security group(s).")
print(f"To revert: python sg_tightener.py revert --backup {backup_path} "
f"--region {args.region} --yes")
if __name__ == "__main__":
main()
EOF
chmod +x sg_diagnose.py 8. OU Risk Report: sg_ou_report.py
This script scans an entire AWS Organisation or a specified account list and produces a risk ranked report showing which accounts have the most permissive security group and NACL rules. You specify the maximum CIDR prefix length you are comfortable with (default /24) and the report flags every rule with a larger block than that. Accounts are sorted from most to least permissive so you know immediately where to focus remediation effort. NACLs appear alongside security groups in the findings, clearly labelled, so you have full visibility across both enforcement layers even though the tightening workflow only operates on security groups.
cat > sg_ou_report.py << 'EOF'
#!/usr/bin/env python3
"""
sg-ou-report: Risk-rank AWS accounts by security group and NACL permissiveness.
Scans all security groups and network ACLs across an OU or account list and
reports inbound rules with RFC 1918 CIDR blocks larger than the threshold.
Accounts are sorted from most to least permissive.
NACLs are reported but not modified. Security group tightening is handled
by sg_tightener.py.
Usage:
python sg_ou_report.py --ou-id ou-xxxx-xxxxxxxx --regions af-south-1 eu-west-1
python sg_ou_report.py --accounts 123456789012 234567890123 --regions af-south-1
python sg_ou_report.py --ou-id ou-xxxx-xxxxxxxx --max-prefix-len 26
Prerequisites:
pip install boto3 pandas openpyxl
"""
import boto3
import csv
import sys
import argparse
import logging
import ipaddress
from datetime import datetime, timezone
from dataclasses import dataclass, asdict
from typing import Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
try:
import pandas as pd
PANDAS_AVAILABLE = True
except ImportError:
PANDAS_AVAILABLE = False
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
log = logging.getLogger(__name__)
PRIVATE_RANGES = [
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.168.0.0/16"),
]
DEFAULT_MAX_PREFIX_LEN = 24
DEFAULT_ROLE_NAME = "OrganizationAccountAccessRole"
def classify_severity(prefix_len: int, threshold: int) -> str:
gap = threshold - prefix_len
if gap >= 16:
return "CRITICAL"
elif gap >= 8:
return "HIGH"
elif gap >= 4:
return "MEDIUM"
return "LOW"
@dataclass
class SGFinding:
account_id: str
account_name: str
region: str
resource_type: str # SecurityGroup or NACL
resource_id: str
resource_name: str
vpc_id: str
rule_protocol: str
rule_from_port: str
rule_to_port: str
cidr: str
prefix_len: int
severity: str
recommendation: str
def list_accounts_in_ou(ou_id: str) -> list:
org = boto3.client("organizations")
accounts = []
def recurse(parent_id):
paginator = org.get_paginator("list_children")
for page in paginator.paginate(ParentId=parent_id, ChildType="ACCOUNT"):
for child in page["Children"]:
try:
acc = org.describe_account(AccountId=child["Id"])["Account"]
if acc["Status"] == "ACTIVE":
accounts.append({"id": acc["Id"], "name": acc["Name"]})
except Exception as e:
log.warning("Could not describe account %s: %s", child["Id"], e)
for page in paginator.paginate(ParentId=parent_id, ChildType="ORGANIZATIONAL_UNIT"):
for child in page["Children"]:
recurse(child["Id"])
recurse(ou_id)
return accounts
def get_session(account_id: str, role_name: Optional[str]) -> boto3.Session:
if not role_name:
return boto3.Session()
sts = boto3.client("sts")
role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"
creds = sts.assume_role(
RoleArn=role_arn, RoleSessionName="SGPermissivenessAudit"
)["Credentials"]
return boto3.Session(
aws_access_key_id=creds["AccessKeyId"],
aws_secret_access_key=creds["SecretAccessKey"],
aws_session_token=creds["SessionToken"]
)
def scan_security_groups(
ec2, account_id: str, account_name: str,
region: str, max_prefix_len: int
) -> list:
findings = []
paginator = ec2.get_paginator("describe_security_groups")
for page in paginator.paginate():
for sg in page.get("SecurityGroups", []):
sg_id = sg["GroupId"]
sg_name = sg.get("GroupName", "")
vpc_id = sg.get("VpcId", "")
for perm in sg.get("IpPermissions", []):
protocol = perm.get("IpProtocol", "-1")
from_port = str(perm.get("FromPort", "all"))
to_port = str(perm.get("ToPort", "all"))
for iprange in perm.get("IpRanges", []):
cidr = iprange.get("CidrIp", "")
if not cidr or cidr == "0.0.0.0/0":
continue
try:
net = ipaddress.ip_network(cidr, strict=False)
except ValueError:
continue
if not any(net.overlaps(p) for p in PRIVATE_RANGES):
continue
if net.prefixlen >= max_prefix_len:
continue
severity = classify_severity(net.prefixlen, max_prefix_len)
findings.append(SGFinding(
account_id=account_id,
account_name=account_name,
region=region,
resource_type="SecurityGroup",
resource_id=sg_id,
resource_name=sg_name,
vpc_id=vpc_id,
rule_protocol=protocol,
rule_from_port=from_port,
rule_to_port=to_port,
cidr=cidr,
prefix_len=net.prefixlen,
severity=severity,
recommendation=(
f"Replace /{net.prefixlen} with tightest covering CIDR "
f"from flow log analysis. Run sg-tightener analyse then plan."
)
))
return findings
def scan_nacls(
ec2, account_id: str, account_name: str,
region: str, max_prefix_len: int
) -> list:
findings = []
paginator = ec2.get_paginator("describe_network_acls")
for page in paginator.paginate():
for nacl in page.get("NetworkAcls", []):
nacl_id = nacl["NetworkAclId"]
nacl_name = next(
(t["Value"] for t in nacl.get("Tags", []) if t["Key"] == "Name"), nacl_id
)
vpc_id = nacl.get("VpcId", "")
for entry in nacl.get("Entries", []):
# Inbound only, allow rules only (RuleAction = allow)
if entry.get("Egress", True):
continue
if entry.get("RuleAction", "") != "allow":
continue
cidr = entry.get("CidrBlock", "")
if not cidr or cidr == "0.0.0.0/0":
continue
try:
net = ipaddress.ip_network(cidr, strict=False)
except ValueError:
continue
if not any(net.overlaps(p) for p in PRIVATE_RANGES):
continue
if net.prefixlen >= max_prefix_len:
continue
protocol = str(entry.get("Protocol", "-1"))
port_range = entry.get("PortRange", {})
from_port = str(port_range.get("From", "all"))
to_port = str(port_range.get("To", "all"))
severity = classify_severity(net.prefixlen, max_prefix_len)
findings.append(SGFinding(
account_id=account_id,
account_name=account_name,
region=region,
resource_type="NACL",
resource_id=nacl_id,
resource_name=nacl_name,
vpc_id=vpc_id,
rule_protocol=protocol,
rule_from_port=from_port,
rule_to_port=to_port,
cidr=cidr,
prefix_len=net.prefixlen,
severity=severity,
recommendation=(
f"NACL /{net.prefixlen} rule is overly permissive. "
f"Review manually. NACL tightening is not automated by sg-tightener."
)
))
return findings
def scan_account(
account: dict, role_name: Optional[str],
regions: list, max_prefix_len: int
) -> list:
account_id = account["id"]
account_name = account["name"]
findings = []
log.info("Scanning account %s (%s)", account_id, account_name)
try:
session = get_session(account_id, role_name)
except Exception as e:
log.error("Cannot assume role in %s: %s", account_id, e)
return []
for region in regions:
try:
ec2 = session.client("ec2", region_name=region)
findings.extend(scan_security_groups(ec2, account_id, account_name, region, max_prefix_len))
findings.extend(scan_nacls(ec2, account_id, account_name, region, max_prefix_len))
except Exception as e:
log.error("Error scanning %s / %s: %s", account_id, region, e)
log.info("Account %s: %d finding(s).", account_id, len(findings))
return findings
def write_csv(findings: list, path: str):
if not findings:
return
fieldnames = list(asdict(findings[0]).keys())
with open(path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for finding in findings:
writer.writerow(asdict(finding))
log.info("CSV written: %s", path)
def write_excel(findings: list, path: str):
if not PANDAS_AVAILABLE:
log.warning("pandas/openpyxl not available. Skipping Excel output.")
return
if not findings:
return
from openpyxl.styles import PatternFill
rows = [asdict(f) for f in findings]
df = pd.DataFrame(rows)
severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3}
df["_sort"] = df["severity"].map(severity_order).fillna(99)
df = df.sort_values(
["_sort", "account_id", "resource_type", "prefix_len"],
ascending=[True, True, True, True]
).drop(columns=["_sort"])
colour_map = {
"CRITICAL": "FF4444",
"HIGH": "FF8800",
"MEDIUM": "FFD700",
"LOW": "90EE90",
}
with pd.ExcelWriter(path, engine="openpyxl") as writer:
df.to_excel(writer, index=False, sheet_name="Permissive Rules")
ws = writer.sheets["Permissive Rules"]
sev_col = list(df.columns).index("severity") + 1
for row_idx, row in enumerate(df.itertuples(index=False), start=2):
colour = colour_map.get(row.severity, "FFFFFF")
ws.cell(row=row_idx, column=sev_col).fill = PatternFill(
start_color=colour, end_color=colour, fill_type="solid"
)
acct_summary = (
df.groupby(["account_id", "account_name"])
.agg(
total_findings=("resource_id", "count"),
sg_findings= ("resource_type", lambda x: (x == "SecurityGroup").sum()),
nacl_findings= ("resource_type", lambda x: (x == "NACL").sum()),
critical= ("severity", lambda x: (x == "CRITICAL").sum()),
high= ("severity", lambda x: (x == "HIGH").sum()),
medium= ("severity", lambda x: (x == "MEDIUM").sum()),
low= ("severity", lambda x: (x == "LOW").sum()),
most_permissive_prefix=("prefix_len", "min")
)
.reset_index()
.sort_values("total_findings", ascending=False)
)
acct_summary.to_excel(writer, index=False, sheet_name="Account Risk Ranking")
log.info("Excel written: %s", path)
def print_summary(findings: list, max_prefix_len: int):
if not findings:
print("\nNo permissive rules found.")
return
from collections import defaultdict
by_account = defaultdict(list)
for f in findings:
by_account[(f.account_id, f.account_name)].append(f)
sorted_accounts = sorted(
by_account.items(), key=lambda x: len(x[1]), reverse=True
)
print("\n" + "=" * 76)
print(f" SECURITY GROUP + NACL PERMISSIVENESS REPORT (threshold: /{max_prefix_len})")
print("=" * 76)
print(f" {'ACCOUNT ID':15s} {'ACCOUNT NAME':28s} {'SG':5s} {'NACL':5s} {'TOTAL':6s} WORST")
print(f" {'-'*15} {'-'*28} {'-'*5} {'-'*5} {'-'*6} {'-'*14}")
for (account_id, account_name), account_findings in sorted_accounts:
sg_count = sum(1 for f in account_findings if f.resource_type == "SecurityGroup")
nacl_count = sum(1 for f in account_findings if f.resource_type == "NACL")
worst_pfx = min(f.prefix_len for f in account_findings)
worst_sev = max(
account_findings,
key=lambda x: {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1}.get(x.severity, 0)
).severity
print(f" {account_id:15s} {account_name[:28]:28s} "
f"{sg_count:5d} {nacl_count:5d} {len(account_findings):6d} "
f"/{worst_pfx} ({worst_sev})")
sevs = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0}
for f in findings:
sevs[f.severity] += 1
sg_total = sum(1 for f in findings if f.resource_type == "SecurityGroup")
nacl_total = sum(1 for f in findings if f.resource_type == "NACL")
print()
print(f" Accounts with findings : {len(by_account)}")
print(f" Security group violations : {sg_total}")
print(f" NACL violations (report only): {nacl_total}")
print(f" Total violations : {len(findings)}")
for sev in ["CRITICAL", "HIGH", "MEDIUM", "LOW"]:
print(f" {sev:10s}: {sevs[sev]:4d}")
print("=" * 76)
def parse_args():
parser = argparse.ArgumentParser(
description="sg-ou-report: Risk-rank accounts by SG and NACL permissiveness."
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--ou-id", help="AWS Organizations OU ID")
group.add_argument("--accounts", nargs="+")
parser.add_argument("--role-name", default=DEFAULT_ROLE_NAME)
parser.add_argument("--regions", nargs="+",
default=["af-south-1", "eu-west-1", "us-east-1"])
parser.add_argument("--max-prefix-len", type=int, default=DEFAULT_MAX_PREFIX_LEN)
parser.add_argument("--workers", type=int, default=5)
parser.add_argument("--output-prefix", default="sg_permissiveness_report")
return parser.parse_args()
def main():
args = parse_args()
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
if args.ou_id:
log.info("Discovering accounts in OU: %s", args.ou_id)
accounts = list_accounts_in_ou(args.ou_id)
log.info("Found %d active account(s).", len(accounts))
else:
accounts = [{"id": a, "name": a} for a in args.accounts]
all_findings = []
with ThreadPoolExecutor(max_workers=args.workers) as executor:
futures = {
executor.submit(
scan_account, acc, args.role_name, args.regions, args.max_prefix_len
): acc
for acc in accounts
}
for future in as_completed(futures):
acc = futures[future]
try:
all_findings.extend(future.result())
except Exception as e:
log.error("Account %s failed: %s", acc["id"], e)
print_summary(all_findings, args.max_prefix_len)
csv_path = f"{args.output_prefix}_{timestamp}.csv"
xlsx_path = f"{args.output_prefix}_{timestamp}.xlsx"
write_csv(all_findings, csv_path)
write_excel(all_findings, xlsx_path)
log.info("Report complete.")
return 1 if any(f.severity == "CRITICAL" for f in all_findings) else 0
if __name__ == "__main__":
sys.exit(main())
EOF
chmod +x sg_ou_report.py 9. Example Workflow
The typical sequence for remediating a single account looks like this.
Step 1: Run the OU risk report to identify your highest-risk accounts.
cat > run_ou_report.sh << 'EOF'
#!/usr/bin/env bash
set -euo pipefail
source .venv/bin/activate
python sg_ou_report.py \
--ou-id ou-xxxx-xxxxxxxx \
--role-name YourAuditRole \
--regions af-south-1 eu-west-1 \
--max-prefix-len 24 \
--workers 8
EOF
chmod +x run_ou_report.sh Step 2: For the highest-risk account, run analyse to build the IP list.
cat > run_analyse.sh << 'EOF'
#!/usr/bin/env bash
set -euo pipefail
source .venv/bin/activate
python sg_tightener.py analyse \
--region af-south-1 \
--days 90
EOF
chmod +x run_analyse.sh Step 3: Review the approved IP list, then run plan to see the full changeset.
cat > run_plan.sh << 'EOF'
#!/usr/bin/env bash
set -euo pipefail
source .venv/bin/activate
python sg_tightener.py plan \
--region af-south-1 \
--approved-ips approved_ips_af-south-1_<timestamp>.json \
--gap-tolerance 0.20
EOF
chmod +x run_plan.sh Step 4: When satisfied with the plan output, apply the changes.
cat > run_apply.sh << 'EOF'
#!/usr/bin/env bash
set -euo pipefail
source .venv/bin/activate
python sg_tightener.py apply \
--region af-south-1 \
--plan sg_plan_af-south-1_<timestamp>.json
EOF
chmod +x run_apply.sh Step 5: After a DR event or new service deployment, run diagnose.
cat > run_diagnose.sh << 'EOF'
#!/usr/bin/env bash
set -euo pipefail
source .venv/bin/activate
python sg_diagnose.py \
--region af-south-1 \
--hours 4 \
--approved-ips approved_ips_af-south-1_<timestamp>.json \
--apply
EOF
chmod +x run_diagnose.sh Step 6: If something breaks, revert immediately.
cat > run_revert.sh << 'EOF'
#!/usr/bin/env bash
set -euo pipefail
source .venv/bin/activate
python sg_tightener.py revert \
--region af-south-1 \
--backup sg_backup_af-south-1_<timestamp>.json
EOF
chmod +x run_revert.sh 10. Relationship to CloudToRepo
sg-tightener lives under the CloudToRepo project at cloudtorepo.com as a security extension. CloudToRepo’s core purpose is to reverse-engineer existing AWS infrastructure into Terraform so you can understand and version-control what you have. sg-tightener extends that philosophy in the security direction: rather than accepting that your security groups are an undocumented product of historical decisions, it gives you an evidence-based, auditable, repeatable way to understand and tighten them.
The two tools are complementary. CloudToRepo tells you what your infrastructure looks like. sg-tightener tells you whether your security group rules are defensible given what has actually been accessing your account. Used together they form the foundation of a security posture that can be explained, reviewed, and continuously improved.
11. What This Does Not Do
sg-tightener does not audit public internet exposure. Rules with a source of 0.0.0.0/0 are left exactly as they are. It does not evaluate whether specific services should be reachable at all, only whether the source CIDR for existing private rules is unnecessarily broad. It does not manage egress rules. Network ACL tightening is reported but not automated: the OU report surfaces permissive NACL rules and labels them clearly, but the plan, apply, and revert workflow is security-group-only. NACL tightening carries a higher subnet-level blast radius and warrants its own dedicated implementation.
What the tool does is convert one specific, pervasive class of security debt into a defensible, evidence-based configuration without requiring weeks of manual forensic work. For most enterprise AWS accounts that have grown organically over several years, that single improvement reduces the lateral movement surface area substantially.
sg-tightener is an open source extension of CloudToRepo. Contributions welcome.
Andrew Baker · andrewbaker.ninja · Group CIO, Capitec Bank