Create / Migrate WordPress to AWS Graviton: Maximum Performance, Minimum Cost

Running WordPress on ARM-based Graviton instances delivers up to 40% better price-performance compared to x86 equivalents. This guide provides production-ready scripts to deploy an optimised WordPress stack in minutes, plus everything you need to migrate your existing site.

Why Graviton for WordPress?

Graviton3 processors deliver:

  • 40% better price-performance vs comparable x86 instances
  • Up to 25% lower cost for equivalent workloads
  • 60% less energy consumption per compute hour
  • Native ARM64 optimisations for PHP 8.x and MariaDB

The t4g.small instance (2 vCPU, 2GB RAM) at ~$12/month handles most WordPress sites comfortably. For higher traffic, t4g.medium or c7g instances scale beautifully.

Architecture

┌─────────────────────────────────────────────────┐
│                   CloudFront                     │
│              (Optional CDN Layer)                │
└─────────────────────┬───────────────────────────┘
                      │
┌─────────────────────▼───────────────────────────┐
│              Graviton EC2 Instance               │
│  ┌─────────────────────────────────────────────┐│
│  │              Caddy (Reverse Proxy)          ││
│  │         Auto-TLS, HTTP/2, Compression       ││
│  └─────────────────────┬───────────────────────┘│
│                        │                         │
│  ┌─────────────────────▼───────────────────────┐│
│  │              PHP-FPM 8.3                     ││
│  │         OPcache, JIT Compilation            ││
│  └─────────────────────┬───────────────────────┘│
│                        │                         │
│  ┌─────────────────────▼───────────────────────┐│
│  │              MariaDB 10.11                   ││
│  │         InnoDB Optimised, Query Cache       ││
│  └─────────────────────────────────────────────┘│
│                                                  │
│  ┌─────────────────────────────────────────────┐│
│  │              EBS gp3 Volume                  ││
│  │         3000 IOPS, 125 MB/s baseline        ││
│  └─────────────────────────────────────────────┘│
└─────────────────────────────────────────────────┘

Prerequisites

  • AWS CLI configured with appropriate permissions
  • A domain name with DNS you control
  • SSH key pair in your target region

If you’d prefer to download these scripts, check out https://github.com/Scr1ptW0lf/wordpress-graviton.

Part 1: Launch the Instance

Save this as launch-graviton-wp.sh and run from AWS CloudShell:

#!/bin/bash

# AWS EC2 ARM Instance Launch Script with Elastic IP
# Launches ARM-based instances with Ubuntu 24.04 LTS ARM64

set -e

echo "=== AWS EC2 ARM Ubuntu Instance Launcher ==="
echo ""

# Function to get Ubuntu 24.04 ARM64 AMI for a region
get_ubuntu_ami() {
    local region=$1
    # Get the latest Ubuntu 24.04 LTS ARM64 AMI
    aws ec2 describe-images \
        --region "$region" \
        --owners 099720109477 \
        --filters "Name=name,Values=ubuntu/images/hvm-ssd-gp3/ubuntu-noble-24.04-arm64-server-*" \
                  "Name=state,Values=available" \
        --query 'Images | sort_by(@, &CreationDate) | [-1].ImageId' \
        --output text
}

# Check for default region
if [ -n "$AWS_DEFAULT_REGION" ]; then
    echo "AWS default region detected: $AWS_DEFAULT_REGION"
    read -p "Use this region? (y/n, default: y): " use_default
    use_default=${use_default:-y}
    
    if [ "$use_default" == "y" ]; then
        REGION="$AWS_DEFAULT_REGION"
        echo "Using region: $REGION"
    else
        use_default="n"
    fi
else
    use_default="n"
fi

# Prompt for region if not using default
if [ "$use_default" == "n" ]; then
    echo ""
    echo "Available regions for ARM instances:"
    echo "1. us-east-1 (N. Virginia)"
    echo "2. us-east-2 (Ohio)"
    echo "3. us-west-2 (Oregon)"
    echo "4. eu-west-1 (Ireland)"
    echo "5. eu-central-1 (Frankfurt)"
    echo "6. ap-southeast-1 (Singapore)"
    echo "7. ap-northeast-1 (Tokyo)"
    echo "8. Enter custom region"
    echo ""
    read -p "Select region (1-8): " region_choice

    case $region_choice in
        1) REGION="us-east-1" ;;
        2) REGION="us-east-2" ;;
        3) REGION="us-west-2" ;;
        4) REGION="eu-west-1" ;;
        5) REGION="eu-central-1" ;;
        6) REGION="ap-southeast-1" ;;
        7) REGION="ap-northeast-1" ;;
        8) read -p "Enter region code: " REGION ;;
        *) echo "Invalid choice"; exit 1 ;;
    esac
    
    echo "Selected region: $REGION"
fi

# Prompt for instance type
echo ""
echo "Select instance type (ARM/Graviton):"
echo "1. t4g.micro   (2 vCPU, 1 GB RAM)   - Free tier eligible"
echo "2. t4g.small   (2 vCPU, 2 GB RAM)   - ~\$0.0168/hr"
echo "3. t4g.medium  (2 vCPU, 4 GB RAM)   - ~\$0.0336/hr"
echo "4. t4g.large   (2 vCPU, 8 GB RAM)   - ~\$0.0672/hr"
echo "5. t4g.xlarge  (4 vCPU, 16 GB RAM)  - ~\$0.1344/hr"
echo "6. t4g.2xlarge (8 vCPU, 32 GB RAM)  - ~\$0.2688/hr"
echo "7. Enter custom ARM instance type"
echo ""
read -p "Select instance type (1-7): " instance_choice

case $instance_choice in
    1) INSTANCE_TYPE="t4g.micro" ;;
    2) INSTANCE_TYPE="t4g.small" ;;
    3) INSTANCE_TYPE="t4g.medium" ;;
    4) INSTANCE_TYPE="t4g.large" ;;
    5) INSTANCE_TYPE="t4g.xlarge" ;;
    6) INSTANCE_TYPE="t4g.2xlarge" ;;
    7) read -p "Enter instance type (e.g., c7g.medium): " INSTANCE_TYPE ;;
    *) echo "Invalid choice"; exit 1 ;;
esac

echo "Selected instance type: $INSTANCE_TYPE"
echo ""
echo "Fetching latest Ubuntu 24.04 ARM64 AMI..."

AMI_ID=$(get_ubuntu_ami "$REGION")

if [ -z "$AMI_ID" ]; then
    echo "Error: Could not find Ubuntu ARM64 AMI in region $REGION"
    exit 1
fi

echo "Found AMI: $AMI_ID"
echo ""

# List existing key pairs
echo "Fetching existing key pairs in $REGION..."
EXISTING_KEYS=$(aws ec2 describe-key-pairs \
    --region "$REGION" \
    --query 'KeyPairs[*].KeyName' \
    --output text 2>/dev/null || echo "")

if [ -n "$EXISTING_KEYS" ]; then
    echo "Existing key pairs in $REGION:"
    # Convert to array for number selection
    mapfile -t KEY_ARRAY < <(echo "$EXISTING_KEYS" | tr '\t' '\n')
    for i in "${!KEY_ARRAY[@]}"; do
        echo "$((i+1)). ${KEY_ARRAY[$i]}"
    done
    echo ""
else
    echo "No existing key pairs found in $REGION"
    echo ""
fi

# Prompt for key pair
read -p "Enter key pair name, number to select from list, or press Enter to create new: " KEY_INPUT
CREATE_NEW_KEY=false

if [ -z "$KEY_INPUT" ]; then
    KEY_NAME="arm-key-$(date +%s)"
    CREATE_NEW_KEY=true
    echo "Will create new key pair: $KEY_NAME"
elif [[ "$KEY_INPUT" =~ ^[0-9]+$ ]] && [ -n "$EXISTING_KEYS" ]; then
    # User entered a number
    if [ "$KEY_INPUT" -ge 1 ] && [ "$KEY_INPUT" -le "${#KEY_ARRAY[@]}" ]; then
        KEY_NAME="${KEY_ARRAY[$((KEY_INPUT-1))]}"
        echo "Will use existing key pair: $KEY_NAME"
    else
        echo "Invalid selection number"
        exit 1
    fi
else
    KEY_NAME="$KEY_INPUT"
    echo "Will use existing key pair: $KEY_NAME"
fi

echo ""

# List existing security groups
echo "Fetching existing security groups in $REGION..."
EXISTING_SGS=$(aws ec2 describe-security-groups \
    --region "$REGION" \
    --query 'SecurityGroups[*].[GroupId,GroupName,Description]' \
    --output text 2>/dev/null || echo "")

if [ -n "$EXISTING_SGS" ]; then
    echo "Existing security groups in $REGION:"
    # Convert to arrays for number selection
    mapfile -t SG_LINES < <(echo "$EXISTING_SGS")
    declare -a SG_ID_ARRAY
    declare -a SG_NAME_ARRAY
    declare -a SG_DESC_ARRAY

    for line in "${SG_LINES[@]}"; do
        read -r sg_id sg_name sg_desc <<< "$line"
        SG_ID_ARRAY+=("$sg_id")
        SG_NAME_ARRAY+=("$sg_name")
        SG_DESC_ARRAY+=("$sg_desc")
    done

    for i in "${!SG_ID_ARRAY[@]}"; do
        echo "$((i+1)). ${SG_ID_ARRAY[$i]} - ${SG_NAME_ARRAY[$i]} (${SG_DESC_ARRAY[$i]})"
    done
    echo ""
else
    echo "No existing security groups found in $REGION"
    echo ""
fi

# Prompt for security group
read -p "Enter security group ID, number to select from list, or press Enter to create new: " SG_INPUT
CREATE_NEW_SG=false

if [ -z "$SG_INPUT" ]; then
    SG_NAME="arm-sg-$(date +%s)"
    CREATE_NEW_SG=true
    echo "Will create new security group: $SG_NAME"
    echo "  - Port 22 (SSH) - open to 0.0.0.0/0"
    echo "  - Port 80 (HTTP) - open to 0.0.0.0/0"
    echo "  - Port 443 (HTTPS) - open to 0.0.0.0/0"
elif [[ "$SG_INPUT" =~ ^[0-9]+$ ]] && [ -n "$EXISTING_SGS" ]; then
    # User entered a number
    if [ "$SG_INPUT" -ge 1 ] && [ "$SG_INPUT" -le "${#SG_ID_ARRAY[@]}" ]; then
        SG_ID="${SG_ID_ARRAY[$((SG_INPUT-1))]}"
        echo "Will use existing security group: $SG_ID (${SG_NAME_ARRAY[$((SG_INPUT-1))]})"
        echo "Note: Ensure ports 22, 80, and 443 are open if needed"
    else
        echo "Invalid selection number"
        exit 1
    fi
else
    SG_ID="$SG_INPUT"
    echo "Will use existing security group: $SG_ID"
    echo "Note: Ensure ports 22, 80, and 443 are open if needed"
fi

echo ""

# Prompt for Elastic IP
read -p "Allocate and assign an Elastic IP? (y/n, default: n): " ALLOCATE_EIP
ALLOCATE_EIP=${ALLOCATE_EIP:-n}

echo ""
read -p "Enter instance name tag (default: ubuntu-arm-instance): " INSTANCE_NAME
INSTANCE_NAME=${INSTANCE_NAME:-ubuntu-arm-instance}

echo ""
echo "=== Launch Configuration ==="
echo "Region: $REGION"
echo "Instance Type: $INSTANCE_TYPE"
echo "AMI: $AMI_ID (Ubuntu 24.04 ARM64)"
echo "Key Pair: $KEY_NAME $([ "$CREATE_NEW_KEY" == true ] && echo '(will be created)')"
echo "Security Group: $([ "$CREATE_NEW_SG" == true ] && echo "$SG_NAME (will be created)" || echo "$SG_ID")"
echo "Name: $INSTANCE_NAME"
echo "Elastic IP: $([ "$ALLOCATE_EIP" == "y" ] && echo 'Yes' || echo 'No')"
echo ""
read -p "Launch instance? (y/n, default: y): " CONFIRM
CONFIRM=${CONFIRM:-y}

if [ "$CONFIRM" != "y" ]; then
    echo "Launch cancelled"
    exit 0
fi

echo ""
echo "Starting launch process..."

# Create key pair if needed
if [ "$CREATE_NEW_KEY" == true ]; then
    echo ""
    echo "Creating key pair: $KEY_NAME"
    aws ec2 create-key-pair \
        --region "$REGION" \
        --key-name "$KEY_NAME" \
        --query 'KeyMaterial' \
        --output text > "${KEY_NAME}.pem"
    chmod 400 "${KEY_NAME}.pem"
    echo "  ✓ Key saved to: ${KEY_NAME}.pem"
    echo "  ⚠️  IMPORTANT: Download this key file from CloudShell if you need it elsewhere!"
fi

# Create security group if needed
if [ "$CREATE_NEW_SG" == true ]; then
    echo ""
    echo "Creating security group: $SG_NAME"
    
    # Get default VPC
    VPC_ID=$(aws ec2 describe-vpcs \
        --region "$REGION" \
        --filters "Name=isDefault,Values=true" \
        --query 'Vpcs[0].VpcId' \
        --output text)
    
    if [ -z "$VPC_ID" ] || [ "$VPC_ID" == "None" ]; then
        echo "Error: No default VPC found. Please specify a security group ID."
        exit 1
    fi
    
    SG_ID=$(aws ec2 create-security-group \
        --region "$REGION" \
        --group-name "$SG_NAME" \
        --description "Security group for ARM instance with web access" \
        --vpc-id "$VPC_ID" \
        --query 'GroupId' \
        --output text)
    
    echo "  ✓ Created security group: $SG_ID"
    echo "  Adding security rules..."
    
    # Add SSH rule
    aws ec2 authorize-security-group-ingress \
        --region "$REGION" \
        --group-id "$SG_ID" \
        --ip-permissions \
        IpProtocol=tcp,FromPort=22,ToPort=22,IpRanges="[{CidrIp=0.0.0.0/0,Description='SSH'}]" \
        > /dev/null
    
    # Add HTTP rule
    aws ec2 authorize-security-group-ingress \
        --region "$REGION" \
        --group-id "$SG_ID" \
        --ip-permissions \
        IpProtocol=tcp,FromPort=80,ToPort=80,IpRanges="[{CidrIp=0.0.0.0/0,Description='HTTP'}]" \
        > /dev/null
    
    # Add HTTPS rule
    aws ec2 authorize-security-group-ingress \
        --region "$REGION" \
        --group-id "$SG_ID" \
        --ip-permissions \
        IpProtocol=tcp,FromPort=443,ToPort=443,IpRanges="[{CidrIp=0.0.0.0/0,Description='HTTPS'}]" \
        > /dev/null
    
    echo "  ✓ Port 22 (SSH) configured"
    echo "  ✓ Port 80 (HTTP) configured"
    echo "  ✓ Port 443 (HTTPS) configured"
fi

echo ""
echo "Launching instance..."

INSTANCE_ID=$(aws ec2 run-instances \
    --region "$REGION" \
    --image-id "$AMI_ID" \
    --instance-type "$INSTANCE_TYPE" \
    --key-name "$KEY_NAME" \
    --security-group-ids "$SG_ID" \
    --tag-specifications "ResourceType=instance,Tags=[{Key=Name,Value=$INSTANCE_NAME}]" \
    --query 'Instances[0].InstanceId' \
    --output text)

echo "  ✓ Instance launched: $INSTANCE_ID"
echo "  Waiting for instance to be running..."

aws ec2 wait instance-running \
    --region "$REGION" \
    --instance-ids "$INSTANCE_ID"

echo "  ✓ Instance is running!"

# Handle Elastic IP
if [ "$ALLOCATE_EIP" == "y" ]; then
    echo ""
    echo "Allocating Elastic IP..."
    
    ALLOCATION_OUTPUT=$(aws ec2 allocate-address \
        --region "$REGION" \
        --domain vpc \
        --tag-specifications "ResourceType=elastic-ip,Tags=[{Key=Name,Value=$INSTANCE_NAME-eip}]")
    
    ALLOCATION_ID=$(echo "$ALLOCATION_OUTPUT" | grep -o '"AllocationId": "[^"]*' | cut -d'"' -f4)
    ELASTIC_IP=$(echo "$ALLOCATION_OUTPUT" | grep -o '"PublicIp": "[^"]*' | cut -d'"' -f4)
    
    echo "  ✓ Elastic IP allocated: $ELASTIC_IP"
    echo "  Associating Elastic IP with instance..."
    
    ASSOCIATION_ID=$(aws ec2 associate-address \
        --region "$REGION" \
        --instance-id "$INSTANCE_ID" \
        --allocation-id "$ALLOCATION_ID" \
        --query 'AssociationId' \
        --output text)
    
    echo "  ✓ Elastic IP associated"
    PUBLIC_IP=$ELASTIC_IP
else
    PUBLIC_IP=$(aws ec2 describe-instances \
        --region "$REGION" \
        --instance-ids "$INSTANCE_ID" \
        --query 'Reservations[0].Instances[0].PublicIpAddress' \
        --output text)
fi

echo ""
echo "=========================================="
echo "=== Instance Ready ==="
echo "=========================================="
echo "Instance ID: $INSTANCE_ID"
echo "Instance Type: $INSTANCE_TYPE"
echo "Public IP: $PUBLIC_IP"
if [ "$ALLOCATE_EIP" == "y" ]; then
    echo "Elastic IP: Yes (IP will persist after stop/start)"
    echo "Allocation ID: $ALLOCATION_ID"
else
    echo "Elastic IP: No (IP will change if instance is stopped)"
fi
echo "Region: $REGION"
echo "Security: SSH (22), HTTP (80), HTTPS (443) open"
echo ""
echo "Connect with:"
echo "  ssh -i ${KEY_NAME}.pem ubuntu@${PUBLIC_IP}"
echo ""
echo "Test web access:"
echo "  curl http://${PUBLIC_IP}"
echo ""
echo "⏱️  Wait 30-60 seconds for SSH to become available"

if [ "$ALLOCATE_EIP" == "y" ]; then
    echo ""
    echo "=========================================="
    echo "⚠️  ELASTIC IP WARNING"
    echo "=========================================="
    echo "Elastic IPs cost \$0.005/hour when NOT"
    echo "associated with a running instance!"
    echo ""
    echo "To avoid charges, release the EIP if you"
    echo "delete the instance:"
    echo ""
    echo "aws ec2 release-address \\"
    echo "  --region $REGION \\"
    echo "  --allocation-id $ALLOCATION_ID"
fi

echo ""
echo "=========================================="

Run it:

chmod +x launch-graviton-wp.sh
./launch-graviton-wp.sh

Part 2: Install WordPress Stack

SSH into your new instance and save this as setup-wordpress.sh:

#!/bin/bash

# WordPress Installation Script for Ubuntu 24.04 ARM64
# Installs Apache, MySQL, PHP, and WordPress with automatic configuration

set -e

echo "=== WordPress Installation Script (Apache) ==="
echo "This script will install and configure:"
echo "  - Apache web server"
echo "  - MySQL database"
echo "  - PHP 8.3"
echo "  - WordPress (latest version)"
echo ""

# Check if running as root
if [ "$EUID" -ne 0 ]; then
    echo "Please run as root (use: sudo bash $0)"
    exit 1
fi

# Get configuration from user
echo "=== WordPress Configuration ==="
read -p "Enter your domain name (or press Enter to use server IP): " DOMAIN_NAME
read -p "Enter WordPress site title (default: My WordPress Site): " SITE_TITLE
SITE_TITLE=${SITE_TITLE:-My WordPress Site}
read -p "Enter WordPress admin username (default: admin): " WP_ADMIN_USER
WP_ADMIN_USER=${WP_ADMIN_USER:-admin}
read -sp "Enter WordPress admin password (or press Enter to generate): " WP_ADMIN_PASS
echo ""
if [ -z "$WP_ADMIN_PASS" ]; then
    WP_ADMIN_PASS=$(openssl rand -base64 16)
    echo "Generated password: $WP_ADMIN_PASS"
fi
read -p "Enter WordPress admin email: (default:test@example.com)" WP_ADMIN_EMAIL
WP_ADMIN_EMAIL=${WP_ADMIN_EMAIL:-test@example.com}

# Generate database credentials
DB_NAME="wordpress"
DB_USER="wpuser"
DB_PASS=$(openssl rand -base64 16)
DB_ROOT_PASS=$(openssl rand -base64 16)

echo ""
echo "=== Installation Summary ==="
echo "Domain: ${DOMAIN_NAME:-Server IP}"
echo "Site Title: $SITE_TITLE"
echo "Admin User: $WP_ADMIN_USER"
echo "Admin Email: $WP_ADMIN_EMAIL"
echo "Database: $DB_NAME"
echo ""
read -p "Proceed with installation? (y/n, default: y): " CONFIRM
CONFIRM=${CONFIRM:-y}

if [ "$CONFIRM" != "y" ]; then
    echo "Installation cancelled"
    exit 0
fi

echo ""
echo "Starting installation..."

# Update system
echo ""
echo "[1/8] Updating system packages..."
apt-get update -qq
apt-get upgrade -y -qq

# Install Apache
echo ""
echo "[2/8] Installing Apache..."
apt-get install -y -qq apache2

# Enable Apache modules
echo "Enabling Apache modules..."
a2enmod rewrite
a2enmod ssl
a2enmod headers

# Check if MySQL is already installed
MYSQL_INSTALLED=false
if systemctl is-active --quiet mysql || systemctl is-active --quiet mysqld; then
    MYSQL_INSTALLED=true
    echo ""
    echo "MySQL is already installed and running."
elif command -v mysql &> /dev/null; then
    MYSQL_INSTALLED=true
    echo ""
    echo "MySQL is already installed."
fi

if [ "$MYSQL_INSTALLED" = true ]; then
    echo ""
    echo "[3/8] Using existing MySQL installation..."
    read -sp "Enter MySQL root password (or press Enter to try without password): " EXISTING_ROOT_PASS
    echo ""

    MYSQL_CONNECTION_OK=false

    # Test the password
    if [ -n "$EXISTING_ROOT_PASS" ]; then
        if mysql -u root -p"${EXISTING_ROOT_PASS}" -e "SELECT 1;" &> /dev/null; then
            echo "Successfully connected to MySQL."
            DB_ROOT_PASS="$EXISTING_ROOT_PASS"
            MYSQL_CONNECTION_OK=true
        else
            echo "Error: Could not connect to MySQL with provided password."
        fi
    fi

    # Try without password if previous attempt failed or no password was provided
    if [ "$MYSQL_CONNECTION_OK" = false ]; then
        echo "Trying to connect without password..."
        if mysql -u root -e "SELECT 1;" &> /dev/null; then
            echo "Connected without password. Will set a password now."
            DB_ROOT_PASS=$(openssl rand -base64 16)
            mysql -u root -e "ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY '${DB_ROOT_PASS}';"
            echo "New root password set: $DB_ROOT_PASS"
            MYSQL_CONNECTION_OK=true
        fi
    fi

    # If still cannot connect, offer to reinstall
    if [ "$MYSQL_CONNECTION_OK" = false ]; then
        echo ""
        echo "ERROR: Cannot connect to MySQL with any method."
        echo "This usually means MySQL is in an inconsistent state."
        echo ""
        read -p "Remove and reinstall MySQL? (y/n, default: y): " REINSTALL_MYSQL
        REINSTALL_MYSQL=${REINSTALL_MYSQL:-y}

        if [ "$REINSTALL_MYSQL" = "y" ]; then
            echo ""
            echo "Removing MySQL..."
            systemctl stop mysql 2>/dev/null || systemctl stop mysqld 2>/dev/null || true
            apt-get remove --purge -y mysql-server mysql-client mysql-common mysql-server-core-* mysql-client-core-* -qq
            apt-get autoremove -y -qq
            apt-get autoclean -qq
            rm -rf /etc/mysql /var/lib/mysql /var/log/mysql

            echo "Reinstalling MySQL..."
            export DEBIAN_FRONTEND=noninteractive
            apt-get update -qq
            apt-get install -y -qq mysql-server

            # Generate new root password
            DB_ROOT_PASS=$(openssl rand -base64 16)

            # Set root password and secure installation
            mysql -e "ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY '${DB_ROOT_PASS}';"
            mysql -u root -p"${DB_ROOT_PASS}" -e "DELETE FROM mysql.user WHERE User='';"
            mysql -u root -p"${DB_ROOT_PASS}" -e "DELETE FROM mysql.user WHERE User='root' AND Host NOT IN ('localhost', '127.0.0.1', '::1');"
            mysql -u root -p"${DB_ROOT_PASS}" -e "DROP DATABASE IF EXISTS test;"
            mysql -u root -p"${DB_ROOT_PASS}" -e "DELETE FROM mysql.db WHERE Db='test' OR Db='test\\_%';"
            mysql -u root -p"${DB_ROOT_PASS}" -e "FLUSH PRIVILEGES;"

            echo "MySQL reinstalled successfully."
            echo "New root password: $DB_ROOT_PASS"
        else
            echo "Installation cancelled."
            exit 1
        fi
    fi
