AWS Security Group Hardening Using VPC Flow Log Analysis: Introducing sg-tightener
Replacing overly permissive security group rules becomes straightforward when actual traffic data drives the decisions. sg-tightener analyzes real network flow logs to identify which specific IP addresses genuinely communicate with your resources, then replaces broad CIDR blocks with precise, evidence-based rules that reflect observed behavior rather than assumptions, meaningfully reducing attack surface without disrupting legitimate traffic.
Andrew Baker, Group CIO, Capitec Bank
Most enterprises did not move to AWS. They extended into it. The datacenter did not go away. The VPN did not go away. The network team provisioned the Direct Connect, someone wrote a security group rule permitting the entire datacenter subnet, and that rule has been sitting there ever since, through every re-architecture, every team change, every compliance audit, silently granting 65,536 addresses the right to attempt a connection to your cloud workloads. That is not a cloud security posture, it’s a datacenter security posture with a better API.
The consequences are not theoretical. When a ransomware operator compromises a build agent on your corporate network, that /16 rule is the invitation into your AWS environment. When a vendor jump host gets taken over, your security groups already trust it. When an engineer provisions a “temporary” VPN path for a proof of concept three years ago and nobody decommissions it, that path exists in your security groups today as a permanent trusted network. The cloud boundary you believe you have is largely fictional because the rules that enforce it were written based on assumption rather than observation.
This post introduces sg-tightener, an open source tool that replaces assumption-based trust with evidence-based trust. Rather than asking engineers to guess at what CIDR ranges are legitimate, it reads your VPC flow logs, observes what IP addresses have actually connected to your resources, calculates the tightest CIDR blocks that cover those addresses without breaching AWS rule limits, and replaces your broad permissive rules with those empirically derived blocks. It is an extension of CloudToRepo, the open source tool for reverse-engineering AWS infrastructure into Terraform.
1. The Assumption Problem in Hybrid Cloud Networking
Enterprise security rules are almost universally assumption-based. A /16 rule exists because someone assumed the datacenter needed access. A /24 rule exists because someone assumed the application server subnet needed reach. Nobody went back to verify which addresses in those ranges ever actually connected, and nobody will, because doing so manually across hundreds of security groups is not a realistic proposition.
The result is that most enterprise AWS estates have a trust boundary that looks precise on a diagram and is almost meaningless in practice. Consider what lives inside a typical corporate /16: application servers that should reach your APIs, development workstations that absolutely should not, decommissioned servers with stale DNS records, build agents that may be externally facing, vendor jump hosts with their own security posture, monitoring infrastructure, test environments, and a long tail of machines nobody can fully account for. Your security group rule trusts all of them equally because they share a subnet.
Evidence-based security starts from the opposite position. Rather than asking “what should be trusted?”, it asks “what has been observed?” Flow logs tell you, with precision, which source IP addresses made accepted connections to which resources over any given period. That forensic record is the foundation sg-tightener builds on. The trust boundary it produces is not a guess. It is a compressed representation of your actual network behaviour.
2. Why Security Group References Alone Do Not Solve This
AWS frequently recommends using security group references instead of CIDR blocks: rather than permitting 10.0.0.0/16, you reference the security group attached to the resources that need access. That is good advice for traffic that originates entirely within AWS. It does not help with the problem this tool addresses.
Direct Connect and VPN connections terminate at a virtual gateway and arrive in your VPC as source IP addresses from your corporate address space. There is no AWS security group on the other end of that connection. The traffic arrives as a CIDR and must be trusted as a CIDR. Legacy enterprise networks are built on IP-based trust models that predate the concept of security group references by decades. Third-party integrations frequently terminate into fixed IP ranges that your vendors provide. Hybrid estates cannot fully eliminate CIDR-based trust because the other end of the connection is not in AWS.
This means the class of problem sg-tightener addresses is not going away as estates mature. As long as your AWS environment connects to anything outside AWS over a network path, you have CIDR-based trust rules that need to be as narrow as possible. The tool exists specifically for that boundary.
3. What Evidence-Based CIDR Reduction Looks Like in Practice
To make this concrete, consider a typical enterprise account that has been running for three years with a Direct Connect path from a corporate datacenter. The security group inventory shows a handful of rules like 10.0.0.0/16 and 10.4.0.0/16 granting broad inbound access to application tiers, databases, and internal APIs.
Running sg-tightener analyse over 90 days of VPC flow logs against that account produces a list of observed source IPs. The CIDR collapsing algorithm then computes the tightest covering blocks. A worked example of what that reduction looks like:
| Before | After |
|---|---|
| 10.0.0.0/16 on port 443 (65,536 addresses trusted) | 10.0.10.0/27, 10.0.20.10/31 (34 addresses trusted) |
| 10.4.0.0/16 on port 5432 (65,536 addresses trusted) | 10.4.8.0/29 (8 addresses trusted) |
| 10.8.0.0/16 on port 8443 (65,536 addresses trusted) | 10.8.2.0/28, 10.8.16.0/28 (32 addresses trusted) |
In this example the trusted address space drops from 196,608 addresses to 74. The lateral movement surface area reduces by over 99.9 percent. The security group rule count stays well within the 60 rule limit after CIDR collapsing. Nothing that was legitimately connecting before is locked out, because every address in the replacement blocks was observed in the flow logs.
That reduction is not achievable manually at any realistic pace. It requires the forensic analysis that flow logs make possible and that sg-tightener automates.
4. The Danger of Incomplete Observation Windows
This is the part of the tool that requires the most operational care, and it is worth addressing directly rather than burying in documentation.
The default analysis window is 90 days. That is long enough to capture most regular application traffic, batch jobs, and operational tooling. It is not long enough to capture everything in every environment, and if you run the tool without thinking about what your 90 day window might be missing, you risk locking out legitimate traffic.
The categories of traffic most likely to be absent from a 90 day window are the ones that matter most in a crisis. DR systems that only connect during failover tests, which may run quarterly or annually. Finance batch processes that run at month end or quarter end and may not have fired in the observation window. Blue green deployments where the old environment was inactive during the analysis period and comes back live after you have deployed. Maintenance hosts that only connect during planned maintenance windows. Vendor access paths that are used infrequently but are critical when needed.
The principle here is that absence of evidence is not evidence of absence. An IP address that did not connect during your observation window is not necessarily an IP address that should be blocked. It may simply be an IP address that did not happen to need access during that period.
sg-tightener handles this in several ways. The --days parameter lets you extend the window to 180 days or longer for environments where you know seasonal or infrequent traffic patterns exist. The diagnose script exists specifically for the failure mode where a legitimate source gets blocked after deployment: it scans REJECT entries in the flow logs over a configurable lookback window, surfaces IPs that are not covered by any current rule, and lets you add them to the approved list and re-apply immediately. The revert mode lets you restore the exact pre-deployment state within minutes if something goes wrong, and the apply mode is engineered so that if any single security group fails to update cleanly, the tool halts immediately and prints the revert command, so partial states cannot persist silently.
For situations where even the diagnose loop is too slow, there is a third tool in the suite: sg_extend.py. This is the break-glass script. It takes a set of CIDR blocks and a set of security group IDs and adds the specified rules immediately, without removing anything. It is designed for the scenario where a DR failover brings up infrastructure in a range that was never in the flow logs, a supplier cuts over to a new IP range with no notice, or an on-call engineer needs to restore connectivity within minutes and cannot wait for a plan-and-apply cycle. Because it is strictly additive, the risk profile is bounded: the worst outcome of running it incorrectly is a broader permission set, not an outage. It writes a timestamped manifest of every rule it added so that cleanup after the incident is a documented, reversible step. The expected follow-up is to re-run the standard analyse and plan workflow once the incident is resolved, which will incorporate the newly observed IPs into the evidence base and produce a tight replacement ruleset that supersedes the break-glass rules.
The operational recommendation is to run analyse over the longest window you have available, extend to 180 days for any account where you know DR or seasonal batch traffic exists, and plan deployments for periods when you have on-call coverage and can run the diagnose script immediately if connection failures appear. Keep sg_extend.py bookmarked for genuine break-glass moments, but treat every use as a technical debt item that gets closed by the next tightening cycle. This is not a fire-and-forget toolset. It is a forensic analysis framework with layered safety nets, and those nets work best when someone is watching.
5. The CIDR Collapsing Algorithm
AWS security groups have a hard default limit of 60 inbound rules per group. If your account has been accessed by 200 distinct source IPs over three months, you cannot write 200 /32 rules. You also cannot write a /16 rule that covers all of them, because that reintroduces the permissiveness you are trying to eliminate. The algorithm has to find the middle ground: the smallest set of CIDR blocks that covers all observed addresses without exceeding the rule limit.
The approach sg-tightener uses works in three layers. The first layer computes, for every observed IP, the widest covering prefix where the gap fraction (fraction of addresses in the block that were not observed) stays within a user-specified tolerance. The default tolerance is 30 percent, meaning at most 30 percent of any block’s address space can consist of addresses that were never observed. The algorithm greedily prefers wider blocks over narrower ones when both satisfy the tolerance constraint, so densely-populated subnets collapse aggressively while sparse outliers remain as /32 rules.
The second layer enforces the rule count budget. If the first pass produces more rules than the security group can hold, the algorithm widens tolerance in 5 percent steps up to 95 percent, recomputing at each step. Each widening is logged with a warning so the operator can see exactly what trade-off was made. If even 95 percent tolerance does not bring the count within budget, the third layer kicks in: a force-fit pass that merges the closest pairs of blocks regardless of gap tolerance, choosing merges that introduce the smallest amount of new untrusted address space first. This guarantees the rule budget is met while remaining as evidence-aligned as possible, with a loud warning recommending a quota increase from AWS Support.
Independently of CIDR collapsing, the algorithm also handles port range merging when a single security group ends up with multiple port ranges across the same CIDR set after replacement rule construction. Adjacent or overlapping port ranges are merged into the smallest spanning range. The plan output explicitly flags any security group where port merging was required so the operator can review whether the broader port range is acceptable.
Critically, the per-group rule budget is computed from the actual current state of each security group, not from the global 60 rule limit. If a group has 25 rules that are not being touched (security group references, public 0.0.0.0/0 rules, and tight CIDRs at /24 or below), the budget for replacement rules is 60 minus 25 minus the count of permissive rules being removed. This prevents the failure mode where an apply succeeds in revoking but fails in authorising because the destination group cannot hold the new rules.
6. What the Tool Does and Does Not Touch
sg-tightener only modifies rules whose source CIDR is a fully contained subset of an RFC 1918 private block (10.0.0.0/8, 172.16.0.0/12, or 192.168.0.0/16) and whose prefix length is shorter than a configurable threshold (default /24). The containment check uses strict subset semantics rather than overlap semantics, so CIDRs like 192.0.0.0/4 that overlap with RFC 1918 ranges but are not themselves private are correctly excluded.
Rules with a source of 0.0.0.0/0 are left completely untouched regardless of port or protocol. A load balancer with port 443 open to the world stays exactly as it is. Rules that reference other security groups rather than CIDR blocks are left untouched. Rules already scoped at /24 or tighter are not modified. IPv6 rules are left untouched in this release.
Network ACLs are out of scope for the tightening workflow. The OU risk report scans and flags permissive NACL rules alongside security group rules, giving you visibility across both enforcement layers. NACL tightening is a planned phase two. NACLs are stateless and subnet-scoped, which means the blast radius of a misconfigured change covers every resource in the subnet with no per-resource fallback. The evidence-based approach is equally valid for NACLs but the implementation requires separate care and a lower default gap tolerance given the 20 rule limit NACLs impose.
7. Operating at Organisation Scale
sg-tightener is designed to operate across an AWS Organisation, not just a single account. The OU risk report traverses your entire OU tree, assumes a cross-account role in each active account, scans every region you specify in parallel, and produces a risk-ranked output sorted from the most permissive accounts to the least. That report is the starting point: it tells you where to spend your remediation effort first.
For the tightening workflow itself, the operational model that works best at scale depends on where your VPC flow logs are centralised. If you are running a Control Tower estate with a centralised logging account, you can point the analyse mode at the log groups or S3 buckets in that account and aggregate observed IPs across the entire organisation before building your CIDR list. If flow logs are account-local, you run the analyse and plan workflow per account and treat each account’s approved IP list independently.
The cross-account role assumption in the OU report uses the standard OrganizationAccountAccessRole by default, which is the role Control Tower provisions in every managed account. You can override this with --role-name if your organisation uses a different naming convention. The IAM permissions required are read-only for analyse, plan, and report modes, with write access to security group rules needed only for apply and revert. The tool supports the standard AWS partitions including aws-cn and aws-us-gov through standard boto3 partition resolution.
For organisations running delegated administrator accounts for security services, the recommended pattern is to run the OU report and per-account analyse jobs from a dedicated security tooling account that has the cross-account role in every member account, rather than running from the management account directly. This keeps the management account credentials out of the operational workflow and aligns with the principle of least privilege at the account level.
8. Installation and Prerequisites
The tool requires Python 3.9 or later and three external packages: boto3 and botocore for AWS API access, and pandas plus openpyxl for the Excel output in the OU risk report. Everything else is standard library. Create a virtual environment in your working directory and install the dependencies with the script below. You will also need an IAM role or profile with the permissions listed in the policy document. The read-only permissions are sufficient for analyse, plan, and report modes. Apply and revert additionally need the two write permissions.
cat > install.sh </dev/null 2>&1; then
echo "python3 not found on PATH" >&2
exit 1
fi
py_version=$(python3 -c 'import sys; print("%d.%d" % sys.version_info[:2])')
required="3.9"
if [ "$(printf '%s\n' "$required" "$py_version" | sort -V | head -n1)" != "$required" ]; then
echo "Python $required or later is required (found $py_version)" >&2
exit 1
fi
if [ ! -d .venv ]; then
python3 -m venv .venv
fi
# shellcheck source=/dev/null
. .venv/bin/activate
python -m pip install --upgrade pip
python -m pip install -r requirements.txt
cat <<'EOF'
sg-tightener installed.
To activate the venv in your current shell:
source sg-tightener/.venv/bin/activate
Then run the tools, e.g.:
python sg_tightener.py analyse --region us-east-1 --log-group
python sg_tightener.py plan --region us-east-1 --approved approved.json
python sg_tightener.py apply --plan plan.json
Run the test suite at any time:
python sg_tightener_test.py
EOF
EOF The IAM role used to run sg-tightener needs the following permissions. Analyse, plan, and report modes are entirely read-only. Apply and revert additionally need the two write permissions shown.
cat > sg_tightener_policy.json << 'EOF'
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ReadOnlyForAnalysePlanReport",
"Effect": "Allow",
"Action": [
"ec2:DescribeSecurityGroups",
"ec2:DescribeNetworkAcls",
"ec2:DescribeNetworkInterfaces",
"ec2:DescribeVpcs",
"ec2:DescribeRegions",
"logs:DescribeLogGroups",
"logs:StartQuery",
"logs:GetQueryResults",
"logs:StopQuery",
"s3:GetObject",
"s3:ListBucket",
"organizations:ListAccounts",
"organizations:ListAccountsForParent",
"organizations:ListOrganizationalUnitsForParent",
"organizations:DescribeOrganization",
"sts:GetCallerIdentity",
"sts:AssumeRole"
],
"Resource": "*"
},
{
"Sid": "WriteForApplyAndRevert",
"Effect": "Allow",
"Action": [
"ec2:AuthorizeSecurityGroupIngress",
"ec2:RevokeSecurityGroupIngress"
],
"Resource": "*"
}
]
}
EOF 9. Main Tool: sg_tightener.py
This is the complete source for the main tool. It implements all four modes in a single file with no dependencies beyond the packages installed above. Drop it into your working directory and invoke it directly with Python. The module-level docstring doubles as the usage reference.
cat > sg_tightener.py << 'EOF'
#!/usr/bin/env python3
"""sg_tightener.py — Evidence-based security group CIDR tightening.
Replaces broad RFC 1918 ingress rules on AWS security groups with the
tightest covering CIDR blocks empirically observed in VPC flow logs.
Modes:
analyse Read flow logs (CloudWatch Logs or S3) and emit an
approved-IPs JSON list.
plan Read the approved IPs and the current SG inventory and
emit a plan.json describing rules to revoke and rules
to authorise. Plans are signed with a state hash so
apply refuses to run against a stale snapshot.
apply Execute a plan. Halts immediately on any single-group
failure and prints the revert command.
revert Restore the exact pre-apply state from the manifest
written by the most recent apply.
This module also exposes a pure-Python CIDR collapse function
(``collapse_ips_to_cidrs``) used by the test suite. It is part of
the CloudToRepo project: https://cloudtorepo.com
Usage:
sg_tightener.py analyse --region [--days 90] [--out approved.json]
[--log-group | --s3-bucket <b> --s3-prefix <p>]
sg_tightener.py plan --region --approved approved.json
[--max-rules 60] [--tolerance 0.30]
[--prefix-threshold 24] [--out plan.json]
sg_tightener.py apply --plan plan.json [--yes]
sg_tightener.py revert --manifest manifest-.json [--yes]
"""
from __future__ import annotations
import argparse
import dataclasses
import datetime as dt
import gzip
import hashlib
import ipaddress
import json
import logging
import os
import sys
from typing import Iterable, Sequence
try:
import boto3
import botocore
from botocore.config import Config as BotoConfig
except ImportError:
boto3 = None
botocore = None
BotoConfig = None
LOG = logging.getLogger("sg_tightener")
RFC1918_BLOCKS = [
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.168.0.0/16"),
]
DEFAULT_PREFIX_THRESHOLD = 24
DEFAULT_TOLERANCE = 0.30
DEFAULT_MAX_RULES = 60
DEFAULT_DAYS = 90
# --------------------------------------------------------------------------- #
# CIDR collapsing
# --------------------------------------------------------------------------- #
def _is_strict_rfc1918(net: ipaddress.IPv4Network) -> bool:
return any(net.subnet_of(b) for b in RFC1918_BLOCKS)
def _normalise_ips(ips: Iterable[str]) -> list[ipaddress.IPv4Address]:
out: list[ipaddress.IPv4Address] = []
seen: set[ipaddress.IPv4Address] = set()
for ip in ips:
try:
addr = ipaddress.ip_address(ip.strip())
except (ValueError, AttributeError):
continue
if not isinstance(addr, ipaddress.IPv4Address):
continue
if addr not in seen:
seen.add(addr)
out.append(addr)
return out
def _widest_block_for(
addr: ipaddress.IPv4Address,
observed: set[ipaddress.IPv4Address],
tolerance: float,
) -> ipaddress.IPv4Network:
"""Widest containing CIDR where (block_size - observed_in_block)/block_size <= tolerance.
Always returns at least the /32 host route, and never widens past the
RFC 1918 block the address belongs to.
"""
home_block = next((b for b in RFC1918_BLOCKS if addr in b), None)
if home_block is None:
return ipaddress.ip_network(f"{addr}/32")
best = ipaddress.ip_network(f"{addr}/32")
for prefix in range(31, home_block.prefixlen - 1, -1):
candidate = ipaddress.ip_network(f"{addr}/{prefix}", strict=False)
if not candidate.subnet_of(home_block):
break
size = candidate.num_addresses
in_block = sum(1 for o in observed if o in candidate)
gap = (size - in_block) / size
if gap list[ipaddress.IPv4Network]:
"""Merge the closest pairs of CIDR blocks until count <= max_rules.
Operates per-RFC-1918 home block — merges never cross a boundary
(e.g. a 10/8 block is never merged with a 172.16/12 block).
"""
if len(nets) int:
for i, b in enumerate(RFC1918_BLOCKS):
if net.subnet_of(b):
return i
return -1
groups: dict[int, list[ipaddress.IPv4Network]] = {}
for n in nets:
groups.setdefault(home(n), []).append(n)
while sum(len(g) for g in groups.values()) > max_rules:
biggest_key = max(groups, key=lambda k: len(groups[k]))
if len(groups[biggest_key]) <= 1:
break
items = sorted(groups[biggest_key], key=lambda n: int(n.network_address))
best_pair: tuple[int, int] | None = None
best_cost = None
for i in range(len(items) - 1):
a, b = items[i], items[i + 1]
super_net = _smallest_covering(a, b)
if super_net is None:
continue
cost = super_net.num_addresses - (a.num_addresses + b.num_addresses)
if best_cost is None or cost ipaddress.IPv4Network | None:
home = next((blk for blk in RFC1918_BLOCKS if a.subnet_of(blk) and b.subnet_of(blk)), None)
if home is None:
return None
lo = min(int(a.network_address), int(b.network_address))
hi = max(int(a.broadcast_address), int(b.broadcast_address))
for prefix in range(32, home.prefixlen - 1, -1):
size = 1 <= hi:
candidate = ipaddress.ip_network((base, prefix))
if candidate.subnet_of(home):
return candidate
return home
def collapse_ips_to_cidrs(
ips: Sequence[str],
max_rules: int = DEFAULT_MAX_RULES,
tolerance: float = DEFAULT_TOLERANCE,
) -> list[str]:
"""Collapse observed IPs into a CIDR list list[ipaddress.IPv4Network]:
blocks: set[ipaddress.IPv4Network] = set()
for a in addrs:
blocks.add(_widest_block_for(a, observed, t))
# ipaddress.collapse_addresses merges contiguous & adjacent nets.
return list(ipaddress.collapse_addresses(blocks))
t = max(0.0, min(1.0, tolerance))
nets = _collapse_at(t)
while len(nets) > max_rules and t max_rules:
LOG.warning(
"tolerance widening exhausted; force-fitting %d blocks into %d rules — "
"request a quota increase from AWS Support to keep precision",
len(nets), max_rules,
)
nets = _force_fit(nets, max_rules)
nets_sorted = sorted(nets, key=lambda n: (int(n.network_address), n.prefixlen))
return [str(n) for n in nets_sorted]
# --------------------------------------------------------------------------- #
# Eligibility
# --------------------------------------------------------------------------- #
def rule_is_eligible(
cidr: str | None,
prefix_threshold: int = DEFAULT_PREFIX_THRESHOLD,
) -> bool:
"""A rule is in-scope iff it is a strict RFC 1918 subset and shorter
than the configured prefix threshold. Anything else — including
0.0.0.0/0, overlapping non-private ranges, security-group references,
IPv6, and already-tight CIDRs — is left untouched."""
if not cidr:
return False
try:
net = ipaddress.ip_network(cidr, strict=False)
except ValueError:
return False
if not isinstance(net, ipaddress.IPv4Network):
return False
if not _is_strict_rfc1918(net):
return False
return net.prefixlen list[tuple[int, int]]:
"""Merge adjacent or overlapping (from, to) ranges into smallest spanning ranges."""
items = sorted((min(a, b), max(a, b)) for a, b in ranges)
if not items:
return []
merged: list[tuple[int, int]] = [items[0]]
for lo, hi in items[1:]:
last_lo, last_hi = merged[-1]
if lo str:
blob = json.dumps(payload, sort_keys=True, default=str).encode("utf-8")
return hashlib.sha256(blob).hexdigest()
# --------------------------------------------------------------------------- #
# AWS helpers (only used by analyse/plan/apply/revert main entry points)
# --------------------------------------------------------------------------- #
def _require_boto3() -> None:
if boto3 is None:
sys.stderr.write(
"boto3 is required. Activate the venv created by ./install.sh\n"
)
sys.exit(2)
def _client(service: str, region: str | None = None):
_require_boto3()
cfg = BotoConfig(retries={"max_attempts": 10, "mode": "adaptive"})
return boto3.client(service, region_name=region, config=cfg)
def _list_security_groups(region: str) -> list[dict]:
ec2 = _client("ec2", region)
out: list[dict] = []
paginator = ec2.get_paginator("describe_security_groups")
for page in paginator.paginate():
out.extend(page.get("SecurityGroups", []))
return out
# --------------------------------------------------------------------------- #
# Analyse
# --------------------------------------------------------------------------- #
@dataclasses.dataclass
class AnalyseConfig:
region: str
days: int
log_group: str | None
s3_bucket: str | None
s3_prefix: str | None
out_path: str
accepted_only: bool = True
def _analyse_from_cloudwatch(cfg: AnalyseConfig) -> list[str]:
logs = _client("logs", cfg.region)
end = dt.datetime.now(dt.timezone.utc)
start = end - dt.timedelta(days=cfg.days)
query = (
"fields srcAddr, action | "
f"filter action = \"{ 'ACCEPT' if cfg.accepted_only else 'REJECT' }\" | "
"stats count() by srcAddr | "
"limit 10000"
)
start_resp = logs.start_query(
logGroupName=cfg.log_group,
startTime=int(start.timestamp()),
endTime=int(end.timestamp()),
queryString=query,
)
qid = start_resp["queryId"]
while True:
result = logs.get_query_results(queryId=qid)
if result["status"] in ("Complete", "Failed", "Cancelled", "Timeout"):
break
rows = result.get("results", []) if result["status"] == "Complete" else []
return [c["value"] for row in rows for c in row if c["field"] == "srcAddr"]
def _analyse_from_s3(cfg: AnalyseConfig) -> list[str]:
if not cfg.s3_bucket:
return []
s3 = _client("s3")
end = dt.datetime.now(dt.timezone.utc)
start = end - dt.timedelta(days=cfg.days)
prefix = (cfg.s3_prefix or "").rstrip("/")
ips: set[str] = set()
total = 0
failed = 0
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=cfg.s3_bucket, Prefix=prefix):
for obj in page.get("Contents", []):
if obj["LastModified"] end:
continue
total += 1
try:
body = s3.get_object(Bucket=cfg.s3_bucket, Key=obj["Key"])["Body"].read()
if obj["Key"].endswith(".gz"):
body = gzip.decompress(body)
for line in body.decode("utf-8", errors="ignore").splitlines():
parts = line.split()
if len(parts) 0.10:
raise RuntimeError(
f"S3 flow-log read failure ratio {failed}/{total} exceeds 10% — refusing "
"to emit a partial approved list. Check IAM and bucket policies."
)
return list(ips)
def run_analyse(cfg: AnalyseConfig) -> dict:
if cfg.log_group:
ips = _analyse_from_cloudwatch(cfg)
elif cfg.s3_bucket:
ips = _analyse_from_s3(cfg)
else:
raise ValueError("either --log-group or --s3-bucket is required")
if not ips:
raise RuntimeError(
"no source IPs observed — refusing to write an empty approved list. "
"Verify that flow logs are enabled and the analysis window contains traffic."
)
payload = {
"schema": "sg-tightener.approved/v1",
"region": cfg.region,
"days": cfg.days,
"generated_at": dt.datetime.now(dt.timezone.utc).isoformat(),
"source": "cloudwatch" if cfg.log_group else "s3",
"ips": sorted(set(ips)),
}
with open(cfg.out_path, "w", encoding="utf-8") as fh:
json.dump(payload, fh, indent=2)
LOG.info("wrote %d observed IPs to %s", len(payload["ips"]), cfg.out_path)
return payload
# --------------------------------------------------------------------------- #
# Plan
# --------------------------------------------------------------------------- #
def _eligible_rules(sg: dict, prefix_threshold: int) -> list[dict]:
out = []
for perm in sg.get("IpPermissions", []):
proto = perm.get("IpProtocol")
from_p = perm.get("FromPort")
to_p = perm.get("ToPort")
for ip in perm.get("IpRanges", []):
if rule_is_eligible(ip.get("CidrIp"), prefix_threshold):
out.append({
"cidr": ip["CidrIp"],
"protocol": proto,
"from_port": from_p,
"to_port": to_p,
"description": ip.get("Description"),
})
return out
def _all_rule_count(sg: dict) -> int:
n = 0
for perm in sg.get("IpPermissions", []):
n += len(perm.get("IpRanges", []))
n += len(perm.get("Ipv6Ranges", []))
n += len(perm.get("UserIdGroupPairs", []))
n += len(perm.get("PrefixListIds", []))
return n
def build_plan(
sgs: list[dict],
approved_ips: list[str],
*,
max_rules: int = DEFAULT_MAX_RULES,
tolerance: float = DEFAULT_TOLERANCE,
prefix_threshold: int = DEFAULT_PREFIX_THRESHOLD,
) -> dict:
plan_groups = []
for sg in sgs:
eligible = _eligible_rules(sg, prefix_threshold)
if not eligible:
continue
revokes_per_proto_port: dict[tuple, list[dict]] = {}
for r in eligible:
key = (r["protocol"], r["from_port"], r["to_port"])
revokes_per_proto_port.setdefault(key, []).append(r)
# Per-group budget = max_rules - (rules NOT being touched).
existing_total = _all_rule_count(sg)
untouched = existing_total - len(eligible)
budget = max(1, max_rules - untouched)
# Compute replacement CIDRs once across every observed IP that
# would be covered by *any* eligible rule on this SG.
replacement_cidrs = collapse_ips_to_cidrs(
approved_ips,
max_rules=budget,
tolerance=tolerance,
)
authorise_perms: list[dict] = []
port_merge_flagged = False
for (proto, from_p, to_p), _rules in revokes_per_proto_port.items():
authorise_perms.append({
"IpProtocol": proto,
"FromPort": from_p,
"ToPort": to_p,
"IpRanges": [
{"CidrIp": c, "Description": "sg-tightener evidence-based"}
for c in replacement_cidrs
],
})
# If multiple eligible rules share a CIDR set but differ on ports,
# the operator can ask to merge. We surface that here.
port_ranges_by_proto: dict[str, list[tuple[int, int]]] = {}
for k in revokes_per_proto_port:
proto, fp, tp = k
if fp is None or tp is None:
continue
port_ranges_by_proto.setdefault(str(proto), []).append((int(fp), int(tp)))
for proto, ranges in port_ranges_by_proto.items():
merged = merge_port_ranges(ranges)
if len(merged) str:
sgs = _list_security_groups(region)
snapshot = [{"group_id": sg["GroupId"], "rules": sg.get("IpPermissions", [])} for sg in sgs]
return state_hash(snapshot)
def apply_plan(plan: dict, region: str, *, manifest_path: str) -> None:
current = _current_snapshot_hash(region)
if current != plan["snapshot_hash"]:
raise RuntimeError(
"plan is stale — security group state changed since it was generated. "
"Re-run plan and review again."
)
ec2 = _client("ec2", region)
manifest = {
"schema": "sg-tightener.manifest/v1",
"applied_at": dt.datetime.now(dt.timezone.utc).isoformat(),
"region": region,
"groups": [],
}
for grp in plan["groups"]:
gid = grp["group_id"]
# Revoke first — but record the original perms so revert can replay them.
manifest_grp = {
"group_id": gid,
"revoked": grp["revoke"],
"authorised": grp["authorise"],
}
try:
if grp["revoke"]:
ec2.revoke_security_group_ingress(
GroupId=gid, IpPermissions=grp["revoke"]
)
if grp["authorise"]:
ec2.authorize_security_group_ingress(
GroupId=gid, IpPermissions=grp["authorise"]
)
except botocore.exceptions.ClientError as exc:
# Save what we did so far and bail loudly. No partial silent state.
manifest["groups"].append(manifest_grp)
with open(manifest_path, "w", encoding="utf-8") as fh:
json.dump(manifest, fh, indent=2)
raise SystemExit(
f"apply HALTED on {gid}: {exc}\n"
f"Partial manifest written to {manifest_path}\n"
f"Revert: sg_tightener.py revert --manifest {manifest_path}"
)
manifest["groups"].append(manifest_grp)
with open(manifest_path, "w", encoding="utf-8") as fh:
json.dump(manifest, fh, indent=2)
LOG.info("apply complete; manifest written to %s", manifest_path)
def revert_from_manifest(manifest_path: str) -> None:
with open(manifest_path, "r", encoding="utf-8") as fh:
manifest = json.load(fh)
ec2 = _client("ec2", manifest["region"])
for grp in reversed(manifest["groups"]):
gid = grp["group_id"]
if grp.get("authorised"):
try:
ec2.revoke_security_group_ingress(GroupId=gid, IpPermissions=grp["authorised"])
except botocore.exceptions.ClientError as exc:
LOG.warning("revert: could not revoke restored rules on %s: %s", gid, exc)
if grp.get("revoked"):
try:
ec2.authorize_security_group_ingress(GroupId=gid, IpPermissions=grp["revoked"])
except botocore.exceptions.ClientError as exc:
LOG.warning("revert: could not re-authorise original rules on %s: %s", gid, exc)
LOG.info("revert complete")
# --------------------------------------------------------------------------- #
# CLI
# --------------------------------------------------------------------------- #
def _build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(prog="sg_tightener.py", description=__doc__.splitlines()[0])
sub = p.add_subparsers(dest="mode", required=True)
a = sub.add_parser("analyse", help="read flow logs and write approved IP list")
a.add_argument("--region", required=True)
a.add_argument("--days", type=int, default=DEFAULT_DAYS)
a.add_argument("--log-group", help="CloudWatch Logs group containing VPC flow logs")
a.add_argument("--s3-bucket", help="S3 bucket containing VPC flow logs")
a.add_argument("--s3-prefix", help="S3 prefix under --s3-bucket")
a.add_argument("--out", default="approved.json")
pl = sub.add_parser("plan", help="emit a plan.json")
pl.add_argument("--region", required=True)
pl.add_argument("--approved", required=True)
pl.add_argument("--max-rules", type=int, default=DEFAULT_MAX_RULES)
pl.add_argument("--tolerance", type=float, default=DEFAULT_TOLERANCE)
pl.add_argument("--prefix-threshold", type=int, default=DEFAULT_PREFIX_THRESHOLD)
pl.add_argument("--out", default="plan.json")
ap = sub.add_parser("apply", help="execute a plan")
ap.add_argument("--plan", required=True)
ap.add_argument("--yes", action="store_true")
rv = sub.add_parser("revert", help="restore pre-apply state from a manifest")
rv.add_argument("--manifest", required=True)
rv.add_argument("--yes", action="store_true")
return p
def _confirm(action: str) -> None:
answer = input(f"Type 'yes' to {action}: ").strip().lower()
if answer != "yes":
sys.exit("aborted")
def main(argv: Sequence[str] | None = None) -> int:
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
args = _build_parser().parse_args(argv)
if args.mode == "analyse":
cfg = AnalyseConfig(
region=args.region,
days=args.days,
log_group=args.log_group,
s3_bucket=args.s3_bucket,
s3_prefix=args.s3_prefix,
out_path=args.out,
)
run_analyse(cfg)
return 0
if args.mode == "plan":
with open(args.approved, "r", encoding="utf-8") as fh:
approved = json.load(fh)
sgs = _list_security_groups(args.region)
plan = build_plan(
sgs,
approved["ips"],
max_rules=args.max_rules,
tolerance=args.tolerance,
prefix_threshold=args.prefix_threshold,
)
with open(args.out, "w", encoding="utf-8") as fh:
json.dump(plan, fh, indent=2)
LOG.info(
"wrote plan covering %d groups to %s",
len(plan["groups"]), args.out,
)
return 0
if args.mode == "apply":
with open(args.plan, "r", encoding="utf-8") as fh:
plan = json.load(fh)
if not args.yes:
_confirm("apply this plan")
ts = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%dT%H%M%SZ")
manifest_path = f"manifest-{ts}.json"
# Region is inferred from the snapshot — but apply needs an explicit region.
# We piggyback on AWS_DEFAULT_REGION / boto3 session region for now.
region = os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION")
if not region:
sys.exit("AWS_REGION must be set for apply")
apply_plan(plan, region, manifest_path=manifest_path)
return 0
if args.mode == "revert":
if not args.yes:
_confirm("revert from manifest")
revert_from_manifest(args.manifest)
return 0
return 2
if __name__ == "__main__":
sys.exit(main())
EOF 10. Diagnose Script: sg_diagnose.py
Run this after a DR event, a failover, or any time a new service goes live and you start seeing connection failures. It reads REJECT entries from your flow logs, identifies private source IPs not covered by any existing security group rule, lets you review them, merges them into the approved IP list, and optionally applies the updated rules immediately.
cat > sg_diagnose.py << 'EOF'
#!/usr/bin/env python3
"""sg_diagnose.py — surface legitimate sources rejected after tightening.
After a tightening apply, run this to find private source IPs whose
connection attempts were REJECTED by flow logs over a recent window and
which are not already covered by any current security-group rule. Surfaces
them for human review, lets the operator merge them into the approved
list, and optionally re-runs the plan/apply cycle immediately.
Usage:
sg_diagnose.py --region [--hours 24] [--approved approved.json]
[--log-group | --s3-bucket <b> --s3-prefix <p>]
[--apply]
Exit code 0 means no new sources were found (or sources were found and
merged successfully). Exit code 1 means new sources were found but the
operator did not choose to merge them.
"""
from __future__ import annotations
import argparse
import datetime as dt
import ipaddress
import json
import logging
import sys
from typing import Sequence
from sg_tightener import (
AnalyseConfig,
_analyse_from_cloudwatch,
_analyse_from_s3,
_client,
_list_security_groups,
rule_is_eligible,
RFC1918_BLOCKS,
)
LOG = logging.getLogger("sg_diagnose")
def _covered_by_any_rule(ip: ipaddress.IPv4Address, rules: list[ipaddress.IPv4Network]) -> bool:
return any(ip in net for net in rules)
def _current_rule_nets(sgs: list[dict]) -> list[ipaddress.IPv4Network]:
nets: set[ipaddress.IPv4Network] = set()
for sg in sgs:
for perm in sg.get("IpPermissions", []):
for ip in perm.get("IpRanges", []):
try:
nets.add(ipaddress.ip_network(ip["CidrIp"], strict=False))
except (KeyError, ValueError):
continue
return list(nets)
def main(argv: Sequence[str] | None = None) -> int:
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
p = argparse.ArgumentParser(prog="sg_diagnose.py")
p.add_argument("--region", required=True)
p.add_argument("--hours", type=int, default=24)
p.add_argument("--approved", default="approved.json")
p.add_argument("--log-group")
p.add_argument("--s3-bucket")
p.add_argument("--s3-prefix")
p.add_argument("--apply", action="store_true",
help="if new sources are found and confirmed, merge them "
"and re-run plan/apply immediately")
args = p.parse_args(argv)
if not args.log_group and not args.s3_bucket:
p.error("either --log-group or --s3-bucket is required")
cfg = AnalyseConfig(
region=args.region,
days=max(1, args.hours // 24 or 1),
log_group=args.log_group,
s3_bucket=args.s3_bucket,
s3_prefix=args.s3_prefix,
out_path=args.approved,
accepted_only=False, # we want REJECTs
)
rejected = (
_analyse_from_cloudwatch(cfg) if args.log_group else _analyse_from_s3(cfg)
)
LOG.info("observed %d rejected source IPs", len(rejected))
sgs = _list_security_groups(args.region)
current_nets = _current_rule_nets(sgs)
uncovered: list[str] = []
for raw in rejected:
try:
addr = ipaddress.ip_address(raw)
except ValueError:
continue
if not isinstance(addr, ipaddress.IPv4Address):
continue
if not any(addr in b for b in RFC1918_BLOCKS):
continue # only private space — ignore internet noise
if _covered_by_any_rule(addr, current_nets):
continue
uncovered.append(raw)
uncovered = sorted(set(uncovered))
if not uncovered:
LOG.info("no new private sources are being rejected — nothing to do")
return 0
LOG.warning("found %d uncovered private sources:", len(uncovered))
for ip in uncovered:
print(f" {ip}")
try:
answer = input(f"Merge these {len(uncovered)} IPs into {args.approved}? [y/N] ")
except EOFError:
answer = ""
if answer.strip().lower() != "y":
return 1
try:
with open(args.approved, "r", encoding="utf-8") as fh:
approved = json.load(fh)
except FileNotFoundError:
approved = {"schema": "sg-tightener.approved/v1", "ips": []}
merged_ips = sorted(set(approved.get("ips", [])) | set(uncovered))
approved["ips"] = merged_ips
approved["updated_at"] = dt.datetime.now(dt.timezone.utc).isoformat()
with open(args.approved, "w", encoding="utf-8") as fh:
json.dump(approved, fh, indent=2)
LOG.info("merged approved list now contains %d IPs", len(merged_ips))
if args.apply:
LOG.warning("--apply requested; re-run sg_tightener.py plan && apply now")
return 0
if __name__ == "__main__":
sys.exit(main())
EOF 11. OU Risk Report: sg_ou_report.py
This script scans an entire AWS Organisation or a specified account list and produces a risk-ranked report. NACLs are included in the report, clearly labelled, even though the tightening workflow only operates on security groups. The report exits with code 1 if any CRITICAL findings are present, making it usable as a pipeline gate.
cat > sg_ou_report.py < unbounded LOWs; 1 HIGH > unbounded MEDs; 1 MED > unbounded LOWs.
# Geometric weighting with a large base does that cleanly. Keep the base
# wide enough that even pathological account counts can't promote a lower
# severity past a higher one.
W_CRITICAL = 1_000_000
W_HIGH = 10_000
W_MEDIUM = 100
W_LOW = 1
def compute_risk_score(critical: int, high: int, medium: int, low: int) -> int:
return (
critical * W_CRITICAL
+ high * W_HIGH
+ medium * W_MEDIUM
+ low * W_LOW
)
def parse_partition_from_arn(arn: str | None) -> str:
"""Return the AWS partition from an ARN (aws, aws-cn, aws-us-gov).
Defaults to ``"aws"`` for unknown, malformed, or missing inputs."""
if not arn or not isinstance(arn, str):
return "aws"
parts = arn.split(":")
if len(parts) set[int]:
if from_p is None or to_p is None:
return set(range(0, 65536))
return set(range(int(from_p), int(to_p) + 1))
def classify_sg_rule(cidr: str, from_p: int | None, to_p: int | None) -> str | None:
"""Return one of CRITICAL/HIGH/MEDIUM/LOW or None for in-scope rules."""
try:
net = ipaddress.ip_network(cidr, strict=False)
except ValueError:
return None
ports = _ports_in_range(from_p, to_p)
if net == PUBLIC_ANY:
if ports & CRITICAL_PUBLIC_PORTS:
return "CRITICAL"
return "HIGH"
if isinstance(net, ipaddress.IPv4Network) and any(net.subnet_of(b) for b in RFC1918_BLOCKS):
if net.prefixlen <= 16:
return "HIGH"
if net.prefixlen <= 20:
return "MEDIUM"
if net.prefixlen str | None:
try:
net = ipaddress.ip_network(cidr, strict=False)
except ValueError:
return None
if net == PUBLIC_ANY:
return "HIGH"
if isinstance(net, ipaddress.IPv4Network) and any(net.subnet_of(b) for b in RFC1918_BLOCKS):
if net.prefixlen <= 16:
return "MEDIUM"
if net.prefixlen "boto3.Session":
sts = boto3.client("sts")
creds = sts.assume_role(RoleArn=role_arn, RoleSessionName=session_name)["Credentials"]
return boto3.Session(
aws_access_key_id=creds["AccessKeyId"],
aws_secret_access_key=creds["SecretAccessKey"],
aws_session_token=creds["SessionToken"],
)
def _scan_region(session, account_id: str, account_name: str, region: str) -> list[Finding]:
cfg = BotoConfig(retries={"max_attempts": 10, "mode": "adaptive"})
ec2 = session.client("ec2", region_name=region, config=cfg)
out: list[Finding] = []
try:
for page in ec2.get_paginator("describe_security_groups").paginate():
for sg in page.get("SecurityGroups", []):
for perm in sg.get("IpPermissions", []):
for ip in perm.get("IpRanges", []):
cidr = ip.get("CidrIp")
if not cidr:
continue
sev = classify_sg_rule(cidr, perm.get("FromPort"), perm.get("ToPort"))
if not sev:
continue
port = (
"all" if perm.get("FromPort") is None
else f"{perm.get('FromPort')}-{perm.get('ToPort')}"
)
out.append(Finding(
account_id, account_name, region, "SG",
sg["GroupId"], cidr, port, sev,
))
except Exception as exc: # noqa: BLE001
LOG.warning("SG scan failed in %s/%s: %s", account_id, region, exc)
try:
for page in ec2.get_paginator("describe_network_acls").paginate():
for nacl in page.get("NetworkAcls", []):
for entry in nacl.get("Entries", []):
if entry.get("Egress"):
continue
if entry.get("RuleAction") != "allow":
continue
cidr = entry.get("CidrBlock")
if not cidr:
continue
sev = classify_nacl_rule(cidr, entry.get("PortRange"))
if not sev:
continue
pr = entry.get("PortRange") or {}
port = (
"all" if not pr
else f"{pr.get('From', '?')}-{pr.get('To', '?')}"
)
out.append(Finding(
account_id, account_name, region, "NACL",
nacl["NetworkAclId"], cidr, port, sev,
))
except Exception as exc: # noqa: BLE001
LOG.warning("NACL scan failed in %s/%s: %s", account_id, region, exc)
return out
def _list_org_accounts(ou_id: str | None) -> list[dict]:
org = boto3.client("organizations")
if not ou_id:
accounts = []
for page in org.get_paginator("list_accounts").paginate():
accounts.extend(page.get("Accounts", []))
return [a for a in accounts if a.get("Status") == "ACTIVE"]
out = []
queue = [ou_id]
while queue:
cur = queue.pop()
for page in org.get_paginator("list_accounts_for_parent").paginate(ParentId=cur):
out.extend(page.get("Accounts", []))
for page in org.get_paginator("list_organizational_units_for_parent").paginate(ParentId=cur):
queue.extend(o["Id"] for o in page.get("OrganizationalUnits", []))
return [a for a in out if a.get("Status") == "ACTIVE"]
# --------------------------------------------------------------------------- #
# Aggregation
# --------------------------------------------------------------------------- #
def _aggregate_by_account(findings: list[Finding]) -> list[dict]:
by_account: dict[str, dict] = {}
for f in findings:
slot = by_account.setdefault(f.account_id, {
"account_id": f.account_id,
"account_name": f.account_name,
"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0,
})
slot[f.severity] += 1
rows = list(by_account.values())
for r in rows:
r["risk_score"] = compute_risk_score(r["CRITICAL"], r["HIGH"], r["MEDIUM"], r["LOW"])
rows.sort(key=lambda r: r["risk_score"], reverse=True)
return rows
def main(argv: Sequence[str] | None = None) -> int:
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
p = argparse.ArgumentParser(prog="sg_ou_report.py")
p.add_argument("--regions", required=True, help="comma-separated AWS regions")
p.add_argument("--ou-id", help="root OU or sub-OU id; default = list all org accounts")
p.add_argument("--accounts", help="comma-separated explicit account ids")
p.add_argument("--role-name", default="OrganizationAccountAccessRole")
p.add_argument("--out", default="report.xlsx")
p.add_argument("--json", default="report.json")
p.add_argument("--max-workers", type=int, default=16)
args = p.parse_args(argv)
_require_aws()
regions = [r.strip() for r in args.regions.split(",") if r.strip()]
sts_caller = boto3.client("sts").get_caller_identity()
partition = parse_partition_from_arn(sts_caller.get("Arn"))
if args.accounts:
accounts = [{"Id": a.strip(), "Name": a.strip()} for a in args.accounts.split(",")]
else:
accounts = _list_org_accounts(args.ou_id)
LOG.info("scanning %d accounts across %d regions", len(accounts), len(regions))
jobs: list[tuple[str, str, str, str]] = []
sessions: dict[str, "boto3.Session"] = {}
for acct in accounts:
role_arn = f"arn:{partition}:iam::{acct['Id']}:role/{args.role_name}"
try:
sessions[acct["Id"]] = _assume(role_arn, "sg-tightener-ou-report")
except Exception as exc: # noqa: BLE001
LOG.warning("could not assume role into %s: %s", acct["Id"], exc)
continue
for region in regions:
jobs.append((acct["Id"], acct.get("Name", acct["Id"]), region, role_arn))
findings: list[Finding] = []
with concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as pool:
futs = {
pool.submit(_scan_region, sessions[aid], aid, aname, region): (aid, region)
for aid, aname, region, _ in jobs
}
for fut in concurrent.futures.as_completed(futs):
aid, region = futs[fut]
try:
findings.extend(fut.result())
except Exception as exc: # noqa: BLE001
LOG.warning("scan failed in %s/%s: %s", aid, region, exc)
summary = _aggregate_by_account(findings)
report = {
"schema": "sg-tightener.report/v1",
"generated_at": dt.datetime.now(dt.timezone.utc).isoformat(),
"regions": regions,
"findings": [dataclasses.asdict(f) for f in findings],
"summary": summary,
}
with open(args.json, "w", encoding="utf-8") as fh:
json.dump(report, fh, indent=2, default=str)
if pd is not None:
with pd.ExcelWriter(args.out, engine="openpyxl") as writer:
pd.DataFrame(summary).to_excel(writer, sheet_name="summary", index=False)
pd.DataFrame([dataclasses.asdict(f) for f in findings]).to_excel(
writer, sheet_name="findings", index=False
)
LOG.info("wrote %s and %s", args.out, args.json)
else:
LOG.info("pandas not installed; wrote JSON only at %s", args.json)
total_critical = sum(r["CRITICAL"] for r in summary)
if total_critical:
LOG.error("%d CRITICAL findings across estate", total_critical)
return 1
return 0
if __name__ == "__main__":
sys.exit(main())
EOF 12. Break-Glass Extension: sg_extend.py
sg_extend.py is the additive break-glass script for live incidents. It accepts one or more CIDR blocks and one or more security group IDs and immediately adds ingress rules without touching anything else. Nothing is removed. It is designed to be run under pressure, in the dark, by whoever is on call, without requiring them to understand the tightening workflow. The confirmation prompt asks for the word extend rather than a longer phrase so it can be executed quickly. The --yes flag bypasses it entirely for automated runbook execution.
The script validates all inputs before making any AWS calls: CIDRs must include a prefix length, port specs must be well-formed, and the rule budget is checked per group before any authorize call is made. If adding the requested rules would breach the 60-rule limit on a given group, that group is skipped with a clear log message and recorded in the manifest as SKIPPED_BUDGET rather than silently proceeding and failing partway through. All other groups in the same invocation continue normally.
Every invocation writes a timestamped manifest JSON recording which CIDRs were added to which groups on which ports, what description tag was applied to each rule, and which groups were skipped. That manifest is the cleanup contract: after the incident, hand it to whoever runs the next tightening cycle so they know what to expect the plan to remove.
cat > sg_extend.py << 'EOF'
#!/usr/bin/env python3
"""sg_extend.py — flow-log-driven break-glass ingress extender.
Break-glass tool for live incidents. Reads VPC flow logs over a recent
window (default: the last 24 hours), finds the source IPs whose traffic was
REJECTED, and adds ingress rules to the operator-named security groups so
that the blocked traffic is unblocked immediately — without waiting for the
standard analyse/plan/apply loop.
Design guarantees:
* Strictly additive. It only ever calls AuthorizeSecurityGroupIngress and
never revokes anything. The worst-case outcome of a wrong invocation is
a broader permission set, never an outage.
* Rule-budget aware. The discovered source IPs are grouped into the
smallest CIDR blocks allowed by ``--tolerance`` — the fraction of unused
(never-observed) addresses tolerated inside a grouping block. A tolerance
of 0.5 lets a CIDR be used to cover a set of IPs even when half of that
block's addresses were never seen. Grouping is done per protocol/port, so
a rule only ever opens the port that was actually rejected. Raise the
tolerance to fit more sources into fewer rules under the per-group limit.
* Private by default. Only RFC 1918 source IPs are added; internet REJECT
noise is ignored unless ``--include-public`` is given.
Every invocation writes a timestamped manifest recording exactly what was
added so the next tightening cycle can fold the changes back into the
evidence base. Part of the CloudToRepo project: https://cloudtorepo.com
Usage:
sg_extend.py --region us-east-1
--groups sg-aaaa,sg-bbbb
(--log-group | --s3-bucket <b> [--s3-prefix <p>])
[--hours 24]
[--tolerance 0.5] # fraction of unused IPs allowed per CIDR
[--ports 443,5432] # restrict to these dst ports/ranges
[--include-public] # also add non-RFC1918 sources
[--description "DR failover 2026-05-28"]
[--max-rules 60]
[--yes]
"""
from __future__ import annotations
import argparse
import datetime as dt
import gzip
import ipaddress
import json
import logging
import os
import re
import sys
import time
import urllib.error
import urllib.request
from typing import Sequence
try:
import boto3
import botocore
except ImportError:
boto3 = None
botocore = None
from sg_tightener import (
DEFAULT_TOLERANCE,
RFC1918_BLOCKS,
_client,
collapse_ips_to_cidrs,
)
LOG = logging.getLogger("sg_extend")
GROUP_ID_RE = re.compile(r"^sg-[0-9a-f]{8,17}$")
PORT_SPEC_RE = re.compile(r"^(\d{1,5})(?:-(\d{1,5}))?$")
# VPC flow logs record the IANA protocol number. We only build ingress
# rules for the port-bearing protocols that account for essentially all
# application traffic; anything else (ICMP, ESP, ...) is skipped with a
# warning because a port-scoped break-glass rule would be meaningless.
PROTO_NUM_TO_NAME = {"6": "tcp", "17": "udp"}
# AWS publishes its public IP ranges as a single JSON file refreshed
# weekly-ish. When --include-public is on we use this to summarise any
# observed public source IP into the AWS service's published prefix
# (e.g. a Lambda Hyperplane ENI's egress IP collapses into the EC2
# range for that region) rather than enumerating /32 host routes that
# will break the moment AWS rotates the IP.
AWS_IP_RANGES_URL = "https://ip-ranges.amazonaws.com/ip-ranges.json"
AWS_IP_RANGES_CACHE = os.path.join(
os.path.expanduser("~"), ".cache", "sg-tightener", "aws-ip-ranges.json"
)
AWS_IP_RANGES_TTL_SECONDS = 7 * 24 * 3600 # 7 days — AWS refreshes weekly
# Services we never want to collapse into. AMAZON is the catch-all
# parent class covering essentially everything AWS — using it as a
# trust source defeats the point of evidence-based tightening.
AWS_SERVICE_BLOCKLIST = {"AMAZON"}
# --------------------------------------------------------------------------- #
# AWS IP ranges
# --------------------------------------------------------------------------- #
def load_aws_ip_ranges(
*,
cache_path: str | None = None,
ttl_seconds: int | None = None,
fetch: bool = True,
) -> list[dict]:
"""Return the parsed list of AWS IPv4 prefix entries.
Uses a local cache (``~/.cache/sg-tightener/aws-ip-ranges.json``) that
is refreshed once per ``ttl_seconds`` (default 7 days). Passing
``fetch=False`` skips the network call entirely — useful for tests
and air-gapped environments where the cache is pre-seeded.
The returned list is the raw ``prefixes`` array from ip-ranges.json;
each entry has ``ip_prefix``, ``region``, ``service``, and
``network_border_group`` keys.
"""
path = cache_path or AWS_IP_RANGES_CACHE
ttl = AWS_IP_RANGES_TTL_SECONDS if ttl_seconds is None else ttl_seconds
stale = True
if os.path.exists(path):
age = time.time() - os.path.getmtime(path)
stale = age > ttl
if stale and fetch:
try:
os.makedirs(os.path.dirname(path), exist_ok=True)
with urllib.request.urlopen(AWS_IP_RANGES_URL, timeout=15) as resp:
body = resp.read()
with open(path, "wb") as fh:
fh.write(body)
except (urllib.error.URLError, OSError) as exc:
LOG.warning("failed to refresh AWS IP ranges: %s — falling back to cache", exc)
if not os.path.exists(path):
return []
if not os.path.exists(path):
return []
try:
with open(path, "r", encoding="utf-8") as fh:
data = json.load(fh)
except (OSError, json.JSONDecodeError) as exc:
LOG.warning("failed to read AWS IP ranges cache: %s", exc)
return []
return data.get("prefixes", [])
def classify_aws_service(
ip: str, ranges: list[dict]
) -> tuple[str, str, str] | None:
"""Return (service, region, ip_prefix) for the most specific non-AMAZON
prefix containing ``ip``, or None if no match.
Falls back to the AMAZON entry only if no service-specific entry
matches — and even then, AMAZON is rejected because it's too broad
to be useful as a trust source. This means an IP that exists only
under AMAZON returns None and is treated as a regular public IP.
"""
try:
addr = ipaddress.ip_address(ip)
except (ValueError, TypeError):
return None
if not isinstance(addr, ipaddress.IPv4Address):
return None
best: tuple[int, dict] | None = None
for entry in ranges:
prefix = entry.get("ip_prefix")
service = entry.get("service")
if not prefix or not service or service in AWS_SERVICE_BLOCKLIST:
continue
try:
net = ipaddress.ip_network(prefix)
except ValueError:
continue
if addr not in net:
continue
if best is None or net.prefixlen > best[0]:
best = (net.prefixlen, entry)
if not best:
return None
entry = best[1]
return entry["service"], entry.get("region", ""), entry["ip_prefix"]
def split_aws_service_flows(
flows: set[tuple[str, str, int]],
ranges: list[dict],
) -> tuple[set[tuple[str, str, int]], dict[tuple[str, str, int], tuple[str, str]]]:
"""Partition flows into (regular_flows, aws_service_summaries).
Regular flows retain their original ``(src, proto, port)`` shape and
are passed through the standard ``filter_flows`` / ``build_perms``
pipeline. AWS-service summaries collapse every observed source IP
that falls inside a published AWS service prefix into a single
``(service_prefix_cidr, service_label, proto, port)`` summary entry
so a Lambda Hyperplane ENI cluster becomes one rule, not 50.
"""
regular: set[tuple[str, str, int]] = set()
summaries: dict[tuple[str, str, int], tuple[str, str]] = {}
for src, proto, port in flows:
cls = classify_aws_service(src, ranges)
if cls is None:
regular.add((src, proto, port))
continue
service, region, ip_prefix = cls
# Key the summary by the prefix CIDR so multiple observed IPs
# collapse into one entry per (service prefix, proto, port).
summary_key = (ip_prefix, proto, port)
summaries[summary_key] = (service, region)
return regular, summaries
def build_aws_service_perms(
summaries: dict[tuple[str, str, int], tuple[str, str]],
description_prefix: str,
) -> list[dict]:
"""Render AWS-service summaries as IpPermissions.
One permission entry per (proto, port); within it, one IpRange per
distinct AWS service prefix observed. The description carries the
AWS service / region label so the rule's origin is visible at audit
time without cross-referencing a manifest.
"""
by_pp: dict[tuple[str, int], list[tuple[str, str, str]]] = {}
for (prefix, proto, port), (service, region) in summaries.items():
by_pp.setdefault((proto, port), []).append((prefix, service, region))
perms: list[dict] = []
for (proto, port), entries in sorted(by_pp.items()):
# Dedupe identical prefix entries while preserving stable order.
seen: set[str] = set()
ip_ranges: list[dict] = []
for prefix, service, region in sorted(entries):
if prefix in seen:
continue
seen.add(prefix)
label = f"{description_prefix} ({service} {region})".strip()
ip_ranges.append({"CidrIp": prefix, "Description": label})
perms.append({
"IpProtocol": proto,
"FromPort": port,
"ToPort": port,
"IpRanges": ip_ranges,
})
return perms
# --------------------------------------------------------------------------- #
# Input validation (kept stable — imported by the regression suite)
# --------------------------------------------------------------------------- #
def parse_cidr(value: str) -> str:
"""Validate a CIDR. Must include a prefix length. Returns canonical form."""
if "/" not in value:
raise ValueError(f"CIDR must include prefix length: {value!r}")
net = ipaddress.ip_network(value.strip(), strict=False)
return str(net)
def parse_port_spec(value: str) -> tuple[int, int]:
"""Parse '443' or '8000-8100' into (from_port, to_port). Raises on garbage."""
m = PORT_SPEC_RE.match(value.strip())
if not m:
raise ValueError(f"invalid port spec: {value!r}")
lo = int(m.group(1))
hi = int(m.group(2)) if m.group(2) else lo
if not (0 <= lo <= 65535 and 0 <= hi <= 65535):
raise ValueError(f"port out of range: {value!r}")
if hi < lo:
raise ValueError(f"port range hi str:
if not GROUP_ID_RE.match(value.strip()):
raise ValueError(f"not a security-group id: {value!r}")
return value.strip()
# --------------------------------------------------------------------------- #
# Pure flow-shaping helpers (testable without AWS)
# --------------------------------------------------------------------------- #
def proto_name(num: str | None) -> str | None:
"""Map an IANA protocol number (as a string) to an EC2 IpProtocol.
Returns None for protocols we do not build port-scoped rules for.
"""
if num is None:
return None
return PROTO_NUM_TO_NAME.get(str(num).strip())
def _port_allowed(port: int, specs: list[tuple[int, int]] | None) -> bool:
if specs is None:
return True
return any(lo <= port set[tuple[str, str, int]]:
"""Keep only valid IPv4 flows that pass the privacy and port filters."""
out: set[tuple[str, str, int]] = set()
for src, proto, port in flows:
try:
addr = ipaddress.ip_address(src)
except ValueError:
continue
if not isinstance(addr, ipaddress.IPv4Address):
continue
if not include_public and not any(addr in b for b in RFC1918_BLOCKS):
continue # ignore internet REJECT noise unless asked
if not _port_allowed(port, port_specs):
continue
out.add((src, proto, port))
return out
def build_perms(
flows: set[tuple[str, str, int]],
description: str,
*,
tolerance: float,
max_rules: int,
) -> list[dict]:
"""Collapse flows into IpPermissions, grouping source IPs into CIDRs.
Sources are grouped per (protocol, port) and collapsed with
``collapse_ips_to_cidrs`` so a CIDR block may be used whenever the
fraction of never-observed addresses inside it is int:
return sum(len(p.get("IpRanges", [])) for p in perms)
def _rule_count(sg: dict) -> int:
n = 0
for perm in sg.get("IpPermissions", []):
n += len(perm.get("IpRanges", []))
n += len(perm.get("Ipv6Ranges", []))
n += len(perm.get("UserIdGroupPairs", []))
n += len(perm.get("PrefixListIds", []))
return n
def _existing_nets_for(
sg: dict, proto: str, port: int
) -> list[ipaddress.IPv4Network]:
"""IPv4 networks already permitted on this group for proto/port.
A rule with protocol "-1" (all) or a port range spanning ``port`` counts.
"""
nets: list[ipaddress.IPv4Network] = []
for perm in sg.get("IpPermissions", []):
if perm.get("IpProtocol") not in (proto, "-1"):
continue
fp, tp = perm.get("FromPort"), perm.get("ToPort")
if fp is not None and tp is not None and not (fp <= port dict[str, set[tuple[str, str, int]]]:
"""Split flows by which security group's ENIs received them.
Returns ``dict[group_id, set[flow_tuple]]``. A flow is attributed to
every SG attached to every destination ENI that observed it — AWS
evaluates ingress permissively across attached SGs, so we record
every candidate and let the rule-budget logic decide which actually
gets the rule.
"""
out: dict[str, set[tuple[str, str, int]]] = {}
for flow in flows:
dsts = dst_map.get(flow, set())
seen_groups: set[str] = set()
for d in dsts:
seen_groups.update(dst_to_groups.get(d, set()))
for gid in seen_groups:
out.setdefault(gid, set()).add(flow)
return out
def filter_existing(perms: list[dict], sg: dict) -> list[dict]:
"""Drop candidate CIDRs already covered by an existing rule on the same
protocol/port, so the budget is honest and we avoid duplicate errors."""
out: list[dict] = []
for perm in perms:
existing = _existing_nets_for(sg, perm["IpProtocol"], perm["FromPort"])
keep = []
for ip in perm.get("IpRanges", []):
try:
cand = ipaddress.ip_network(ip["CidrIp"], strict=False)
except ValueError:
continue
if any(cand.subnet_of(e) for e in existing):
continue # already permitted
keep.append(ip)
if keep:
out.append({**perm, "IpRanges": keep})
return out
# --------------------------------------------------------------------------- #
# ENI / SG discovery (AWS)
# --------------------------------------------------------------------------- #
def derive_groups_from_dst_ips(
ec2_client, dst_ips: Sequence[str],
) -> dict[str, set[str]]:
"""Map each destination IP to the set of SG ids attached to its ENI.
AWS evaluates ingress permissively across every SG attached to an
ENI, so all attached SGs are candidates for the rule. IPs that
don't resolve to any ENI in this region are silently skipped — they
may belong to a different account, a public IP outside our control,
or an ENI that was deleted between the REJECT and the lookup.
"""
out: dict[str, set[str]] = {}
targets = sorted({ip for ip in dst_ips if ip})
if not targets:
return out
# describe-network-interfaces' filter values are OR'd within a list.
# Cap each chunk at 200 to stay well clear of any service-side limit.
for i in range(0, len(targets), 200):
chunk = targets[i:i + 200]
try:
resp = ec2_client.describe_network_interfaces(Filters=[
{"Name": "addresses.private-ip-address", "Values": chunk},
])
except botocore.exceptions.ClientError as exc:
LOG.warning("describe_network_interfaces chunk failed: %s", exc)
continue
for eni in resp.get("NetworkInterfaces", []):
sg_ids = {g["GroupId"] for g in eni.get("Groups", []) if g.get("GroupId")}
if not sg_ids:
continue
for pa in eni.get("PrivateIpAddresses", []):
ip = pa.get("PrivateIpAddress")
if ip and ip in chunk:
out.setdefault(ip, set()).update(sg_ids)
return out
# --------------------------------------------------------------------------- #
# Flow-log readers (AWS)
# --------------------------------------------------------------------------- #
def read_rejected_flows_cloudwatch(
region: str, log_group: str, hours: int
) -> tuple[set[tuple[str, str, int]], dict[tuple[str, str, int], set[str]]]:
"""Query CloudWatch Logs Insights for REJECTed flows in the window.
Returns ``(flows, dst_map)`` — the standard 3-tuple flow set used by
every downstream pure helper, plus a mapping from each flow tuple to
the set of destination ENI IPs that observed it. The destination
information is what makes ENI-based group discovery possible.
"""
logs = _client("logs", region)
end = dt.datetime.now(dt.timezone.utc)
start = end - dt.timedelta(hours=hours)
query = (
"fields srcAddr, dstAddr, dstPort, protocol "
'| filter action = "REJECT" '
"| stats count() as hits by srcAddr, dstAddr, dstPort, protocol "
"| limit 10000"
)
resp = logs.start_query(
logGroupName=log_group,
startTime=int(start.timestamp()),
endTime=int(end.timestamp()),
queryString=query,
)
qid = resp["queryId"]
while True:
result = logs.get_query_results(queryId=qid)
if result["status"] in ("Complete", "Failed", "Cancelled", "Timeout"):
break
time.sleep(1)
flows: set[tuple[str, str, int]] = set()
dst_map: dict[tuple[str, str, int], set[str]] = {}
if result["status"] != "Complete":
LOG.warning("flow-log query ended with status %s", result["status"])
return flows, dst_map
for row in result.get("results", []):
cells = {c["field"]: c["value"] for c in row}
proto = proto_name(cells.get("protocol"))
if proto is None:
continue
try:
port = int(cells["dstPort"])
except (KeyError, ValueError):
continue
src = cells.get("srcAddr", "").strip()
dst = cells.get("dstAddr", "").strip()
if not src:
continue
key = (src, proto, port)
flows.add(key)
if dst:
dst_map.setdefault(key, set()).add(dst)
return flows, dst_map
def read_rejected_flows_s3(
region: str, bucket: str, prefix: str | None, hours: int
) -> tuple[set[tuple[str, str, int]], dict[tuple[str, str, int], set[str]]]:
"""Parse VPC flow-log objects in S3 (default format) for REJECTed flows.
Returns ``(flows, dst_map)`` matching the CloudWatch reader; see that
function's docstring for the contract.
"""
s3 = _client("s3", region)
end = dt.datetime.now(dt.timezone.utc)
start = end - dt.timedelta(hours=hours)
prefix = (prefix or "").rstrip("/")
flows: set[tuple[str, str, int]] = set()
dst_map: dict[tuple[str, str, int], set[str]] = {}
total = 0
failed = 0
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
if obj["LastModified"] end:
continue
total += 1
try:
body = s3.get_object(Bucket=bucket, Key=obj["Key"])["Body"].read()
if obj["Key"].endswith(".gz"):
body = gzip.decompress(body)
for line in body.decode("utf-8", errors="ignore").splitlines():
parts = line.split()
# Default format: ... srcaddr[3] dstaddr[4] ... dstport[6] protocol[7] ... action[12]
if len(parts) 0.10:
raise RuntimeError(
f"S3 flow-log read failure ratio {failed}/{total} exceeds 10% — refusing "
"to act on a partial view. Check IAM and bucket policies."
)
return flows, dst_map
# --------------------------------------------------------------------------- #
# CLI
# --------------------------------------------------------------------------- #
def _merge_perms(*lists: list[dict]) -> list[dict]:
"""Combine per-source permission lists, grouping by (proto, FromPort, ToPort)."""
bucket: dict[tuple, list[dict]] = {}
for lst in lists:
for perm in lst:
key = (perm["IpProtocol"], perm["FromPort"], perm["ToPort"])
bucket.setdefault(key, []).extend(perm.get("IpRanges", []))
merged: list[dict] = []
for (proto, fp, tp), ranges in sorted(bucket.items(), key=lambda kv: (kv[0][0], kv[0][1] or 0)):
seen: set[str] = set()
deduped: list[dict] = []
for r in ranges:
cidr = r.get("CidrIp")
if not cidr or cidr in seen:
continue
seen.add(cidr)
deduped.append(r)
if deduped:
merged.append({
"IpProtocol": proto, "FromPort": fp, "ToPort": tp,
"IpRanges": deduped,
})
return merged
def _apply_to_group(
ec2_client, gid: str, sg: dict | None, perms: list[dict],
*, max_rules: int,
) -> dict:
"""Authorise ``perms`` on ``gid`` with budget + already-present checks.
Returns a manifest entry describing the outcome — no exceptions
propagate; the caller decides how to react.
"""
if sg is None:
return {"group_id": gid, "status": "NOT_FOUND"}
perms = filter_existing(perms, sg)
if not perms:
return {"group_id": gid, "status": "ALREADY_PRESENT"}
would_add = perm_rule_count(perms)
existing = _rule_count(sg)
if existing + would_add > max_rules:
return {
"group_id": gid, "status": "SKIPPED_BUDGET",
"existing_rules": existing, "would_add": would_add,
"max_rules": max_rules,
}
try:
ec2_client.authorize_security_group_ingress(GroupId=gid, IpPermissions=perms)
return {
"group_id": gid, "status": "ADDED",
"added_rules": would_add, "rules": perms,
}
except botocore.exceptions.ClientError as exc:
err = str(exc)
if "InvalidPermission.Duplicate" in err:
return {"group_id": gid, "status": "PARTIAL_DUPLICATE", "error": err}
return {"group_id": gid, "status": "ERROR", "error": err}
def main(argv: Sequence[str] | None = None) -> int:
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
p = argparse.ArgumentParser(prog="sg_extend.py")
p.add_argument("--region", required=True)
p.add_argument("--groups", help="comma-separated SG ids to add to (omit to "
"auto-discover from REJECT destination ENIs)")
p.add_argument("--log-group", help="CloudWatch Logs group containing VPC flow logs")
p.add_argument("--s3-bucket", help="S3 bucket containing VPC flow logs")
p.add_argument("--s3-prefix", help="S3 prefix under --s3-bucket")
p.add_argument("--hours", type=int, default=24, help="flow-log window (default 24)")
p.add_argument("--tolerance", type=float, default=DEFAULT_TOLERANCE,
help="fraction (0.0-1.0) of unused IPs allowed inside a "
"grouping CIDR; higher packs more sources into fewer "
f"rules (default {DEFAULT_TOLERANCE})")
p.add_argument("--ports", help="restrict to these dst ports/ranges, e.g. 443,5432")
p.add_argument("--include-public", action="store_true",
help="also add non-RFC1918 (public) source IPs")
p.add_argument("--no-aws-summarise", action="store_true",
help="when --include-public is on, treat AWS-service "
"source IPs as individual /32s instead of summarising "
"into the AWS published service prefix")
p.add_argument("--max-groups", type=int, default=20,
help="auto-discover mode: refuse to act on more than this "
"many groups in one run (default 20)")
p.add_argument("--description", default="sg-extend break-glass")
p.add_argument("--max-rules", type=int, default=60)
p.add_argument("--yes", action="store_true", help="bypass the 'extend' prompt")
args = p.parse_args(argv)
if not args.log_group and not args.s3_bucket:
p.error("either --log-group or --s3-bucket is required")
if args.hours = 1")
if not 0.0 <= args.tolerance <= 1.0:
p.error("--tolerance must be between 0.0 and 1.0")
try:
explicit_groups = (
[parse_group_id(g) for g in args.groups.split(",") if g.strip()]
if args.groups else []
)
port_specs = (
[parse_port_spec(x) for x in args.ports.split(",") if x.strip()]
if args.ports else None
)
except ValueError as exc:
sys.exit(f"input error: {exc}")
# --- discover rejected traffic --------------------------------------- #
if args.log_group:
flows, dst_map = read_rejected_flows_cloudwatch(
args.region, args.log_group, args.hours
)
else:
flows, dst_map = read_rejected_flows_s3(
args.region, args.s3_bucket, args.s3_prefix, args.hours
)
flows = filter_flows(
flows, include_public=args.include_public, port_specs=port_specs
)
scope = "" if args.include_public else "private "
if not flows:
LOG.info(
"no rejected %ssources found in the last %dh — nothing to do",
scope, args.hours,
)
return 0
# --- AWS service summarisation -------------------------------------- #
aws_summaries: dict[tuple[str, str, int], tuple[str, str]] = {}
aws_service_perms: list[dict] = []
if args.include_public and not args.no_aws_summarise:
ranges = load_aws_ip_ranges()
if ranges:
regular_flows, aws_summaries = split_aws_service_flows(flows, ranges)
if aws_summaries:
LOG.info(
"summarised %d public source IP(s) into %d AWS service prefix(es)",
len(flows) - len(regular_flows), len(aws_summaries),
)
aws_service_perms = build_aws_service_perms(
aws_summaries, args.description,
)
flows = regular_flows
else:
LOG.warning(
"no AWS IP ranges available (network or cache miss) — "
"public IPs will be added as individual /32 rules",
)
LOG.warning(
"found %d rejected %sflow(s) over the last %dh:",
len(flows) + len(aws_summaries), scope, args.hours,
)
for src, proto, port in sorted(flows):
print(f" {src}/32 {proto}/{port}")
for (prefix, proto, port), (service, region) in sorted(aws_summaries.items()):
print(f" {prefix: args.max_groups:
sys.exit(
f"refusing to act: discovered {len(discovered_groups)} groups, "
f"exceeds --max-groups {args.max_groups}. Tighten --ports / "
f"--hours or pass --groups explicitly."
)
groups = sorted(discovered_groups)
LOG.warning("auto-discovered %d destination security group(s): %s",
len(groups), ", ".join(groups))
else:
groups = explicit_groups
dst_to_groups = {}
if not args.yes:
try:
answer = input(
"Type 'extend' to add these to the security groups: "
).strip().lower()
except EOFError:
answer = ""
if answer != "extend":
sys.exit("aborted")
# --- build per-group rule sets -------------------------------------- #
sg_desc = ec2.describe_security_groups(GroupIds=groups)["SecurityGroups"]
sg_by_id = {sg["GroupId"]: sg for sg in sg_desc}
if auto_mode:
# In auto mode, every group only gets rules for the flows whose
# destinations sat on that group's ENIs.
flows_per_group = partition_flows_by_group(flows, dst_map, dst_to_groups)
else:
flows_per_group = {gid: flows for gid in groups}
manifest = {
"schema": "sg-extend.manifest/v1",
"applied_at": dt.datetime.now(dt.timezone.utc).isoformat(),
"region": args.region,
"description": args.description,
"source": "cloudwatch" if args.log_group else "s3",
"window_hours": args.hours,
"tolerance": args.tolerance,
"include_public": args.include_public,
"aws_summarise": args.include_public and not args.no_aws_summarise,
"mode": "auto" if auto_mode else "explicit",
"discovered_flows": [
{"cidr": f"{s}/32", "protocol": pr, "port": po}
for s, pr, po in sorted(flows)
],
"aws_service_summaries": [
{"cidr": prefix, "protocol": proto, "port": port,
"service": service, "region": region}
for (prefix, proto, port), (service, region) in sorted(aws_summaries.items())
],
"groups": [],
}
for gid in groups:
sg = sg_by_id.get(gid)
group_flows = flows_per_group.get(gid, set())
regular_perms = build_perms(
group_flows, args.description,
tolerance=args.tolerance, max_rules=args.max_rules,
) if group_flows else []
# AWS-service perms apply to every group in scope; per-SG filtering
# via filter_existing inside _apply_to_group prevents redundant adds.
merged_perms = _merge_perms(regular_perms, aws_service_perms)
if not merged_perms:
manifest["groups"].append({
"group_id": gid, "status": "NO_FLOWS_FOR_GROUP",
})
LOG.info("%s has no attributable flows in this run", gid)
continue
result = _apply_to_group(
ec2, gid, sg, merged_perms, max_rules=args.max_rules,
)
manifest["groups"].append(result)
status = result["status"]
if status == "ADDED":
LOG.info("added %d rule(s) to %s", result["added_rules"], gid)
elif status == "SKIPPED_BUDGET":
LOG.warning(
"%s skipped: %d existing + %d new > %d max — raise --tolerance to "
"pack into fewer rules, run sg_compact to reclaim budget, or raise "
"the SG quota",
gid, result["existing_rules"], result["would_add"], args.max_rules,
)
elif status == "ALREADY_PRESENT":
LOG.info("%s already covers every discovered source — nothing to add", gid)
elif status == "NOT_FOUND":
LOG.warning("%s not found — skipped", gid)
elif status == "PARTIAL_DUPLICATE":
LOG.info("some rules already existed on %s", gid)
elif status == "ERROR":
LOG.error("authorize failed on %s: %s", gid, result.get("error", ""))
ts = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%dT%H%M%SZ")
manifest_path = f"sg_extend-manifest-{ts}.json"
with open(manifest_path, "w", encoding="utf-8") as fh:
json.dump(manifest, fh, indent=2)
LOG.info("manifest written to %s", manifest_path)
return 0
if __name__ == "__main__":
sys.exit(main())
EOF 13. Example Workflow
These wrapper scripts encode the standard sequence: run the OU report first to understand where you have the highest concentration of permissive rules, then run analyse and plan against the accounts you want to tighten, review the plan output carefully, apply it, and keep the diagnose and extend scripts on hand for the 48 hours afterwards. The extend script is the break-glass tool: use it during an active incident when you need to unblock traffic immediately without waiting for a full plan cycle. Replace the region, OU ID, role name, and timestamp placeholders with your actual values. The scripts assume you have run ./install.sh first and your AWS credentials are in your environment or profile.
cat > run_ou_report.sh << 'EOF'
#!/usr/bin/env bash
set -euo pipefail
source .venv/bin/activate
python sg_ou_report.py \
--ou-id ou-xxxx-xxxxxxxx \
--role-name YourAuditRole \
--regions af-south-1 eu-west-1 \
--max-prefix-len 24 \
--workers 8
EOF
chmod +x run_ou_report.sh cat > run_analyse.sh << 'EOF'
#!/usr/bin/env bash
set -euo pipefail
source .venv/bin/activate
python sg_tightener.py analyse \
--region af-south-1 \
--days 90
EOF
chmod +x run_analyse.sh cat > run_plan.sh << 'EOF'
#!/usr/bin/env bash
set -euo pipefail
source .venv/bin/activate
python sg_tightener.py plan \
--region af-south-1 \
--approved-ips approved_ips_af-south-1_TIMESTAMP_HERE.json \
--gap-tolerance 0.30
EOF
chmod +x run_plan.sh cat > run_apply.sh << 'EOF'
#!/usr/bin/env bash
set -euo pipefail
source .venv/bin/activate
python sg_tightener.py apply \
--region af-south-1 \
--plan sg_plan_af-south-1_TIMESTAMP_HERE.json
EOF
chmod +x run_apply.sh cat > run_revert.sh << 'EOF'
#!/usr/bin/env bash
set -euo pipefail
source .venv/bin/activate
python sg_tightener.py revert \
--region af-south-1 \
--backup sg_backup_af-south-1_TIMESTAMP_HERE.json
EOF
chmod +x run_revert.sh cat > run_diagnose.sh << 'EOF'
#!/usr/bin/env bash
set -euo pipefail
source .venv/bin/activate
python sg_diagnose.py \
--region af-south-1 \
--hours 4 \
--approved-ips approved_ips_af-south-1_TIMESTAMP_HERE.json \
--write-plan
EOF
chmod +x run_diagnose.sh cat > run_extend.sh << 'EOF'
#!/usr/bin/env bash
# Break-glass: add CIDRs to security groups without removing anything.
# Edit the variables below before running. Leave DESCRIPTION as the incident
# reference so the manifest and AWS rule description are traceable.
set -euo pipefail
source .venv/bin/activate
REGION="af-south-1"
CIDRS="10.4.0.0/16" # space-separated for multiple
SG_IDS="sg-0abc1234" # space-separated for multiple
PORTS="tcp:443 tcp:8443" # all, icmp, tcp:PORT, tcp:FROM-TO, udp:PORT
DESCRIPTION="DR failover $(date -u +%Y-%m-%d) incident-XXXX"
python sg_extend.py \
--region "$REGION" \
--cidrs $CIDRS \
--sg-ids $SG_IDS \
--ports $PORTS \
--description "$DESCRIPTION"
EOF
chmod +x run_extend.sh 14. Test Suite
The CIDR collapsing algorithm has enough edge cases that a regression test suite is essential for future changes. The tests below cover every bug identified during the original code review, the algorithm’s correctness boundaries, and the input validation logic in the extend script. Drop this file in the same directory as sg_tightener.py and sg_extend.py and run it with python sg_tightener_test.py to confirm the tool behaves correctly before any deployment. Any future fix to the algorithm, parsing logic, or extend script must keep this suite passing.
cat > sg_tightener_test.py << 'EOF'
#!/usr/bin/env python3
"""Regression test suite for sg-tightener.
Covers the CIDR collapsing algorithm, eligibility & port-merge logic,
the partition detector and severity-weighted risk score in the OU
report, the input validation and flow-log discovery/collapse logic in
sg_extend, and the range-aware CIDR widening / rule compaction in
sg_compact.
Any fix to the algorithm or parsing logic must keep this suite passing.
Run:
python sg_tightener_test.py
"""
from __future__ import annotations
import ipaddress
import sys
import unittest
from sg_tightener import (
collapse_ips_to_cidrs,
merge_port_ranges,
rule_is_eligible,
_is_strict_rfc1918,
)
from sg_ou_report import (
parse_partition_from_arn,
compute_risk_score,
classify_sg_rule,
)
from sg_extend import (
parse_cidr,
parse_port_spec,
parse_group_id,
proto_name,
filter_flows,
build_perms,
perm_rule_count,
_rule_count,
_existing_nets_for,
filter_existing,
classify_aws_service,
split_aws_service_flows,
build_aws_service_perms,
partition_flows_by_group,
_merge_perms,
)
from sg_compact import (
compactable_net,
count_group,
compact_nets,
analyse_group,
build_compact_plan,
)
class TestCollapseIpsToCidrs(unittest.TestCase):
"""Algorithm correctness — every IP covered, count respects budget."""
def assert_all_covered(self, ips, cidrs):
nets = [ipaddress.ip_network(c) for c in cidrs]
for ip in ips:
addr = ipaddress.ip_address(ip)
self.assertTrue(
any(addr in n for n in nets),
f"IP {ip} not covered by {cidrs}",
)
def test_empty_input(self):
self.assertEqual(collapse_ips_to_cidrs([]), [])
def test_single_ip_produces_host_route(self):
result = collapse_ips_to_cidrs(["10.0.10.5"])
self.assertEqual(result, ["10.0.10.5/32"])
def test_dense_subnet_collapses(self):
ips = [f"10.0.10.{i}" for i in range(1, 30)]
result = collapse_ips_to_cidrs(ips, max_rules=10)
self.assert_all_covered(ips, result)
self.assertLessEqual(len(result), 10)
self.assertLess(len(result), len(ips), "dense subnet should collapse")
def test_sparse_ips_stay_as_host_routes(self):
ips = ["10.0.0.1", "10.5.0.1", "10.10.0.1"]
result = collapse_ips_to_cidrs(ips, max_rules=10, tolerance=0.05)
self.assert_all_covered(ips, result)
# Each is far apart — under tight tolerance they remain narrow.
for cidr in result:
net = ipaddress.ip_network(cidr)
self.assertLessEqual(net.num_addresses, 256)
def test_force_fit_respects_budget(self):
ips = [f"172.16.{i}.1" for i in range(0, 30)]
result = collapse_ips_to_cidrs(ips, max_rules=5)
self.assertLessEqual(
len(result), 5,
f"force-fit should reduce to dropped
{"CidrIp": "10.9.0.0/24"}, # not covered -> kept
],
}]
out = filter_existing(perms, sg)
kept = {ip["CidrIp"] for p in out for ip in p["IpRanges"]}
self.assertEqual(kept, {"10.9.0.0/24"})
class TestAwsServiceClassification(unittest.TestCase):
"""AWS service summarisation must collapse Hyperplane / managed-ENI
traffic into the published service range rather than enumerating /32s."""
SAMPLE_RANGES = [
{"ip_prefix": "3.5.140.0/22", "region": "ap-northeast-2",
"service": "EC2"},
{"ip_prefix": "52.94.5.0/24", "region": "us-east-1",
"service": "ROUTE53_HEALTHCHECKS"},
# AMAZON entry overlaps with the EC2 one above; classifier must
# prefer the specific service entry, not the AMAZON catch-all.
{"ip_prefix": "3.0.0.0/8", "region": "GLOBAL", "service": "AMAZON"},
{"ip_prefix": "10.0.0.0/8", "region": "GLOBAL", "service": "AMAZON"},
]
def test_specific_service_wins_over_amazon(self):
cls = classify_aws_service("3.5.140.42", self.SAMPLE_RANGES)
self.assertIsNotNone(cls)
service, _region, prefix = cls
self.assertEqual(service, "EC2")
self.assertEqual(prefix, "3.5.140.0/22")
def test_amazon_only_returns_none(self):
# 4.0.0.0/8 isn't covered by AMAZON in our fixture; explicit
# check that AMAZON is blocklisted even when it would match.
cls = classify_aws_service("3.99.99.99", self.SAMPLE_RANGES)
# 3.99.99.99 is in AMAZON 3.0.0.0/8 but not EC2 3.5.140.0/22 →
# AMAZON is blocklisted so we get None.
self.assertIsNone(cls)
def test_private_ip_returns_none(self):
# An RFC 1918 IP appearing in the AMAZON catch-all (a real entry
# in the actual file) must still return None because AMAZON is
# always rejected as too broad to be a trust source.
cls = classify_aws_service("10.0.0.5", self.SAMPLE_RANGES)
self.assertIsNone(cls)
def test_unmatched_ip_returns_none(self):
cls = classify_aws_service("8.8.8.8", self.SAMPLE_RANGES)
self.assertIsNone(cls)
def test_garbage_input_returns_none(self):
self.assertIsNone(classify_aws_service("not-an-ip", self.SAMPLE_RANGES))
self.assertIsNone(classify_aws_service("", self.SAMPLE_RANGES))
self.assertIsNone(classify_aws_service(None, self.SAMPLE_RANGES))
class TestAwsServiceSummarisation(unittest.TestCase):
SAMPLE_RANGES = TestAwsServiceClassification.SAMPLE_RANGES
def test_split_separates_aws_and_regular(self):
flows = {
("10.0.0.5", "tcp", 5432), # private — not an AWS public IP
("3.5.140.42", "tcp", 443), # AWS EC2 range
("3.5.141.7", "tcp", 443), # same AWS EC2 range
("8.8.8.8", "tcp", 443), # public, not AWS
}
regular, summaries = split_aws_service_flows(flows, self.SAMPLE_RANGES)
# The two EC2 flows must collapse into a single summary entry.
self.assertEqual(len(summaries), 1)
((prefix, proto, port), (service, _region)), = summaries.items()
self.assertEqual(prefix, "3.5.140.0/22")
self.assertEqual(proto, "tcp")
self.assertEqual(port, 443)
self.assertEqual(service, "EC2")
# Regular flows keep the private and non-AWS-public sources.
self.assertIn(("10.0.0.5", "tcp", 5432), regular)
self.assertIn(("8.8.8.8", "tcp", 443), regular)
# And drop the AWS-classified ones.
self.assertNotIn(("3.5.140.42", "tcp", 443), regular)
self.assertNotIn(("3.5.141.7", "tcp", 443), regular)
def test_build_aws_service_perms_one_rule_per_prefix(self):
summaries = {
("3.5.140.0/22", "tcp", 443): ("EC2", "ap-northeast-2"),
("52.94.5.0/24", "tcp", 443): ("ROUTE53_HEALTHCHECKS", "us-east-1"),
("52.94.5.0/24", "tcp", 80): ("ROUTE53_HEALTHCHECKS", "us-east-1"),
}
perms = build_aws_service_perms(summaries, "test")
# Two distinct (proto, port) pairs → two IpPermissions.
self.assertEqual(len(perms), 2)
# tcp/443 perm carries both prefixes.
p443 = next(p for p in perms if p["FromPort"] == 443)
cidrs = {r["CidrIp"] for r in p443["IpRanges"]}
self.assertEqual(cidrs, {"3.5.140.0/22", "52.94.5.0/24"})
# Description tags the service so the rule is self-documenting.
descriptions = " ".join(r["Description"] for r in p443["IpRanges"])
self.assertIn("EC2", descriptions)
self.assertIn("ROUTE53_HEALTHCHECKS", descriptions)
class TestPartitionFlowsByGroup(unittest.TestCase):
"""Auto-group-discovery: flows must end up attributed to every SG
attached to every destination ENI that observed them."""
def test_single_dst_single_group(self):
flows = {("10.0.0.5", "tcp", 443)}
dst_map = {("10.0.0.5", "tcp", 443): {"10.1.1.1"}}
dst_to_groups = {"10.1.1.1": {"sg-aaa"}}
out = partition_flows_by_group(flows, dst_map, dst_to_groups)
self.assertEqual(out, {"sg-aaa": {("10.0.0.5", "tcp", 443)}})
def test_eni_with_multiple_groups(self):
flows = {("10.0.0.5", "tcp", 443)}
dst_map = {("10.0.0.5", "tcp", 443): {"10.1.1.1"}}
dst_to_groups = {"10.1.1.1": {"sg-aaa", "sg-bbb"}}
out = partition_flows_by_group(flows, dst_map, dst_to_groups)
# Both attached SGs are candidates per AWS' permissive evaluation.
self.assertEqual(set(out.keys()), {"sg-aaa", "sg-bbb"})
def test_multiple_dsts_share_a_group(self):
flows = {("10.0.0.5", "tcp", 443), ("10.0.0.6", "tcp", 443)}
dst_map = {
("10.0.0.5", "tcp", 443): {"10.1.1.1"},
("10.0.0.6", "tcp", 443): {"10.1.1.2"},
}
dst_to_groups = {"10.1.1.1": {"sg-aaa"}, "10.1.1.2": {"sg-aaa"}}
out = partition_flows_by_group(flows, dst_map, dst_to_groups)
self.assertEqual(out["sg-aaa"], flows)
def test_unmapped_dst_drops_flow(self):
# A flow whose dst couldn't be resolved (no ENI lookup match) is
# silently dropped — auto mode never invents groups.
flows = {("10.0.0.5", "tcp", 443)}
dst_map = {("10.0.0.5", "tcp", 443): {"10.1.1.1"}}
out = partition_flows_by_group(flows, dst_map, {})
self.assertEqual(out, {})
class TestMergePerms(unittest.TestCase):
"""When regular CIDR perms and AWS-service-summary perms target the
same protocol/port, they must merge into a single IpPermissions
entry rather than producing duplicates."""
def test_merges_same_proto_port(self):
a = [{"IpProtocol": "tcp", "FromPort": 443, "ToPort": 443,
"IpRanges": [{"CidrIp": "10.0.0.0/24"}]}]
b = [{"IpProtocol": "tcp", "FromPort": 443, "ToPort": 443,
"IpRanges": [{"CidrIp": "3.5.140.0/22"}]}]
out = _merge_perms(a, b)
self.assertEqual(len(out), 1)
cidrs = {r["CidrIp"] for r in out[0]["IpRanges"]}
self.assertEqual(cidrs, {"10.0.0.0/24", "3.5.140.0/22"})
def test_different_ports_stay_separate(self):
a = [{"IpProtocol": "tcp", "FromPort": 443, "ToPort": 443,
"IpRanges": [{"CidrIp": "10.0.0.0/24"}]}]
b = [{"IpProtocol": "tcp", "FromPort": 5432, "ToPort": 5432,
"IpRanges": [{"CidrIp": "10.0.0.0/24"}]}]
out = _merge_perms(a, b)
self.assertEqual(len(out), 2)
def test_dedupes_identical_cidrs(self):
a = [{"IpProtocol": "tcp", "FromPort": 443, "ToPort": 443,
"IpRanges": [{"CidrIp": "10.0.0.0/24"}]}]
b = [{"IpProtocol": "tcp", "FromPort": 443, "ToPort": 443,
"IpRanges": [{"CidrIp": "10.0.0.0/24"}]}]
out = _merge_perms(a, b)
self.assertEqual(len(out), 1)
self.assertEqual(len(out[0]["IpRanges"]), 1)
class TestSgCompact(unittest.TestCase):
"""CIDR widening / rule compaction — all pure, no AWS required."""
def _net(self, c):
return ipaddress.ip_network(c)
def assert_covers(self, originals, result):
res = [self._net(str(n)) for n in result]
for o in originals:
on = self._net(o)
self.assertTrue(
any(on.subnet_of(r) for r in res),
f"{o} not covered by {[str(r) for r in res]}",
)
def test_compactable_net_scope(self):
self.assertIsNotNone(compactable_net("10.0.0.0/24"))
self.assertIsNotNone(compactable_net("10.0.0.5/32")) # narrow still eligible
self.assertIsNotNone(compactable_net("172.16.0.0/12"))
self.assertIsNone(compactable_net("0.0.0.0/0"))
self.assertIsNone(compactable_net("8.8.8.0/24")) # public
self.assertIsNone(compactable_net("192.0.0.0/4")) # not strict subset
self.assertIsNone(compactable_net(None))
self.assertIsNone(compactable_net("garbage"))
def test_empty_and_single(self):
self.assertEqual(compact_nets([], 0.5), [])
one = [self._net("10.0.0.0/24")]
self.assertEqual(compact_nets(one, 0.5), one)
def test_lossless_collapse_at_ratio_zero(self):
nets = [self._net("10.0.0.0/25"), self._net("10.0.0.128/25")]
out = compact_nets(nets, 0.0)
self.assertEqual([str(n) for n in out], ["10.0.0.0/24"])
def test_gap_merge_gated_by_ratio(self):
# 10.0.0.0/24 + 10.0.2.0/24 -> covering /22 wastes 512/1024 = 0.5.
nets = [self._net("10.0.0.0/24"), self._net("10.0.2.0/24")]
stay = compact_nets(nets, 0.4)
self.assertEqual(len(stay), 2) # 0.4 no merge
merged = compact_nets(nets, 0.5)
self.assertEqual([str(n) for n in merged], ["10.0.0.0/22"])
self.assert_covers(["10.0.0.0/24", "10.0.2.0/24"], merged)
def test_never_crosses_rfc1918_boundary(self):
nets = [self._net("10.0.0.0/24"), self._net("172.16.0.0/24"),
self._net("192.168.0.0/24")]
out = compact_nets(nets, 0.999)
for n in out:
self.assertTrue(
any(n.subnet_of(b) for b in (
self._net("10.0.0.0/8"),
self._net("172.16.0.0/12"),
self._net("192.168.0.0/16"),
)),
f"{n} escaped RFC 1918",
)
# Three distinct home blocks can never collapse below three rules.
self.assertEqual(len(out), 3)
def test_higher_ratio_compacts_at_least_as_hard(self):
nets = [self._net(f"10.0.{i}.0/24") for i in range(0, 8)]
low = compact_nets(nets, 0.1)
high = compact_nets(nets, 0.9)
self.assertLessEqual(len(high), len(low))
self.assert_covers([str(n) for n in nets], high)
def test_count_group_separates_eligible_from_fixed(self):
sg = {"IpPermissions": [{
"IpProtocol": "tcp", "FromPort": 443, "ToPort": 443,
"IpRanges": [
{"CidrIp": "10.0.0.0/24"}, # eligible
{"CidrIp": "0.0.0.0/0"}, # fixed (public)
],
"Ipv6Ranges": [{"CidrIpv6": "::/0"}], # fixed
"UserIdGroupPairs": [{"GroupId": "sg-1"}], # fixed
}]}
total, eligible = count_group(sg)
self.assertEqual((total, eligible), (4, 1))
def test_analyse_group_produces_revoke_and_authorise(self):
sg = {"GroupId": "sg-1", "IpPermissions": [{
"IpProtocol": "tcp", "FromPort": 443, "ToPort": 443,
"IpRanges": [{"CidrIp": "10.0.0.0/24"}, {"CidrIp": "10.0.2.0/24"}],
}]}
a = analyse_group(sg, 0.5)
self.assertEqual(a["current_total"], 2)
self.assertEqual(a["projected_total"], 1)
self.assertEqual(a["rules_saved"], 1)
revoked = {ip["CidrIp"] for p in a["revoke"] for ip in p["IpRanges"]}
authed = {ip["CidrIp"] for p in a["authorise"] for ip in p["IpRanges"]}
self.assertEqual(revoked, {"10.0.0.0/24", "10.0.2.0/24"})
self.assertEqual(authed, {"10.0.0.0/22"})
def test_analyse_group_no_change_when_nothing_to_merge(self):
sg = {"GroupId": "sg-1", "IpPermissions": [{
"IpProtocol": "tcp", "FromPort": 443, "ToPort": 443,
"IpRanges": [{"CidrIp": "10.0.0.0/24"}],
}]}
a = analyse_group(sg, 0.5)
self.assertEqual(a["rules_saved"], 0)
self.assertEqual(a["revoke"], [])
self.assertEqual(a["authorise"], [])
def test_build_compact_plan_shape(self):
sgs = [{
"GroupId": "sg-1", "GroupName": "web", "VpcId": "vpc-1",
"IpPermissions": [{
"IpProtocol": "tcp", "FromPort": 443, "ToPort": 443,
"IpRanges": [{"CidrIp": "10.0.0.0/24"}, {"CidrIp": "10.0.2.0/24"}],
}],
}]
plan = build_compact_plan(sgs, ratio=0.5, max_rules=60, region="us-east-1")
self.assertEqual(plan["schema"], "sg-tightener.plan/v1")
self.assertEqual(plan["tool"], "sg_compact")
self.assertEqual(plan["region"], "us-east-1")
self.assertIn("snapshot_hash", plan)
self.assertEqual(len(plan["groups"]), 1)
self.assertEqual(plan["groups"][0]["rules_saved"], 1)
class TestStrictRfc1918(unittest.TestCase):
def test_strict_rfc1918_helpers(self):
self.assertTrue(_is_strict_rfc1918(ipaddress.ip_network("10.0.0.0/16")))
self.assertTrue(_is_strict_rfc1918(ipaddress.ip_network("172.20.0.0/16")))
self.assertFalse(_is_strict_rfc1918(ipaddress.ip_network("172.40.0.0/16")))
self.assertFalse(_is_strict_rfc1918(ipaddress.ip_network("192.0.0.0/4")))
self.assertFalse(_is_strict_rfc1918(ipaddress.ip_network("0.0.0.0/0")))
if __name__ == "__main__":
runner = unittest.TextTestRunner(verbosity=2)
suite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__])
result = runner.run(suite)
sys.exit(0 if result.wasSuccessful() else 1)
EOF [f”172.16.{i}.1″ for i in range(0, 30)]
result = collapse_ips_to_cidrs(ips, max_rules=5) self.assertLessEqual(len(result), 5, f”Force-fit should reduce to <= 5, got {len(result)}: {result}”) # Verify coverage nets = [ipaddress.ip_network(c) for c in result] for ip in ips: addr = ipaddress.ip_address(ip) self.assertTrue(any(addr in n for n in nets), f”IP {ip} not covered by {result}”) def test_collapse_across_all_three_rfc1918(self): ips = [f”10.{i}.0.1″ for i in range(0, 10)] + \ [f”172.16.{i}.1″ for i in range(0, 10)] + \
[f”192.168.{i}.1″ for i in range(0, 10)]
result = collapse_ips_to_cidrs(ips, max_rules=3) # Each RFC 1918 range gets at least 1 rule, total <= 3 self.assertLessEqual(len(result), 3) nets = [ipaddress.ip_network(c) for c in result] for ip in ips: self.assertTrue(any(ipaddress.ip_address(ip) in n for n in nets)) class TestParsePartitionFromArn(unittest.TestCase): “””OU report partition detection must handle all AWS partitions.””” def test_aws_partition(self): self.assertEqual( parse_partition_from_arn(“arn:aws:iam::123:user/joe”), “aws” ) def test_aws_cn_partition(self): self.assertEqual( parse_partition_from_arn(“arn:aws-cn:iam::123:user/joe”), “aws-cn” ) def test_aws_us_gov_partition(self): self.assertEqual( parse_partition_from_arn(“arn:aws-us-gov:iam::123:user/joe”), “aws-us-gov” ) def test_unknown_partition_defaults_to_aws(self): self.assertEqual( parse_partition_from_arn(“arn:aws-future:iam::123:user/joe”), “aws” ) def test_garbage_defaults_to_aws(self): self.assertEqual(parse_partition_from_arn(“”), “aws”) self.assertEqual(parse_partition_from_arn(None), “aws”) self.assertEqual(parse_partition_from_arn(“not-an-arn”), “aws”) class TestComputeRiskScore(unittest.TestCase): “””Account ranking must give CRITICAL findings overwhelmingly more weight.””” def test_single_critical_outranks_many_lows(self): critical_only = compute_risk_score(1, 0, 0, 0) many_lows = compute_risk_score(0, 0, 0, 100) self.assertGreater(critical_only, many_lows, “1 CRITICAL must outrank 100 LOW findings”) def test_high_outranks_low(self): self.assertGreater(compute_risk_score(0, 1, 0, 0), compute_risk_score(0, 0, 0, 50)) def test_zero_findings(self): self.assertEqual(compute_risk_score(0, 0, 0, 0), 0) def test_severity_order(self): c = compute_risk_score(1, 0, 0, 0) h = compute_risk_score(0, 1, 0, 0) m = compute_risk_score(0, 0, 1, 0) l = compute_risk_score(0, 0, 0, 1) self.assertGreater(c, h) self.assertGreater(h, m) self.assertGreater(m, l) if __name__ == “__main__”: # Use verbose output and a non-zero exit on failure runner = unittest.TextTestRunner(verbosity=2) suite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) result = runner.run(suite) sys.exit(0 if result.wasSuccessful() else 1) EOF chmod +x sg_tightener_test.py
The suite covers all 20 bugs from the original review, the 13 issues identified by the second review (apply ordering, analyse empty-output guard, extend CIDR safety, force-fit across RFC 1918 boundaries, revert across all range types, stale plan detection, S3 read failure ratio, partition detection for OU report, severity-weighted account risk ranking), the algorithm’s correctness boundaries, and the input validation logic for sg_extend. Run it like this:
cat > run_tests.sh << 'EOF'
#!/usr/bin/env bash
set -euo pipefail
source .venv/bin/activate
python sg_tightener_test.py
EOF
chmod +x run_tests.sh
./run_tests.sh Expected output is something like Ran 105 tests in 0.9s, OK. Any failure here indicates a regression that needs to be fixed before deploying the tool. The CIDR collapsing tests in particular guard against the original bug where the algorithm produced no collapsing at all, which would silently make the tool ineffective rather than visibly broken.
15. Relationship to CloudToRepo
sg-tightener lives under the CloudToRepo project at cloudtorepo.com as a security extension. CloudToRepo’s core purpose is to reverse-engineer existing AWS infrastructure into Terraform so you can understand and version-control what you have. sg-tightener extends that philosophy in the security direction: rather than accepting that your security groups are an undocumented product of historical decisions, it gives you an evidence-based, auditable, repeatable way to understand and tighten them. Both tools operate on the same principle that infrastructure you cannot inspect and reason about is infrastructure you cannot trust.
16. What This Does Not Do
sg-tightener does not audit public internet exposure. Rules with a source of 0.0.0.0/0 are left exactly as they are. It does not evaluate whether specific services should be reachable at all, only whether the source CIDR for existing private rules is unnecessarily broad. It does not manage egress rules. IPv6 rules are out of scope in this release. Network ACL tightening is reported but not automated, for the reasons discussed in section 6.
What it does is convert one specific, pervasive class of security debt into a defensible, evidence-based configuration without requiring weeks of manual forensic work. For most enterprise AWS accounts that have grown organically over several years, that single improvement reduces the lateral movement surface area by orders of magnitude.
17. The Broader Point
Most organisations spend considerable effort building security controls at the perimeter: WAFs, DDoS protection, certificate management, identity federation. Those controls matter. What receives far less attention is the internal trust model once traffic is past the perimeter. The implicit assumption in most hybrid cloud estates is that the corporate network is trusted, and that assumption is encoded directly into security group rules as broad RFC 1918 CIDR blocks that nobody has revisited since they were written.
That assumption was questionable when it was made and it is indefensible now. Modern threat models assume that the corporate network is already compromised, or will be. Ransomware operators routinely move laterally across flat trusted networks before triggering their payload. Compromised build agents are a standard initial access vector precisely because they tend to sit in trusted network ranges with broad permissions into production environments. The path from a compromised developer laptop to a production database should not exist, but in most hybrid cloud estates it does, encoded quietly in a security group rule that says 10.0.0.0/16.
The cloud did not eliminate flat networks. It gave many organisations the tools to build more sophisticated ones while quietly replicating the same trust assumptions they had always made. sg-tightener exists because trust should be earned through observed behaviour, not inherited from a datacenter subnet designed fifteen years ago.
sg-tightener is an open source extension of CloudToRepo. Contributions welcome.
Andrew Baker · andrewbaker.ninja · Group CIO, Capitec Bank