else
    # Install MySQL
    echo ""
    echo "[3/8] Installing MySQL..."
    export DEBIAN_FRONTEND=noninteractive
    apt-get install -y -qq mysql-server

    # Secure MySQL installation
    echo ""
    echo "[4/8] Configuring MySQL..."
    mysql -e "ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY '${DB_ROOT_PASS}';"
    mysql -u root -p"${DB_ROOT_PASS}" -e "DELETE FROM mysql.user WHERE User='';"
    mysql -u root -p"${DB_ROOT_PASS}" -e "DELETE FROM mysql.user WHERE User='root' AND Host NOT IN ('localhost', '127.0.0.1', '::1');"
    mysql -u root -p"${DB_ROOT_PASS}" -e "DROP DATABASE IF EXISTS test;"
    mysql -u root -p"${DB_ROOT_PASS}" -e "DELETE FROM mysql.db WHERE Db='test' OR Db='test\\_%';"
    mysql -u root -p"${DB_ROOT_PASS}" -e "FLUSH PRIVILEGES;"
fi

# Check if WordPress database already exists
echo ""
echo "[4/8] Setting up WordPress database..."

# Create MySQL defaults file for safer password handling
MYSQL_CNF=$(mktemp)
cat > "$MYSQL_CNF" <<EOF
[client]
user=root
password=${DB_ROOT_PASS}
EOF
chmod 600 "$MYSQL_CNF"

# Test MySQL connection first
echo "Testing MySQL connection..."
if ! mysql --defaults-extra-file="$MYSQL_CNF" -e "SELECT 1;" &> /dev/null; then
    echo "ERROR: Cannot connect to MySQL to create database."
    rm -f "$MYSQL_CNF"
    exit 1
fi

echo "MySQL connection successful."

# Check if database exists
echo "Checking for existing database '${DB_NAME}'..."
DB_EXISTS=$(mysql --defaults-extra-file="$MYSQL_CNF" -e "SHOW DATABASES LIKE '${DB_NAME}';" 2>/dev/null | grep -c "${DB_NAME}" || true)

if [ "$DB_EXISTS" -gt 0 ]; then
    echo ""
    echo "WARNING: Database '${DB_NAME}' already exists!"
    read -p "Delete existing database and create fresh? (y/n, default: n): " DELETE_DB
    DELETE_DB=${DELETE_DB:-n}

    if [ "$DELETE_DB" = "y" ]; then
        echo "Dropping existing database..."
        mysql --defaults-extra-file="$MYSQL_CNF" -e "DROP DATABASE ${DB_NAME};"
        echo "Creating fresh WordPress database..."
        mysql --defaults-extra-file="$MYSQL_CNF" <<EOF
CREATE DATABASE ${DB_NAME} DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
EOF
    else
        echo "Using existing database '${DB_NAME}'."
    fi
else
    echo "Creating WordPress database..."
    mysql --defaults-extra-file="$MYSQL_CNF" <<EOF
CREATE DATABASE ${DB_NAME} DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
EOF
    echo "Database created successfully."
fi

# Check if WordPress user already exists
echo "Checking for existing database user '${DB_USER}'..."
USER_EXISTS=$(mysql --defaults-extra-file="$MYSQL_CNF" -e "SELECT User FROM mysql.user WHERE User='${DB_USER}';" 2>/dev/null | grep -c "${DB_USER}" || true)

if [ "$USER_EXISTS" -gt 0 ]; then
    echo "Database user '${DB_USER}' already exists. Updating password and permissions..."
    mysql --defaults-extra-file="$MYSQL_CNF" <<EOF
ALTER USER '${DB_USER}'@'localhost' IDENTIFIED BY '${DB_PASS}';
GRANT ALL PRIVILEGES ON ${DB_NAME}.* TO '${DB_USER}'@'localhost';
FLUSH PRIVILEGES;
EOF
    echo "User updated successfully."
else
    echo "Creating WordPress database user..."
    mysql --defaults-extra-file="$MYSQL_CNF" <<EOF
CREATE USER '${DB_USER}'@'localhost' IDENTIFIED BY '${DB_PASS}';
GRANT ALL PRIVILEGES ON ${DB_NAME}.* TO '${DB_USER}'@'localhost';
FLUSH PRIVILEGES;
EOF
    echo "User created successfully."
fi

echo "Database setup complete."
rm -f "$MYSQL_CNF"

# Install PHP
echo ""
echo "[5/8] Installing PHP and extensions..."
apt-get install -y -qq php8.3 php8.3-mysql php8.3-curl php8.3-gd php8.3-mbstring \
    php8.3-xml php8.3-xmlrpc php8.3-soap php8.3-intl php8.3-zip libapache2-mod-php8.3 php8.3-imagick

# Configure PHP
echo "Configuring PHP..."
sed -i 's/upload_max_filesize = .*/upload_max_filesize = 64M/' /etc/php/8.3/apache2/php.ini
sed -i 's/post_max_size = .*/post_max_size = 64M/' /etc/php/8.3/apache2/php.ini
sed -i 's/max_execution_time = .*/max_execution_time = 300/' /etc/php/8.3/apache2/php.ini

# Check if WordPress directory already exists
if [ -d "/var/www/html/wordpress" ]; then
    echo ""
    echo "WARNING: WordPress directory /var/www/html/wordpress already exists!"
    read -p "Delete existing WordPress installation? (y/n, default: n): " DELETE_WP
    DELETE_WP=${DELETE_WP:-n}

    if [ "$DELETE_WP" = "y" ]; then
        echo "Removing existing WordPress directory..."
        rm -rf /var/www/html/wordpress
    fi
fi

# Download WordPress
echo ""
echo "[6/8] Downloading WordPress..."
cd /tmp
wget -q https://wordpress.org/latest.tar.gz
tar -xzf latest.tar.gz
mv wordpress /var/www/html/
chown -R www-data:www-data /var/www/html/wordpress
rm -f latest.tar.gz

# Configure WordPress
echo ""
echo "[7/8] Configuring WordPress..."
cd /var/www/html/wordpress

# Generate WordPress salts
SALTS=$(curl -s https://api.wordpress.org/secret-key/1.1/salt/)

# Create wp-config.php
cat > wp-config.php <<EOF
<?php
define( 'DB_NAME', '${DB_NAME}' );
define( 'DB_USER', '${DB_USER}' );
define( 'DB_PASSWORD', '${DB_PASS}' );
define( 'DB_HOST', 'localhost' );
define( 'DB_CHARSET', 'utf8mb4' );
define( 'DB_COLLATE', '' );

${SALTS}

\$table_prefix = 'wp_';

define( 'WP_DEBUG', false );

if ( ! defined( 'ABSPATH' ) ) {
    define( 'ABSPATH', __DIR__ . '/' );
}

require_once ABSPATH . 'wp-settings.php';
EOF

chown www-data:www-data wp-config.php
chmod 640 wp-config.php

# Configure Apache
echo ""
echo "[8/8] Configuring Apache..."

# Determine server name
if [ -z "$DOMAIN_NAME" ]; then
    # Try to get EC2 public IP first
    SERVER_NAME=$(curl -s --connect-timeout 5 http://169.254.169.254/latest/meta-data/public-ipv4 2>/dev/null)

    # If we got a valid public IP, use it
    if [ -n "$SERVER_NAME" ] && [[ ! "$SERVER_NAME" =~ ^172\. ]] && [[ ! "$SERVER_NAME" =~ ^10\. ]] && [[ ! "$SERVER_NAME" =~ ^192\.168\. ]]; then
        echo "Detected EC2 public IP: $SERVER_NAME"
    else
        # Fallback: try to get public IP from external service
        echo "EC2 metadata not available, trying external service..."
        SERVER_NAME=$(curl -s --connect-timeout 5 https://api.ipify.org 2>/dev/null || curl -s --connect-timeout 5 https://icanhazip.com 2>/dev/null)

        if [ -n "$SERVER_NAME" ]; then
            echo "Detected public IP from external service: $SERVER_NAME"
        else
            # Last resort: use local IP (but warn user)
            SERVER_NAME=$(hostname -I | awk '{print $1}')
            echo "WARNING: Using local IP address: $SERVER_NAME"
            echo "This is a private IP and won't be accessible from the internet."
            echo "Consider specifying a domain name or public IP."
        fi
    fi
else
    SERVER_NAME="$DOMAIN_NAME"
    echo "Using provided domain: $SERVER_NAME"
fi

# Create Apache virtual host
cat > /etc/apache2/sites-available/wordpress.conf <<EOF
<VirtualHost *:80>
    ServerName ${SERVER_NAME}
    ServerAdmin ${WP_ADMIN_EMAIL}
    DocumentRoot /var/www/html/wordpress

    <Directory /var/www/html/wordpress>
        Options FollowSymLinks
        AllowOverride All
        Require all granted
    </Directory>

    ErrorLog \${APACHE_LOG_DIR}/wordpress-error.log
    CustomLog \${APACHE_LOG_DIR}/wordpress-access.log combined
</VirtualHost>
EOF

# Enable WordPress site
echo "Enabling WordPress site..."
a2ensite wordpress.conf

# Disable default site if it exists
if [ -f /etc/apache2/sites-enabled/000-default.conf ]; then
    echo "Disabling default site..."
    a2dissite 000-default.conf
fi

# Test Apache configuration
echo ""
echo "Testing Apache configuration..."
if ! apache2ctl configtest 2>&1 | grep -q "Syntax OK"; then
    echo "ERROR: Apache configuration test failed!"
    apache2ctl configtest
    exit 1
fi

echo "Apache configuration is valid."

# Restart Apache
echo "Restarting Apache..."
systemctl restart apache2

# Enable services to start on boot
systemctl enable apache2
systemctl enable mysql

# Install WP-CLI for command line WordPress management
echo ""
echo "Installing WP-CLI..."
wget -q https://raw.githubusercontent.com/wp-cli/builds/gh-pages/phar/wp-cli.phar -O /usr/local/bin/wp
chmod +x /usr/local/bin/wp

# Complete WordPress installation via WP-CLI
echo ""
echo "Completing WordPress installation..."
cd /var/www/html/wordpress

# Determine WordPress URL
# If SERVER_NAME looks like a private IP, try to get public IP
if [[ "$SERVER_NAME" =~ ^172\. ]] || [[ "$SERVER_NAME" =~ ^10\. ]] || [[ "$SERVER_NAME" =~ ^192\.168\. ]]; then
    PUBLIC_IP=$(curl -s --connect-timeout 5 http://169.254.169.254/latest/meta-data/public-ipv4 2>/dev/null || curl -s --connect-timeout 5 https://api.ipify.org 2>/dev/null)
    if [ -n "$PUBLIC_IP" ]; then
        WP_URL="http://${PUBLIC_IP}"
        echo "Using public IP for WordPress URL: $PUBLIC_IP"
    else
        WP_URL="http://${SERVER_NAME}"
        echo "WARNING: Could not determine public IP, using private IP: $SERVER_NAME"
    fi
else
    WP_URL="http://${SERVER_NAME}"
fi

echo "WordPress URL will be: $WP_URL"

# Check if WordPress is already installed
if sudo -u www-data wp core is-installed 2>/dev/null; then
    echo ""
    echo "WARNING: WordPress is already installed!"
    read -p "Continue with fresh installation? (y/n, default: n): " REINSTALL_WP
    REINSTALL_WP=${REINSTALL_WP:-n}

    if [ "$REINSTALL_WP" = "y" ]; then
        echo "Reinstalling WordPress..."
        sudo -u www-data wp db reset --yes
        sudo -u www-data wp core install \
            --url="$WP_URL" \
            --title="${SITE_TITLE}" \
            --admin_user="${WP_ADMIN_USER}" \
            --admin_password="${WP_ADMIN_PASS}" \
            --admin_email="${WP_ADMIN_EMAIL}" \
            --skip-email
    fi
else
    sudo -u www-data wp core install \
        --url="$WP_URL" \
        --title="${SITE_TITLE}" \
        --admin_user="${WP_ADMIN_USER}" \
        --admin_password="${WP_ADMIN_PASS}" \
        --admin_email="${WP_ADMIN_EMAIL}" \
        --skip-email
fi

echo ""
echo "=========================================="
echo "=== WordPress Installation Complete! ==="
echo "=========================================="
echo ""
echo "Website URL: $WP_URL"
echo "Admin URL: $WP_URL/wp-admin"
echo ""
echo "WordPress Admin Credentials:"
echo "  Username: $WP_ADMIN_USER"
echo "  Password: $WP_ADMIN_PASS"
echo "  Email: $WP_ADMIN_EMAIL"
echo ""
echo "Database Credentials:"
echo "  Database: $DB_NAME"
echo "  User: $DB_USER"
echo "  Password: $DB_PASS"
echo ""
echo "MySQL Root Password: $DB_ROOT_PASS"
echo ""
echo "IMPORTANT: Save these credentials securely!"
echo ""

# Save credentials to file
CREDS_FILE="/root/wordpress-credentials.txt"
cat > "$CREDS_FILE" <<EOF
WordPress Installation Credentials
===================================
Date: $(date)

Website URL: $WP_URL
Admin URL: $WP_URL/wp-admin

WordPress Admin:
  Username: $WP_ADMIN_USER
  Password: $WP_ADMIN_PASS
  Email: $WP_ADMIN_EMAIL

Database:
  Name: $DB_NAME
  User: $DB_USER
  Password: $DB_PASS

MySQL Root Password: $DB_ROOT_PASS

WP-CLI installed at: /usr/local/bin/wp
Usage: sudo -u www-data wp <command>

Apache Configuration: /etc/apache2/sites-available/wordpress.conf
EOF

chmod 600 "$CREDS_FILE"

echo "Credentials saved to: $CREDS_FILE"
echo ""
echo "Next steps:"
echo "1. Visit $WP_URL/wp-admin to access your site"
echo "2. Consider setting up SSL/HTTPS with Let's Encrypt"
echo "3. Install a caching plugin for better performance"
echo "4. Configure regular backups"
echo ""

if [ -n "$DOMAIN_NAME" ]; then
    echo "To set up SSL with Let's Encrypt:"
    echo "  apt-get install -y certbot python3-certbot-apache"
    echo "  certbot --apache -d ${DOMAIN_NAME}"
    echo ""
fi

echo "To manage WordPress from command line:"
echo "  cd /var/www/html/wordpress"
echo "  sudo -u www-data wp plugin list"
echo "  sudo -u www-data wp theme list"
echo ""
echo "Apache logs:"
echo "  Error log: /var/log/apache2/wordpress-error.log"
echo "  Access log: /var/log/apache2/wordpress-access.log"
echo ""
echo "=========================================="

Run it:

chmod +x setup-wordpress.sh
sudo ./setup-wordpress.sh

Part 3: Migrate Your Existing Site

If you’re migrating from an existing WordPress installation, follow these steps.

What gets migrated:

  • All posts, pages, and media
  • All users and their roles
  • All plugins (files + database settings)
  • All themes (including customisations)
  • All plugin/theme configurations (stored in wp_options table)
  • Widgets, menus, and customizer settings
  • WooCommerce products, orders, customers (if applicable)
  • All custom database tables created by plugins

Step 3a: Export from Old Server

Run this on your existing WordPress server. Save as wp-export.sh:

#!/bin/bash
set -euo pipefail

# Configuration
WP_PATH="/var/www/html"           # Adjust to your WordPress path
EXPORT_DIR="/tmp/wp-migration"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)

# Detect WordPress path if not set correctly
if [ ! -f "${WP_PATH}/wp-config.php" ]; then
    for path in "/var/www/wordpress" "/var/www/html/wordpress" "/home/*/public_html" "/var/www/*/public_html"; do
        if [ -f "${path}/wp-config.php" ]; then
            WP_PATH="$path"
            break
        fi
    done
fi

if [ ! -f "${WP_PATH}/wp-config.php" ]; then
    echo "ERROR: wp-config.php not found. Please set WP_PATH correctly."
    exit 1
fi

echo "==> WordPress found at: ${WP_PATH}"

# Extract database credentials from wp-config.php
DB_NAME=$(grep "DB_NAME" "${WP_PATH}/wp-config.php" | cut -d "'" -f 4)
DB_USER=$(grep "DB_USER" "${WP_PATH}/wp-config.php" | cut -d "'" -f 4)
DB_PASS=$(grep "DB_PASSWORD" "${WP_PATH}/wp-config.php" | cut -d "'" -f 4)
DB_HOST=$(grep "DB_HOST" "${WP_PATH}/wp-config.php" | cut -d "'" -f 4)

echo "==> Database: ${DB_NAME}"

# Create export directory
mkdir -p "${EXPORT_DIR}"
cd "${EXPORT_DIR}"

echo "==> Exporting database..."
mysqldump -h "${DB_HOST}" -u "${DB_USER}" -p"${DB_PASS}" \
    --single-transaction \
    --quick \
    --lock-tables=false \
    --routines \
    --triggers \
    "${DB_NAME}" > database.sql

DB_SIZE=$(ls -lh database.sql | awk '{print $5}')
echo "    Database exported: ${DB_SIZE}"

echo "==> Exporting wp-content..."
tar czf wp-content.tar.gz -C "${WP_PATH}" wp-content

CONTENT_SIZE=$(ls -lh wp-content.tar.gz | awk '{print $5}')
echo "    wp-content exported: ${CONTENT_SIZE}"

echo "==> Exporting wp-config.php..."
cp "${WP_PATH}/wp-config.php" wp-config.php.bak

echo "==> Creating migration package..."
tar czf "wordpress-migration-${TIMESTAMP}.tar.gz" \
    database.sql \
    wp-content.tar.gz \
    wp-config.php.bak

rm -f database.sql wp-content.tar.gz wp-config.php.bak

PACKAGE_SIZE=$(ls -lh "wordpress-migration-${TIMESTAMP}.tar.gz" | awk '{print $5}')

echo ""
echo "============================================"
echo "Export complete!"
echo ""
echo "Package: ${EXPORT_DIR}/wordpress-migration-${TIMESTAMP}.tar.gz"
echo "Size:    ${PACKAGE_SIZE}"
echo ""
echo "Transfer to new server with:"
echo "  scp ${EXPORT_DIR}/wordpress-migration-${TIMESTAMP}.tar.gz ec2-user@NEW_IP:/tmp/"
echo "============================================"

Step 3b: Transfer the Export

scp /tmp/wp-migration/wordpress-migration-*.tar.gz ec2-user@YOUR_NEW_IP:/tmp/

Step 3c: Import on New Server

Run this on your new Graviton instance. Save as wp-import.sh:

#!/bin/bash
set -euo pipefail

# Configuration - EDIT THESE
MIGRATION_FILE="${1:-/tmp/wordpress-migration-*.tar.gz}"
OLD_DOMAIN="oldsite.com"          # Your old domain
NEW_DOMAIN="newsite.com"          # Your new domain (can be same)
WP_PATH="/var/www/wordpress"

# Resolve migration file path
MIGRATION_FILE=$(ls -1 ${MIGRATION_FILE} 2>/dev/null | head -1)

if [ ! -f "${MIGRATION_FILE}" ]; then
    echo "ERROR: Migration file not found: ${MIGRATION_FILE}"
    echo "Usage: $0 /path/to/wordpress-migration-XXXXXX.tar.gz"
    exit 1
fi

echo "==> Using migration file: ${MIGRATION_FILE}"

# Get database credentials from existing wp-config
if [ ! -f "${WP_PATH}/wp-config.php" ]; then
    echo "ERROR: wp-config.php not found at ${WP_PATH}"
    echo "Please run the WordPress setup script first"
    exit 1
fi

DB_NAME=$(grep "DB_NAME" "${WP_PATH}/wp-config.php" | cut -d "'" -f 4)
DB_USER=$(grep "DB_USER" "${WP_PATH}/wp-config.php" | cut -d "'" -f 4)
DB_PASS=$(grep "DB_PASSWORD" "${WP_PATH}/wp-config.php" | cut -d "'" -f 4)
MYSQL_ROOT_PASS=$(cat /root/.wordpress/credentials | grep "MySQL Root" | awk '{print $4}')

echo "==> Extracting migration package..."
TEMP_DIR=$(mktemp -d)
cd "${TEMP_DIR}"
tar xzf "${MIGRATION_FILE}"

echo "==> Backing up current installation..."
BACKUP_DIR="/var/backups/wordpress/pre-migration-$(date +%Y%m%d_%H%M%S)"
mkdir -p "${BACKUP_DIR}"
cp -r "${WP_PATH}/wp-content" "${BACKUP_DIR}/" 2>/dev/null || true
mysqldump -u root -p"${MYSQL_ROOT_PASS}" "${DB_NAME}" > "${BACKUP_DIR}/database.sql" 2>/dev/null || true

echo "==> Importing database..."
mysql -u root -p"${MYSQL_ROOT_PASS}" << EOF
DROP DATABASE IF EXISTS ${DB_NAME};
CREATE DATABASE ${DB_NAME} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
GRANT ALL PRIVILEGES ON ${DB_NAME}.* TO '${DB_USER}'@'localhost';
FLUSH PRIVILEGES;
EOF

mysql -u root -p"${MYSQL_ROOT_PASS}" "${DB_NAME}" < database.sql

echo "==> Importing wp-content..."
rm -rf "${WP_PATH}/wp-content"
tar xzf wp-content.tar.gz -C "${WP_PATH}"
chown -R caddy:caddy "${WP_PATH}/wp-content"
find "${WP_PATH}/wp-content" -type d -exec chmod 755 {} \;
find "${WP_PATH}/wp-content" -type f -exec chmod 644 {} \;

echo "==> Updating URLs in database..."
cd "${WP_PATH}"

OLD_URL_HTTP="http://${OLD_DOMAIN}"
OLD_URL_HTTPS="https://${OLD_DOMAIN}"
NEW_URL="https://${NEW_DOMAIN}"

# Install WP-CLI if not present
if ! command -v wp &> /dev/null; then
    curl -sO https://raw.githubusercontent.com/wp-cli/builds/gh-pages/phar/wp-cli.phar
    chmod +x wp-cli.phar
    mv wp-cli.phar /usr/local/bin/wp
fi

echo "    Replacing ${OLD_URL_HTTPS} with ${NEW_URL}..."
sudo -u caddy wp search-replace "${OLD_URL_HTTPS}" "${NEW_URL}" --all-tables --precise --skip-columns=guid 2>/dev/null || true

echo "    Replacing ${OLD_URL_HTTP} with ${NEW_URL}..."
sudo -u caddy wp search-replace "${OLD_URL_HTTP}" "${NEW_URL}" --all-tables --precise --skip-columns=guid 2>/dev/null || true

echo "    Replacing //${OLD_DOMAIN} with //${NEW_DOMAIN}..."
sudo -u caddy wp search-replace "//${OLD_DOMAIN}" "//${NEW_DOMAIN}" --all-tables --precise --skip-columns=guid 2>/dev/null || true

echo "==> Flushing caches and rewrite rules..."
sudo -u caddy wp cache flush
sudo -u caddy wp rewrite flush

echo "==> Reactivating plugins..."
# Some plugins may deactivate during migration - reactivate all
sudo -u caddy wp plugin activate --all 2>/dev/null || true

echo "==> Verifying import..."
POST_COUNT=$(sudo -u caddy wp post list --post_type=post --format=count)
PAGE_COUNT=$(sudo -u caddy wp post list --post_type=page --format=count)
USER_COUNT=$(sudo -u caddy wp user list --format=count)
PLUGIN_COUNT=$(sudo -u caddy wp plugin list --format=count)

echo ""
echo "============================================"
echo "Migration complete!"
echo ""
echo "Imported content:"
echo "  - Posts:   ${POST_COUNT}"
echo "  - Pages:   ${PAGE_COUNT}"
echo "  - Users:   ${USER_COUNT}"
echo "  - Plugins: ${PLUGIN_COUNT}"
echo ""
echo "Site URL: https://${NEW_DOMAIN}"
echo ""
echo "Pre-migration backup: ${BACKUP_DIR}"
echo "============================================"

rm -rf "${TEMP_DIR}"

Run it:

chmod +x wp-import.sh
sudo ./wp-import.sh /tmp/wordpress-migration-*.tar.gz

Step 3d: Verify Migration

#!/bin/bash
set -euo pipefail

WP_PATH="/var/www/wordpress"
cd "${WP_PATH}"

echo "==> WordPress Verification Report"
echo "=================================="
echo ""

echo "WordPress Version:"
sudo -u caddy wp core version
echo ""

echo "Site URL Configuration:"
sudo -u caddy wp option get siteurl
sudo -u caddy wp option get home
echo ""

echo "Database Status:"
sudo -u caddy wp db check
echo ""

echo "Content Summary:"
echo "  Posts:      $(sudo -u caddy wp post list --post_type=post --format=count)"
echo "  Pages:      $(sudo -u caddy wp post list --post_type=page --format=count)"
echo "  Media:      $(sudo -u caddy wp post list --post_type=attachment --format=count)"
echo "  Users:      $(sudo -u caddy wp user list --format=count)"
echo ""

echo "Plugin Status:"
sudo -u caddy wp plugin list --format=table
echo ""

echo "Uploads Directory:"
UPLOAD_COUNT=$(find "${WP_PATH}/wp-content/uploads" -type f 2>/dev/null | wc -l)
UPLOAD_SIZE=$(du -sh "${WP_PATH}/wp-content/uploads" 2>/dev/null | cut -f1)
echo "  Files: ${UPLOAD_COUNT}"
echo "  Size:  ${UPLOAD_SIZE}"
echo ""

echo "Service Status:"
echo "  PHP-FPM: $(systemctl is-active php-fpm)"
echo "  MariaDB: $(systemctl is-active mariadb)"
echo "  Caddy:   $(systemctl is-active caddy)"
echo ""

echo "Page Load Test:"
DOMAIN=$(sudo -u caddy wp option get siteurl | sed 's|https://||' | sed 's|/.*||')
curl -w "  Total time: %{time_total}s\n  HTTP code: %{http_code}\n" -o /dev/null -s "https://${DOMAIN}/"

Rollback if Needed

If something goes wrong:

#!/bin/bash
set -euo pipefail

BACKUP_DIR=$(ls -1d /var/backups/wordpress/pre-migration-* 2>/dev/null | tail -1)

if [ -z "${BACKUP_DIR}" ]; then
    echo "ERROR: No backup found"
    exit 1
fi

echo "==> Rolling back to: ${BACKUP_DIR}"

WP_PATH="/var/www/wordpress"
MYSQL_ROOT_PASS=$(cat /root/.wordpress/credentials | grep "MySQL Root" | awk '{print $4}')
DB_NAME=$(grep "DB_NAME" "${WP_PATH}/wp-config.php" | cut -d "'" -f 4)

mysql -u root -p"${MYSQL_ROOT_PASS}" "${DB_NAME}" < "${BACKUP_DIR}/database.sql"

rm -rf "${WP_PATH}/wp-content"
cp -r "${BACKUP_DIR}/wp-content" "${WP_PATH}/"
chown -R caddy:caddy "${WP_PATH}/wp-content"

cd "${WP_PATH}"
sudo -u caddy wp cache flush
sudo -u caddy wp rewrite flush

echo "Rollback complete!"

Part 4: Post-Installation Optimisations

After setup (or migration), run these additional optimisations:

#!/bin/bash

cd /var/www/wordpress

# Remove default content
sudo -u caddy wp post delete 1 2 --force 2>/dev/null || true
sudo -u caddy wp theme delete twentytwentytwo twentytwentythree 2>/dev/null || true

# Update everything
sudo -u caddy wp core update
sudo -u caddy wp plugin update --all
sudo -u caddy wp theme update --all

# Configure WP Super Cache
sudo -u caddy wp super-cache enable 2>/dev/null || true

# Set optimal permalink structure
sudo -u caddy wp rewrite structure '/%postname%/'
sudo -u caddy wp rewrite flush

echo "Optimisations complete!"

Performance Verification

Check your stack is running optimally:

# Verify PHP OPcache status
php -i | grep -i opcache

# Check PHP-FPM status
systemctl status php-fpm

# Test page load time
curl -w "@-" -o /dev/null -s "https://yourdomain.com" << 'EOF'
     time_namelookup:  %{time_namelookup}s
        time_connect:  %{time_connect}s
     time_appconnect:  %{time_appconnect}s
    time_pretransfer:  %{time_pretransfer}s
       time_redirect:  %{time_redirect}s
  time_starttransfer:  %{time_starttransfer}s
                     ----------
          time_total:  %{time_total}s
EOF

Cost Comparison

InstancevCPURAMMonthly CostUse Case
t4g.micro21GB~$6Dev/testing
t4g.small22GB~$12Small blogs
t4g.medium24GB~$24Medium traffic
t4g.large28GB~$48High traffic
c7g.medium12GB~$25CPU-intensive

All prices are approximate for eu-west-1 with on-demand pricing. Reserved instances or Savings Plans reduce costs by 30-60%.


Troubleshooting

502 Bad Gateway: PHP-FPM socket permissions issue

systemctl restart php-fpm
ls -la /run/php-fpm/www.sock

Database connection error: Check MariaDB is running

systemctl status mariadb
mysql -u wp_user -p wordpress

SSL certificate not working: Ensure DNS is pointing to instance IP

dig +short yourdomain.com
curl -I https://yourdomain.com

OPcache not working: Verify with phpinfo

php -r "phpinfo();" | grep -i opcache.enable

Quick Reference

# 1. Launch instance (local machine)
./launch-graviton-wp.sh

# 2. SSH in and setup WordPress
ssh -i ~/.ssh/key.pem ec2-user@IP
sudo ./setup-wordpress.sh

# 3. If migrating - on old server
./wp-export.sh
scp /tmp/wp-migration/wordpress-migration-*.tar.gz ec2-user@NEW_IP:/tmp/

# 4. If migrating - on new server
sudo ./wp-import.sh /tmp/wordpress-migration-*.tar.gz

This setup delivers a production-ready WordPress installation that’ll handle significant traffic while keeping your AWS bill minimal. The combination of Graviton’s price-performance, Caddy’s efficiency, and properly-tuned PHP creates a stack that punches well above its weight class.

0
0

Why Rubrik’s Architecture Matters: When Restore, Not Backup, Is the Product

1. Backups Should Be Boring (and That Is the Point)

Backups are boring. They should be boring. A backup system that generates excitement is usually signalling failure.

The only time backups become interesting is when they are missing, and that interest level is lethal. Emergency bridges. Frozen change windows. Executive escalation. Media briefings. Regulatory apology letters. Engineers being asked questions that have no safe answers.

Most backup platforms are built for the boring days. Rubrik is designed for the day boredom ends.

2. Backup Is Not the Product. Restore Is.

Many organisations still evaluate backup platforms on the wrong metric: how fast they can copy data somewhere else.

That metric is irrelevant during an incident.

When things go wrong, the only questions that matter are:

  • What can I restore?
  • How fast can it be used?
  • How many restores can run in parallel?
  • How little additional infrastructure is required?

Rubrik treats restore as the primary product, not a secondary feature.

3. Architectural Starting Point: Designed for Failure, Not Demos

Rubrik was built without tape era assumptions. There is no central backup server, no serial job controller, and no media server bottleneck. Instead, it uses a distributed, scale out architecture with a global metadata index and a stateless policy engine.

Restore becomes a metadata lookup problem, not a job replay problem. This distinction is invisible in demos and decisive during outages.

4. Performance Metrics That Actually Matter

Backup throughput is easy to optimise and easy to market. Restore performance is constrained by network fan out, restore concurrency, control plane orchestration, and application host contention.

Rubrik addresses this by default through parallel restore streams, linear scaling with node count, and minimal control plane chatter. Restore performance becomes predictable rather than optimistic.

5. Restore Semantics That Match Reality

The real test of any backup platform is not how elegantly it captures data, but how usefully it returns that data when needed. This is where architectural decisions made years earlier either pay dividends or extract penalties.

5.1 Instant Access Instead of Full Rehydration

Rubrik does not require full data copy back before access. It supports live mount of virtual machines, database mounts directly from backup storage, and file system mounts for selective recovery.

The recovery model becomes access first, copy later if needed. This is the difference between minutes and hours when production is down.

5.2 Dropping a Table Should Not Be a Crisis

Rubrik understands databases as structured systems, not opaque blobs.

It supports table level restores for SQL Server, mounting a database backup as a live database, extracting tables or schemas without restoring the full database, and point in time recovery without rollback.

Accidental table drops should be operational annoyances, not existential threats.

5.3 Supported Database Engines

Rubrik provides native protection for the major enterprise database platforms:

Database EngineLive MountPoint in Time RecoveryKey Constraints
Microsoft SQL ServerYesYes (transaction log replay)SQL 2012+ supported; Always On AG, FCI, standalone
Oracle DatabaseYesYes (archive log replay)RAC, Data Guard, Exadata supported; SPFILE required for automated recovery
SAP HANANoYesBackint API integration; uses native HANA backup scheduling
PostgreSQLNoYes (up to 5 minute RPO)File level incremental; on premises and cloud (AWS, Azure, GCP)
IBM Db2Via Elastic App ServiceYesUses native Db2 backup utilities
MongoDBVia Elastic App ServiceYesSharded and unsharded clusters; no quiescing required
MySQLVia Elastic App ServiceYesUses native MySQL backup tools
CassandraVia Elastic App ServiceYesVia Rubrik Datos IO integration

The distinction between native integration and Elastic App Service matters operationally. Native integration means Rubrik handles discovery, scheduling, and orchestration directly. Elastic App Service means Rubrik provides managed volumes as backup targets while the database’s native tools handle the actual backup process. Both approaches deliver immutability and policy driven retention, but the operational experience differs.

5.4 Live Mount: Constraints and Caveats

Live Mount is Rubrik’s signature capability—mounting backups as live, queryable databases without copying data back to production storage. The database runs with its data files served directly from the Rubrik cluster over NFS (for Oracle) or SMB 3.0 (for SQL Server).

This capability is transformative for specific use cases. It is not a replacement for production storage.

What Live Mount Delivers:

  • Near instant database availability (seconds to minutes, regardless of database size)
  • Zero storage provisioning on the target host
  • Multiple concurrent mounts from the same backup
  • Point in time access across the entire retention window
  • Ideal for granular recovery, DBCC health checks, test/dev cloning, audit queries, and upgrade validation

What Live Mount Does Not Deliver:

  • Production grade I/O performance
  • High availability during Rubrik cluster maintenance
  • Persistence across host or cluster reboots

IOPS Constraints:

Live Mount performance is bounded by the Rubrik appliance’s ability to serve I/O, not by the target host’s storage subsystem. Published figures suggest approximately 30,000 IOPS per Rubrik appliance for Live Mount workloads. This is adequate for reporting queries, data extraction, and validation testing. It is not adequate for transaction heavy production workloads.

The performance characteristics are inherently different from production storage:

MetricProduction SAN/FlashRubrik Live Mount
Random read IOPS100,000+~30,000 per appliance
Latency profileSub millisecondNetwork + NFS overhead
Write optimisationProduction tunedBackup optimised
Concurrent workloadsDesigned for contentionShared with backup operations

SQL Server Live Mount Specifics:

  • Databases mount via SMB 3.0 shares with UNC paths
  • Transaction log replay occurs during mount for point in time positioning
  • The mounted database is read write, but writes go to the Rubrik cluster
  • Supported for standalone instances, Failover Cluster Instances, and Always On Availability Groups
  • Table level recovery requires mounting the database, then using T SQL to extract and import specific objects

Oracle Live Mount Specifics:

  • Data files mount via NFS; redo logs and control files remain on the target host
  • Automated recovery requires source and target configurations to match (RAC to RAC, single instance to single instance, ASM to ASM)
  • Files only recovery allows dissimilar configurations but requires DBA managed RMAN recovery
  • SPFILE is required for automated recovery; PFILE databases require manual intervention
  • Block change tracking (BCT) is disabled on Live Mount targets
  • Live Mount fails if the target host, RAC cluster, or Rubrik cluster reboots during the mount—requiring forced unmount to clean up metadata
  • Direct NFS (DNFS) is recommended on Oracle RAC nodes for improved recovery performance

What Live Mount Is Not:

Live Mount is explicitly designed for temporary access, not sustained production workloads. The use cases Rubrik markets—test/dev, DBCC validation, granular recovery, audit queries—all share a common characteristic: they are time bounded operations that tolerate moderate I/O performance in exchange for instant availability.

Running production transaction processing against a Live Mount database would be technically possible and operationally inadvisable. The I/O profile, the network dependency, and the lack of high availability guarantees make it unsuitable for workloads where performance and uptime matter.

5.5 The Recovery Hierarchy

Understanding when to use each recovery method matters:

Recovery NeedRecommended MethodTime to AccessStorage Required
Extract specific rows/tablesLive Mount + queryMinutesNone
Validate backup integrityLive Mount + DBCCMinutesNone
Clone for test/devLive MountMinutesNone
Full database replacementExport/RestoreHours (size dependent)Full database size
Disaster recovery cutoverInstant RecoveryMinutes (then migrate)Temporary, then full

The strategic value of Live Mount is avoiding full restores when full restores are unnecessary. For a 5TB database where someone dropped a single table, Live Mount means extracting that table in minutes rather than waiting hours for a complete restore.

For actual disaster recovery, where the production database is gone and must be replaced, Live Mount provides bridge access while the full restore completes in parallel. The database is queryable immediately; production grade performance follows once data migration finishes.

6. Why Logical Streaming Is a Design Failure

Traditional restore models stream backup data through the database host. This guarantees CPU contention, IO pressure, and restore times proportional to database size rather than change size.

Figure 1 illustrates this contrast clearly. Traditional restore requires data to be copied back through the database server, creating high I/O, CPU and network load with correspondingly long restore times. Rubrik’s Live Mount approach mounts the backup copy directly, achieving near zero RTO with minimal data movement. The difference between these approaches becomes decisive when production is down and every minute of restore time translates to business impact.

Rubrik avoids this by mounting database images and extracting only required objects. The database host stops being collateral damage during recovery.

6.1 The VSS Tax: Why SQL Server Backups Cannot Escape Application Coordination

For VMware workloads without databases, Rubrik can leverage storage level snapshots that are instantaneous, application agnostic, and impose zero load on the guest operating system. The hypervisor freezes the VM state, the storage array captures the point in time image, and the backup completes before the application notices.

SQL Server cannot offer this simplicity. The reason is not a Microsoft limitation or a Rubrik constraint. The reason is transactional consistency.

6.1.1 The Crash Consistent Option Exists

Nothing technically prevents Rubrik, or any backup tool, from taking a pure storage snapshot of a SQL Server volume without application coordination. The snapshot would complete in milliseconds with zero database load.

The problem is what you would recover: a crash consistent image, not an application consistent one.

A crash consistent snapshot captures storage state mid flight. This includes partially written pages, uncommitted transactions, dirty buffers not yet flushed to disk, and potentially torn writes caught mid I/O. SQL Server is designed to recover from exactly this state. Every time the database engine starts after an unexpected shutdown, it runs crash recovery, rolling forward committed transactions from the log and rolling back uncommitted ones.

The database will become consistent. Eventually. Probably.

6.1.2 Why Probably Is Not Good Enough

Crash recovery works. It works reliably. It is tested millions of times daily across every SQL Server instance that experiences an unclean shutdown.

But restore confidence matters. When production is down and executives are asking questions, the difference between “this backup is guaranteed consistent” and “this backup should recover correctly after crash recovery completes” is operationally significant.

VSS exists to eliminate that uncertainty.

6.1.3 What VSS Actually Does

When a backup application requests an application consistent SQL Server snapshot, the sequence shown in Figure 2 executes. The backup server sends a signal through VSS Orchestration, which triggers the SQL Server VSS Writer to prepare the database. This preparation involves flushing dirty pages to storage, hardening transaction logs, and momentarily freezing I/O. Only then does the storage-level snapshot execute, capturing a point-in-time consistent image that requires no crash recovery on restore.

The result is a snapshot that requires no crash recovery on restore. The database is immediately consistent, immediately usable, and carries no uncertainty about transactional integrity.

6.1.4 The Coordination Cost

The VSS freeze window is typically brief, milliseconds to low seconds. But the preparation is not free.

Buffer pool flushes on large databases generate I/O pressure. Checkpoint operations compete with production workloads. The freeze, however short, introduces latency for in flight transactions. The database instance is actively participating in its own backup.

For databases measured in terabytes, with buffer pools consuming hundreds of gigabytes, this coordination overhead becomes operationally visible. Backup windows that appear instantaneous from the storage console are hiding real work inside the SQL Server instance.

6.1.5 The Architectural Asymmetry

This creates a fundamental difference in backup elegance across workload types:

Workload TypeBackup MethodApplication LoadRestore State
VMware VM (no database)Storage snapshotZeroCrash consistent (acceptable)
VMware VM (with SQL Server)VSS coordinated snapshotModerateApplication consistent
Physical SQL ServerVSS coordinated snapshotModerate to highApplication consistent
Physical SQL ServerPure storage snapshotZeroCrash consistent (risky)

For a web server or file share, crash consistent is fine. The application has no transactional state worth protecting. For a database, crash consistent means trusting recovery logic rather than guaranteeing consistency.

6.1.6 The Uncomfortable Reality

The largest, most critical SQL Server databases, the ones that would benefit most from zero overhead instantaneous backup, are precisely the workloads where crash consistent snapshots carry the most risk. More transactions in flight. Larger buffer pools. More recovery time if something needs replay.

Rubrik supports VSS coordination because the alternative is shipping backups that might need crash recovery. That uncertainty is acceptable for test environments. It is rarely acceptable for production databases backing financial systems, customer records, or regulatory reporting.

The VSS tax is not a limitation imposed by Microsoft or avoided by competitors. It is the cost of consistency. Every backup platform that claims application consistent SQL Server protection is paying it. The only question is whether they admit the overhead exists.

7. Snapshot Based Protection Is Objectively Better (When You Can Get It)

The previous section explained why SQL Server backups cannot escape application coordination. VSS exists because transactional consistency requires it, and the coordination overhead is the price of certainty.

This makes the contrast with pure snapshot based protection even starker. Where snapshots work cleanly, they are not incrementally better. They are categorically superior.

7.1 What Pure Snapshots Deliver

Snapshot based backups in environments that support them provide:

  • Near instant capture: microseconds to milliseconds, regardless of dataset size
  • Zero application load: the workload never knows a backup occurred
  • Consistent recovery points: the storage layer guarantees point in time consistency
  • Predictable backup windows: duration is independent of data volume
  • No bandwidth consumption during capture: data movement happens later, asynchronously

A 50TB VMware datastore snapshots in the same time as a 50GB datastore. Backup windows become scheduling decisions rather than capacity constraints.

Rubrik exploits this deeply in VMware environments. Snapshot orchestration, instant VM recovery, and live mounts all depend on the hypervisor providing clean, consistent, zero overhead capture points.

7.2 Why This Is Harder Than It Looks

The elegance of snapshot based protection depends entirely on the underlying platform providing the right primitives. This is where the gap between VMware and everything else becomes painful.

VMware offers:

  • Native snapshot APIs with transactional semantics
  • Changed Block Tracking (CBT) for efficient incrementals
  • Hypervisor level consistency without guest coordination
  • Storage integration through VADP (vSphere APIs for Data Protection)

These are not accidental features. VMware invested years building a backup ecosystem because they understood that enterprise adoption required operational maturity, not just compute virtualisation.

Physical hosts offer none of this.

There is no universal snapshot API for bare metal servers. Storage arrays provide snapshot capabilities, but each vendor implements them differently, with different consistency guarantees, different integration points, and different failure modes. The operating system has no standard mechanism to coordinate application state with storage level capture.

7.3 The Physical Host Penalty

This is why physical SQL Server hosts face a compounding disadvantage:

  1. No hypervisor abstraction: there is no layer between the OS and storage that can freeze state cleanly
  2. VSS remains mandatory: application consistency still requires database coordination
  3. No standardised incremental tracking: without CBT or equivalent, every backup must rediscover what changed
  4. Storage integration is bespoke: each array, each SAN, each configuration requires specific handling

The result is that physical hosts with the largest databases—the workloads generating the most backup data, with the longest restore times, under the most operational pressure, receive the least architectural benefit from modern backup platforms.

They are stuck paying the VSS tax without receiving the snapshot dividend.

7.4 The Integration Hierarchy

Backup elegance follows a clear hierarchy based on platform integration depth:

EnvironmentSnapshot QualityIncremental EfficiencyApplication ConsistencyOverall Experience
VMware (no database)ExcellentCBT drivenNot requiredSeamless
VMware (with SQL Server)ExcellentCBT drivenVSS coordinatedGood with overhead
Cloud native (EBS, managed disks)GoodProvider dependentVaries by workloadGenerally clean
Physical with enterprise SANPossibleArray dependentVSS coordinatedComplex but workable
Physical with commodity storageLimitedOften full scanVSS coordinatedPainful

The further down this hierarchy, the more the backup platform must compensate for missing primitives. Rubrik handles this better than most, but even excellent software cannot conjure APIs that do not exist.

7.5 Why the Industry Irony Persists

The uncomfortable truth is that snapshot based protection delivers its greatest value precisely where it is least available.

A 500GB VMware VM snapshots effortlessly. The hypervisor provides everything needed. Backup is boring, as it should be.

A 50TB physical SQL Server, the database actually keeping the business running, containing years of transactional history, backing regulatory reporting and financial reconciliation, must coordinate through VSS, flush terabytes of buffer pool, sustain I/O pressure during capture, and hope the storage layer cooperates.

The workloads that need snapshot elegance the most are architecturally prevented from receiving it.

This is not a Rubrik limitation. It is not a Microsoft conspiracy. It is the accumulated consequence of decades of infrastructure evolution where virtualisation received backup investment and physical infrastructure did not.

7.6 What This Means for Architecture Decisions

Understanding this hierarchy should influence infrastructure strategy:

Virtualise where possible. The backup benefits alone often justify the overhead. A SQL Server VM with VSS coordination still benefits from CBT, instant recovery, and hypervisor level orchestration.

Choose storage with snapshot maturity. If physical hosts are unavoidable, enterprise arrays with proven snapshot integration reduce the backup penalty. This is not the place for commodity storage experimentation.

Accept the VSS overhead. For SQL Server workloads, crash consistent snapshots are technically possible but operationally risky. The coordination cost is worth paying. Budget for it in backup windows and I/O capacity.

Plan restore, not backup. Snapshot speed is irrelevant if restore requires hours of data rehydration. The architectural advantage of snapshots extends to recovery only if the platform supports instant mount and selective restore.

Rubrik’s value in this landscape is not eliminating the integration gaps, nobody can, but navigating them intelligently. Where snapshots work, Rubrik exploits them fully. Where they do not, Rubrik minimises the penalty through parallel restore, live mounts, and metadata driven recovery.

The goal remains the same: make restore the product, regardless of how constrained the backup capture had to be.

8. Rubrik Restore Policies: Strategy, Trade offs, and Gotchas

SLA Domains are Rubrik’s policy abstraction layer, and understanding how to configure them properly separates smooth recoveries from painful ones. The flexibility is substantial, but so are the consequences of misconfiguration.

8.1 Understanding SLA Domain Architecture

Rubrik’s policy model centres on SLA Domains, named policies that define retention, frequency, replication, and archival behaviour. Objects are assigned to SLA Domains rather than configured individually, which creates operational leverage but requires upfront design discipline.

The core parameters that matter for restore planning:

Snapshot Frequency determines your Recovery Point Objective (RPO). A 4-hour frequency means you could lose up to 4 hours of data. For SQL Server with log backup enabled, transaction logs between snapshots reduce effective RPO to minutes, but the full snapshot frequency still determines how quickly you can access a baseline restore point.

Local Retention controls how many snapshots remain on the Rubrik cluster for instant access. This is your Live Mount window. Data within local retention restores in minutes. Data beyond it requires rehydration from archive, which takes hours.

Replication copies snapshots to a secondary Rubrik cluster, typically in another location. This is your disaster recovery tier. Replication targets can serve Live Mount operations, meaning DR isn’t just “eventually consistent backup copies” but actual instant recovery capability at the secondary site.

Archival moves aged snapshots to object storage (S3, Azure Blob, Google Cloud Storage). Archive tier data cannot be Live Mounted, it must be retrieved first, which introduces retrieval latency and potentially egress costs.

8.2 The Retention vs. Recovery Speed Trade off

This is where most organisations get the policy design wrong.

The temptation is to keep minimal local retention and archive aggressively to reduce storage costs. The consequence is that any restore request older than a few days becomes a multi hour operation.

Consider the mathematics for a 5TB SQL Server database:

Recovery ScenarioLocal RetentionTime to AccessOperational Impact
Yesterday’s backupWithin local retention2-5 minutes (Live Mount)Minimal
Last week’s backupWithin local retention2-5 minutes (Live Mount)Minimal
Last month’s backupArchived4-8 hours (retrieval + restore)Significant
Last quarter’s backupArchived (cold tier)12-24 hoursMajor incident

The storage cost of keeping 30 days local versus 7 days local might seem significant when multiplied across the estate. But the operational cost of a 6 hour restore delay during an audit request or compliance investigation often exceeds years of incremental storage spend.

Recommendation: Size local retention to cover your realistic recovery scenarios, not your theoretical minimum. For most organisations, 14-30 days of local retention provides the right balance between cost and operational flexibility.

8.3 SLA Domain Design Patterns

8.3.1 Pattern 1: Tiered by Criticality

Create separate SLA Domains for different criticality levels:

  • Platinum: 4 hour snapshots, 30 day local retention, synchronous replication, 7 year archive
  • Gold: 8 hour snapshots, 14 day local retention, asynchronous replication, 3 year archive
  • Silver: Daily snapshots, 7 day local retention, no replication, 1 year archive
  • Bronze: Daily snapshots, 7 day local retention, no replication, 90 day archive

This pattern works well when criticality maps cleanly to workload types, but creates governance overhead when applications span tiers.

8.3.2 Pattern 2: Tiered by Recovery Requirements

Align SLA Domains to recovery time objectives rather than business criticality:

  • Instant Recovery: Maximum local retention, synchronous replication, Live Mount always available
  • Same Day Recovery: 14 day local retention, asynchronous replication
  • Next Day Recovery: 7 day local retention, archive first strategy

This pattern acknowledges that “critical” and “needs instant recovery” aren’t always the same thing. A compliance archive might be business critical but tolerate 24 hour recovery times.

8.3.3 Pattern 3: Application Aligned

Create SLA Domains per major application or database platform:

  • SQL Server Production
  • SQL Server Non Production
  • Oracle Production
  • VMware Infrastructure
  • File Shares

This pattern simplifies troubleshooting and reporting but can lead to policy sprawl as the estate grows.

8.4 Log Backup Policies: The Hidden Complexity

For SQL Server and Oracle, snapshot frequency alone doesn’t tell the full story. Transaction log backups between snapshots determine actual RPO.

Rubrik supports log backup frequencies down to 1 minute for SQL Server. The trade offs:

Aggressive Log Backup (1-5 minute frequency):

  • Sub 5 minute RPO
  • Higher metadata overhead on Rubrik cluster
  • More objects to manage during restore
  • Longer Live Mount preparation time (more logs to replay)

Conservative Log Backup (15-60 minute frequency):

  • Acceptable RPO for most workloads
  • Lower operational overhead
  • Faster Live Mount operations
  • Simpler troubleshooting

Gotcha: Log backup frequency creates a hidden I/O load on the source database. A 1 minute log backup interval on a high transaction database generates constant log backup traffic. For already I/O constrained databases, this can become the straw that breaks performance.

Recommendation: Match log backup frequency to actual RPO requirements, not aspirational ones. If the business can tolerate 15 minutes of data loss, don’t configure 1 minute log backups just because you can.

8.5 Replication Topology Gotchas

Replication seems straightforward, copy snapshots to another cluster, but the implementation details matter.

8.5.1 Gotcha 1: Replication Lag Under Load

Asynchronous replication means the target cluster is always behind the source. During high backup activity (month end processing, batch loads), this lag can extend to hours. If a disaster occurs during this window, you lose more data than your SLA suggests.

Monitor replication lag as an operational metric, not just a capacity planning number.

8.5.2 Gotcha 2: Bandwidth Contention with Production Traffic

Replication competes for the same network paths as production traffic. If your backup replication saturates a WAN link, production application performance degrades.

Either implement QoS policies to protect production traffic, or schedule replication during low utilisation windows. Rubrik supports replication scheduling, but the default is “as fast as possible,” which isn’t always appropriate.

8.5.3 Gotcha 3: Cascaded Replication Complexity

For multi site architectures, you might configure Site A → Site B → Site C replication. Each hop adds latency and failure modes. A Site B outage breaks the chain to Site C.

Consider whether hub and spoke (Site A replicates independently to both B and C) better matches your DR requirements, despite the additional bandwidth consumption.

8.6 Archive Tier Selection: Retrieval Time Matters

Object storage isn’t monolithic. The choice between storage classes has direct recovery implications.

Storage ClassTypical Retrieval TimeUse Case
S3 Standard / Azure HotImmediateFrequently accessed archives
S3 Standard-IA / Azure CoolImmediate (higher retrieval cost)Infrequent but urgent access
S3 Glacier Instant RetrievalMillisecondsCompliance archives with occasional audit access
S3 Glacier Flexible Retrieval1-12 hoursLong-term retention with rare access
S3 Glacier Deep Archive12-48 hoursLegal hold, never access unless subpoenaed

Gotcha: Rubrik’s archive policy assigns snapshots to a single storage class. If your retention spans 7 years, all 7 years of archives pay the same storage rate, even though year 1 archives are accessed far more frequently than year 7 archives.

Recommendation: Consider tiered archive policies—recent archives to Standard-IA, aged archives to Glacier. This requires multiple SLA Domains and careful lifecycle management, but the cost savings compound significantly at scale.

8.7 Policy Assignment Gotchas

8.7.1 Gotcha 1: Inheritance and Override Conflicts

Rubrik supports hierarchical policy assignment (cluster → host → database). When policies conflict, the resolution logic isn’t always intuitive. A database with an explicit SLA assignment won’t inherit changes made to its parent host’s policy.

Document your policy hierarchy explicitly. During audits, the question “what policy actually applies to this database?” should have an immediate, verifiable answer.

8.7.2 Gotcha 2: Pre script and Post script Failures

Custom scripts for application quiescing or notification can fail, and failure handling varies. A pre script failure might skip the backup entirely (safe but creates a gap) or proceed without proper quiescing (dangerous).

Test script failure modes explicitly. Know what happens when your notification webhook is unreachable or your custom quiesce script times out.

8.7.3 Gotcha 3: Time Zone Confusion

Rubrik displays times in the cluster’s configured time zone, but SLA schedules operate in UTC unless explicitly configured otherwise. An “8 PM backup” might run at midnight local time if the time zone mapping is wrong.

Verify backup execution times after policy configuration, don’t trust the schedule display alone.

8.8 Testing Your Restore Policies

Policy design is theoretical until tested. The following tests should be regular operational practice:

Live Mount Validation: Mount a backup from local retention and verify application functionality. This proves both backup integrity and Live Mount operational capability.

Archive Retrieval Test: Retrieve a backup from archive tier and time the operation. Compare actual retrieval time against SLA commitments.

Replication Failover Test: Perform a Live Mount from the replication target, not the source cluster. This validates that DR actually works, not just that replication is running.

Point in Time Recovery Test: For databases with log backup enabled, recover to a specific timestamp between snapshots. This validates that log chain integrity is maintained.

Concurrent Restore Test: Simulate a ransomware scenario by triggering multiple simultaneous restores. Measure whether your infrastructure can sustain the required parallelism.

8.9 Policy Review Triggers

SLA Domains shouldn’t be “set and forget.” Trigger policy reviews when:

  • Application criticality changes (promotion to production, decommissioning)
  • Recovery requirements change (new compliance mandates, audit findings)
  • Infrastructure changes (new replication targets, storage tier availability)
  • Performance issues emerge (backup windows exceeded, replication lag growing)
  • Cost optimisation cycles (storage spend review, cloud egress analysis)

The goal is proactive policy maintenance, not reactive incident response when a restore takes longer than expected.

9. Ransomware: Where Architecture Is Exposed

9.1 The Restore Storm Problem

After ransomware, the challenge is not backup availability. The challenge is restoring everything at once.

Constraints appear immediately. East-west traffic saturates. DWDM links run hot. Core switch buffers overflow. Cloud egress throttling kicks in.

Rubrik mitigates this through parallel restores, SLA based prioritisation, and live mounts for critical systems. What it cannot do is defeat physics. A good recovery plan avoids turning a data breach into a network outage.

10. SaaS vs Appliance: This Is a Network Decision

Functionally, Rubrik SaaS and on prem appliances share the same policy engine, metadata index, and restore semantics.

The difference is bandwidth reality.

On prem appliances provide fast local restores, predictable latency, and minimal WAN dependency. SaaS based protection provides excellent cloud workload coverage and operational simplicity, but restore speed is bounded by network capacity and egress costs.

Hybrid estates usually require both.

11. Why Rubrik in the Cloud?

Cloud providers offer native backup primitives. These are necessary but insufficient. They do not provide unified policy across environments, cross account recovery at scale, ransomware intelligence, or consistent restore semantics. Rubrik turns cloud backups into recoverable systems rather than isolated snapshots.

11.1 Should You Protect Your AWS Root and Crypto Accounts?

Yes, because losing the control plane is worse than losing data.

Rubrik protects IAM configuration, account state, and infrastructure metadata. After a compromise, restoring how the account was configured is as important as restoring the data itself.

12. Backup Meets Security (Finally)

Rubrik integrates threat awareness into recovery using entropy analysis, change rate anomaly detection, and snapshot divergence tracking.cThis answers the most dangerous question in recovery: which backup is actually safe to restore? Most platforms cannot answer this with confidence.

13. VMware First Class Citizen, Physical Hosts Still Lag

Rubrik’s deepest integrations exist in VMware environments, including snapshot orchestration, instant VM recovery, and live mounts.

The uncomfortable reality remains that physical hosts with the largest datasets would benefit most from snapshot based protection, yet receive the least integration. This is an industry gap, not just a tooling one.

14. When Rubrik Is Not the Right Tool

Rubrik is not universal.

It is less optimal when bandwidth is severely constrained, estates are very small, or tape workflows are legally mandated.

Rubrik’s value emerges at scale, under pressure, and during failure.

15. Conclusion: Boredom Is Success

Backups should be boring. Restores should be quiet. Executives should never know the platform exists.

The only time backups become exciting is when they fail, and that excitement is almost always lethal.

Rubrik is not interesting because it stores data. It is interesting because, when everything is already on fire, restore remains a controlled engineering exercise rather than a panic response.

References

  1. Gartner Magic Quadrant for Enterprise Backup and Recovery Solutions – https://www.gartner.com/en/documents/5138291
  2. Rubrik Technical Architecture Whitepapers – https://www.rubrik.com/resources
  3. Microsoft SQL Server Backup and Restore Internals – https://learn.microsoft.com/en-us/sql/relational-databases/backup-restore/backup-overview-sql-server
  4. VMware Snapshot and Backup Best Practices – https://knowledge.broadcom.com/external/article?legacyId=1025279
  5. AWS Backup and Recovery Documentation – https://docs.aws.amazon.com/aws-backup/
  6. NIST SP 800-209 Security Guidelines for Storage Infrastructure – https://csrc.nist.gov/publications/detail/sp/800-209/final
  7. Rubrik SQL Live Mount Documentation – https://www.rubrik.com/solutions/sql-live-mount
  8. Rubrik Oracle Live Mount Documentation – https://docs.rubrik.com/en-us/saas/oracle/oracle_live_mount.html
  9. Rubrik for Oracle and Microsoft SQL Server Data Sheet – https://www.rubrik.com/content/dam/rubrik/en/resources/data-sheet/Rubrik-for-Oracle-and-Microsoft-SQL-Sever-DS.pdf
  10. Rubrik Enhanced Performance for Microsoft SQL and Oracle Database – https://www.rubrik.com/blog/technology/2021/12/rubrik-enhanced-performance-for-microsoft-sql-and-oracle-database
  11. Rubrik PostgreSQL Support Announcement – https://www.rubrik.com/blog/technology/24/10/rubrik-expands-database-protection-with-postgre-sql-support-and-on-premises-sensitive-data-monitoring-for-microsoft-sql-server
  12. Rubrik Elastic App Service – https://www.rubrik.com/solutions/elastic-app-service
  13. Rubrik and VMware vSphere Reference Architecture – https://www.rubrik.com/content/dam/rubrik/en/resources/white-paper/ra-rubrik-vmware-vsphere.pdf
  14. Protecting Microsoft SQL Server with Rubrik Technical White Paper – https://www.rubrik.com/content/dam/rubrik/en/resources/white-paper/rwp-protecting-microsoft-sql-server-with-rubrik.pdf
  15. The Definitive Guide to Rubrik Cloud Data Management – https://www.rubrik.com/content/dam/rubrik/en/resources/white-paper/rwp-definitive-guide-to-rubrik-cdm.pdf
  16. Rubrik Oracle Tools GitHub Repository – https://github.com/rubrikinc/rubrik_oracle_tools
  17. Automating SQL Server Live Mounts with Rubrik – https://virtuallysober.com/2017/08/08/automating-sql-server-live-mounts-with-rubrik-alta-4-0/
0
0

Aurora PostgreSQL: Archiving and Restoring Partitions from Large Tables to Iceberg and Parquet on S3

A Complete Guide to Archiving, Restoring, and Querying Large Table Partitions

When dealing with multi-terabyte tables in Aurora PostgreSQL, keeping historical partitions online becomes increasingly expensive and operationally burdensome. This guide presents a complete solution for archiving partitions to S3 in Iceberg/Parquet format, restoring them when needed, and querying archived data directly via a Spring Boot API without database restoration.

1. Architecture Overview

The solution comprises three components:

  1. Archive Script: Exports a partition from Aurora PostgreSQL to Parquet files organised in Iceberg table format on S3
  2. Restore Script: Imports archived data from S3 back into a staging table for validation and migration to the main table
  3. Query API: A Spring Boot application that reads Parquet files directly from S3, applying predicate pushdown for efficient filtering

This approach reduces storage costs by approximately 70 to 80 percent compared to keeping data in Aurora, while maintaining full queryability through the API layer.

2. Prerequisites and Dependencies

2.1 Python Environment

pip install boto3 pyarrow psycopg2-binary pandas pyiceberg sqlalchemy

2.2 AWS Configuration

Ensure your environment has appropriate IAM permissions:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:DeleteObject"
            ],
            "Resource": [
                "arn:aws:s3:::your-archive-bucket",
                "arn:aws:s3:::your-archive-bucket/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "glue:CreateTable",
                "glue:GetTable",
                "glue:UpdateTable",
                "glue:DeleteTable"
            ],
            "Resource": "*"
        }
    ]
}

2.3 Java Dependencies

For the Spring Boot API, add these to your pom.xml:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-avro</artifactId>
        <version>1.14.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aws</artifactId>
        <version>3.3.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>3.3.6</version>
    </dependency>
    <dependency>
        <groupId>software.amazon.awssdk</groupId>
        <artifactId>s3</artifactId>
        <version>2.25.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-core</artifactId>
        <version>1.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-parquet</artifactId>
        <version>1.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-aws</artifactId>
        <version>1.5.0</version>
    </dependency>
</dependencies>

3. The Archive Script

This script extracts a partition from Aurora PostgreSQL and writes it to S3 in Iceberg table format with Parquet data files.

3.1 Configuration Module

# config.py

from dataclasses import dataclass
from typing import Optional
import os


@dataclass
class DatabaseConfig:
    host: str
    port: int
    database: str
    user: str
    password: str

    @classmethod
    def from_environment(cls) -> "DatabaseConfig":
        return cls(
            host=os.environ["AURORA_HOST"],
            port=int(os.environ.get("AURORA_PORT", "5432")),
            database=os.environ["AURORA_DATABASE"],
            user=os.environ["AURORA_USER"],
            password=os.environ["AURORA_PASSWORD"]
        )

    @property
    def connection_string(self) -> str:
        return f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}"


@dataclass
class S3Config:
    bucket: str
    prefix: str
    region: str

    @classmethod
    def from_environment(cls) -> "S3Config":
        return cls(
            bucket=os.environ["ARCHIVE_S3_BUCKET"],
            prefix=os.environ.get("ARCHIVE_S3_PREFIX", "aurora-archives"),
            region=os.environ.get("AWS_REGION", "eu-west-1")
        )


@dataclass  
class ArchiveConfig:
    table_name: str
    partition_column: str
    partition_value: str
    schema_name: str = "public"
    batch_size: int = 100000
    compression: str = "snappy"

3.2 Schema Introspection

# schema_inspector.py

from typing import Dict, List, Tuple, Any
import psycopg2
from psycopg2.extras import RealDictCursor
import pyarrow as pa


class SchemaInspector:
    """Inspects PostgreSQL table schema and converts to PyArrow schema."""

    POSTGRES_TO_ARROW = {
        "integer": pa.int32(),
        "bigint": pa.int64(),
        "smallint": pa.int16(),
        "real": pa.float32(),
        "double precision": pa.float64(),
        "numeric": pa.decimal128(38, 10),
        "text": pa.string(),
        "character varying": pa.string(),
        "varchar": pa.string(),
        "char": pa.string(),
        "character": pa.string(),
        "boolean": pa.bool_(),
        "date": pa.date32(),
        "timestamp without time zone": pa.timestamp("us"),
        "timestamp with time zone": pa.timestamp("us", tz="UTC"),
        "time without time zone": pa.time64("us"),
        "time with time zone": pa.time64("us"),
        "uuid": pa.string(),
        "json": pa.string(),
        "jsonb": pa.string(),
        "bytea": pa.binary(),
    }

    def __init__(self, connection_string: str):
        self.connection_string = connection_string

    def get_table_columns(
        self, 
        schema_name: str, 
        table_name: str
    ) -> List[Dict[str, Any]]:
        """Retrieve column definitions from information_schema."""
        query = """
            SELECT 
                column_name,
                data_type,
                is_nullable,
                column_default,
                ordinal_position,
                character_maximum_length,
                numeric_precision,
                numeric_scale
            FROM information_schema.columns
            WHERE table_schema = %s 
              AND table_name = %s
            ORDER BY ordinal_position
        """

        with psycopg2.connect(self.connection_string) as conn:
            with conn.cursor(cursor_factory=RealDictCursor) as cur:
                cur.execute(query, (schema_name, table_name))
                return list(cur.fetchall())

    def get_primary_key_columns(
        self, 
        schema_name: str, 
        table_name: str
    ) -> List[str]:
        """Retrieve primary key column names."""
        query = """
            SELECT a.attname
            FROM pg_index i
            JOIN pg_attribute a ON a.attrelid = i.indrelid 
                AND a.attnum = ANY(i.indkey)
            JOIN pg_class c ON c.oid = i.indrelid
            JOIN pg_namespace n ON n.oid = c.relnamespace
            WHERE i.indisprimary
              AND n.nspname = %s
              AND c.relname = %s
            ORDER BY array_position(i.indkey, a.attnum)
        """

        with psycopg2.connect(self.connection_string) as conn:
            with conn.cursor() as cur:
                cur.execute(query, (schema_name, table_name))
                return [row[0] for row in cur.fetchall()]

    def postgres_type_to_arrow(
        self, 
        pg_type: str, 
        precision: int = None, 
        scale: int = None
    ) -> pa.DataType:
        """Convert PostgreSQL type to PyArrow type."""
        pg_type_lower = pg_type.lower()

        if pg_type_lower == "numeric" and precision and scale:
            return pa.decimal128(precision, scale)

        if pg_type_lower in self.POSTGRES_TO_ARROW:
            return self.POSTGRES_TO_ARROW[pg_type_lower]

        if pg_type_lower.startswith("character varying"):
            return pa.string()

        if pg_type_lower.endswith("[]"):
            base_type = pg_type_lower[:-2]
            if base_type in self.POSTGRES_TO_ARROW:
                return pa.list_(self.POSTGRES_TO_ARROW[base_type])

        return pa.string()

    def build_arrow_schema(
        self, 
        schema_name: str, 
        table_name: str
    ) -> pa.Schema:
        """Build PyArrow schema from PostgreSQL table definition."""
        columns = self.get_table_columns(schema_name, table_name)

        fields = []
        for col in columns:
            arrow_type = self.postgres_type_to_arrow(
                col["data_type"],
                col.get("numeric_precision"),
                col.get("numeric_scale")
            )
            nullable = col["is_nullable"] == "YES"
            fields.append(pa.field(col["column_name"], arrow_type, nullable=nullable))

        return pa.Schema(fields)

    def get_partition_info(
        self, 
        schema_name: str, 
        table_name: str
    ) -> Dict[str, Any]:
        """Get partition strategy information if table is partitioned."""
        query = """
            SELECT 
                pt.partstrat,
                array_agg(a.attname ORDER BY pos.idx) as partition_columns
            FROM pg_partitioned_table pt
            JOIN pg_class c ON c.oid = pt.partrelid
            JOIN pg_namespace n ON n.oid = c.relnamespace
            CROSS JOIN LATERAL unnest(pt.partattrs) WITH ORDINALITY AS pos(attnum, idx)
            JOIN pg_attribute a ON a.attrelid = c.oid AND a.attnum = pos.attnum
            WHERE n.nspname = %s AND c.relname = %s
            GROUP BY pt.partstrat
        """

        with psycopg2.connect(self.connection_string) as conn:
            with conn.cursor(cursor_factory=RealDictCursor) as cur:
                cur.execute(query, (schema_name, table_name))
                result = cur.fetchone()

                if result:
                    strategy_map = {"r": "range", "l": "list", "h": "hash"}
                    return {
                        "is_partitioned": True,
                        "strategy": strategy_map.get(result["partstrat"], "unknown"),
                        "partition_columns": result["partition_columns"]
                    }

                return {"is_partitioned": False}

3.3 Archive Script

#!/usr/bin/env python3
# archive_partition.py

"""
Archive a partition from Aurora PostgreSQL to S3 in Iceberg/Parquet format.

Usage:
    python archive_partition.py \
        --table transactions \
        --partition-column transaction_date \
        --partition-value 2024-01 \
        --schema public
"""

import argparse
import json
import logging
import sys
from datetime import datetime
from pathlib import Path
from typing import Generator, Dict, Any, Optional
import hashlib

import boto3
import pandas as pd
import psycopg2
from psycopg2.extras import RealDictCursor
import pyarrow as pa
import pyarrow.parquet as pq

from config import DatabaseConfig, S3Config, ArchiveConfig
from schema_inspector import SchemaInspector


logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)


class PartitionArchiver:
    """Archives PostgreSQL partitions to S3 in Iceberg format."""

    def __init__(
        self,
        db_config: DatabaseConfig,
        s3_config: S3Config,
        archive_config: ArchiveConfig
    ):
        self.db_config = db_config
        self.s3_config = s3_config
        self.archive_config = archive_config
        self.schema_inspector = SchemaInspector(db_config.connection_string)
        self.s3_client = boto3.client("s3", region_name=s3_config.region)

    def _generate_archive_path(self) -> str:
        """Generate S3 path for archive files."""
        return (
            f"{self.s3_config.prefix}/"
            f"{self.archive_config.schema_name}/"
            f"{self.archive_config.table_name}/"
            f"{self.archive_config.partition_column}={self.archive_config.partition_value}"
        )

    def _build_extraction_query(self) -> str:
        """Build query to extract partition data."""
        full_table = (
            f"{self.archive_config.schema_name}.{self.archive_config.table_name}"
        )

        partition_info = self.schema_inspector.get_partition_info(
            self.archive_config.schema_name,
            self.archive_config.table_name
        )

        if partition_info["is_partitioned"]:
            partition_table = (
                f"{self.archive_config.table_name}_"
                f"{self.archive_config.partition_value.replace('-', '_')}"
            )
            return f"SELECT * FROM {self.archive_config.schema_name}.{partition_table}"

        return (
            f"SELECT * FROM {full_table} "
            f"WHERE {self.archive_config.partition_column} = %s"
        )

    def _stream_partition_data(self) -> Generator[pd.DataFrame, None, None]:
        """Stream partition data in batches."""
        query = self._build_extraction_query()
        partition_info = self.schema_inspector.get_partition_info(
            self.archive_config.schema_name,
            self.archive_config.table_name
        )

        with psycopg2.connect(self.db_config.connection_string) as conn:
            with conn.cursor(
                name="archive_cursor",
                cursor_factory=RealDictCursor
            ) as cur:
                if partition_info["is_partitioned"]:
                    cur.execute(query)
                else:
                    cur.execute(query, (self.archive_config.partition_value,))

                while True:
                    rows = cur.fetchmany(self.archive_config.batch_size)
                    if not rows:
                        break
                    yield pd.DataFrame(rows)

    def _write_parquet_to_s3(
        self,
        table: pa.Table,
        file_number: int,
        arrow_schema: pa.Schema
    ) -> Dict[str, Any]:
        """Write a Parquet file to S3 and return metadata."""
        archive_path = self._generate_archive_path()
        file_name = f"data_{file_number:05d}.parquet"
        s3_key = f"{archive_path}/data/{file_name}"

        buffer = pa.BufferOutputStream()
        pq.write_table(
            table,
            buffer,
            compression=self.archive_config.compression,
            write_statistics=True
        )

        data = buffer.getvalue().to_pybytes()

        self.s3_client.put_object(
            Bucket=self.s3_config.bucket,
            Key=s3_key,
            Body=data,
            ContentType="application/octet-stream"
        )

        return {
            "path": f"s3://{self.s3_config.bucket}/{s3_key}",
            "file_size_bytes": len(data),
            "record_count": table.num_rows
        }

    def _write_iceberg_metadata(
        self,
        arrow_schema: pa.Schema,
        data_files: list,
        total_records: int
    ) -> None:
        """Write Iceberg table metadata files."""
        archive_path = self._generate_archive_path()

        schema_dict = {
            "type": "struct",
            "schema_id": 0,
            "fields": []
        }

        for i, field in enumerate(arrow_schema):
            field_type = self._arrow_to_iceberg_type(field.type)
            schema_dict["fields"].append({
                "id": i + 1,
                "name": field.name,
                "required": not field.nullable,
                "type": field_type
            })

        table_uuid = hashlib.md5(
            f"{self.archive_config.table_name}_{datetime.utcnow().isoformat()}".encode()
        ).hexdigest()

        manifest_entries = []
        for df in data_files:
            manifest_entries.append({
                "status": 1,
                "snapshot_id": 1,
                "data_file": {
                    "file_path": df["path"],
                    "file_format": "PARQUET",
                    "record_count": df["record_count"],
                    "file_size_in_bytes": df["file_size_bytes"]
                }
            })

        metadata = {
            "format_version": 2,
            "table_uuid": table_uuid,
            "location": f"s3://{self.s3_config.bucket}/{archive_path}",
            "last_sequence_number": 1,
            "last_updated_ms": int(datetime.utcnow().timestamp() * 1000),
            "last_column_id": len(arrow_schema),
            "current_schema_id": 0,
            "schemas": [schema_dict],
            "partition_spec": [],
            "default_spec_id": 0,
            "last_partition_id": 0,
            "properties": {
                "source.table": self.archive_config.table_name,
                "source.schema": self.archive_config.schema_name,
                "source.partition_column": self.archive_config.partition_column,
                "source.partition_value": self.archive_config.partition_value,
                "archive.timestamp": datetime.utcnow().isoformat(),
                "archive.total_records": str(total_records)
            },
            "current_snapshot_id": 1,
            "snapshots": [
                {
                    "snapshot_id": 1,
                    "timestamp_ms": int(datetime.utcnow().timestamp() * 1000),
                    "summary": {
                        "operation": "append",
                        "added_data_files": str(len(data_files)),
                        "total_records": str(total_records)
                    },
                    "manifest_list": f"s3://{self.s3_config.bucket}/{archive_path}/metadata/manifest_list.json"
                }
            ]
        }

        self.s3_client.put_object(
            Bucket=self.s3_config.bucket,
            Key=f"{archive_path}/metadata/v1.metadata.json",
            Body=json.dumps(metadata, indent=2),
            ContentType="application/json"
        )

        self.s3_client.put_object(
            Bucket=self.s3_config.bucket,
            Key=f"{archive_path}/metadata/manifest_list.json",
            Body=json.dumps(manifest_entries, indent=2),
            ContentType="application/json"
        )

        self.s3_client.put_object(
            Bucket=self.s3_config.bucket,
            Key=f"{archive_path}/metadata/version_hint.text",
            Body="1",
            ContentType="text/plain"
        )

    def _arrow_to_iceberg_type(self, arrow_type: pa.DataType) -> str:
        """Convert PyArrow type to Iceberg type string."""
        type_mapping = {
            pa.int16(): "int",
            pa.int32(): "int",
            pa.int64(): "long",
            pa.float32(): "float",
            pa.float64(): "double",
            pa.bool_(): "boolean",
            pa.string(): "string",
            pa.binary(): "binary",
            pa.date32(): "date",
        }

        if arrow_type in type_mapping:
            return type_mapping[arrow_type]

        if pa.types.is_timestamp(arrow_type):
            if arrow_type.tz:
                return "timestamptz"
            return "timestamp"

        if pa.types.is_decimal(arrow_type):
            return f"decimal({arrow_type.precision},{arrow_type.scale})"

        if pa.types.is_list(arrow_type):
            inner = self._arrow_to_iceberg_type(arrow_type.value_type)
            return f"list<{inner}>"

        return "string"

    def _save_schema_snapshot(self, arrow_schema: pa.Schema) -> None:
        """Save schema information for restore validation."""
        archive_path = self._generate_archive_path()

        columns = self.schema_inspector.get_table_columns(
            self.archive_config.schema_name,
            self.archive_config.table_name
        )

        pk_columns = self.schema_inspector.get_primary_key_columns(
            self.archive_config.schema_name,
            self.archive_config.table_name
        )

        schema_snapshot = {
            "source_table": {
                "schema": self.archive_config.schema_name,
                "table": self.archive_config.table_name
            },
            "columns": columns,
            "primary_key_columns": pk_columns,
            "arrow_schema": arrow_schema.to_string(),
            "archived_at": datetime.utcnow().isoformat()
        }

        self.s3_client.put_object(
            Bucket=self.s3_config.bucket,
            Key=f"{archive_path}/schema_snapshot.json",
            Body=json.dumps(schema_snapshot, indent=2, default=str),
            ContentType="application/json"
        )

    def archive(self) -> Dict[str, Any]:
        """Execute the archive operation."""
        logger.info(
            f"Starting archive of {self.archive_config.schema_name}."
            f"{self.archive_config.table_name} "
            f"partition {self.archive_config.partition_column}="
            f"{self.archive_config.partition_value}"
        )

        arrow_schema = self.schema_inspector.build_arrow_schema(
            self.archive_config.schema_name,
            self.archive_config.table_name
        )

        self._save_schema_snapshot(arrow_schema)

        data_files = []
        total_records = 0
        file_number = 0

        for batch_df in self._stream_partition_data():
            table = pa.Table.from_pandas(batch_df, schema=arrow_schema)
            file_meta = self._write_parquet_to_s3(table, file_number, arrow_schema)
            data_files.append(file_meta)
            total_records += file_meta["record_count"]
            file_number += 1

            logger.info(
                f"Written file {file_number}: {file_meta['record_count']} records"
            )

        self._write_iceberg_metadata(arrow_schema, data_files, total_records)

        result = {
            "status": "success",
            "table": f"{self.archive_config.schema_name}.{self.archive_config.table_name}",
            "partition": f"{self.archive_config.partition_column}={self.archive_config.partition_value}",
            "location": f"s3://{self.s3_config.bucket}/{self._generate_archive_path()}",
            "total_records": total_records,
            "total_files": len(data_files),
            "total_bytes": sum(f["file_size_bytes"] for f in data_files),
            "archived_at": datetime.utcnow().isoformat()
        }

        logger.info(f"Archive complete: {total_records} records in {len(data_files)} files")
        return result


def main():
    parser = argparse.ArgumentParser(
        description="Archive Aurora PostgreSQL partition to S3"
    )
    parser.add_argument("--table", required=True, help="Table name")
    parser.add_argument("--partition-column", required=True, help="Partition column name")
    parser.add_argument("--partition-value", required=True, help="Partition value to archive")
    parser.add_argument("--schema", default="public", help="Schema name")
    parser.add_argument("--batch-size", type=int, default=100000, help="Batch size for streaming")

    args = parser.parse_args()

    db_config = DatabaseConfig.from_environment()
    s3_config = S3Config.from_environment()
    archive_config = ArchiveConfig(
        table_name=args.table,
        partition_column=args.partition_column,
        partition_value=args.partition_value,
        schema_name=args.schema,
        batch_size=args.batch_size
    )

    archiver = PartitionArchiver(db_config, s3_config, archive_config)
    result = archiver.archive()

    print(json.dumps(result, indent=2))
    return 0


if __name__ == "__main__":
    sys.exit(main())

4. The Restore Script

This script reverses the archive operation by reading Parquet files from S3 and loading them into a staging table.

4.1 Restore Script

#!/usr/bin/env python3
# restore_partition.py

"""
Restore an archived partition from S3 back to Aurora PostgreSQL.

Usage:
    python restore_partition.py \
        --source-path s3://bucket/prefix/schema/table/partition_col=value \
        --target-table transactions_staging \
        --target-schema public
"""

import argparse
import json
import logging
import sys
from datetime import datetime
from typing import Dict, Any, List, Optional
from urllib.parse import urlparse

import boto3
import pandas as pd
import psycopg2
from psycopg2 import sql
from psycopg2.extras import execute_values
import pyarrow.parquet as pq

from config import DatabaseConfig


logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)


class PartitionRestorer:
    """Restores archived partitions from S3 to PostgreSQL."""

    def __init__(
        self,
        db_config: DatabaseConfig,
        source_path: str,
        target_schema: str,
        target_table: str,
        create_table: bool = True,
        batch_size: int = 10000
    ):
        self.db_config = db_config
        self.source_path = source_path
        self.target_schema = target_schema
        self.target_table = target_table
        self.create_table = create_table
        self.batch_size = batch_size

        parsed = urlparse(source_path)
        self.bucket = parsed.netloc
        self.prefix = parsed.path.lstrip("/")

        self.s3_client = boto3.client("s3")

    def _load_schema_snapshot(self) -> Dict[str, Any]:
        """Load the schema snapshot from the archive."""
        response = self.s3_client.get_object(
            Bucket=self.bucket,
            Key=f"{self.prefix}/schema_snapshot.json"
        )
        return json.loads(response["Body"].read())

    def _load_iceberg_metadata(self) -> Dict[str, Any]:
        """Load Iceberg metadata."""
        response = self.s3_client.get_object(
            Bucket=self.bucket,
            Key=f"{self.prefix}/metadata/v1.metadata.json"
        )
        return json.loads(response["Body"].read())

    def _list_data_files(self) -> List[str]:
        """List all Parquet data files in the archive."""
        data_prefix = f"{self.prefix}/data/"

        files = []
        paginator = self.s3_client.get_paginator("list_objects_v2")

        for page in paginator.paginate(Bucket=self.bucket, Prefix=data_prefix):
            for obj in page.get("Contents", []):
                if obj["Key"].endswith(".parquet"):
                    files.append(obj["Key"])

        return sorted(files)

    def _postgres_type_from_column_def(self, col: Dict[str, Any]) -> str:
        """Convert column definition to PostgreSQL type."""
        data_type = col["data_type"]

        if data_type == "character varying":
            max_len = col.get("character_maximum_length")
            if max_len:
                return f"varchar({max_len})"
            return "text"

        if data_type == "numeric":
            precision = col.get("numeric_precision")
            scale = col.get("numeric_scale")
            if precision and scale:
                return f"numeric({precision},{scale})"
            return "numeric"

        return data_type

    def _create_staging_table(
        self,
        schema_snapshot: Dict[str, Any],
        conn: psycopg2.extensions.connection
    ) -> None:
        """Create the staging table based on archived schema."""
        columns = schema_snapshot["columns"]

        column_defs = []
        for col in columns:
            pg_type = self._postgres_type_from_column_def(col)
            nullable = "" if col["is_nullable"] == "YES" else " NOT NULL"
            column_defs.append(f'    "{col["column_name"]}" {pg_type}{nullable}')

        create_sql = f"""
            DROP TABLE IF EXISTS {self.target_schema}.{self.target_table};
            CREATE TABLE {self.target_schema}.{self.target_table} (
{chr(10).join(column_defs)}
            )
        """

        with conn.cursor() as cur:
            cur.execute(create_sql)
        conn.commit()

        logger.info(f"Created staging table {self.target_schema}.{self.target_table}")

    def _insert_batch(
        self,
        df: pd.DataFrame,
        columns: List[str],
        conn: psycopg2.extensions.connection
    ) -> int:
        """Insert a batch of records into the staging table."""
        if df.empty:
            return 0

        for col in df.columns:
            if pd.api.types.is_datetime64_any_dtype(df[col]):
                df[col] = df[col].apply(
                    lambda x: x.isoformat() if pd.notna(x) else None
                )

        values = [tuple(row) for row in df[columns].values]

        column_names = ", ".join(f'"{c}"' for c in columns)
        insert_sql = f"""
            INSERT INTO {self.target_schema}.{self.target_table} ({column_names})
            VALUES %s
        """

        with conn.cursor() as cur:
            execute_values(cur, insert_sql, values, page_size=self.batch_size)

        return len(values)

    def restore(self) -> Dict[str, Any]:
        """Execute the restore operation."""
        logger.info(f"Starting restore from {self.source_path}")

        schema_snapshot = self._load_schema_snapshot()
        metadata = self._load_iceberg_metadata()
        data_files = self._list_data_files()

        logger.info(f"Found {len(data_files)} data files to restore")

        columns = [col["column_name"] for col in schema_snapshot["columns"]]

        with psycopg2.connect(self.db_config.connection_string) as conn:
            if self.create_table:
                self._create_staging_table(schema_snapshot, conn)

            total_records = 0

            for file_key in data_files:
                s3_uri = f"s3://{self.bucket}/{file_key}"
                logger.info(f"Restoring from {file_key}")

                response = self.s3_client.get_object(Bucket=self.bucket, Key=file_key)
                table = pq.read_table(response["Body"])
                df = table.to_pandas()

                file_records = 0
                for start in range(0, len(df), self.batch_size):
                    batch_df = df.iloc[start:start + self.batch_size]
                    inserted = self._insert_batch(batch_df, columns, conn)
                    file_records += inserted

                conn.commit()
                total_records += file_records
                logger.info(f"Restored {file_records} records from {file_key}")

        result = {
            "status": "success",
            "source": self.source_path,
            "target": f"{self.target_schema}.{self.target_table}",
            "total_records": total_records,
            "files_processed": len(data_files),
            "restored_at": datetime.utcnow().isoformat()
        }

        logger.info(f"Restore complete: {total_records} records")
        return result


def main():
    parser = argparse.ArgumentParser(
        description="Restore archived partition from S3 to PostgreSQL"
    )
    parser.add_argument("--source-path", required=True, help="S3 path to archived partition")
    parser.add_argument("--target-table", required=True, help="Target table name")
    parser.add_argument("--target-schema", default="public", help="Target schema name")
    parser.add_argument("--batch-size", type=int, default=10000, help="Insert batch size")
    parser.add_argument("--no-create", action="store_true", help="Don't create table, assume it exists")

    args = parser.parse_args()

    db_config = DatabaseConfig.from_environment()

    restorer = PartitionRestorer(
        db_config=db_config,
        source_path=args.source_path,
        target_schema=args.target_schema,
        target_table=args.target_table,
        create_table=not args.no_create,
        batch_size=args.batch_size
    )

    result = restorer.restore()
    print(json.dumps(result, indent=2))
    return 0


if __name__ == "__main__":
    sys.exit(main())

5. SQL Operations for Partition Migration

Once data is restored to a staging table, you need SQL operations to validate and migrate it to the main table.

5.1 Schema Validation

-- Validate that staging table schema matches the main table

CREATE OR REPLACE FUNCTION validate_table_schemas(
    p_source_schema TEXT,
    p_source_table TEXT,
    p_target_schema TEXT,
    p_target_table TEXT
) RETURNS TABLE (
    validation_type TEXT,
    column_name TEXT,
    source_value TEXT,
    target_value TEXT,
    is_valid BOOLEAN
) AS $$
BEGIN
    -- Check column count
    RETURN QUERY
    SELECT 
        'column_count'::TEXT,
        NULL::TEXT,
        src.cnt::TEXT,
        tgt.cnt::TEXT,
        src.cnt = tgt.cnt
    FROM 
        (SELECT COUNT(*)::INT AS cnt 
         FROM information_schema.columns 
         WHERE table_schema = p_source_schema 
           AND table_name = p_source_table) src,
        (SELECT COUNT(*)::INT AS cnt 
         FROM information_schema.columns 
         WHERE table_schema = p_target_schema 
           AND table_name = p_target_table) tgt;

    -- Check each column exists with matching type
    RETURN QUERY
    SELECT 
        'column_definition'::TEXT,
        src.column_name,
        src.data_type || COALESCE('(' || src.character_maximum_length::TEXT || ')', ''),
        COALESCE(tgt.data_type || COALESCE('(' || tgt.character_maximum_length::TEXT || ')', ''), 'MISSING'),
        src.data_type = COALESCE(tgt.data_type, '') 
            AND COALESCE(src.character_maximum_length, 0) = COALESCE(tgt.character_maximum_length, 0)
    FROM 
        information_schema.columns src
    LEFT JOIN 
        information_schema.columns tgt 
        ON tgt.table_schema = p_target_schema 
        AND tgt.table_name = p_target_table
        AND tgt.column_name = src.column_name
    WHERE 
        src.table_schema = p_source_schema 
        AND src.table_name = p_source_table
    ORDER BY src.ordinal_position;

    -- Check nullability
    RETURN QUERY
    SELECT 
        'nullability'::TEXT,
        src.column_name,
        src.is_nullable,
        COALESCE(tgt.is_nullable, 'MISSING'),
        src.is_nullable = COALESCE(tgt.is_nullable, '')
    FROM 
        information_schema.columns src
    LEFT JOIN 
        information_schema.columns tgt 
        ON tgt.table_schema = p_target_schema 
        AND tgt.table_name = p_target_table
        AND tgt.column_name = src.column_name
    WHERE 
        src.table_schema = p_source_schema 
        AND src.table_name = p_source_table
    ORDER BY src.ordinal_position;
END;
$$ LANGUAGE plpgsql;

-- Usage
SELECT * FROM validate_table_schemas('public', 'transactions_staging', 'public', 'transactions');

5.2 Comprehensive Validation Report

-- Generate a full validation report before migration

CREATE OR REPLACE FUNCTION generate_migration_report(
    p_staging_schema TEXT,
    p_staging_table TEXT,
    p_target_schema TEXT,
    p_target_table TEXT,
    p_partition_column TEXT,
    p_partition_value TEXT
) RETURNS TABLE (
    check_name TEXT,
    result TEXT,
    details JSONB
) AS $$
DECLARE
    v_staging_count BIGINT;
    v_existing_count BIGINT;
    v_schema_valid BOOLEAN;
BEGIN
    -- Get staging table count
    EXECUTE format(
        'SELECT COUNT(*) FROM %I.%I',
        p_staging_schema, p_staging_table
    ) INTO v_staging_count;

    RETURN QUERY SELECT 
        'staging_record_count'::TEXT,
        'INFO'::TEXT,
        jsonb_build_object('count', v_staging_count);

    -- Check for existing data in target partition
    BEGIN
        EXECUTE format(
            'SELECT COUNT(*) FROM %I.%I WHERE %I = $1',
            p_target_schema, p_target_table, p_partition_column
        ) INTO v_existing_count USING p_partition_value;

        IF v_existing_count > 0 THEN
            RETURN QUERY SELECT 
                'existing_partition_data'::TEXT,
                'WARNING'::TEXT,
                jsonb_build_object(
                    'count', v_existing_count,
                    'message', 'Target partition already contains data'
                );
        ELSE
            RETURN QUERY SELECT 
                'existing_partition_data'::TEXT,
                'OK'::TEXT,
                jsonb_build_object('count', 0);
        END IF;
    EXCEPTION WHEN undefined_column THEN
        RETURN QUERY SELECT 
            'partition_column_check'::TEXT,
            'ERROR'::TEXT,
            jsonb_build_object(
                'message', format('Partition column %s not found', p_partition_column)
            );
    END;

    -- Validate schemas match
    SELECT bool_and(is_valid) INTO v_schema_valid
    FROM validate_table_schemas(
        p_staging_schema, p_staging_table,
        p_target_schema, p_target_table
    );

    RETURN QUERY SELECT 
        'schema_validation'::TEXT,
        CASE WHEN v_schema_valid THEN 'OK' ELSE 'ERROR' END::TEXT,
        jsonb_build_object('schemas_match', v_schema_valid);

    -- Check for null values in NOT NULL columns
    RETURN QUERY
    SELECT 
        'null_check_' || c.column_name,
        CASE WHEN null_count > 0 THEN 'ERROR' ELSE 'OK' END,
        jsonb_build_object('null_count', null_count)
    FROM information_schema.columns c
    CROSS JOIN LATERAL (
        SELECT COUNT(*) as null_count 
        FROM (
            SELECT 1 
            FROM information_schema.columns ic
            WHERE ic.table_schema = p_staging_schema
              AND ic.table_name = p_staging_table
              AND ic.column_name = c.column_name
        ) x
    ) nc
    WHERE c.table_schema = p_target_schema
      AND c.table_name = p_target_table
      AND c.is_nullable = 'NO';
END;
$$ LANGUAGE plpgsql;

-- Usage
SELECT * FROM generate_migration_report(
    'public', 'transactions_staging',
    'public', 'transactions',
    'transaction_date', '2024-01'
);

5.3 Partition Migration

-- Migrate data from staging table to main table

CREATE OR REPLACE PROCEDURE migrate_partition_data(
    p_staging_schema TEXT,
    p_staging_table TEXT,
    p_target_schema TEXT,
    p_target_table TEXT,
    p_partition_column TEXT,
    p_partition_value TEXT,
    p_delete_existing BOOLEAN DEFAULT FALSE,
    p_batch_size INTEGER DEFAULT 50000
)
LANGUAGE plpgsql
AS $$
DECLARE
    v_columns TEXT;
    v_total_migrated BIGINT := 0;
    v_batch_migrated BIGINT;
    v_validation_passed BOOLEAN;
BEGIN
    -- Validate schemas match
    SELECT bool_and(is_valid) INTO v_validation_passed
    FROM validate_table_schemas(
        p_staging_schema, p_staging_table,
        p_target_schema, p_target_table
    );

    IF NOT v_validation_passed THEN
        RAISE EXCEPTION 'Schema validation failed. Run validate_table_schemas() for details.';
    END IF;

    -- Build column list
    SELECT string_agg(quote_ident(column_name), ', ' ORDER BY ordinal_position)
    INTO v_columns
    FROM information_schema.columns
    WHERE table_schema = p_staging_schema
      AND table_name = p_staging_table;

    -- Delete existing data if requested
    IF p_delete_existing THEN
        EXECUTE format(
            'DELETE FROM %I.%I WHERE %I = $1',
            p_target_schema, p_target_table, p_partition_column
        ) USING p_partition_value;

        RAISE NOTICE 'Deleted existing data for partition % = %', 
            p_partition_column, p_partition_value;
    END IF;

    -- Migrate in batches using a cursor approach
    LOOP
        EXECUTE format($sql$
            WITH to_migrate AS (
                SELECT ctid 
                FROM %I.%I 
                WHERE NOT EXISTS (
                    SELECT 1 FROM %I.%I t 
                    WHERE t.%I = $1
                )
                LIMIT $2
            ),
            inserted AS (
                INSERT INTO %I.%I (%s)
                SELECT %s 
                FROM %I.%I s
                WHERE s.ctid IN (SELECT ctid FROM to_migrate)
                RETURNING 1
            )
            SELECT COUNT(*) FROM inserted
        $sql$,
            p_staging_schema, p_staging_table,
            p_target_schema, p_target_table, p_partition_column,
            p_target_schema, p_target_table, v_columns,
            v_columns,
            p_staging_schema, p_staging_table
        ) INTO v_batch_migrated USING p_partition_value, p_batch_size;

        v_total_migrated := v_total_migrated + v_batch_migrated;

        IF v_batch_migrated = 0 THEN
            EXIT;
        END IF;

        RAISE NOTICE 'Migrated batch: % records (total: %)', v_batch_migrated, v_total_migrated;
        COMMIT;
    END LOOP;

    RAISE NOTICE 'Migration complete. Total records migrated: %', v_total_migrated;
END;
$$;

-- Usage
CALL migrate_partition_data(
    'public', 'transactions_staging',
    'public', 'transactions',
    'transaction_date', '2024-01',
    TRUE,   -- delete existing
    50000   -- batch size
);

5.4 Attach Partition (for Partitioned Tables)

-- For natively partitioned tables, attach the staging table as a partition

CREATE OR REPLACE PROCEDURE attach_restored_partition(
    p_staging_schema TEXT,
    p_staging_table TEXT,
    p_target_schema TEXT,
    p_target_table TEXT,
    p_partition_column TEXT,
    p_partition_start TEXT,
    p_partition_end TEXT
)
LANGUAGE plpgsql
AS $$
DECLARE
    v_partition_name TEXT;
    v_constraint_name TEXT;
BEGIN
    -- Validate schemas match
    IF NOT (
        SELECT bool_and(is_valid)
        FROM validate_table_schemas(
            p_staging_schema, p_staging_table,
            p_target_schema, p_target_table
        )
    ) THEN
        RAISE EXCEPTION 'Schema validation failed';
    END IF;

    -- Add constraint to staging table that matches partition bounds
    v_constraint_name := p_staging_table || '_partition_check';

    EXECUTE format($sql$
        ALTER TABLE %I.%I 
        ADD CONSTRAINT %I 
        CHECK (%I >= %L AND %I < %L)
    $sql$,
        p_staging_schema, p_staging_table,
        v_constraint_name,
        p_partition_column, p_partition_start,
        p_partition_column, p_partition_end
    );

    -- Validate constraint without locking
    EXECUTE format($sql$
        ALTER TABLE %I.%I 
        VALIDATE CONSTRAINT %I
    $sql$,
        p_staging_schema, p_staging_table,
        v_constraint_name
    );

    -- Detach old partition if exists
    v_partition_name := p_target_table || '_' || replace(p_partition_start, '-', '_');

    BEGIN
        EXECUTE format($sql$
            ALTER TABLE %I.%I 
            DETACH PARTITION %I.%I
        $sql$,
            p_target_schema, p_target_table,
            p_target_schema, v_partition_name
        );
        RAISE NOTICE 'Detached existing partition %', v_partition_name;
    EXCEPTION WHEN undefined_table THEN
        RAISE NOTICE 'No existing partition to detach';
    END;

    -- Rename staging table to partition name
    EXECUTE format($sql$
        ALTER TABLE %I.%I RENAME TO %I
    $sql$,
        p_staging_schema, p_staging_table,
        v_partition_name
    );

    -- Attach as partition
    EXECUTE format($sql$
        ALTER TABLE %I.%I 
        ATTACH PARTITION %I.%I 
        FOR VALUES FROM (%L) TO (%L)
    $sql$,
        p_target_schema, p_target_table,
        p_staging_schema, v_partition_name,
        p_partition_start, p_partition_end
    );

    RAISE NOTICE 'Successfully attached partition % to %', 
        v_partition_name, p_target_table;
END;
$$;

-- Usage for range partitioned table
CALL attach_restored_partition(
    'public', 'transactions_staging',
    'public', 'transactions',
    'transaction_date',
    '2024-01-01', '2024-02-01'
);

5.5 Cleanup Script

-- Clean up after successful migration

CREATE OR REPLACE PROCEDURE cleanup_after_migration(
    p_staging_schema TEXT,
    p_staging_table TEXT,
    p_verify_target_schema TEXT DEFAULT NULL,
    p_verify_target_table TEXT DEFAULT NULL,
    p_verify_count BOOLEAN DEFAULT TRUE
)
LANGUAGE plpgsql
AS $$
DECLARE
    v_staging_count BIGINT;
    v_target_count BIGINT;
BEGIN
    IF p_verify_count AND p_verify_target_schema IS NOT NULL THEN
        EXECUTE format(
            'SELECT COUNT(*) FROM %I.%I',
            p_staging_schema, p_staging_table
        ) INTO v_staging_count;

        EXECUTE format(
            'SELECT COUNT(*) FROM %I.%I',
            p_verify_target_schema, p_verify_target_table
        ) INTO v_target_count;

        IF v_target_count < v_staging_count THEN
            RAISE WARNING 'Target count (%) is less than staging count (%). Migration may be incomplete.',
                v_target_count, v_staging_count;
            RETURN;
        END IF;
    END IF;

    EXECUTE format(
        'DROP TABLE IF EXISTS %I.%I',
        p_staging_schema, p_staging_table
    );

    RAISE NOTICE 'Dropped staging table %.%', p_staging_schema, p_staging_table;
END;
$$;

-- Usage
CALL cleanup_after_migration(
    'public', 'transactions_staging',
    'public', 'transactions',
    TRUE
);

6. Spring Boot Query API

This API allows querying archived data directly from S3 without restoring to the database.

6.1 Project Structure

src/main/java/com/example/archivequery/
    ArchiveQueryApplication.java
    config/
        AwsConfig.java
        ParquetConfig.java
    controller/
        ArchiveQueryController.java
    service/
        ParquetQueryService.java
        PredicateParser.java
    model/
        QueryRequest.java
        QueryResponse.java
        ArchiveMetadata.java
    predicate/
        Predicate.java
        ComparisonPredicate.java
        LogicalPredicate.java
        PredicateEvaluator.java

6.2 Application Configuration

// ArchiveQueryApplication.java
package com.example.archivequery;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ArchiveQueryApplication {
    public static void main(String[] args) {
        SpringApplication.run(ArchiveQueryApplication.class, args);
    }
}

6.3 AWS Configuration

// config/AwsConfig.java
package com.example.archivequery.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;

@Configuration
public class AwsConfig {

    @Value("${aws.region:eu-west-1}")
    private String region;

    @Bean
    public S3Client s3Client() {
        return S3Client.builder()
            .region(Region.of(region))
            .credentialsProvider(DefaultCredentialsProvider.create())
            .build();
    }
}

6.4 Parquet Configuration

// config/ParquetConfig.java
package com.example.archivequery.config;

import org.apache.hadoop.conf.Configuration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;

@org.springframework.context.annotation.Configuration
public class ParquetConfig {

    @Value("${aws.region:eu-west-1}")
    private String region;

    @Bean
    public Configuration hadoopConfiguration() {
        Configuration conf = new Configuration();
        conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
        conf.set("fs.s3a.aws.credentials.provider", 
            "com.amazonaws.auth.DefaultAWSCredentialsProviderChain");
        conf.set("fs.s3a.endpoint.region", region);
        return conf;
    }
}

6.5 Model Classes

// model/QueryRequest.java
package com.example.archivequery.model;

import java.util.List;
import java.util.Map;

public record QueryRequest(
    String archivePath,
    List<String> columns,
    Map<String, Object> filters,
    String filterExpression,
    Integer limit,
    Integer offset
) {
    public QueryRequest {
        if (limit == null) limit = 1000;
        if (offset == null) offset = 0;
    }
}
// model/QueryResponse.java
package com.example.archivequery.model;

import java.util.List;
import java.util.Map;

public record QueryResponse(
    List<Map<String, Object>> data,
    long totalMatched,
    long totalScanned,
    long executionTimeMs,
    Map<String, String> schema,
    String archivePath
) {}
// model/ArchiveMetadata.java
package com.example.archivequery.model;

import java.time.Instant;
import java.util.List;
import java.util.Map;

public record ArchiveMetadata(
    String tableUuid,
    String location,
    List<ColumnDefinition> columns,
    Map<String, String> properties,
    Instant archivedAt
) {
    public record ColumnDefinition(
        int id,
        String name,
        String type,
        boolean required
    ) {}
}

6.6 Predicate Classes

// predicate/Predicate.java
package com.example.archivequery.predicate;

import java.util.Map;

public sealed interface Predicate 
    permits ComparisonPredicate, LogicalPredicate {

    boolean evaluate(Map<String, Object> record);
}
// predicate/ComparisonPredicate.java
package com.example.archivequery.predicate;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.Objects;

public record ComparisonPredicate(
    String column,
    Operator operator,
    Object value
) implements Predicate {

    public enum Operator {
        EQ, NE, GT, GE, LT, LE, LIKE, IN, IS_NULL, IS_NOT_NULL
    }

    @Override
    public boolean evaluate(Map<String, Object> record) {
        Object recordValue = record.get(column);

        return switch (operator) {
            case IS_NULL -> recordValue == null;
            case IS_NOT_NULL -> recordValue != null;
            case EQ -> Objects.equals(recordValue, convertValue(recordValue));
            case NE -> !Objects.equals(recordValue, convertValue(recordValue));
            case GT -> compare(recordValue, convertValue(recordValue)) > 0;
            case GE -> compare(recordValue, convertValue(recordValue)) >= 0;
            case LT -> compare(recordValue, convertValue(recordValue)) < 0;
            case LE -> compare(recordValue, convertValue(recordValue)) <= 0;
            case LIKE -> matchesLike(recordValue);
            case IN -> matchesIn(recordValue);
        };
    }

    private Object convertValue(Object recordValue) {
        if (recordValue == null || value == null) return value;

        if (recordValue instanceof LocalDate && value instanceof String s) {
            return LocalDate.parse(s);
        }
        if (recordValue instanceof LocalDateTime && value instanceof String s) {
            return LocalDateTime.parse(s);
        }
        if (recordValue instanceof BigDecimal && value instanceof Number n) {
            return new BigDecimal(n.toString());
        }
        if (recordValue instanceof Long && value instanceof Number n) {
            return n.longValue();
        }
        if (recordValue instanceof Integer && value instanceof Number n) {
            return n.intValue();
        }
        if (recordValue instanceof Double && value instanceof Number n) {
            return n.doubleValue();
        }

        return value;
    }

    @SuppressWarnings({"unchecked", "rawtypes"})
    private int compare(Object a, Object b) {
        if (a == null || b == null) return 0;
        if (a instanceof Comparable ca && b instanceof Comparable cb) {
            return ca.compareTo(cb);
        }
        return 0;
    }

    private boolean matchesLike(Object recordValue) {
        if (recordValue == null || value == null) return false;
        String pattern = value.toString()
            .replace("%", ".*")
            .replace("_", ".");
        return recordValue.toString().matches(pattern);
    }

    private boolean matchesIn(Object recordValue) {
        if (recordValue == null || value == null) return false;
        if (value instanceof Iterable<?> iterable) {
            for (Object item : iterable) {
                if (Objects.equals(recordValue, item)) return true;
            }
        }
        return false;
    }
}
// predicate/LogicalPredicate.java
package com.example.archivequery.predicate;

import java.util.List;
import java.util.Map;

public record LogicalPredicate(
    Operator operator,
    List<Predicate> predicates
) implements Predicate {

    public enum Operator {
        AND, OR, NOT
    }

    @Override
    public boolean evaluate(Map<String, Object> record) {
        return switch (operator) {
            case AND -> predicates.stream().allMatch(p -> p.evaluate(record));
            case OR -> predicates.stream().anyMatch(p -> p.evaluate(record));
            case NOT -> !predicates.getFirst().evaluate(record);
        };
    }
}
// predicate/PredicateEvaluator.java
package com.example.archivequery.predicate;

import java.util.Map;

public class PredicateEvaluator {

    private final Predicate predicate;

    public PredicateEvaluator(Predicate predicate) {
        this.predicate = predicate;
    }

    public boolean matches(Map<String, Object> record) {
        if (predicate == null) return true;
        return predicate.evaluate(record);
    }
}

6.7 Predicate Parser

// service/PredicateParser.java
package com.example.archivequery.service;

import com.example.archivequery.predicate.*;
import org.springframework.stereotype.Service;

import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@Service
public class PredicateParser {

    private static final Pattern COMPARISON_PATTERN = Pattern.compile(
        "(\\w+)\\s*(=|!=|<>|>=|<=|>|<|LIKE|IN|IS NULL|IS NOT NULL)\\s*(.*)?"
    );

    public Predicate parse(String expression) {
        if (expression == null || expression.isBlank()) {
            return null;
        }
        return parseExpression(expression.trim());
    }

    public Predicate fromFilters(Map<String, Object> filters) {
        if (filters == null || filters.isEmpty()) {
            return null;
        }

        List<Predicate> predicates = new ArrayList<>();

        for (Map.Entry<String, Object> entry : filters.entrySet()) {
            String key = entry.getKey();
            Object value = entry.getValue();

            if (key.endsWith("_gt")) {
                predicates.add(new ComparisonPredicate(
                    key.substring(0, key.length() - 3),
                    ComparisonPredicate.Operator.GT,
                    value
                ));
            } else if (key.endsWith("_gte")) {
                predicates.add(new ComparisonPredicate(
                    key.substring(0, key.length() - 4),
                    ComparisonPredicate.Operator.GE,
                    value
                ));
            } else if (key.endsWith("_lt")) {
                predicates.add(new ComparisonPredicate(
                    key.substring(0, key.length() - 3),
                    ComparisonPredicate.Operator.LT,
                    value
                ));
            } else if (key.endsWith("_lte")) {
                predicates.add(new ComparisonPredicate(
                    key.substring(0, key.length() - 4),
                    ComparisonPredicate.Operator.LE,
                    value
                ));
            } else if (key.endsWith("_ne")) {
                predicates.add(new ComparisonPredicate(
                    key.substring(0, key.length() - 3),
                    ComparisonPredicate.Operator.NE,
                    value
                ));
            } else if (key.endsWith("_like")) {
                predicates.add(new ComparisonPredicate(
                    key.substring(0, key.length() - 5),
                    ComparisonPredicate.Operator.LIKE,
                    value
                ));
            } else if (key.endsWith("_in") && value instanceof List<?> list) {
                predicates.add(new ComparisonPredicate(
                    key.substring(0, key.length() - 3),
                    ComparisonPredicate.Operator.IN,
                    list
                ));
            } else {
                predicates.add(new ComparisonPredicate(
                    key,
                    ComparisonPredicate.Operator.EQ,
                    value
                ));
            }
        }

        if (predicates.size() == 1) {
            return predicates.getFirst();
        }

        return new LogicalPredicate(LogicalPredicate.Operator.AND, predicates);
    }

    private Predicate parseExpression(String expression) {
        String upper = expression.toUpperCase();

        int andIndex = findLogicalOperator(upper, " AND ");
        if (andIndex > 0) {
            String left = expression.substring(0, andIndex);
            String right = expression.substring(andIndex + 5);
            return new LogicalPredicate(
                LogicalPredicate.Operator.AND,
                List.of(parseExpression(left), parseExpression(right))
            );
        }

        int orIndex = findLogicalOperator(upper, " OR ");
        if (orIndex > 0) {
            String left = expression.substring(0, orIndex);
            String right = expression.substring(orIndex + 4);
            return new LogicalPredicate(
                LogicalPredicate.Operator.OR,
                List.of(parseExpression(left), parseExpression(right))
            );
        }

        if (upper.startsWith("NOT ")) {
            return new LogicalPredicate(
                LogicalPredicate.Operator.NOT,
                List.of(parseExpression(expression.substring(4)))
            );
        }

        if (expression.startsWith("(") && expression.endsWith(")")) {
            return parseExpression(expression.substring(1, expression.length() - 1));
        }

        return parseComparison(expression);
    }

    private int findLogicalOperator(String expression, String operator) {
        int depth = 0;
        int index = 0;

        while (index < expression.length()) {
            char c = expression.charAt(index);
            if (c == '(') depth++;
            else if (c == ')') depth--;
            else if (depth == 0 && expression.substring(index).startsWith(operator)) {
                return index;
            }
            index++;
        }
        return -1;
    }

    private Predicate parseComparison(String expression) {
        Matcher matcher = COMPARISON_PATTERN.matcher(expression.trim());

        if (!matcher.matches()) {
            throw new IllegalArgumentException("Invalid expression: " + expression);
        }

        String column = matcher.group(1);
        String operatorStr = matcher.group(2).toUpperCase();
        String valueStr = matcher.group(3);

        ComparisonPredicate.Operator operator = switch (operatorStr) {
            case "=" -> ComparisonPredicate.Operator.EQ;
            case "!=", "<>" -> ComparisonPredicate.Operator.NE;
            case ">" -> ComparisonPredicate.Operator.GT;
            case ">=" -> ComparisonPredicate.Operator.GE;
            case "<" -> ComparisonPredicate.Operator.LT;
            case "<=" -> ComparisonPredicate.Operator.LE;
            case "LIKE" -> ComparisonPredicate.Operator.LIKE;
            case "IN" -> ComparisonPredicate.Operator.IN;
            case "IS NULL" -> ComparisonPredicate.Operator.IS_NULL;
            case "IS NOT NULL" -> ComparisonPredicate.Operator.IS_NOT_NULL;
            default -> throw new IllegalArgumentException("Unknown operator: " + operatorStr);
        };

        Object value = parseValue(valueStr, operator);

        return new ComparisonPredicate(column, operator, value);
    }

    private Object parseValue(String valueStr, ComparisonPredicate.Operator operator) {
        if (operator == ComparisonPredicate.Operator.IS_NULL || 
            operator == ComparisonPredicate.Operator.IS_NOT_NULL) {
            return null;
        }

        if (valueStr == null || valueStr.isBlank()) {
            return null;
        }

        valueStr = valueStr.trim();

        if (valueStr.startsWith("'") && valueStr.endsWith("'")) {
            return valueStr.substring(1, valueStr.length() - 1);
        }

        if (operator == ComparisonPredicate.Operator.IN) {
            if (valueStr.startsWith("(") && valueStr.endsWith(")")) {
                valueStr = valueStr.substring(1, valueStr.length() - 1);
            }
            return Arrays.stream(valueStr.split(","))
                .map(String::trim)
                .map(s -> s.startsWith("'") ? s.substring(1, s.length() - 1) : parseNumber(s))
                .toList();
        }

        return parseNumber(valueStr);
    }

    private Object parseNumber(String s) {
        try {
            if (s.contains(".")) {
                return Double.parseDouble(s);
            }
            return Long.parseLong(s);
        } catch (NumberFormatException e) {
            return s;
        }
    }
}

6.8 Parquet Query Service

// service/ParquetQueryService.java
package com.example.archivequery.service;

import com.example.archivequery.model.*;
import com.example.archivequery.predicate.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.*;

import java.io.IOException;
import java.net.URI;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;

@Service
public class ParquetQueryService {

    private static final Logger log = LoggerFactory.getLogger(ParquetQueryService.class);

    private final S3Client s3Client;
    private final Configuration hadoopConfig;
    private final PredicateParser predicateParser;
    private final ObjectMapper objectMapper;
    private final ExecutorService executorService;

    public ParquetQueryService(
        S3Client s3Client,
        Configuration hadoopConfig,
        PredicateParser predicateParser
    ) {
        this.s3Client = s3Client;
        this.hadoopConfig = hadoopConfig;
        this.predicateParser = predicateParser;
        this.objectMapper = new ObjectMapper();
        this.executorService = Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors()
        );
    }

    public QueryResponse query(QueryRequest request) {
        long startTime = System.currentTimeMillis();

        Predicate predicate = buildPredicate(request);
        PredicateEvaluator evaluator = new PredicateEvaluator(predicate);

        List<String> dataFiles = listDataFiles(request.archivePath());

        List<Map<String, Object>> allResults = Collections.synchronizedList(new ArrayList<>());
        long[] counters = new long[2];

        List<Future<?>> futures = new ArrayList<>();

        for (String dataFile : dataFiles) {
            futures.add(executorService.submit(() -> {
                try {
                    QueryFileResult result = queryFile(
                        dataFile, 
                        request.columns(), 
                        evaluator
                    );
                    synchronized (counters) {
                        counters[0] += result.matched();
                        counters[1] += result.scanned();
                    }
                    allResults.addAll(result.records());
                } catch (IOException e) {
                    log.error("Error reading file: " + dataFile, e);
                }
            }));
        }

        for (Future<?> future : futures) {
            try {
                future.get();
            } catch (InterruptedException | ExecutionException e) {
                log.error("Error waiting for query completion", e);
            }
        }

        List<Map<String, Object>> paginatedResults = allResults.stream()
            .skip(request.offset())
            .limit(request.limit())
            .collect(Collectors.toList());

        Map<String, String> schema = extractSchema(request.archivePath());

        long executionTime = System.currentTimeMillis() - startTime;

        return new QueryResponse(
            paginatedResults,
            counters[0],
            counters[1],
            executionTime,
            schema,
            request.archivePath()
        );
    }

    public ArchiveMetadata getMetadata(String archivePath) {
        try {
            URI uri = URI.create(archivePath);
            String bucket = uri.getHost();
            String prefix = uri.getPath().substring(1);

            GetObjectRequest getRequest = GetObjectRequest.builder()
                .bucket(bucket)
                .key(prefix + "/metadata/v1.metadata.json")
                .build();

            byte[] content = s3Client.getObjectAsBytes(getRequest).asByteArray();
            Map<String, Object> metadata = objectMapper.readValue(content, Map.class);

            List<Map<String, Object>> schemas = (List<Map<String, Object>>) metadata.get("schemas");
            Map<String, Object> currentSchema = schemas.getFirst();
            List<Map<String, Object>> fields = (List<Map<String, Object>>) currentSchema.get("fields");

            List<ArchiveMetadata.ColumnDefinition> columns = fields.stream()
                .map(f -> new ArchiveMetadata.ColumnDefinition(
                    ((Number) f.get("id")).intValue(),
                    (String) f.get("name"),
                    (String) f.get("type"),
                    (Boolean) f.get("required")
                ))
                .toList();

            Map<String, String> properties = (Map<String, String>) metadata.get("properties");

            String archivedAtStr = properties.get("archive.timestamp");
            Instant archivedAt = archivedAtStr != null ? 
                Instant.parse(archivedAtStr) : Instant.now();

            return new ArchiveMetadata(
                (String) metadata.get("table_uuid"),
                (String) metadata.get("location"),
                columns,
                properties,
                archivedAt
            );
        } catch (Exception e) {
            throw new RuntimeException("Failed to read archive metadata", e);
        }
    }

    private Predicate buildPredicate(QueryRequest request) {
        Predicate filterPredicate = predicateParser.fromFilters(request.filters());
        Predicate expressionPredicate = predicateParser.parse(request.filterExpression());

        if (filterPredicate == null) return expressionPredicate;
        if (expressionPredicate == null) return filterPredicate;

        return new LogicalPredicate(
            LogicalPredicate.Operator.AND,
            List.of(filterPredicate, expressionPredicate)
        );
    }

    private List<String> listDataFiles(String archivePath) {
        URI uri = URI.create(archivePath);
        String bucket = uri.getHost();
        String prefix = uri.getPath().substring(1) + "/data/";

        ListObjectsV2Request listRequest = ListObjectsV2Request.builder()
            .bucket(bucket)
            .prefix(prefix)
            .build();

        List<String> files = new ArrayList<>();
        ListObjectsV2Response response;

        do {
            response = s3Client.listObjectsV2(listRequest);

            for (S3Object obj : response.contents()) {
                if (obj.key().endsWith(".parquet")) {
                    files.add("s3a://" + bucket + "/" + obj.key());
                }
            }

            listRequest = listRequest.toBuilder()
                .continuationToken(response.nextContinuationToken())
                .build();

        } while (response.isTruncated());

        return files;
    }

    private record QueryFileResult(
        List<Map<String, Object>> records,
        long matched,
        long scanned
    ) {}

    private QueryFileResult queryFile(
        String filePath,
        List<String> columns,
        PredicateEvaluator evaluator
    ) throws IOException {
        List<Map<String, Object>> results = new ArrayList<>();
        long matched = 0;
        long scanned = 0;

        Path path = new Path(filePath);
        HadoopInputFile inputFile = HadoopInputFile.fromPath(path, hadoopConfig);

        try (ParquetReader<GenericRecord> reader = AvroParquetReader
                .<GenericRecord>builder(inputFile)
                .withConf(hadoopConfig)
                .build()) {

            GenericRecord record;
            while ((record = reader.read()) != null) {
                scanned++;
                Map<String, Object> recordMap = recordToMap(record);

                if (evaluator.matches(recordMap)) {
                    matched++;

                    if (columns != null && !columns.isEmpty()) {
                        Map<String, Object> projected = new LinkedHashMap<>();
                        for (String col : columns) {
                            if (recordMap.containsKey(col)) {
                                projected.put(col, recordMap.get(col));
                            }
                        }
                        results.add(projected);
                    } else {
                        results.add(recordMap);
                    }
                }
            }
        }

        return new QueryFileResult(results, matched, scanned);
    }

    private Map<String, Object> recordToMap(GenericRecord record) {
        Map<String, Object> map = new LinkedHashMap<>();

        for (org.apache.avro.Schema.Field field : record.getSchema().getFields()) {
            String name = field.name();
            Object value = record.get(name);
            map.put(name, convertValue(value));
        }

        return map;
    }

    private Object convertValue(Object value) {
        if (value == null) return null;

        if (value instanceof org.apache.avro.util.Utf8) {
            return value.toString();
        }

        if (value instanceof org.apache.avro.generic.GenericRecord nested) {
            return recordToMap(nested);
        }

        if (value instanceof java.nio.ByteBuffer buffer) {
            byte[] bytes = new byte[buffer.remaining()];
            buffer.get(bytes);
            return Base64.getEncoder().encodeToString(bytes);
        }

        if (value instanceof Integer i) {
            return LocalDate.ofEpochDay(i);
        }

        if (value instanceof Long l && l > 1_000_000_000_000L) {
            return LocalDateTime.ofInstant(
                Instant.ofEpochMilli(l / 1000), 
                ZoneOffset.UTC
            );
        }

        return value;
    }

    private Map<String, String> extractSchema(String archivePath) {
        try {
            ArchiveMetadata metadata = getMetadata(archivePath);
            return metadata.columns().stream()
                .collect(Collectors.toMap(
                    ArchiveMetadata.ColumnDefinition::name,
                    ArchiveMetadata.ColumnDefinition::type,
                    (a, b) -> a,
                    LinkedHashMap::new
                ));
        } catch (Exception e) {
            log.warn("Failed to extract schema", e);
            return Collections.emptyMap();
        }
    }
}

6.9 REST Controller

// controller/ArchiveQueryController.java
package com.example.archivequery.controller;

import com.example.archivequery.model.*;
import com.example.archivequery.service.ParquetQueryService;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.List;
import java.util.Map;

@RestController
@RequestMapping("/api/v1/archives")
public class ArchiveQueryController {

    private final ParquetQueryService queryService;

    public ArchiveQueryController(ParquetQueryService queryService) {
        this.queryService = queryService;
    }

    @PostMapping("/query")
    public ResponseEntity<QueryResponse> query(@RequestBody QueryRequest request) {
        QueryResponse response = queryService.query(request);
        return ResponseEntity.ok(response);
    }

    @GetMapping("/query")
    public ResponseEntity<QueryResponse> queryGet(
        @RequestParam String archivePath,
        @RequestParam(required = false) List<String> columns,
        @RequestParam(required = false) String filter,
        @RequestParam(defaultValue = "1000") Integer limit,
        @RequestParam(defaultValue = "0") Integer offset
    ) {
        QueryRequest request = new QueryRequest(
            archivePath,
            columns,
            null,
            filter,
            limit,
            offset
        );

        QueryResponse response = queryService.query(request);
        return ResponseEntity.ok(response);
    }

    @GetMapping("/metadata")
    public ResponseEntity<ArchiveMetadata> getMetadata(
        @RequestParam String archivePath
    ) {
        ArchiveMetadata metadata = queryService.getMetadata(archivePath);
        return ResponseEntity.ok(metadata);
    }

    @GetMapping("/schema/{schema}/{table}/{partitionValue}")
    public ResponseEntity<ArchiveMetadata> getMetadataByTable(
        @PathVariable String schema,
        @PathVariable String table,
        @PathVariable String partitionValue,
        @RequestParam String bucket,
        @RequestParam(defaultValue = "aurora-archives") String prefix
    ) {
        String archivePath = String.format(
            "s3://%s/%s/%s/%s/%s",
            bucket, prefix, schema, table, partitionValue
        );

        ArchiveMetadata metadata = queryService.getMetadata(archivePath);
        return ResponseEntity.ok(metadata);
    }

    @PostMapping("/query/{schema}/{table}/{partitionValue}")
    public ResponseEntity<QueryResponse> queryByTable(
        @PathVariable String schema,
        @PathVariable String table,
        @PathVariable String partitionValue,
        @RequestParam String bucket,
        @RequestParam(defaultValue = "aurora-archives") String prefix,
        @RequestBody Map<String, Object> requestBody
    ) {
        String archivePath = String.format(
            "s3://%s/%s/%s/%s/%s",
            bucket, prefix, schema, table, partitionValue
        );

        List<String> columns = requestBody.containsKey("columns") ?
            (List<String>) requestBody.get("columns") : null;

        Map<String, Object> filters = requestBody.containsKey("filters") ?
            (Map<String, Object>) requestBody.get("filters") : null;

        String filterExpression = requestBody.containsKey("filterExpression") ?
            (String) requestBody.get("filterExpression") : null;

        Integer limit = requestBody.containsKey("limit") ?
            ((Number) requestBody.get("limit")).intValue() : 1000;

        Integer offset = requestBody.containsKey("offset") ?
            ((Number) requestBody.get("offset")).intValue() : 0;

        QueryRequest request = new QueryRequest(
            archivePath,
            columns,
            filters,
            filterExpression,
            limit,
            offset
        );

        QueryResponse response = queryService.query(request);
        return ResponseEntity.ok(response);
    }
}

6.10 Application Properties

# application.yml
server:
  port: 8080

aws:
  region: ${AWS_REGION:eu-west-1}

spring:
  application:
    name: archive-query-service

logging:
  level:
    com.example.archivequery: DEBUG
    org.apache.parquet: WARN
    org.apache.hadoop: WARN

7. Usage Examples

7.1 Archive a Partition

# Set environment variables
export AURORA_HOST=your-cluster.cluster-xxxx.eu-west-1.rds.amazonaws.com
export AURORA_DATABASE=production
export AURORA_USER=admin
export AURORA_PASSWORD=your-password
export ARCHIVE_S3_BUCKET=my-archive-bucket
export ARCHIVE_S3_PREFIX=aurora-archives
export AWS_REGION=eu-west-1

# Archive January 2024 transactions
python archive_partition.py \
    --table transactions \
    --partition-column transaction_month \
    --partition-value 2024-01 \
    --schema public \
    --batch-size 100000

7.2 Query Archived Data via API

# Get archive metadata
curl -X GET "http://localhost:8080/api/v1/archives/metadata?archivePath=s3://my-archive-bucket/aurora-archives/public/transactions/transaction_month=2024-01"

# Query with filters (POST)
curl -X POST "http://localhost:8080/api/v1/archives/query" \
    -H "Content-Type: application/json" \
    -d '{
        "archivePath": "s3://my-archive-bucket/aurora-archives/public/transactions/transaction_month=2024-01",
        "columns": ["transaction_id", "customer_id", "amount", "transaction_date"],
        "filters": {
            "amount_gte": 1000,
            "status": "completed"
        },
        "limit": 100,
        "offset": 0
    }'

# Query with expression filter (GET)
curl -X GET "http://localhost:8080/api/v1/archives/query" \
    --data-urlencode "archivePath=s3://my-archive-bucket/aurora-archives/public/transactions/transaction_month=2024-01" \
    --data-urlencode "filter=amount >= 1000 AND status = 'completed'" \
    --data-urlencode "columns=transaction_id,customer_id,amount" \
    --data-urlencode "limit=50"

7.3 Restore Archived Data

# Restore to staging table
python restore_partition.py \
    --source-path s3://my-archive-bucket/aurora-archives/public/transactions/transaction_month=2024-01 \
    --target-table transactions_staging \
    --target-schema public \
    --batch-size 10000

7.4 Migrate to Main Table

-- Validate before migration
SELECT * FROM generate_migration_report(
    'public', 'transactions_staging',
    'public', 'transactions',
    'transaction_month', '2024-01'
);

-- Migrate data
CALL migrate_partition_data(
    'public', 'transactions_staging',
    'public', 'transactions',
    'transaction_month', '2024-01',
    TRUE,    -- delete existing data in partition
    50000    -- batch size
);

-- Clean up staging table
CALL cleanup_after_migration(
    'public', 'transactions_staging',
    'public', 'transactions',
    TRUE     -- verify counts
);

8. Operational Considerations

8.1 Cost Analysis

The cost savings from this approach are significant:

Storage TypeMonthly Cost per TB
Aurora PostgreSQL$230
S3 Standard$23
S3 Intelligent Tiering$23 (hot) to $4 (archive)
S3 Glacier Instant Retrieval$4

For a 10TB historical dataset:

  • Aurora: $2,300/month
  • S3 with Parquet (7:1 compression): ~$33/month
  • Savings: ~98.5%

8.2 Query Performance

The Spring Boot API performance depends on:

  1. Data file size: Smaller files (100MB to 250MB) enable better parallelism
  2. Predicate selectivity: Higher selectivity means fewer records to deserialise
  3. Column projection: Requesting fewer columns reduces I/O
  4. S3 latency: Use same region as the bucket

Expected performance for a 1TB partition (compressed to ~150GB Parquet):

Query TypeTypical Latency
Point lookup (indexed column)500ms to 2s
Range scan (10% selectivity)5s to 15s
Full scan with aggregation30s to 60s

8.3 Monitoring and Alerting

Implement these CloudWatch metrics for production use:

@Component
public class QueryMetrics {

    private final MeterRegistry meterRegistry;

    public void recordQuery(QueryResponse response) {
        meterRegistry.counter("archive.query.count").increment();

        meterRegistry.timer("archive.query.duration")
            .record(response.executionTimeMs(), TimeUnit.MILLISECONDS);

        meterRegistry.gauge("archive.query.records_scanned", response.totalScanned());
        meterRegistry.gauge("archive.query.records_matched", response.totalMatched());
    }
}

9. Conclusion

This solution provides a complete data lifecycle management approach for large Aurora PostgreSQL tables. The archive script efficiently exports partitions to the cost effective Iceberg/Parquet format on S3, while the restore script enables seamless data recovery when needed. The Spring Boot API bridges the gap by allowing direct queries against archived data, eliminating the need for restoration in many analytical scenarios.

Key benefits:

  1. Cost reduction: 90 to 98 percent storage cost savings compared to keeping data in Aurora
  2. Operational flexibility: Query archived data without restoration
  3. Schema preservation: Full schema metadata maintained for reliable restores
  4. Partition management: Clean attach/detach operations for partitioned tables
  5. Predicate pushdown: Efficient filtering reduces data transfer and processing

The Iceberg format ensures compatibility with the broader data ecosystem, allowing tools like Athena, Spark, and Trino to query the same archived data when needed for more complex analytical workloads.

0
0

Deep Dive into PostgreSQL Prepared Statements: When Plan Caching Goes Wrong leading to Memory Exhaustion

Prepared statements are one of PostgreSQL’s most powerful features for query optimization. By parsing and planning queries once, then reusing those plans for subsequent executions, they can dramatically improve performance. But this optimization comes with a hidden danger: sometimes caching the same plan for every execution can lead to catastrophic memory exhaustion and performance degradation.

In this deep dive, we’ll explore how prepared statement plan caching works, when it fails spectacularly, and how PostgreSQL has evolved to address these challenges.

1. Understanding Prepared Statements and Plan Caching

When you execute a prepared statement in PostgreSQL, the database goes through several phases:

  1. Parsing: Converting the SQL text into a parse tree
  2. Planning: Creating an execution plan based on statistics and parameters
  3. Execution: Running the plan against actual data

The promise of prepared statements is simple: do steps 1 and 2 once, then reuse the results for repeated executions with different parameter values.

-- Prepare the statement
PREPARE get_orders AS
SELECT * FROM orders WHERE customer_id = $1;

-- Execute multiple times with different parameters
EXECUTE get_orders(123);
EXECUTE get_orders(456);
EXECUTE get_orders(789);

PostgreSQL uses a clever heuristic to decide when to cache plans. For the first five executions, it creates a custom plan specific to the parameter values. Starting with the sixth execution, it evaluates whether a generic plan (one that works for any parameter value) would be more efficient. If the average cost of the custom plans is close enough to the generic plan’s cost, PostgreSQL switches to reusing the generic plan.

2. The Dark Side: Memory Exhaustion from Plan Caching

Here’s where things can go catastrophically wrong. Consider a partitioned table:

CREATE TABLE events (
    id BIGSERIAL,
    event_date DATE,
    user_id INTEGER,
    event_type TEXT,
    data JSONB
) PARTITION BY RANGE (event_date);

-- Create 365 partitions, one per day
CREATE TABLE events_2024_01_01 PARTITION OF events
    FOR VALUES FROM ('2024-01-01') TO ('2024-01-02');
CREATE TABLE events_2024_01_02 PARTITION OF events
    FOR VALUES FROM ('2024-01-02') TO ('2024-01-03');
-- ... 363 more partitions

Now consider this prepared statement:

PREPARE get_events AS
SELECT * FROM events WHERE event_date = $1;

The Problem: Generic Plans Can’t Prune Partitions

When PostgreSQL creates a generic plan for this query, it doesn’t know which specific date you’ll query at execution time. Without this knowledge, the planner cannot perform partition pruning the critical optimization that eliminates irrelevant partitions from consideration.

Here’s what happens:

  1. Custom plan (first 5 executions): PostgreSQL sees the actual date value, realizes only one partition is relevant, and creates a plan that touches only that partition. Fast and efficient.
  2. Generic plan (6th execution onward): PostgreSQL creates a plan that must be valid for ANY date value. Since it can’t know which partition you’ll need, it includes ALL 365 partitions in the plan.

The result: Instead of reading from 1 partition, PostgreSQL’s generic plan prepares to read from all 365 partitions. This leads to:

  • Memory exhaustion: The query plan itself becomes enormous, containing nodes for every partition
  • Planning overhead: Even though the plan is cached, initializing it for execution requires allocating memory for all partition nodes
  • Execution inefficiency: The executor must check every partition, even though 364 of them will return zero rows

In extreme cases with thousands of partitions, this can consume gigabytes of memory per connection and bring your database to its knees.

3. Partition Pruning: The Critical Optimization and How It Works

Partition pruning is the process of eliminating partitions that cannot possibly contain relevant data based on query constraints. Understanding partition pruning in depth is essential for working with partitioned tables effectively.

3.1 What Is Partition Pruning?

At its core, partition pruning is PostgreSQL’s mechanism for avoiding unnecessary work. When you query a partitioned table, the database analyzes your WHERE clause and determines which partitions could possibly contain matching rows. All other partitions are excluded from the query execution entirely.

Consider a table partitioned by date range:

CREATE TABLE sales (
    sale_id BIGINT,
    sale_date DATE,
    amount NUMERIC,
    product_id INTEGER
) PARTITION BY RANGE (sale_date);

CREATE TABLE sales_2023_q1 PARTITION OF sales
    FOR VALUES FROM ('2023-01-01') TO ('2023-04-01');
    
CREATE TABLE sales_2023_q2 PARTITION OF sales
    FOR VALUES FROM ('2023-04-01') TO ('2023-07-01');
    
CREATE TABLE sales_2023_q3 PARTITION OF sales
    FOR VALUES FROM ('2023-07-01') TO ('2023-10-01');
    
CREATE TABLE sales_2023_q4 PARTITION OF sales
    FOR VALUES FROM ('2023-10-01') TO ('2024-01-01');

When you execute:

SELECT * FROM sales WHERE sale_date = '2023-05-15';

PostgreSQL performs partition pruning by examining the partition constraints. It determines that only sales_2023_q2 can contain rows with sale_date = ‘2023-05-15’, so it completely ignores the other three partitions. They never get opened, scanned, or loaded into memory.

3.2 The Two Stages of Partition Pruning

PostgreSQL performs partition pruning at two distinct stages in query execution, and understanding the difference is crucial for troubleshooting performance issues.

Stage 1: Plan Time Pruning (Static Pruning)

Plan time pruning happens during the query planning phase, before execution begins. This is the ideal scenario because pruned partitions never appear in the execution plan at all.

When it occurs:

  • The query contains literal values in the WHERE clause
  • The partition key columns are directly compared to constants
  • The planner can evaluate the partition constraints at planning time

Example:

EXPLAIN SELECT * FROM sales WHERE sale_date = '2023-05-15';

Output might show:

Seq Scan on sales_2023_q2 sales
  Filter: (sale_date = '2023-05-15'::date)

Notice that only one partition appears in the plan. The other three partitions were pruned away during planning, and they consume zero resources.

What makes plan time pruning possible:

The planner evaluates the WHERE clause condition against each partition’s constraint. For sales_2023_q2, the constraint is:

sale_date >= '2023-04-01' AND sale_date < '2023-07-01'

The planner performs boolean logic: “Can sale_date = ‘2023-05-15’ be true if the constraint requires sale_date >= ‘2023-04-01’ AND sale_date < ‘2023-07-01’?” Yes, it can. For the other partitions, the answer is no, so they’re eliminated.

Performance characteristics:

  • No runtime overhead for pruned partitions
  • Minimal memory usage
  • Optimal query performance
  • The execution plan is lean and specific

Stage 2: Execution Time Pruning (Dynamic Pruning)

Execution time pruning, also called runtime pruning, happens during query execution rather than planning. This occurs when the planner cannot determine which partitions to prune until the query actually runs.

When it occurs:

  • Parameters or variables are used instead of literal values
  • Subqueries provide the filter values
  • Join conditions determine which partitions are needed
  • Prepared statements with parameters

Example:

PREPARE get_sales AS 
SELECT * FROM sales WHERE sale_date = $1;

EXPLAIN (ANALYZE) EXECUTE get_sales('2023-05-15');

With execution time pruning, the plan initially includes all partitions, but the output shows:

Append (actual rows=100)
  Subplans Removed: 3
  -> Seq Scan on sales_2023_q2 sales (actual rows=100)
       Filter: (sale_date = '2023-05-15'::date)

The key indicator is “Subplans Removed: 3”, which tells you that three partitions were pruned at execution time.

How execution time pruning works:

During the initialization phase of query execution, PostgreSQL evaluates the actual parameter values and applies the same constraint checking logic as plan time pruning. However, instead of eliminating partitions from the plan, it marks them as “pruned” and skips their initialization and execution.

The critical difference:

Even though execution time pruning skips scanning the pruned partitions, the plan still contains nodes for all partitions. This means:

  • Memory is allocated for all partition nodes (though less than full initialization)
  • The plan structure is larger
  • There is a small runtime cost to check each partition
  • More complex bookkeeping is required

This is why execution time pruning, while much better than no pruning, is not quite as efficient as plan time pruning.

3.3 Partition Pruning with Different Partition Strategies

PostgreSQL supports multiple partitioning strategies, and pruning works differently for each.

Range Partitioning

Range partitioning is the most common and supports the most effective pruning:

CREATE TABLE measurements (
    measurement_time TIMESTAMPTZ,
    sensor_id INTEGER,
    value NUMERIC
) PARTITION BY RANGE (measurement_time);

Pruning logic: PostgreSQL uses range comparison. Given a filter like measurement_time >= '2024-01-01' AND measurement_time < '2024-02-01', it identifies all partitions whose ranges overlap with this query range.

Pruning effectiveness: Excellent. Range comparisons are computationally cheap and highly selective.

List Partitioning

List partitioning groups rows by discrete values:

CREATE TABLE orders (
    order_id BIGINT,
    country_code TEXT,
    amount NUMERIC
) PARTITION BY LIST (country_code);

CREATE TABLE orders_us PARTITION OF orders
    FOR VALUES IN ('US');
    
CREATE TABLE orders_uk PARTITION OF orders
    FOR VALUES IN ('UK');
    
CREATE TABLE orders_eu PARTITION OF orders
    FOR VALUES IN ('DE', 'FR', 'IT', 'ES');

Pruning logic: PostgreSQL checks if the query value matches any value in each partition’s list.

SELECT * FROM orders WHERE country_code = 'FR';

Only orders_eu is accessed because ‘FR’ appears in its value list.

Pruning effectiveness: Very good for equality comparisons. Less effective for OR conditions across many values or pattern matching.

Hash Partitioning

Hash partitioning distributes rows using a hash function:

CREATE TABLE users (
    user_id BIGINT,
    username TEXT,
    email TEXT
) PARTITION BY HASH (user_id);

CREATE TABLE users_p0 PARTITION OF users
    FOR VALUES WITH (MODULUS 4, REMAINDER 0);
    
CREATE TABLE users_p1 PARTITION OF users
    FOR VALUES WITH (MODULUS 4, REMAINDER 1);
-- ... p2, p3

Pruning logic: PostgreSQL computes the hash of the query value and determines which partition it maps to.

SELECT * FROM users WHERE user_id = 12345;

PostgreSQL calculates hash(12345) % 4 and accesses only the matching partition.

Pruning effectiveness: Excellent for equality on the partition key. Completely ineffective for range queries, pattern matching, or anything except exact equality matches.

3.4 Complex Partition Pruning Scenarios

Real world queries are often more complex than simple equality comparisons. Here’s how pruning handles various scenarios:

Multi Column Partition Keys

CREATE TABLE events (
    event_date DATE,
    region TEXT,
    data JSONB
) PARTITION BY RANGE (event_date, region);

Pruning works on the leading columns of the partition key. A query filtering only on event_date can still prune effectively. A query filtering only on region cannot prune at all because region is not the leading column.

OR Conditions

SELECT * FROM sales 
WHERE sale_date = '2023-05-15' OR sale_date = '2023-08-20';

PostgreSQL must access partitions for both dates (Q2 and Q3), so it keeps both and prunes Q1 and Q4. OR conditions reduce pruning effectiveness.

Inequality Comparisons

SELECT * FROM sales WHERE sale_date >= '2023-05-01';

PostgreSQL prunes partitions entirely before the date (Q1) but must keep all partitions from Q2 onward. Range queries reduce pruning selectivity.

Joins Between Partitioned Tables

SELECT * FROM sales s
JOIN products p ON s.product_id = p.product_id
WHERE s.sale_date = '2023-05-15';

If sales is partitioned by sale_date, that partition pruning works normally. If products is also partitioned, PostgreSQL attempts partitionwise joins where possible, enabling pruning on both sides.

Subqueries Providing Values

SELECT * FROM sales 
WHERE sale_date = (SELECT MAX(order_date) FROM orders);

This requires execution time pruning because the subquery must run before PostgreSQL knows which partition to access.

3.5 Monitoring Partition Pruning

To verify partition pruning is working, use EXPLAIN:

EXPLAIN (ANALYZE, BUFFERS) 
SELECT * FROM sales WHERE sale_date = '2023-05-15';

What to look for:

Plan time pruning succeeded:

Seq Scan on sales_2023_q2

Only one partition appears in the plan at all.

Execution time pruning succeeded:

Append
  Subplans Removed: 3
  -> Seq Scan on sales_2023_q2

All partitions appear in the plan structure, but “Subplans Removed” shows pruning happened.

No pruning occurred:

Append
  -> Seq Scan on sales_2023_q1
  -> Seq Scan on sales_2023_q2
  -> Seq Scan on sales_2023_q3
  -> Seq Scan on sales_2023_q4

All partitions were scanned. This indicates a problem.

3.6 Why Partition Pruning Fails

Understanding why pruning fails helps you fix it:

  1. Query doesn’t filter on partition key: If your WHERE clause doesn’t reference the partition column(s), PostgreSQL cannot prune.
  2. Function calls on partition key: WHERE EXTRACT(YEAR FROM sale_date) = 2023 prevents pruning because PostgreSQL can’t map the function result back to partition ranges. Use WHERE sale_date >= '2023-01-01' AND sale_date < '2024-01-01' instead.
  3. Type mismatches: If your partition key is DATE but you compare to TEXT without explicit casting, pruning may fail.
  4. Generic plans in prepared statements: As discussed in the main article, generic plans prevent plan time pruning and older PostgreSQL versions struggled with execution time pruning.
  5. OR conditions with non-partition columns: WHERE sale_date = '2023-05-15' OR customer_id = 100 prevents pruning because customer_id isn’t the partition key.
  6. Volatile functions: WHERE sale_date = CURRENT_DATE may prevent plan time pruning (but should work with execution time pruning).

3.7 Partition Pruning Performance Impact

The performance difference between pruned and unpruned queries can be staggering:

Example scenario: 1000 partitions, each with 1 million rows. Query targets one partition.

With pruning:

  • Partitions opened: 1
  • Rows scanned: 1 million
  • Memory for plan nodes: ~10KB
  • Query time: 50ms

Without pruning:

  • Partitions opened: 1000
  • Rows scanned: 1 billion (returning ~1 million)
  • Memory for plan nodes: ~10MB
  • Query time: 45 seconds

The difference is not incremental; it’s exponential as partition count grows.

4. Partition Pruning in Prepared Statements: The Core Problem

Let me illustrate the severity with a real-world scenario:

-- Table with 1000 partitions
CREATE TABLE metrics (
    timestamp TIMESTAMPTZ,
    metric_name TEXT,
    value NUMERIC
) PARTITION BY RANGE (timestamp);

-- Create 1000 daily partitions...

-- Prepared statement
PREPARE get_metrics AS
SELECT * FROM metrics 
WHERE timestamp >= $1 AND timestamp < $2;

After the 6th execution, PostgreSQL switches to a generic plan. Each subsequent execution:

  1. Allocates memory for 1000 partition nodes
  2. Initializes executor state for 1000 partitions
  3. Checks 1000 partition constraints
  4. Returns data from just 1-2 partitions

If you have 100 connections each executing this prepared statement, you’re multiplying this overhead by 100. With connection poolers reusing connections (and thus reusing prepared statements), the problem compounds.

6. The Fix: Evolution Across PostgreSQL Versions

PostgreSQL has steadily improved partition pruning for prepared statements:

PostgreSQL 11: Execution-Time Pruning Introduced

PostgreSQL 11 introduced run-time partition pruning, but it had significant limitations with prepared statements. Generic plans still included all partitions in memory, even if they could be skipped during execution.

PostgreSQL 12: Better Prepared Statement Pruning

PostgreSQL 12 made substantial improvements:

  • Generic plans gained the ability to defer partition pruning to execution time more effectively
  • The planner became smarter about when to use generic vs. custom plans for partitioned tables
  • Memory consumption for generic plans improved significantly

However, issues remained in edge cases, particularly with: • Multi level partitioning • Complex join queries involving partitioned tables • Prepared statements in stored procedures

PostgreSQL 13-14: Refined Heuristics

These versions improved the cost model for deciding between custom and generic plans:

  • Better accounting for partition pruning benefits in the cost calculation
  • More accurate statistics gathering on partitioned tables
  • Improved handling of partitionwise joins

PostgreSQL 15-16: The Real Game Changers

PostgreSQL 15 and 16 brought transformative improvements:

PostgreSQL 15:

  • Dramatically reduced memory usage for generic plans on partitioned tables
  • Improved execution-time pruning performance
  • Better handling of prepared statements with partition pruning

PostgreSQL 16:

  • Introduced incremental sorting improvements that benefit partitioned queries
  • Enhanced partition-wise aggregation
  • More aggressive execution-time pruning

The key breakthrough: PostgreSQL now builds “stub” plans that allocate minimal memory for partitions that will be pruned, rather than fully initializing all partition nodes.

Workarounds for Older Versions

If you’re stuck on older PostgreSQL versions, here are strategies to avoid the prepared statement pitfall:

1. Disable Generic Plans

Force PostgreSQL to always use custom plans:

-- Set at session level
SET plan_cache_mode = force_custom_plan;

-- Or for specific prepared statement contexts
PREPARE get_events AS
SELECT * FROM events WHERE event_date = $1;

-- Before execution
SET LOCAL plan_cache_mode = force_custom_plan;
EXECUTE get_events('2024-06-15');

This sacrifices the planning time savings but ensures proper partition pruning.

2. Use Statement Level Caching Instead

Many ORMs and database drivers offer statement level caching that doesn’t persist across multiple executions:

# psycopg2 example - named cursors create server-side cursors
# but don't persist plans
cursor = connection.cursor()
cursor.execute(
    "SELECT * FROM events WHERE event_date = %s",
    (date_value,)
)

3. Adjust plan_cache_mode Per Query

PostgreSQL 12+ provides plan_cache_mode:

-- auto (default): use PostgreSQL's heuristics
-- force_generic_plan: always use generic plan
-- force_custom_plan: always use custom plan

SET plan_cache_mode = force_custom_plan;

For partitioned tables, force_custom_plan is often the right choice.

4. Increase Custom Plan Count

The threshold of 5 custom plans before switching to generic is hardcoded, but you can work around it by using different prepared statement names or by periodically deallocating and recreating prepared statements:

DEALLOCATE get_events;
PREPARE get_events AS SELECT * FROM events WHERE event_date = $1;

5. Partition Pruning Hints

In PostgreSQL 12+, you can sometimes coerce the planner into better behavior:

-- Using an explicit constraint that helps the planner
SELECT * FROM events 
WHERE event_date = $1 
  AND event_date >= CURRENT_DATE - INTERVAL '1 year';

This additional constraint provides a hint about the parameter range.

Best Practices

  1. Monitor your query plans: Use EXPLAIN (ANALYZE, BUFFERS) to check if partition pruning is happening:
EXPLAIN (ANALYZE, BUFFERS) EXECUTE get_events('2024-06-15');

Look for “Partitions removed” in the output.

  1. Check prepared statement statistics: Query pg_prepared_statements to see generic vs. custom plan usage:
SELECT name, 
       generic_plans, 
       custom_plans 
FROM pg_prepared_statements;
  1. Upgrade PostgreSQL: If you’re dealing with large partitioned tables, the improvements in PostgreSQL 15+ are worth the upgrade effort.
  2. Design partitions appropriately: Don’t over-partition. Having 10,000 tiny partitions creates problems even with perfect pruning.
  3. Use connection pooling wisely: Prepared statements persist per connection. With connection pooling, long-lived connections accumulate many prepared statements. Configure your pooler to occasionally recycle connections.
  4. Benchmark both modes: Test your specific workload with both custom and generic plans to measure the actual impact.

Conclusion

Prepared statements are a powerful optimization, but their interaction with partitioned tables exposes a fundamental tension: caching for reuse versus specificity for efficiency. PostgreSQL’s evolution from version 11 through 16 represents a masterclass in addressing this challenge.

The key takeaway: if you’re using prepared statements with partitioned tables on PostgreSQL versions older than 15, be vigilant about plan caching behavior. Monitor memory usage, check execution plans, and don’t hesitate to force custom plans when generic plans cause problems.

For modern PostgreSQL installations (15+), the improvements are substantial enough that the traditional guidance of “be careful with prepared statements on partitioned tables” is becoming outdated. The database now handles these scenarios with far more intelligence and efficiency.

But understanding the history and mechanics remains crucial, because the next time you see mysterious memory growth in your PostgreSQL connections, you’ll know exactly where to look.

0
0

AWS: Install and configure the AWS CLI on a Macbook

You can absolutely get the following from the AWS help pages; but this is the lazy way to get everything you need for a simple single account setup.

Run the two commands below to drop the package on your Mac.

$ curl "https://awscli.amazonaws.com/AWSCLIV2.pkg" -o "AWSCLIV2.pkg"
$ sudo installer -pkg AWSCLIV2.pkg -target /

Then check the versions you have installed:

$ which aws
$ aws --version

Next you need to setup your environment. Note: This is NOT the recommended way (as it uses long term credentials).

The following example configures a default profile using sample values. Replace them with your own values as described in the following sections.

$ aws configure
AWS Access Key ID [None]: AKIAIOSFODNN7EXAMPLE
AWS Secret Access Key [None]: secretaccesskey
Default region name [None]: af-south-1
Default output format [None]: json

You can also use named profiles. The following example configures a profile named userprod using sample values. Replace them with your own values as described in the following sections.

$ aws configure --profile userprod
AWS Access Key ID [None]: AKIAIOSFODNN7EXAMPLE
AWS Secret Access Key [None]: secretaccesskey
Default region name [None]: af-south-1
Default output format [None]: json

Get your access keys

  1. Sign in to the AWS Management Console and open the IAM console at https://console.aws.amazon.com/iam/.
  2. In the navigation pane of the IAM console, select Users and then select the User name of the user that you created previously.
  3. On the user’s page, select the Security credentials page. Then, under Access keys, select Create access key.
  4. For Create access key Step 1, choose Command Line Interface (CLI).
  5. For Create access key Step 2, enter an optional tag and select Next.
  6. For Create access key Step 3, select Download .csv file to save a .csv file with your IAM user’s access key and secret access key. You need this information for later.
  7. Select Done.
0
0

AWS: Automatically Stop and Start your EC2 Services

Below is a quick (am busy) outline on how to automatically stop and start your EC2 instances.

Step 1: Tag your resources

In order to decide which instances stop and start you first need to add an auto-start-stop: Yes tag to all the instances you want to be affected by the start / stop functions. Note: You can use “Resource Groups and Tag Editor” to bulk apply these tags to the resources you want to be affected by the lambda functions you are going to create. See below (click the orange button called “Manage tags of Selected Resources”).

Step 2: Create a new role for our lambda functions

First we need to create the IAM role to run the Lambda functions. Go to IAM and click the “Create Role” button. Then select “AWS Service” from the “Trusted entity options”, and select Lambda from the “Use Cases” options. Then click “Next”, followed by “Create Policy”. To specify the permission, simply Click the JSON button on the right of the screen and enter the below policy (swapping the region and account id for your region and account id):

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "ec2:DescribeInstances",
                "ec2:StartInstances",
                "ec2:DescribeTags",
                "logs:*",
                "ec2:DescribeInstanceTypes",
                "ec2:StopInstances",
                "ec2:DescribeInstanceStatus"
            ],
            "Resource": "arn:aws:ec2:<region>:<accountID>:instance/*",
            "Condition": {
                "StringEquals": {
                    "aws:ResourceTag/auto-start-stop": "Yes"
                }
            }
        }
    ]
}

Hit next and under “Review and create”, save the above policy as ec2-lambda-start-stop by clicking the “Create Policy” button. Next, search for this newly created policy and select it as per below and hit “Next”.

You will now see the “Name, review, and create” screen. Here you simply need to hit “Create Role” after you enter the role name as ec2-lambda-start-stop-role.

Note the policy is restricted to only have access to EC2 instances that contains auto-start-stop: Yes tags (least privileges).

If you want to review your role, this is how it should look. You can see I have filled in my region and account number in the policy:

Step 3: Create Lambda Functions To Start/Stop EC2 Instances

In this section we will create two lambda functions, one to start the instances and the other to stop the instances.

Step 3a: Add the Stop EC2 instance function

  • Goto Lambda console and click on create function
  • Create a lambda function with a function name of stop-ec2-instance-lambda, python3.11 runtime, and ec2-lambda-stop-start-role (see image below).

Next add the lamdba stop function and save it as stop-ec2-instance. Note, you will need to change the value of the region_name parameter accordingly.

import json
import boto3

ec2 = boto3.resource('ec2', region_name='af-south-1')
def lambda_handler(event, context):
   instances = ec2.instances.filter(Filters=[{'Name': 'instance-state-name', 'Values': ['running']},{'Name': 'tag:auto-start-stop','Values':['Yes']}])
   for instance in instances:
       id=instance.id
       ec2.instances.filter(InstanceIds=[id]).stop()
       print("Instance ID is stopped:- "+instance.id)
   return "success"

This is how your Lambda function should look:

Step 3b: Add the Start EC2 instance function

  • Goto Lambda console and click on create function
  • Create lambda functions with start-ec2-instance, python3.11 runtime, and ec2-lambda-stop-start-role.
  • Then add the below code and save the function as start-ec2-instance-lambda.

Note, you will need to change the value of the region_name parameter accordingly.

import json
import boto3

ec2 = boto3.resource('ec2', region_name='af-south-1')
def lambda_handler(event, context):
   instances = ec2.instances.filter(Filters=[{'Name': 'instance-state-name', 'Values': ['stopped']},{'Name': 'tag:auto-start-stop','Values':['Yes']}])
   for instance in instances:
       id=instance.id
       ec2.instances.filter(InstanceIds=[id]).stop()
       print("Instance ID is stopped:- "+instance.id)
   return "success"

4. Summary

If either of the above lambda functions are triggered, they will start or stop your EC2 instances based on the instance state and the value of auto-start-stop tag. To automate this you can simply setup up cron jobs, step functions, AWS Event Bridge, Jenkins etc.

0
0

How to trigger Scaling Events using Stress-ng Command

If you are testing how your autoscaling policies respond to CPU load then a really simple way to test this is using the “stress” command. Note: this is a very crude mechanism to test and wherever possible you should try and generate synthetic application load.

#!/bin/bash

# DESCRIPTION: After updating from the repo, installs stress-ng, a tool used to create various system load for testing purposes.
yum update -y
# Install stress-ng
sudo apt install stress-ng

# CPU spike: Run a CPU spike for 5 seconds
uptime
stress-ng --cpu 4 --timeout 5s --metrics-brief
uptime

# Disk Test: Start N (2) workers continually writing, reading and removing temporary files:
stress-ng --disk 2 --timeout 5s --metrics-brief

# Memory stress test
# Populate memory. Use mmap N bytes per vm worker, the default is 256MB. 
# You can also specify the size as % of total available memory or in units of 
# Bytes, KBytes, MBytes and GBytes using the suffix b, k, m or g:
# Note: The --vm 2 will start N workers (2 workers) continuously calling 
# mmap/munmap and writing to the allocated memory. Note that this can cause 
# systems to trip the kernel OOM killer on Linux systems if not enough 
# physical memory and swap is not available
stress-ng --vm 2 --vm-bytes 1G --timeout 5s

# Combination Stress
# To run for 5 seconds with 4 cpu stressors, 2 io stressors and 1 vm 
# stressor using 1GB of virtual memory, enter:
stress-ng --cpu 4 --io 2 --vm 1 --vm-bytes 1G --timeout 5s --metrics-brief

0
0

Part 2: Increasing your Cloud consumption (the sane way)

Introduction

This article follows on from the “Cloud Migrations Crusade” blog post…

A single tenancy datacenter is a fixed scale, fixed price service on a closed network. The costs of the resources in the datacenter are divided up and shared out to the enterprise constituents on a semi-random basis. If anyone uses less resources than the forecast this generates waste which is shared back to the enterprise. If there is more demand than forecasted, it will either generate service degradation, panic or an outage! This model is clearly fragile and doesn’t respond quickly to change; it is also wasteful as it requires a level of overprovisioning based on forecast consumption (otherwise you will experience delays in projects, service degradation or have reduced resilience).

Cloud, on the other hand is a multi-tenanted on demand software service which you pay for as you use. But surely having multiple tenants running on the same fixed capacity actually increases the risks, and just because its in the cloud it doesn’t mean that you can get away without over provisioning – so who sits with the over provisioned costs? The cloud providers have to build this into their rates. So cloud providers have to deal with a balance sheet of fixed capacity shared amongst customers running on demand infrastructure. They do this with very clever forecasting, very short provisioning cycles and asking their customers for forecasts and then offering discounts for pre-commits.

Anything that moves you back towards managing resources levels / forecasting will destroy a huge portion of the value of moving to the cloud in the first instance. For example, if you have ever been to a Re:Invent you will be flawed by the rate of innovation and also how easy it is to absorb these new innovative products. But wait – you just signed a 5yr cost commit and now you learn about Aurura’s new serverless database model. You realise that you can save millions of dollars; but you have to wait for your 5yr commits to expire before you adopt or maybe start mining bitcoin with all your excess commits! This is anti-innovation and anti-customer.

Whats even worse is that pre-commits are typically signed up front on day 1- this is total madness!!! At the point where you know nothing about your brave new world, you use the old costs as a proxy to predict the new costs so that you can squeeze a lousy 5px saving at the risk of 100px of the commit size! What you will start to learn is that your cloud success is NOT based on the commercial contract that you sign with your cloud provider; its actually based on the quality of the engineering talent that your organisation is able to attract. Cloud is a IP war – its not a legal/sourcing war. Allow yourself to learn, don’t box yourself in on day 1. When you sign the pre-commit you will notice your first year utilisation projections are actually tiny and therefore the savings are small. So whats the point of signing so early on when the risk is at a maximum and the gains are at a minimum? When you sign this deal you are essentially turning the cloud into a “financial data center” – you have destroyed the cloud before you even started!

A Lesson from the field – Solving Hadoop Compute Demand Spike:

We moved 7000 cores of burst compute to AWS to solve a capacity issue on premise. That’s expensive, so lets “fix the costs”! We can go a sign a RI (reserved instance), play with spot, buy savings plans or even beg / barter for some EDP relief. But instead we plugged the service usuage into Quicksight and analysed the queries. We found one query was using 60 percent of the entire banks compute! Nobody confessed to owning the query, so we just disabled it (if you need a reason for your change management; describe the change as “disabling a financial DDOS”). We quickly found the service owner and explained that running a table scan across billions of rows to return a report with just last months data is not a good idea. We also explained that if they don’t fix this we will start billing them in 6 weeks time (a few million dollars). The team deployed a fix and now we run the banks big data stack at half the costs – just by tuning one query!!!

So the point of the above is that there is no substitute for engineering excellence. You have to understand and engineer the cloud to win, you cannot contract yourself into the cloud. The more contracts you sign the more failures you will experience. This leads me to point 2…

Step 2: Training, Training, Training

Start the biggest training campaign you possibly can – make this your crusade. Train everyone; business, finance, security, infrastructure – you name it, you train it. Don’t limit what anyone can train on, training is cheap – feast as much as you can. Look at Udemy, ACloudGuru, Youtube, WhizLabs etc etc etc. If you get this wrong then you will find your organisation fills up with expensive consultants and bespoke migration products that you don’t need ++ can easily do yourself, via opensource or with your cloud provider toolsets. In fact I would go one step further – if your not prepared to learn about the cloud, your not ready to go there.

Step 3: The OS Build

When you do start your cloud migration and begin to review your base OS images – go right back to the very beginning, remove every single product in all of these base builds. Look at what you can get out the box from your cloud provider and really push yourself hard on what do I really need vs nice to have. But the trick is that to get the real benefit from a cloud migration, you have to start by making your builds as “naked” as possible. Nothing should move into the base build without a good reason. Ownership and report lines are not a good enough reason for someones special “tool” to make it into the build. This process, if done correctly, should deliver you between 20-40px of your cloud migration savings. Do this badly and your costs, complexity and support will all head in the wrong direction.

Security HAS to be a first class citizen of your new world. In most organizations this will likely make for some awkward cultural collisions (control and ownership vs agility) and some difficult dialogs. The cloud, by definition, should be liberating – so how do you secure it without creating a “cloud bunker” that nobody can actually use? More on this later… 🙂

Step 4: Hybrid Networking

For any organisation with data centers – make no mistake, if you get this wrong its over before it starts.

0
0

AWS: Making use of S3s ETags to check if a file has been altered

I was playing with S3 the other day an I noticed that a file which I had uploaded twice, in two different locations had an identical ETag. This immediately made me think that this tag was some kind of hash. So I had a quick look AWS documentation and this ETag turns out to be marginally useful. ETag is an “Entity Tag” and its basically a MD5 hash of the file (although once the file is bigger than 5gb it appears to use another hashing algorithm).

So if you ever want to compare a local copy of a file with an AWS S3 copy of a file you just need to install MD5 (the below steps are for ubuntu linux):

# Update your ubunto
# Download the latest package lists
sudo apt update
# Perform the upgrade
sudo apt-get upgrade -y
# Now install common utils (inc MD5)
sudo apt install -y ucommon-utils
# Upgrades involving the Linux kernel, changing dependencies, adding / removing new packages etc
sudo apt-get dist-upgrade

Next to view the MD5 hash of a file simple type:

# View MD5 hash of
md5sum myfilename.myextension
2aa318899bdf388488656c46127bd814  myfilename.myextension
# The first number above will match your S3 Etag if its not been altered

Below is the screenshot of the properties that you will see in S3 with a matching MD5 hash:

0
0

Using TPC-H tools to Create Test Data for AWS Redshift and AWS EMR

If you need to test out your big data tools below is a useful set of scripts that I have used in the past for aws emr and redshift the below might be helpful:

install git
 sudo yum install make git -y
 install the tpch-kit
 git clone https://github.com/gregrahn/tpch-kit
 cd tpch-kit/dbgen
 sudo yum install gcc -y
 Compile the tpch kit
 make OS=LINUX
 Go home
 cd ~
 Now make your emr data
 mkdir emrdata
 Tell tcph to use the this dir
 export DSS_PATH=$HOME/emrdata
 cd tpch-kit/dbgen
 Now run dbgen in verbose mode, with tables (orders), 10gb data size
 ./dbgen -v -T o -s 10
 move the data to a s3 bucket
 cd $HOME/emrdata
 aws s3api create-bucket -- bucket andrewbakerbigdata --region af-south-1 --LocationConstraint=af-south-1
 aws s3 cp $HOME/emrdata s3://andrewbakerbigdata/emrdata --recursive
 cd $HOME
 mkdir redshiftdata
 Tell tcph to use the this dir
 export DSS_PATH=$HOME/redshiftdata
 Now make your redshift data
 cd tpch-kit/dbgen
 Now run dbgen in verbose mode, with tables (orders), 40gb data size
 ./dbgen -v -T o -s 40
 These are big files, so lets find out how big they are and split them
 Count lines
 cd $HOME/redshiftdata
 wc -l orders.tbl
 Now split orders into 15m lines per file
 split -d -l 15000000 -a 4 orders.tbl orders.tbl.
 Now split line items
 wc -l lineitem.tbl
 split -d -l 60000000 -a 4 lineitem.tbl lineitem.tbl.
 Now clean up the master files
 rm orders.tbl
 rm lineitem.tbl
 move the split data to a s3 bucket
 aws s3 cp $HOME/redshiftdata s3://andrewbakerbigdata/redshiftdata --recursive
0
0