Aurora PostgreSQL: Archiving and Restoring Partitions from Large Tables to Iceberg and Parquet on S3

A Complete Guide to Archiving, Restoring, and Querying Large Table Partitions

When dealing with multi-terabyte tables in Aurora PostgreSQL, keeping historical partitions online becomes increasingly expensive and operationally burdensome. This guide presents a complete solution for archiving partitions to S3 in Iceberg/Parquet format, restoring them when needed, and querying archived data directly via a Spring Boot API without database restoration.

1. Architecture Overview

The solution comprises three components:

  1. Archive Script: Exports a partition from Aurora PostgreSQL to Parquet files organised in Iceberg table format on S3
  2. Restore Script: Imports archived data from S3 back into a staging table for validation and migration to the main table
  3. Query API: A Spring Boot application that reads Parquet files directly from S3, applying predicate pushdown for efficient filtering

This approach reduces storage costs by approximately 70 to 80 percent compared to keeping data in Aurora, while maintaining full queryability through the API layer.

2. Prerequisites and Dependencies

2.1 Python Environment

pip install boto3 pyarrow psycopg2-binary pandas pyiceberg sqlalchemy

2.2 AWS Configuration

Ensure your environment has appropriate IAM permissions:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:DeleteObject"
            ],
            "Resource": [
                "arn:aws:s3:::your-archive-bucket",
                "arn:aws:s3:::your-archive-bucket/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "glue:CreateTable",
                "glue:GetTable",
                "glue:UpdateTable",
                "glue:DeleteTable"
            ],
            "Resource": "*"
        }
    ]
}

2.3 Java Dependencies

For the Spring Boot API, add these to your pom.xml:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-avro</artifactId>
        <version>1.14.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aws</artifactId>
        <version>3.3.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>3.3.6</version>
    </dependency>
    <dependency>
        <groupId>software.amazon.awssdk</groupId>
        <artifactId>s3</artifactId>
        <version>2.25.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-core</artifactId>
        <version>1.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-parquet</artifactId>
        <version>1.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-aws</artifactId>
        <version>1.5.0</version>
    </dependency>
</dependencies>

3. The Archive Script

This script extracts a partition from Aurora PostgreSQL and writes it to S3 in Iceberg table format with Parquet data files.

3.1 Configuration Module

# config.py

from dataclasses import dataclass
from typing import Optional
import os


@dataclass
class DatabaseConfig:
    host: str
    port: int
    database: str
    user: str
    password: str

    @classmethod
    def from_environment(cls) -> "DatabaseConfig":
        return cls(
            host=os.environ["AURORA_HOST"],
            port=int(os.environ.get("AURORA_PORT", "5432")),
            database=os.environ["AURORA_DATABASE"],
            user=os.environ["AURORA_USER"],
            password=os.environ["AURORA_PASSWORD"]
        )

    @property
    def connection_string(self) -> str:
        return f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}"


@dataclass
class S3Config:
    bucket: str
    prefix: str
    region: str

    @classmethod
    def from_environment(cls) -> "S3Config":
        return cls(
            bucket=os.environ["ARCHIVE_S3_BUCKET"],
            prefix=os.environ.get("ARCHIVE_S3_PREFIX", "aurora-archives"),
            region=os.environ.get("AWS_REGION", "eu-west-1")
        )


@dataclass  
class ArchiveConfig:
    table_name: str
    partition_column: str
    partition_value: str
    schema_name: str = "public"
    batch_size: int = 100000
    compression: str = "snappy"

3.2 Schema Introspection

# schema_inspector.py

from typing import Dict, List, Tuple, Any
import psycopg2
from psycopg2.extras import RealDictCursor
import pyarrow as pa


class SchemaInspector:
    """Inspects PostgreSQL table schema and converts to PyArrow schema."""

    POSTGRES_TO_ARROW = {
        "integer": pa.int32(),
        "bigint": pa.int64(),
        "smallint": pa.int16(),
        "real": pa.float32(),
        "double precision": pa.float64(),
        "numeric": pa.decimal128(38, 10),
        "text": pa.string(),
        "character varying": pa.string(),
        "varchar": pa.string(),
        "char": pa.string(),
        "character": pa.string(),
        "boolean": pa.bool_(),
        "date": pa.date32(),
        "timestamp without time zone": pa.timestamp("us"),
        "timestamp with time zone": pa.timestamp("us", tz="UTC"),
        "time without time zone": pa.time64("us"),
        "time with time zone": pa.time64("us"),
        "uuid": pa.string(),
        "json": pa.string(),
        "jsonb": pa.string(),
        "bytea": pa.binary(),
    }

    def __init__(self, connection_string: str):
        self.connection_string = connection_string

    def get_table_columns(
        self, 
        schema_name: str, 
        table_name: str
    ) -> List[Dict[str, Any]]:
        """Retrieve column definitions from information_schema."""
        query = """
            SELECT 
                column_name,
                data_type,
                is_nullable,
                column_default,
                ordinal_position,
                character_maximum_length,
                numeric_precision,
                numeric_scale
            FROM information_schema.columns
            WHERE table_schema = %s 
              AND table_name = %s
            ORDER BY ordinal_position
        """

        with psycopg2.connect(self.connection_string) as conn:
            with conn.cursor(cursor_factory=RealDictCursor) as cur:
                cur.execute(query, (schema_name, table_name))
                return list(cur.fetchall())

    def get_primary_key_columns(
        self, 
        schema_name: str, 
        table_name: str
    ) -> List[str]:
        """Retrieve primary key column names."""
        query = """
            SELECT a.attname
            FROM pg_index i
            JOIN pg_attribute a ON a.attrelid = i.indrelid 
                AND a.attnum = ANY(i.indkey)
            JOIN pg_class c ON c.oid = i.indrelid
            JOIN pg_namespace n ON n.oid = c.relnamespace
            WHERE i.indisprimary
              AND n.nspname = %s
              AND c.relname = %s
            ORDER BY array_position(i.indkey, a.attnum)
        """

        with psycopg2.connect(self.connection_string) as conn:
            with conn.cursor() as cur:
                cur.execute(query, (schema_name, table_name))
                return [row[0] for row in cur.fetchall()]

    def postgres_type_to_arrow(
        self, 
        pg_type: str, 
        precision: int = None, 
        scale: int = None
    ) -> pa.DataType:
        """Convert PostgreSQL type to PyArrow type."""
        pg_type_lower = pg_type.lower()

        if pg_type_lower == "numeric" and precision and scale:
            return pa.decimal128(precision, scale)

        if pg_type_lower in self.POSTGRES_TO_ARROW:
            return self.POSTGRES_TO_ARROW[pg_type_lower]

        if pg_type_lower.startswith("character varying"):
            return pa.string()

        if pg_type_lower.endswith("[]"):
            base_type = pg_type_lower[:-2]
            if base_type in self.POSTGRES_TO_ARROW:
                return pa.list_(self.POSTGRES_TO_ARROW[base_type])

        return pa.string()

    def build_arrow_schema(
        self, 
        schema_name: str, 
        table_name: str
    ) -> pa.Schema:
        """Build PyArrow schema from PostgreSQL table definition."""
        columns = self.get_table_columns(schema_name, table_name)

        fields = []
        for col in columns:
            arrow_type = self.postgres_type_to_arrow(
                col["data_type"],
                col.get("numeric_precision"),
                col.get("numeric_scale")
            )
            nullable = col["is_nullable"] == "YES"
            fields.append(pa.field(col["column_name"], arrow_type, nullable=nullable))

        return pa.Schema(fields)

    def get_partition_info(
        self, 
        schema_name: str, 
        table_name: str
    ) -> Dict[str, Any]:
        """Get partition strategy information if table is partitioned."""
        query = """
            SELECT 
                pt.partstrat,
                array_agg(a.attname ORDER BY pos.idx) as partition_columns
            FROM pg_partitioned_table pt
            JOIN pg_class c ON c.oid = pt.partrelid
            JOIN pg_namespace n ON n.oid = c.relnamespace
            CROSS JOIN LATERAL unnest(pt.partattrs) WITH ORDINALITY AS pos(attnum, idx)
            JOIN pg_attribute a ON a.attrelid = c.oid AND a.attnum = pos.attnum
            WHERE n.nspname = %s AND c.relname = %s
            GROUP BY pt.partstrat
        """

        with psycopg2.connect(self.connection_string) as conn:
            with conn.cursor(cursor_factory=RealDictCursor) as cur:
                cur.execute(query, (schema_name, table_name))
                result = cur.fetchone()

                if result:
                    strategy_map = {"r": "range", "l": "list", "h": "hash"}
                    return {
                        "is_partitioned": True,
                        "strategy": strategy_map.get(result["partstrat"], "unknown"),
                        "partition_columns": result["partition_columns"]
                    }

                return {"is_partitioned": False}

3.3 Archive Script

#!/usr/bin/env python3
# archive_partition.py

"""
Archive a partition from Aurora PostgreSQL to S3 in Iceberg/Parquet format.

Usage:
    python archive_partition.py \
        --table transactions \
        --partition-column transaction_date \
        --partition-value 2024-01 \
        --schema public
"""

import argparse
import json
import logging
import sys
from datetime import datetime
from pathlib import Path
from typing import Generator, Dict, Any, Optional
import hashlib

import boto3
import pandas as pd
import psycopg2
from psycopg2.extras import RealDictCursor
import pyarrow as pa
import pyarrow.parquet as pq

from config import DatabaseConfig, S3Config, ArchiveConfig
from schema_inspector import SchemaInspector


logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)


class PartitionArchiver:
    """Archives PostgreSQL partitions to S3 in Iceberg format."""

    def __init__(
        self,
        db_config: DatabaseConfig,
        s3_config: S3Config,
        archive_config: ArchiveConfig
    ):
        self.db_config = db_config
        self.s3_config = s3_config
        self.archive_config = archive_config
        self.schema_inspector = SchemaInspector(db_config.connection_string)
        self.s3_client = boto3.client("s3", region_name=s3_config.region)

    def _generate_archive_path(self) -> str:
        """Generate S3 path for archive files."""
        return (
            f"{self.s3_config.prefix}/"
            f"{self.archive_config.schema_name}/"
            f"{self.archive_config.table_name}/"
            f"{self.archive_config.partition_column}={self.archive_config.partition_value}"
        )

    def _build_extraction_query(self) -> str:
        """Build query to extract partition data."""
        full_table = (
            f"{self.archive_config.schema_name}.{self.archive_config.table_name}"
        )

        partition_info = self.schema_inspector.get_partition_info(
            self.archive_config.schema_name,
            self.archive_config.table_name
        )

        if partition_info["is_partitioned"]:
            partition_table = (
                f"{self.archive_config.table_name}_"
                f"{self.archive_config.partition_value.replace('-', '_')}"
            )
            return f"SELECT * FROM {self.archive_config.schema_name}.{partition_table}"

        return (
            f"SELECT * FROM {full_table} "
            f"WHERE {self.archive_config.partition_column} = %s"
        )

    def _stream_partition_data(self) -> Generator[pd.DataFrame, None, None]:
        """Stream partition data in batches."""
        query = self._build_extraction_query()
        partition_info = self.schema_inspector.get_partition_info(
            self.archive_config.schema_name,
            self.archive_config.table_name
        )

        with psycopg2.connect(self.db_config.connection_string) as conn:
            with conn.cursor(
                name="archive_cursor",
                cursor_factory=RealDictCursor
            ) as cur:
                if partition_info["is_partitioned"]:
                    cur.execute(query)
                else:
                    cur.execute(query, (self.archive_config.partition_value,))

                while True:
                    rows = cur.fetchmany(self.archive_config.batch_size)
                    if not rows:
                        break
                    yield pd.DataFrame(rows)

    def _write_parquet_to_s3(
        self,
        table: pa.Table,
        file_number: int,
        arrow_schema: pa.Schema
    ) -> Dict[str, Any]:
        """Write a Parquet file to S3 and return metadata."""
        archive_path = self._generate_archive_path()
        file_name = f"data_{file_number:05d}.parquet"
        s3_key = f"{archive_path}/data/{file_name}"

        buffer = pa.BufferOutputStream()
        pq.write_table(
            table,
            buffer,
            compression=self.archive_config.compression,
            write_statistics=True
        )

        data = buffer.getvalue().to_pybytes()

        self.s3_client.put_object(
            Bucket=self.s3_config.bucket,
            Key=s3_key,
            Body=data,
            ContentType="application/octet-stream"
        )

        return {
            "path": f"s3://{self.s3_config.bucket}/{s3_key}",
            "file_size_bytes": len(data),
            "record_count": table.num_rows
        }

    def _write_iceberg_metadata(
        self,
        arrow_schema: pa.Schema,
        data_files: list,
        total_records: int
    ) -> None:
        """Write Iceberg table metadata files."""
        archive_path = self._generate_archive_path()

        schema_dict = {
            "type": "struct",
            "schema_id": 0,
            "fields": []
        }

        for i, field in enumerate(arrow_schema):
            field_type = self._arrow_to_iceberg_type(field.type)
            schema_dict["fields"].append({
                "id": i + 1,
                "name": field.name,
                "required": not field.nullable,
                "type": field_type
            })

        table_uuid = hashlib.md5(
            f"{self.archive_config.table_name}_{datetime.utcnow().isoformat()}".encode()
        ).hexdigest()

        manifest_entries = []
        for df in data_files:
            manifest_entries.append({
                "status": 1,
                "snapshot_id": 1,
                "data_file": {
                    "file_path": df["path"],
                    "file_format": "PARQUET",
                    "record_count": df["record_count"],
                    "file_size_in_bytes": df["file_size_bytes"]
                }
            })

        metadata = {
            "format_version": 2,
            "table_uuid": table_uuid,
            "location": f"s3://{self.s3_config.bucket}/{archive_path}",
            "last_sequence_number": 1,
            "last_updated_ms": int(datetime.utcnow().timestamp() * 1000),
            "last_column_id": len(arrow_schema),
            "current_schema_id": 0,
            "schemas": [schema_dict],
            "partition_spec": [],
            "default_spec_id": 0,
            "last_partition_id": 0,
            "properties": {
                "source.table": self.archive_config.table_name,
                "source.schema": self.archive_config.schema_name,
                "source.partition_column": self.archive_config.partition_column,
                "source.partition_value": self.archive_config.partition_value,
                "archive.timestamp": datetime.utcnow().isoformat(),
                "archive.total_records": str(total_records)
            },
            "current_snapshot_id": 1,
            "snapshots": [
                {
                    "snapshot_id": 1,
                    "timestamp_ms": int(datetime.utcnow().timestamp() * 1000),
                    "summary": {
                        "operation": "append",
                        "added_data_files": str(len(data_files)),
                        "total_records": str(total_records)
                    },
                    "manifest_list": f"s3://{self.s3_config.bucket}/{archive_path}/metadata/manifest_list.json"
                }
            ]
        }

        self.s3_client.put_object(
            Bucket=self.s3_config.bucket,
            Key=f"{archive_path}/metadata/v1.metadata.json",
            Body=json.dumps(metadata, indent=2),
            ContentType="application/json"
        )

        self.s3_client.put_object(
            Bucket=self.s3_config.bucket,
            Key=f"{archive_path}/metadata/manifest_list.json",
            Body=json.dumps(manifest_entries, indent=2),
            ContentType="application/json"
        )

        self.s3_client.put_object(
            Bucket=self.s3_config.bucket,
            Key=f"{archive_path}/metadata/version_hint.text",
            Body="1",
            ContentType="text/plain"
        )

    def _arrow_to_iceberg_type(self, arrow_type: pa.DataType) -> str:
        """Convert PyArrow type to Iceberg type string."""
        type_mapping = {
            pa.int16(): "int",
            pa.int32(): "int",
            pa.int64(): "long",
            pa.float32(): "float",
            pa.float64(): "double",
            pa.bool_(): "boolean",
            pa.string(): "string",
            pa.binary(): "binary",
            pa.date32(): "date",
        }

        if arrow_type in type_mapping:
            return type_mapping[arrow_type]

        if pa.types.is_timestamp(arrow_type):
            if arrow_type.tz:
                return "timestamptz"
            return "timestamp"

        if pa.types.is_decimal(arrow_type):
            return f"decimal({arrow_type.precision},{arrow_type.scale})"

        if pa.types.is_list(arrow_type):
            inner = self._arrow_to_iceberg_type(arrow_type.value_type)
            return f"list<{inner}>"

        return "string"

    def _save_schema_snapshot(self, arrow_schema: pa.Schema) -> None:
        """Save schema information for restore validation."""
        archive_path = self._generate_archive_path()

        columns = self.schema_inspector.get_table_columns(
            self.archive_config.schema_name,
            self.archive_config.table_name
        )

        pk_columns = self.schema_inspector.get_primary_key_columns(
            self.archive_config.schema_name,
            self.archive_config.table_name
        )

        schema_snapshot = {
            "source_table": {
                "schema": self.archive_config.schema_name,
                "table": self.archive_config.table_name
            },
            "columns": columns,
            "primary_key_columns": pk_columns,
            "arrow_schema": arrow_schema.to_string(),
            "archived_at": datetime.utcnow().isoformat()
        }

        self.s3_client.put_object(
            Bucket=self.s3_config.bucket,
            Key=f"{archive_path}/schema_snapshot.json",
            Body=json.dumps(schema_snapshot, indent=2, default=str),
            ContentType="application/json"
        )

    def archive(self) -> Dict[str, Any]:
        """Execute the archive operation."""
        logger.info(
            f"Starting archive of {self.archive_config.schema_name}."
            f"{self.archive_config.table_name} "
            f"partition {self.archive_config.partition_column}="
            f"{self.archive_config.partition_value}"
        )

        arrow_schema = self.schema_inspector.build_arrow_schema(
            self.archive_config.schema_name,
            self.archive_config.table_name
        )

        self._save_schema_snapshot(arrow_schema)

        data_files = []
        total_records = 0
        file_number = 0

        for batch_df in self._stream_partition_data():
            table = pa.Table.from_pandas(batch_df, schema=arrow_schema)
            file_meta = self._write_parquet_to_s3(table, file_number, arrow_schema)
            data_files.append(file_meta)
            total_records += file_meta["record_count"]
            file_number += 1

            logger.info(
                f"Written file {file_number}: {file_meta['record_count']} records"
            )

        self._write_iceberg_metadata(arrow_schema, data_files, total_records)

        result = {
            "status": "success",
            "table": f"{self.archive_config.schema_name}.{self.archive_config.table_name}",
            "partition": f"{self.archive_config.partition_column}={self.archive_config.partition_value}",
            "location": f"s3://{self.s3_config.bucket}/{self._generate_archive_path()}",
            "total_records": total_records,
            "total_files": len(data_files),
            "total_bytes": sum(f["file_size_bytes"] for f in data_files),
            "archived_at": datetime.utcnow().isoformat()
        }

        logger.info(f"Archive complete: {total_records} records in {len(data_files)} files")
        return result


def main():
    parser = argparse.ArgumentParser(
        description="Archive Aurora PostgreSQL partition to S3"
    )
    parser.add_argument("--table", required=True, help="Table name")
    parser.add_argument("--partition-column", required=True, help="Partition column name")
    parser.add_argument("--partition-value", required=True, help="Partition value to archive")
    parser.add_argument("--schema", default="public", help="Schema name")
    parser.add_argument("--batch-size", type=int, default=100000, help="Batch size for streaming")

    args = parser.parse_args()

    db_config = DatabaseConfig.from_environment()
    s3_config = S3Config.from_environment()
    archive_config = ArchiveConfig(
        table_name=args.table,
        partition_column=args.partition_column,
        partition_value=args.partition_value,
        schema_name=args.schema,
        batch_size=args.batch_size
    )

    archiver = PartitionArchiver(db_config, s3_config, archive_config)
    result = archiver.archive()

    print(json.dumps(result, indent=2))
    return 0


if __name__ == "__main__":
    sys.exit(main())

4. The Restore Script

This script reverses the archive operation by reading Parquet files from S3 and loading them into a staging table.

4.1 Restore Script

#!/usr/bin/env python3
# restore_partition.py

"""
Restore an archived partition from S3 back to Aurora PostgreSQL.

Usage:
    python restore_partition.py \
        --source-path s3://bucket/prefix/schema/table/partition_col=value \
        --target-table transactions_staging \
        --target-schema public
"""

import argparse
import json
import logging
import sys
from datetime import datetime
from typing import Dict, Any, List, Optional
from urllib.parse import urlparse

import boto3
import pandas as pd
import psycopg2
from psycopg2 import sql
from psycopg2.extras import execute_values
import pyarrow.parquet as pq

from config import DatabaseConfig


logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)


class PartitionRestorer:
    """Restores archived partitions from S3 to PostgreSQL."""

    def __init__(
        self,
        db_config: DatabaseConfig,
        source_path: str,
        target_schema: str,
        target_table: str,
        create_table: bool = True,
        batch_size: int = 10000
    ):
        self.db_config = db_config
        self.source_path = source_path
        self.target_schema = target_schema
        self.target_table = target_table
        self.create_table = create_table
        self.batch_size = batch_size

        parsed = urlparse(source_path)
        self.bucket = parsed.netloc
        self.prefix = parsed.path.lstrip("/")

        self.s3_client = boto3.client("s3")

    def _load_schema_snapshot(self) -> Dict[str, Any]:
        """Load the schema snapshot from the archive."""
        response = self.s3_client.get_object(
            Bucket=self.bucket,
            Key=f"{self.prefix}/schema_snapshot.json"
        )
        return json.loads(response["Body"].read())

    def _load_iceberg_metadata(self) -> Dict[str, Any]:
        """Load Iceberg metadata."""
        response = self.s3_client.get_object(
            Bucket=self.bucket,
            Key=f"{self.prefix}/metadata/v1.metadata.json"
        )
        return json.loads(response["Body"].read())

    def _list_data_files(self) -> List[str]:
        """List all Parquet data files in the archive."""
        data_prefix = f"{self.prefix}/data/"

        files = []
        paginator = self.s3_client.get_paginator("list_objects_v2")

        for page in paginator.paginate(Bucket=self.bucket, Prefix=data_prefix):
            for obj in page.get("Contents", []):
                if obj["Key"].endswith(".parquet"):
                    files.append(obj["Key"])

        return sorted(files)

    def _postgres_type_from_column_def(self, col: Dict[str, Any]) -> str:
        """Convert column definition to PostgreSQL type."""
        data_type = col["data_type"]

        if data_type == "character varying":
            max_len = col.get("character_maximum_length")
            if max_len:
                return f"varchar({max_len})"
            return "text"

        if data_type == "numeric":
            precision = col.get("numeric_precision")
            scale = col.get("numeric_scale")
            if precision and scale:
                return f"numeric({precision},{scale})"
            return "numeric"

        return data_type

    def _create_staging_table(
        self,
        schema_snapshot: Dict[str, Any],
        conn: psycopg2.extensions.connection
    ) -> None:
        """Create the staging table based on archived schema."""
        columns = schema_snapshot["columns"]

        column_defs = []
        for col in columns:
            pg_type = self._postgres_type_from_column_def(col)
            nullable = "" if col["is_nullable"] == "YES" else " NOT NULL"
            column_defs.append(f'    "{col["column_name"]}" {pg_type}{nullable}')

        create_sql = f"""
            DROP TABLE IF EXISTS {self.target_schema}.{self.target_table};
            CREATE TABLE {self.target_schema}.{self.target_table} (
{chr(10).join(column_defs)}
            )
        """

        with conn.cursor() as cur:
            cur.execute(create_sql)
        conn.commit()

        logger.info(f"Created staging table {self.target_schema}.{self.target_table}")

    def _insert_batch(
        self,
        df: pd.DataFrame,
        columns: List[str],
        conn: psycopg2.extensions.connection
    ) -> int:
        """Insert a batch of records into the staging table."""
        if df.empty:
            return 0

        for col in df.columns:
            if pd.api.types.is_datetime64_any_dtype(df[col]):
                df[col] = df[col].apply(
                    lambda x: x.isoformat() if pd.notna(x) else None
                )

        values = [tuple(row) for row in df[columns].values]

        column_names = ", ".join(f'"{c}"' for c in columns)
        insert_sql = f"""
            INSERT INTO {self.target_schema}.{self.target_table} ({column_names})
            VALUES %s
        """

        with conn.cursor() as cur:
            execute_values(cur, insert_sql, values, page_size=self.batch_size)

        return len(values)

    def restore(self) -> Dict[str, Any]:
        """Execute the restore operation."""
        logger.info(f"Starting restore from {self.source_path}")

        schema_snapshot = self._load_schema_snapshot()
        metadata = self._load_iceberg_metadata()
        data_files = self._list_data_files()

        logger.info(f"Found {len(data_files)} data files to restore")

        columns = [col["column_name"] for col in schema_snapshot["columns"]]

        with psycopg2.connect(self.db_config.connection_string) as conn:
            if self.create_table:
                self._create_staging_table(schema_snapshot, conn)

            total_records = 0

            for file_key in data_files:
                s3_uri = f"s3://{self.bucket}/{file_key}"
                logger.info(f"Restoring from {file_key}")

                response = self.s3_client.get_object(Bucket=self.bucket, Key=file_key)
                table = pq.read_table(response["Body"])
                df = table.to_pandas()

                file_records = 0
                for start in range(0, len(df), self.batch_size):
                    batch_df = df.iloc[start:start + self.batch_size]
                    inserted = self._insert_batch(batch_df, columns, conn)
                    file_records += inserted

                conn.commit()
                total_records += file_records
                logger.info(f"Restored {file_records} records from {file_key}")

        result = {
            "status": "success",
            "source": self.source_path,
            "target": f"{self.target_schema}.{self.target_table}",
            "total_records": total_records,
            "files_processed": len(data_files),
            "restored_at": datetime.utcnow().isoformat()
        }

        logger.info(f"Restore complete: {total_records} records")
        return result


def main():
    parser = argparse.ArgumentParser(
        description="Restore archived partition from S3 to PostgreSQL"
    )
    parser.add_argument("--source-path", required=True, help="S3 path to archived partition")
    parser.add_argument("--target-table", required=True, help="Target table name")
    parser.add_argument("--target-schema", default="public", help="Target schema name")
    parser.add_argument("--batch-size", type=int, default=10000, help="Insert batch size")
    parser.add_argument("--no-create", action="store_true", help="Don't create table, assume it exists")

    args = parser.parse_args()

    db_config = DatabaseConfig.from_environment()

    restorer = PartitionRestorer(
        db_config=db_config,
        source_path=args.source_path,
        target_schema=args.target_schema,
        target_table=args.target_table,
        create_table=not args.no_create,
        batch_size=args.batch_size
    )

    result = restorer.restore()
    print(json.dumps(result, indent=2))
    return 0


if __name__ == "__main__":
    sys.exit(main())

5. SQL Operations for Partition Migration

Once data is restored to a staging table, you need SQL operations to validate and migrate it to the main table.

5.1 Schema Validation

-- Validate that staging table schema matches the main table

CREATE OR REPLACE FUNCTION validate_table_schemas(
    p_source_schema TEXT,
    p_source_table TEXT,
    p_target_schema TEXT,
    p_target_table TEXT
) RETURNS TABLE (
    validation_type TEXT,
    column_name TEXT,
    source_value TEXT,
    target_value TEXT,
    is_valid BOOLEAN
) AS $$
BEGIN
    -- Check column count
    RETURN QUERY
    SELECT 
        'column_count'::TEXT,
        NULL::TEXT,
        src.cnt::TEXT,
        tgt.cnt::TEXT,
        src.cnt = tgt.cnt
    FROM 
        (SELECT COUNT(*)::INT AS cnt 
         FROM information_schema.columns 
         WHERE table_schema = p_source_schema 
           AND table_name = p_source_table) src,
        (SELECT COUNT(*)::INT AS cnt 
         FROM information_schema.columns 
         WHERE table_schema = p_target_schema 
           AND table_name = p_target_table) tgt;

    -- Check each column exists with matching type
    RETURN QUERY
    SELECT 
        'column_definition'::TEXT,
        src.column_name,
        src.data_type || COALESCE('(' || src.character_maximum_length::TEXT || ')', ''),
        COALESCE(tgt.data_type || COALESCE('(' || tgt.character_maximum_length::TEXT || ')', ''), 'MISSING'),
        src.data_type = COALESCE(tgt.data_type, '') 
            AND COALESCE(src.character_maximum_length, 0) = COALESCE(tgt.character_maximum_length, 0)
    FROM 
        information_schema.columns src
    LEFT JOIN 
        information_schema.columns tgt 
        ON tgt.table_schema = p_target_schema 
        AND tgt.table_name = p_target_table
        AND tgt.column_name = src.column_name
    WHERE 
        src.table_schema = p_source_schema 
        AND src.table_name = p_source_table
    ORDER BY src.ordinal_position;

    -- Check nullability
    RETURN QUERY
    SELECT 
        'nullability'::TEXT,
        src.column_name,
        src.is_nullable,
        COALESCE(tgt.is_nullable, 'MISSING'),
        src.is_nullable = COALESCE(tgt.is_nullable, '')
    FROM 
        information_schema.columns src
    LEFT JOIN 
        information_schema.columns tgt 
        ON tgt.table_schema = p_target_schema 
        AND tgt.table_name = p_target_table
        AND tgt.column_name = src.column_name
    WHERE 
        src.table_schema = p_source_schema 
        AND src.table_name = p_source_table
    ORDER BY src.ordinal_position;
END;
$$ LANGUAGE plpgsql;

-- Usage
SELECT * FROM validate_table_schemas('public', 'transactions_staging', 'public', 'transactions');

5.2 Comprehensive Validation Report

-- Generate a full validation report before migration

CREATE OR REPLACE FUNCTION generate_migration_report(
    p_staging_schema TEXT,
    p_staging_table TEXT,
    p_target_schema TEXT,
    p_target_table TEXT,
    p_partition_column TEXT,
    p_partition_value TEXT
) RETURNS TABLE (
    check_name TEXT,
    result TEXT,
    details JSONB
) AS $$
DECLARE
    v_staging_count BIGINT;
    v_existing_count BIGINT;
    v_schema_valid BOOLEAN;
BEGIN
    -- Get staging table count
    EXECUTE format(
        'SELECT COUNT(*) FROM %I.%I',
        p_staging_schema, p_staging_table
    ) INTO v_staging_count;

    RETURN QUERY SELECT 
        'staging_record_count'::TEXT,
        'INFO'::TEXT,
        jsonb_build_object('count', v_staging_count);

    -- Check for existing data in target partition
    BEGIN
        EXECUTE format(
            'SELECT COUNT(*) FROM %I.%I WHERE %I = $1',
            p_target_schema, p_target_table, p_partition_column
        ) INTO v_existing_count USING p_partition_value;

        IF v_existing_count > 0 THEN
            RETURN QUERY SELECT 
                'existing_partition_data'::TEXT,
                'WARNING'::TEXT,
                jsonb_build_object(
                    'count', v_existing_count,
                    'message', 'Target partition already contains data'
                );
        ELSE
            RETURN QUERY SELECT 
                'existing_partition_data'::TEXT,
                'OK'::TEXT,
                jsonb_build_object('count', 0);
        END IF;
    EXCEPTION WHEN undefined_column THEN
        RETURN QUERY SELECT 
            'partition_column_check'::TEXT,
            'ERROR'::TEXT,
            jsonb_build_object(
                'message', format('Partition column %s not found', p_partition_column)
            );
    END;

    -- Validate schemas match
    SELECT bool_and(is_valid) INTO v_schema_valid
    FROM validate_table_schemas(
        p_staging_schema, p_staging_table,
        p_target_schema, p_target_table
    );

    RETURN QUERY SELECT 
        'schema_validation'::TEXT,
        CASE WHEN v_schema_valid THEN 'OK' ELSE 'ERROR' END::TEXT,
        jsonb_build_object('schemas_match', v_schema_valid);

    -- Check for null values in NOT NULL columns
    RETURN QUERY
    SELECT 
        'null_check_' || c.column_name,
        CASE WHEN null_count > 0 THEN 'ERROR' ELSE 'OK' END,
        jsonb_build_object('null_count', null_count)
    FROM information_schema.columns c
    CROSS JOIN LATERAL (
        SELECT COUNT(*) as null_count 
        FROM (
            SELECT 1 
            FROM information_schema.columns ic
            WHERE ic.table_schema = p_staging_schema
              AND ic.table_name = p_staging_table
              AND ic.column_name = c.column_name
        ) x
    ) nc
    WHERE c.table_schema = p_target_schema
      AND c.table_name = p_target_table
      AND c.is_nullable = 'NO';
END;
$$ LANGUAGE plpgsql;

-- Usage
SELECT * FROM generate_migration_report(
    'public', 'transactions_staging',
    'public', 'transactions',
    'transaction_date', '2024-01'
);

5.3 Partition Migration

-- Migrate data from staging table to main table

CREATE OR REPLACE PROCEDURE migrate_partition_data(
    p_staging_schema TEXT,
    p_staging_table TEXT,
    p_target_schema TEXT,
    p_target_table TEXT,
    p_partition_column TEXT,
    p_partition_value TEXT,
    p_delete_existing BOOLEAN DEFAULT FALSE,
    p_batch_size INTEGER DEFAULT 50000
)
LANGUAGE plpgsql
AS $$
DECLARE
    v_columns TEXT;
    v_total_migrated BIGINT := 0;
    v_batch_migrated BIGINT;
    v_validation_passed BOOLEAN;
BEGIN
    -- Validate schemas match
    SELECT bool_and(is_valid) INTO v_validation_passed
    FROM validate_table_schemas(
        p_staging_schema, p_staging_table,
        p_target_schema, p_target_table
    );

    IF NOT v_validation_passed THEN
        RAISE EXCEPTION 'Schema validation failed. Run validate_table_schemas() for details.';
    END IF;

    -- Build column list
    SELECT string_agg(quote_ident(column_name), ', ' ORDER BY ordinal_position)
    INTO v_columns
    FROM information_schema.columns
    WHERE table_schema = p_staging_schema
      AND table_name = p_staging_table;

    -- Delete existing data if requested
    IF p_delete_existing THEN
        EXECUTE format(
            'DELETE FROM %I.%I WHERE %I = $1',
            p_target_schema, p_target_table, p_partition_column
        ) USING p_partition_value;

        RAISE NOTICE 'Deleted existing data for partition % = %', 
            p_partition_column, p_partition_value;
    END IF;

    -- Migrate in batches using a cursor approach
    LOOP
        EXECUTE format($sql$
            WITH to_migrate AS (
                SELECT ctid 
                FROM %I.%I 
                WHERE NOT EXISTS (
                    SELECT 1 FROM %I.%I t 
                    WHERE t.%I = $1
                )
                LIMIT $2
            ),
            inserted AS (
                INSERT INTO %I.%I (%s)
                SELECT %s 
                FROM %I.%I s
                WHERE s.ctid IN (SELECT ctid FROM to_migrate)
                RETURNING 1
            )
            SELECT COUNT(*) FROM inserted
        $sql$,
            p_staging_schema, p_staging_table,
            p_target_schema, p_target_table, p_partition_column,
            p_target_schema, p_target_table, v_columns,
            v_columns,
            p_staging_schema, p_staging_table
        ) INTO v_batch_migrated USING p_partition_value, p_batch_size;

        v_total_migrated := v_total_migrated + v_batch_migrated;

        IF v_batch_migrated = 0 THEN
            EXIT;
        END IF;

        RAISE NOTICE 'Migrated batch: % records (total: %)', v_batch_migrated, v_total_migrated;
        COMMIT;
    END LOOP;

    RAISE NOTICE 'Migration complete. Total records migrated: %', v_total_migrated;
END;
$$;

-- Usage
CALL migrate_partition_data(
    'public', 'transactions_staging',
    'public', 'transactions',
    'transaction_date', '2024-01',
    TRUE,   -- delete existing
    50000   -- batch size
);

5.4 Attach Partition (for Partitioned Tables)

-- For natively partitioned tables, attach the staging table as a partition

CREATE OR REPLACE PROCEDURE attach_restored_partition(
    p_staging_schema TEXT,
    p_staging_table TEXT,
    p_target_schema TEXT,
    p_target_table TEXT,
    p_partition_column TEXT,
    p_partition_start TEXT,
    p_partition_end TEXT
)
LANGUAGE plpgsql
AS $$
DECLARE
    v_partition_name TEXT;
    v_constraint_name TEXT;
BEGIN
    -- Validate schemas match
    IF NOT (
        SELECT bool_and(is_valid)
        FROM validate_table_schemas(
            p_staging_schema, p_staging_table,
            p_target_schema, p_target_table
        )
    ) THEN
        RAISE EXCEPTION 'Schema validation failed';
    END IF;

    -- Add constraint to staging table that matches partition bounds
    v_constraint_name := p_staging_table || '_partition_check';

    EXECUTE format($sql$
        ALTER TABLE %I.%I 
        ADD CONSTRAINT %I 
        CHECK (%I >= %L AND %I < %L)
    $sql$,
        p_staging_schema, p_staging_table,
        v_constraint_name,
        p_partition_column, p_partition_start,
        p_partition_column, p_partition_end
    );

    -- Validate constraint without locking
    EXECUTE format($sql$
        ALTER TABLE %I.%I 
        VALIDATE CONSTRAINT %I
    $sql$,
        p_staging_schema, p_staging_table,
        v_constraint_name
    );

    -- Detach old partition if exists
    v_partition_name := p_target_table || '_' || replace(p_partition_start, '-', '_');

    BEGIN
        EXECUTE format($sql$
            ALTER TABLE %I.%I 
            DETACH PARTITION %I.%I
        $sql$,
            p_target_schema, p_target_table,
            p_target_schema, v_partition_name
        );
        RAISE NOTICE 'Detached existing partition %', v_partition_name;
    EXCEPTION WHEN undefined_table THEN
        RAISE NOTICE 'No existing partition to detach';
    END;

    -- Rename staging table to partition name
    EXECUTE format($sql$
        ALTER TABLE %I.%I RENAME TO %I
    $sql$,
        p_staging_schema, p_staging_table,
        v_partition_name
    );

    -- Attach as partition
    EXECUTE format($sql$
        ALTER TABLE %I.%I 
        ATTACH PARTITION %I.%I 
        FOR VALUES FROM (%L) TO (%L)
    $sql$,
        p_target_schema, p_target_table,
        p_staging_schema, v_partition_name,
        p_partition_start, p_partition_end
    );

    RAISE NOTICE 'Successfully attached partition % to %', 
        v_partition_name, p_target_table;
END;
$$;

-- Usage for range partitioned table
CALL attach_restored_partition(
    'public', 'transactions_staging',
    'public', 'transactions',
    'transaction_date',
    '2024-01-01', '2024-02-01'
);

5.5 Cleanup Script

-- Clean up after successful migration

CREATE OR REPLACE PROCEDURE cleanup_after_migration(
    p_staging_schema TEXT,
    p_staging_table TEXT,
    p_verify_target_schema TEXT DEFAULT NULL,
    p_verify_target_table TEXT DEFAULT NULL,
    p_verify_count BOOLEAN DEFAULT TRUE
)
LANGUAGE plpgsql
AS $$
DECLARE
    v_staging_count BIGINT;
    v_target_count BIGINT;
BEGIN
    IF p_verify_count AND p_verify_target_schema IS NOT NULL THEN
        EXECUTE format(
            'SELECT COUNT(*) FROM %I.%I',
            p_staging_schema, p_staging_table
        ) INTO v_staging_count;

        EXECUTE format(
            'SELECT COUNT(*) FROM %I.%I',
            p_verify_target_schema, p_verify_target_table
        ) INTO v_target_count;

        IF v_target_count < v_staging_count THEN
            RAISE WARNING 'Target count (%) is less than staging count (%). Migration may be incomplete.',
                v_target_count, v_staging_count;
            RETURN;
        END IF;
    END IF;

    EXECUTE format(
        'DROP TABLE IF EXISTS %I.%I',
        p_staging_schema, p_staging_table
    );

    RAISE NOTICE 'Dropped staging table %.%', p_staging_schema, p_staging_table;
END;
$$;

-- Usage
CALL cleanup_after_migration(
    'public', 'transactions_staging',
    'public', 'transactions',
    TRUE
);

6. Spring Boot Query API

This API allows querying archived data directly from S3 without restoring to the database.

6.1 Project Structure

src/main/java/com/example/archivequery/
    ArchiveQueryApplication.java
    config/
        AwsConfig.java
        ParquetConfig.java
    controller/
        ArchiveQueryController.java
    service/
        ParquetQueryService.java
        PredicateParser.java
    model/
        QueryRequest.java
        QueryResponse.java
        ArchiveMetadata.java
    predicate/
        Predicate.java
        ComparisonPredicate.java
        LogicalPredicate.java
        PredicateEvaluator.java

6.2 Application Configuration

// ArchiveQueryApplication.java
package com.example.archivequery;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ArchiveQueryApplication {
    public static void main(String[] args) {
        SpringApplication.run(ArchiveQueryApplication.class, args);
    }
}

6.3 AWS Configuration

// config/AwsConfig.java
package com.example.archivequery.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;

@Configuration
public class AwsConfig {

    @Value("${aws.region:eu-west-1}")
    private String region;

    @Bean
    public S3Client s3Client() {
        return S3Client.builder()
            .region(Region.of(region))
            .credentialsProvider(DefaultCredentialsProvider.create())
            .build();
    }
}

6.4 Parquet Configuration

// config/ParquetConfig.java
package com.example.archivequery.config;

import org.apache.hadoop.conf.Configuration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;

@org.springframework.context.annotation.Configuration
public class ParquetConfig {

    @Value("${aws.region:eu-west-1}")
    private String region;

    @Bean
    public Configuration hadoopConfiguration() {
        Configuration conf = new Configuration();
        conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
        conf.set("fs.s3a.aws.credentials.provider", 
            "com.amazonaws.auth.DefaultAWSCredentialsProviderChain");
        conf.set("fs.s3a.endpoint.region", region);
        return conf;
    }
}

6.5 Model Classes

// model/QueryRequest.java
package com.example.archivequery.model;

import java.util.List;
import java.util.Map;

public record QueryRequest(
    String archivePath,
    List<String> columns,
    Map<String, Object> filters,
    String filterExpression,
    Integer limit,
    Integer offset
) {
    public QueryRequest {
        if (limit == null) limit = 1000;
        if (offset == null) offset = 0;
    }
}
// model/QueryResponse.java
package com.example.archivequery.model;

import java.util.List;
import java.util.Map;

public record QueryResponse(
    List<Map<String, Object>> data,
    long totalMatched,
    long totalScanned,
    long executionTimeMs,
    Map<String, String> schema,
    String archivePath
) {}
// model/ArchiveMetadata.java
package com.example.archivequery.model;

import java.time.Instant;
import java.util.List;
import java.util.Map;

public record ArchiveMetadata(
    String tableUuid,
    String location,
    List<ColumnDefinition> columns,
    Map<String, String> properties,
    Instant archivedAt
) {
    public record ColumnDefinition(
        int id,
        String name,
        String type,
        boolean required
    ) {}
}

6.6 Predicate Classes

// predicate/Predicate.java
package com.example.archivequery.predicate;

import java.util.Map;

public sealed interface Predicate 
    permits ComparisonPredicate, LogicalPredicate {

    boolean evaluate(Map<String, Object> record);
}
// predicate/ComparisonPredicate.java
package com.example.archivequery.predicate;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.Objects;

public record ComparisonPredicate(
    String column,
    Operator operator,
    Object value
) implements Predicate {

    public enum Operator {
        EQ, NE, GT, GE, LT, LE, LIKE, IN, IS_NULL, IS_NOT_NULL
    }

    @Override
    public boolean evaluate(Map<String, Object> record) {
        Object recordValue = record.get(column);

        return switch (operator) {
            case IS_NULL -> recordValue == null;
            case IS_NOT_NULL -> recordValue != null;
            case EQ -> Objects.equals(recordValue, convertValue(recordValue));
            case NE -> !Objects.equals(recordValue, convertValue(recordValue));
            case GT -> compare(recordValue, convertValue(recordValue)) > 0;
            case GE -> compare(recordValue, convertValue(recordValue)) >= 0;
            case LT -> compare(recordValue, convertValue(recordValue)) < 0;
            case LE -> compare(recordValue, convertValue(recordValue)) <= 0;
            case LIKE -> matchesLike(recordValue);
            case IN -> matchesIn(recordValue);
        };
    }

    private Object convertValue(Object recordValue) {
        if (recordValue == null || value == null) return value;

        if (recordValue instanceof LocalDate && value instanceof String s) {
            return LocalDate.parse(s);
        }
        if (recordValue instanceof LocalDateTime && value instanceof String s) {
            return LocalDateTime.parse(s);
        }
        if (recordValue instanceof BigDecimal && value instanceof Number n) {
            return new BigDecimal(n.toString());
        }
        if (recordValue instanceof Long && value instanceof Number n) {
            return n.longValue();
        }
        if (recordValue instanceof Integer && value instanceof Number n) {
            return n.intValue();
        }
        if (recordValue instanceof Double && value instanceof Number n) {
            return n.doubleValue();
        }

        return value;
    }

    @SuppressWarnings({"unchecked", "rawtypes"})
    private int compare(Object a, Object b) {
        if (a == null || b == null) return 0;
        if (a instanceof Comparable ca && b instanceof Comparable cb) {
            return ca.compareTo(cb);
        }
        return 0;
    }

    private boolean matchesLike(Object recordValue) {
        if (recordValue == null || value == null) return false;
        String pattern = value.toString()
            .replace("%", ".*")
            .replace("_", ".");
        return recordValue.toString().matches(pattern);
    }

    private boolean matchesIn(Object recordValue) {
        if (recordValue == null || value == null) return false;
        if (value instanceof Iterable<?> iterable) {
            for (Object item : iterable) {
                if (Objects.equals(recordValue, item)) return true;
            }
        }
        return false;
    }
}
// predicate/LogicalPredicate.java
package com.example.archivequery.predicate;

import java.util.List;
import java.util.Map;

public record LogicalPredicate(
    Operator operator,
    List<Predicate> predicates
) implements Predicate {

    public enum Operator {
        AND, OR, NOT
    }

    @Override
    public boolean evaluate(Map<String, Object> record) {
        return switch (operator) {
            case AND -> predicates.stream().allMatch(p -> p.evaluate(record));
            case OR -> predicates.stream().anyMatch(p -> p.evaluate(record));
            case NOT -> !predicates.getFirst().evaluate(record);
        };
    }
}
// predicate/PredicateEvaluator.java
package com.example.archivequery.predicate;

import java.util.Map;

public class PredicateEvaluator {

    private final Predicate predicate;

    public PredicateEvaluator(Predicate predicate) {
        this.predicate = predicate;
    }

    public boolean matches(Map<String, Object> record) {
        if (predicate == null) return true;
        return predicate.evaluate(record);
    }
}

6.7 Predicate Parser

// service/PredicateParser.java
package com.example.archivequery.service;

import com.example.archivequery.predicate.*;
import org.springframework.stereotype.Service;

import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@Service
public class PredicateParser {

    private static final Pattern COMPARISON_PATTERN = Pattern.compile(
        "(\\w+)\\s*(=|!=|<>|>=|<=|>|<|LIKE|IN|IS NULL|IS NOT NULL)\\s*(.*)?"
    );

    public Predicate parse(String expression) {
        if (expression == null || expression.isBlank()) {
            return null;
        }
        return parseExpression(expression.trim());
    }

    public Predicate fromFilters(Map<String, Object> filters) {
        if (filters == null || filters.isEmpty()) {
            return null;
        }

        List<Predicate> predicates = new ArrayList<>();

        for (Map.Entry<String, Object> entry : filters.entrySet()) {
            String key = entry.getKey();
            Object value = entry.getValue();

            if (key.endsWith("_gt")) {
                predicates.add(new ComparisonPredicate(
                    key.substring(0, key.length() - 3),
                    ComparisonPredicate.Operator.GT,
                    value
                ));
            } else if (key.endsWith("_gte")) {
                predicates.add(new ComparisonPredicate(
                    key.substring(0, key.length() - 4),
                    ComparisonPredicate.Operator.GE,
                    value
                ));
            } else if (key.endsWith("_lt")) {
                predicates.add(new ComparisonPredicate(
                    key.substring(0, key.length() - 3),
                    ComparisonPredicate.Operator.LT,
                    value
                ));
            } else if (key.endsWith("_lte")) {
                predicates.add(new ComparisonPredicate(
                    key.substring(0, key.length() - 4),
                    ComparisonPredicate.Operator.LE,
                    value
                ));
            } else if (key.endsWith("_ne")) {
                predicates.add(new ComparisonPredicate(
                    key.substring(0, key.length() - 3),
                    ComparisonPredicate.Operator.NE,
                    value
                ));
            } else if (key.endsWith("_like")) {
                predicates.add(new ComparisonPredicate(
                    key.substring(0, key.length() - 5),
                    ComparisonPredicate.Operator.LIKE,
                    value
                ));
            } else if (key.endsWith("_in") && value instanceof List<?> list) {
                predicates.add(new ComparisonPredicate(
                    key.substring(0, key.length() - 3),
                    ComparisonPredicate.Operator.IN,
                    list
                ));
            } else {
                predicates.add(new ComparisonPredicate(
                    key,
                    ComparisonPredicate.Operator.EQ,
                    value
                ));
            }
        }

        if (predicates.size() == 1) {
            return predicates.getFirst();
        }

        return new LogicalPredicate(LogicalPredicate.Operator.AND, predicates);
    }

    private Predicate parseExpression(String expression) {
        String upper = expression.toUpperCase();

        int andIndex = findLogicalOperator(upper, " AND ");
        if (andIndex > 0) {
            String left = expression.substring(0, andIndex);
            String right = expression.substring(andIndex + 5);
            return new LogicalPredicate(
                LogicalPredicate.Operator.AND,
                List.of(parseExpression(left), parseExpression(right))
            );
        }

        int orIndex = findLogicalOperator(upper, " OR ");
        if (orIndex > 0) {
            String left = expression.substring(0, orIndex);
            String right = expression.substring(orIndex + 4);
            return new LogicalPredicate(
                LogicalPredicate.Operator.OR,
                List.of(parseExpression(left), parseExpression(right))
            );
        }

        if (upper.startsWith("NOT ")) {
            return new LogicalPredicate(
                LogicalPredicate.Operator.NOT,
                List.of(parseExpression(expression.substring(4)))
            );
        }

        if (expression.startsWith("(") && expression.endsWith(")")) {
            return parseExpression(expression.substring(1, expression.length() - 1));
        }

        return parseComparison(expression);
    }

    private int findLogicalOperator(String expression, String operator) {
        int depth = 0;
        int index = 0;

        while (index < expression.length()) {
            char c = expression.charAt(index);
            if (c == '(') depth++;
            else if (c == ')') depth--;
            else if (depth == 0 && expression.substring(index).startsWith(operator)) {
                return index;
            }
            index++;
        }
        return -1;
    }

    private Predicate parseComparison(String expression) {
        Matcher matcher = COMPARISON_PATTERN.matcher(expression.trim());

        if (!matcher.matches()) {
            throw new IllegalArgumentException("Invalid expression: " + expression);
        }

        String column = matcher.group(1);
        String operatorStr = matcher.group(2).toUpperCase();
        String valueStr = matcher.group(3);

        ComparisonPredicate.Operator operator = switch (operatorStr) {
            case "=" -> ComparisonPredicate.Operator.EQ;
            case "!=", "<>" -> ComparisonPredicate.Operator.NE;
            case ">" -> ComparisonPredicate.Operator.GT;
            case ">=" -> ComparisonPredicate.Operator.GE;
            case "<" -> ComparisonPredicate.Operator.LT;
            case "<=" -> ComparisonPredicate.Operator.LE;
            case "LIKE" -> ComparisonPredicate.Operator.LIKE;
            case "IN" -> ComparisonPredicate.Operator.IN;
            case "IS NULL" -> ComparisonPredicate.Operator.IS_NULL;
            case "IS NOT NULL" -> ComparisonPredicate.Operator.IS_NOT_NULL;
            default -> throw new IllegalArgumentException("Unknown operator: " + operatorStr);
        };

        Object value = parseValue(valueStr, operator);

        return new ComparisonPredicate(column, operator, value);
    }

    private Object parseValue(String valueStr, ComparisonPredicate.Operator operator) {
        if (operator == ComparisonPredicate.Operator.IS_NULL || 
            operator == ComparisonPredicate.Operator.IS_NOT_NULL) {
            return null;
        }

        if (valueStr == null || valueStr.isBlank()) {
            return null;
        }

        valueStr = valueStr.trim();

        if (valueStr.startsWith("'") && valueStr.endsWith("'")) {
            return valueStr.substring(1, valueStr.length() - 1);
        }

        if (operator == ComparisonPredicate.Operator.IN) {
            if (valueStr.startsWith("(") && valueStr.endsWith(")")) {
                valueStr = valueStr.substring(1, valueStr.length() - 1);
            }
            return Arrays.stream(valueStr.split(","))
                .map(String::trim)
                .map(s -> s.startsWith("'") ? s.substring(1, s.length() - 1) : parseNumber(s))
                .toList();
        }

        return parseNumber(valueStr);
    }

    private Object parseNumber(String s) {
        try {
            if (s.contains(".")) {
                return Double.parseDouble(s);
            }
            return Long.parseLong(s);
        } catch (NumberFormatException e) {
            return s;
        }
    }
}

6.8 Parquet Query Service

// service/ParquetQueryService.java
package com.example.archivequery.service;

import com.example.archivequery.model.*;
import com.example.archivequery.predicate.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.*;

import java.io.IOException;
import java.net.URI;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;

@Service
public class ParquetQueryService {

    private static final Logger log = LoggerFactory.getLogger(ParquetQueryService.class);

    private final S3Client s3Client;
    private final Configuration hadoopConfig;
    private final PredicateParser predicateParser;
    private final ObjectMapper objectMapper;
    private final ExecutorService executorService;

    public ParquetQueryService(
        S3Client s3Client,
        Configuration hadoopConfig,
        PredicateParser predicateParser
    ) {
        this.s3Client = s3Client;
        this.hadoopConfig = hadoopConfig;
        this.predicateParser = predicateParser;
        this.objectMapper = new ObjectMapper();
        this.executorService = Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors()
        );
    }

    public QueryResponse query(QueryRequest request) {
        long startTime = System.currentTimeMillis();

        Predicate predicate = buildPredicate(request);
        PredicateEvaluator evaluator = new PredicateEvaluator(predicate);

        List<String> dataFiles = listDataFiles(request.archivePath());

        List<Map<String, Object>> allResults = Collections.synchronizedList(new ArrayList<>());
        long[] counters = new long[2];

        List<Future<?>> futures = new ArrayList<>();

        for (String dataFile : dataFiles) {
            futures.add(executorService.submit(() -> {
                try {
                    QueryFileResult result = queryFile(
                        dataFile, 
                        request.columns(), 
                        evaluator
                    );
                    synchronized (counters) {
                        counters[0] += result.matched();
                        counters[1] += result.scanned();
                    }
                    allResults.addAll(result.records());
                } catch (IOException e) {
                    log.error("Error reading file: " + dataFile, e);
                }
            }));
        }

        for (Future<?> future : futures) {
            try {
                future.get();
            } catch (InterruptedException | ExecutionException e) {
                log.error("Error waiting for query completion", e);
            }
        }

        List<Map<String, Object>> paginatedResults = allResults.stream()
            .skip(request.offset())
            .limit(request.limit())
            .collect(Collectors.toList());

        Map<String, String> schema = extractSchema(request.archivePath());

        long executionTime = System.currentTimeMillis() - startTime;

        return new QueryResponse(
            paginatedResults,
            counters[0],
            counters[1],
            executionTime,
            schema,
            request.archivePath()
        );
    }

    public ArchiveMetadata getMetadata(String archivePath) {
        try {
            URI uri = URI.create(archivePath);
            String bucket = uri.getHost();
            String prefix = uri.getPath().substring(1);

            GetObjectRequest getRequest = GetObjectRequest.builder()
                .bucket(bucket)
                .key(prefix + "/metadata/v1.metadata.json")
                .build();

            byte[] content = s3Client.getObjectAsBytes(getRequest).asByteArray();
            Map<String, Object> metadata = objectMapper.readValue(content, Map.class);

            List<Map<String, Object>> schemas = (List<Map<String, Object>>) metadata.get("schemas");
            Map<String, Object> currentSchema = schemas.getFirst();
            List<Map<String, Object>> fields = (List<Map<String, Object>>) currentSchema.get("fields");

            List<ArchiveMetadata.ColumnDefinition> columns = fields.stream()
                .map(f -> new ArchiveMetadata.ColumnDefinition(
                    ((Number) f.get("id")).intValue(),
                    (String) f.get("name"),
                    (String) f.get("type"),
                    (Boolean) f.get("required")
                ))
                .toList();

            Map<String, String> properties = (Map<String, String>) metadata.get("properties");

            String archivedAtStr = properties.get("archive.timestamp");
            Instant archivedAt = archivedAtStr != null ? 
                Instant.parse(archivedAtStr) : Instant.now();

            return new ArchiveMetadata(
                (String) metadata.get("table_uuid"),
                (String) metadata.get("location"),
                columns,
                properties,
                archivedAt
            );
        } catch (Exception e) {
            throw new RuntimeException("Failed to read archive metadata", e);
        }
    }

    private Predicate buildPredicate(QueryRequest request) {
        Predicate filterPredicate = predicateParser.fromFilters(request.filters());
        Predicate expressionPredicate = predicateParser.parse(request.filterExpression());

        if (filterPredicate == null) return expressionPredicate;
        if (expressionPredicate == null) return filterPredicate;

        return new LogicalPredicate(
            LogicalPredicate.Operator.AND,
            List.of(filterPredicate, expressionPredicate)
        );
    }

    private List<String> listDataFiles(String archivePath) {
        URI uri = URI.create(archivePath);
        String bucket = uri.getHost();
        String prefix = uri.getPath().substring(1) + "/data/";

        ListObjectsV2Request listRequest = ListObjectsV2Request.builder()
            .bucket(bucket)
            .prefix(prefix)
            .build();

        List<String> files = new ArrayList<>();
        ListObjectsV2Response response;

        do {
            response = s3Client.listObjectsV2(listRequest);

            for (S3Object obj : response.contents()) {
                if (obj.key().endsWith(".parquet")) {
                    files.add("s3a://" + bucket + "/" + obj.key());
                }
            }

            listRequest = listRequest.toBuilder()
                .continuationToken(response.nextContinuationToken())
                .build();

        } while (response.isTruncated());

        return files;
    }

    private record QueryFileResult(
        List<Map<String, Object>> records,
        long matched,
        long scanned
    ) {}

    private QueryFileResult queryFile(
        String filePath,
        List<String> columns,
        PredicateEvaluator evaluator
    ) throws IOException {
        List<Map<String, Object>> results = new ArrayList<>();
        long matched = 0;
        long scanned = 0;

        Path path = new Path(filePath);
        HadoopInputFile inputFile = HadoopInputFile.fromPath(path, hadoopConfig);

        try (ParquetReader<GenericRecord> reader = AvroParquetReader
                .<GenericRecord>builder(inputFile)
                .withConf(hadoopConfig)
                .build()) {

            GenericRecord record;
            while ((record = reader.read()) != null) {
                scanned++;
                Map<String, Object> recordMap = recordToMap(record);

                if (evaluator.matches(recordMap)) {
                    matched++;

                    if (columns != null && !columns.isEmpty()) {
                        Map<String, Object> projected = new LinkedHashMap<>();
                        for (String col : columns) {
                            if (recordMap.containsKey(col)) {
                                projected.put(col, recordMap.get(col));
                            }
                        }
                        results.add(projected);
                    } else {
                        results.add(recordMap);
                    }
                }
            }
        }

        return new QueryFileResult(results, matched, scanned);
    }

    private Map<String, Object> recordToMap(GenericRecord record) {
        Map<String, Object> map = new LinkedHashMap<>();

        for (org.apache.avro.Schema.Field field : record.getSchema().getFields()) {
            String name = field.name();
            Object value = record.get(name);
            map.put(name, convertValue(value));
        }

        return map;
    }

    private Object convertValue(Object value) {
        if (value == null) return null;

        if (value instanceof org.apache.avro.util.Utf8) {
            return value.toString();
        }

        if (value instanceof org.apache.avro.generic.GenericRecord nested) {
            return recordToMap(nested);
        }

        if (value instanceof java.nio.ByteBuffer buffer) {
            byte[] bytes = new byte[buffer.remaining()];
            buffer.get(bytes);
            return Base64.getEncoder().encodeToString(bytes);
        }

        if (value instanceof Integer i) {
            return LocalDate.ofEpochDay(i);
        }

        if (value instanceof Long l && l > 1_000_000_000_000L) {
            return LocalDateTime.ofInstant(
                Instant.ofEpochMilli(l / 1000), 
                ZoneOffset.UTC
            );
        }

        return value;
    }

    private Map<String, String> extractSchema(String archivePath) {
        try {
            ArchiveMetadata metadata = getMetadata(archivePath);
            return metadata.columns().stream()
                .collect(Collectors.toMap(
                    ArchiveMetadata.ColumnDefinition::name,
                    ArchiveMetadata.ColumnDefinition::type,
                    (a, b) -> a,
                    LinkedHashMap::new
                ));
        } catch (Exception e) {
            log.warn("Failed to extract schema", e);
            return Collections.emptyMap();
        }
    }
}

6.9 REST Controller

// controller/ArchiveQueryController.java
package com.example.archivequery.controller;

import com.example.archivequery.model.*;
import com.example.archivequery.service.ParquetQueryService;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.List;
import java.util.Map;

@RestController
@RequestMapping("/api/v1/archives")
public class ArchiveQueryController {

    private final ParquetQueryService queryService;

    public ArchiveQueryController(ParquetQueryService queryService) {
        this.queryService = queryService;
    }

    @PostMapping("/query")
    public ResponseEntity<QueryResponse> query(@RequestBody QueryRequest request) {
        QueryResponse response = queryService.query(request);
        return ResponseEntity.ok(response);
    }

    @GetMapping("/query")
    public ResponseEntity<QueryResponse> queryGet(
        @RequestParam String archivePath,
        @RequestParam(required = false) List<String> columns,
        @RequestParam(required = false) String filter,
        @RequestParam(defaultValue = "1000") Integer limit,
        @RequestParam(defaultValue = "0") Integer offset
    ) {
        QueryRequest request = new QueryRequest(
            archivePath,
            columns,
            null,
            filter,
            limit,
            offset
        );

        QueryResponse response = queryService.query(request);
        return ResponseEntity.ok(response);
    }

    @GetMapping("/metadata")
    public ResponseEntity<ArchiveMetadata> getMetadata(
        @RequestParam String archivePath
    ) {
        ArchiveMetadata metadata = queryService.getMetadata(archivePath);
        return ResponseEntity.ok(metadata);
    }

    @GetMapping("/schema/{schema}/{table}/{partitionValue}")
    public ResponseEntity<ArchiveMetadata> getMetadataByTable(
        @PathVariable String schema,
        @PathVariable String table,
        @PathVariable String partitionValue,
        @RequestParam String bucket,
        @RequestParam(defaultValue = "aurora-archives") String prefix
    ) {
        String archivePath = String.format(
            "s3://%s/%s/%s/%s/%s",
            bucket, prefix, schema, table, partitionValue
        );

        ArchiveMetadata metadata = queryService.getMetadata(archivePath);
        return ResponseEntity.ok(metadata);
    }

    @PostMapping("/query/{schema}/{table}/{partitionValue}")
    public ResponseEntity<QueryResponse> queryByTable(
        @PathVariable String schema,
        @PathVariable String table,
        @PathVariable String partitionValue,
        @RequestParam String bucket,
        @RequestParam(defaultValue = "aurora-archives") String prefix,
        @RequestBody Map<String, Object> requestBody
    ) {
        String archivePath = String.format(
            "s3://%s/%s/%s/%s/%s",
            bucket, prefix, schema, table, partitionValue
        );

        List<String> columns = requestBody.containsKey("columns") ?
            (List<String>) requestBody.get("columns") : null;

        Map<String, Object> filters = requestBody.containsKey("filters") ?
            (Map<String, Object>) requestBody.get("filters") : null;

        String filterExpression = requestBody.containsKey("filterExpression") ?
            (String) requestBody.get("filterExpression") : null;

        Integer limit = requestBody.containsKey("limit") ?
            ((Number) requestBody.get("limit")).intValue() : 1000;

        Integer offset = requestBody.containsKey("offset") ?
            ((Number) requestBody.get("offset")).intValue() : 0;

        QueryRequest request = new QueryRequest(
            archivePath,
            columns,
            filters,
            filterExpression,
            limit,
            offset
        );

        QueryResponse response = queryService.query(request);
        return ResponseEntity.ok(response);
    }
}

6.10 Application Properties

# application.yml
server:
  port: 8080

aws:
  region: ${AWS_REGION:eu-west-1}

spring:
  application:
    name: archive-query-service

logging:
  level:
    com.example.archivequery: DEBUG
    org.apache.parquet: WARN
    org.apache.hadoop: WARN

7. Usage Examples

7.1 Archive a Partition

# Set environment variables
export AURORA_HOST=your-cluster.cluster-xxxx.eu-west-1.rds.amazonaws.com
export AURORA_DATABASE=production
export AURORA_USER=admin
export AURORA_PASSWORD=your-password
export ARCHIVE_S3_BUCKET=my-archive-bucket
export ARCHIVE_S3_PREFIX=aurora-archives
export AWS_REGION=eu-west-1

# Archive January 2024 transactions
python archive_partition.py \
    --table transactions \
    --partition-column transaction_month \
    --partition-value 2024-01 \
    --schema public \
    --batch-size 100000

7.2 Query Archived Data via API

# Get archive metadata
curl -X GET "http://localhost:8080/api/v1/archives/metadata?archivePath=s3://my-archive-bucket/aurora-archives/public/transactions/transaction_month=2024-01"

# Query with filters (POST)
curl -X POST "http://localhost:8080/api/v1/archives/query" \
    -H "Content-Type: application/json" \
    -d '{
        "archivePath": "s3://my-archive-bucket/aurora-archives/public/transactions/transaction_month=2024-01",
        "columns": ["transaction_id", "customer_id", "amount", "transaction_date"],
        "filters": {
            "amount_gte": 1000,
            "status": "completed"
        },
        "limit": 100,
        "offset": 0
    }'

# Query with expression filter (GET)
curl -X GET "http://localhost:8080/api/v1/archives/query" \
    --data-urlencode "archivePath=s3://my-archive-bucket/aurora-archives/public/transactions/transaction_month=2024-01" \
    --data-urlencode "filter=amount >= 1000 AND status = 'completed'" \
    --data-urlencode "columns=transaction_id,customer_id,amount" \
    --data-urlencode "limit=50"

7.3 Restore Archived Data

# Restore to staging table
python restore_partition.py \
    --source-path s3://my-archive-bucket/aurora-archives/public/transactions/transaction_month=2024-01 \
    --target-table transactions_staging \
    --target-schema public \
    --batch-size 10000

7.4 Migrate to Main Table

-- Validate before migration
SELECT * FROM generate_migration_report(
    'public', 'transactions_staging',
    'public', 'transactions',
    'transaction_month', '2024-01'
);

-- Migrate data
CALL migrate_partition_data(
    'public', 'transactions_staging',
    'public', 'transactions',
    'transaction_month', '2024-01',
    TRUE,    -- delete existing data in partition
    50000    -- batch size
);

-- Clean up staging table
CALL cleanup_after_migration(
    'public', 'transactions_staging',
    'public', 'transactions',
    TRUE     -- verify counts
);

8. Operational Considerations

8.1 Cost Analysis

The cost savings from this approach are significant:

Storage TypeMonthly Cost per TB
Aurora PostgreSQL$230
S3 Standard$23
S3 Intelligent Tiering$23 (hot) to $4 (archive)
S3 Glacier Instant Retrieval$4

For a 10TB historical dataset:

  • Aurora: $2,300/month
  • S3 with Parquet (7:1 compression): ~$33/month
  • Savings: ~98.5%

8.2 Query Performance

The Spring Boot API performance depends on:

  1. Data file size: Smaller files (100MB to 250MB) enable better parallelism
  2. Predicate selectivity: Higher selectivity means fewer records to deserialise
  3. Column projection: Requesting fewer columns reduces I/O
  4. S3 latency: Use same region as the bucket

Expected performance for a 1TB partition (compressed to ~150GB Parquet):

Query TypeTypical Latency
Point lookup (indexed column)500ms to 2s
Range scan (10% selectivity)5s to 15s
Full scan with aggregation30s to 60s

8.3 Monitoring and Alerting

Implement these CloudWatch metrics for production use:

@Component
public class QueryMetrics {

    private final MeterRegistry meterRegistry;

    public void recordQuery(QueryResponse response) {
        meterRegistry.counter("archive.query.count").increment();

        meterRegistry.timer("archive.query.duration")
            .record(response.executionTimeMs(), TimeUnit.MILLISECONDS);

        meterRegistry.gauge("archive.query.records_scanned", response.totalScanned());
        meterRegistry.gauge("archive.query.records_matched", response.totalMatched());
    }
}

9. Conclusion

This solution provides a complete data lifecycle management approach for large Aurora PostgreSQL tables. The archive script efficiently exports partitions to the cost effective Iceberg/Parquet format on S3, while the restore script enables seamless data recovery when needed. The Spring Boot API bridges the gap by allowing direct queries against archived data, eliminating the need for restoration in many analytical scenarios.

Key benefits:

  1. Cost reduction: 90 to 98 percent storage cost savings compared to keeping data in Aurora
  2. Operational flexibility: Query archived data without restoration
  3. Schema preservation: Full schema metadata maintained for reliable restores
  4. Partition management: Clean attach/detach operations for partitioned tables
  5. Predicate pushdown: Efficient filtering reduces data transfer and processing

The Iceberg format ensures compatibility with the broader data ecosystem, allowing tools like Athena, Spark, and Trino to query the same archived data when needed for more complex analytical workloads.

0
0

Deep Dive into PostgreSQL Prepared Statements: When Plan Caching Goes Wrong leading to Memory Exhaustion

Prepared statements are one of PostgreSQL’s most powerful features for query optimization. By parsing and planning queries once, then reusing those plans for subsequent executions, they can dramatically improve performance. But this optimization comes with a hidden danger: sometimes caching the same plan for every execution can lead to catastrophic memory exhaustion and performance degradation.

In this deep dive, we’ll explore how prepared statement plan caching works, when it fails spectacularly, and how PostgreSQL has evolved to address these challenges.

1. Understanding Prepared Statements and Plan Caching

When you execute a prepared statement in PostgreSQL, the database goes through several phases:

  1. Parsing: Converting the SQL text into a parse tree
  2. Planning: Creating an execution plan based on statistics and parameters
  3. Execution: Running the plan against actual data

The promise of prepared statements is simple: do steps 1 and 2 once, then reuse the results for repeated executions with different parameter values.

-- Prepare the statement
PREPARE get_orders AS
SELECT * FROM orders WHERE customer_id = $1;

-- Execute multiple times with different parameters
EXECUTE get_orders(123);
EXECUTE get_orders(456);
EXECUTE get_orders(789);

PostgreSQL uses a clever heuristic to decide when to cache plans. For the first five executions, it creates a custom plan specific to the parameter values. Starting with the sixth execution, it evaluates whether a generic plan (one that works for any parameter value) would be more efficient. If the average cost of the custom plans is close enough to the generic plan’s cost, PostgreSQL switches to reusing the generic plan.

2. The Dark Side: Memory Exhaustion from Plan Caching

Here’s where things can go catastrophically wrong. Consider a partitioned table:

CREATE TABLE events (
    id BIGSERIAL,
    event_date DATE,
    user_id INTEGER,
    event_type TEXT,
    data JSONB
) PARTITION BY RANGE (event_date);

-- Create 365 partitions, one per day
CREATE TABLE events_2024_01_01 PARTITION OF events
    FOR VALUES FROM ('2024-01-01') TO ('2024-01-02');
CREATE TABLE events_2024_01_02 PARTITION OF events
    FOR VALUES FROM ('2024-01-02') TO ('2024-01-03');
-- ... 363 more partitions

Now consider this prepared statement:

PREPARE get_events AS
SELECT * FROM events WHERE event_date = $1;

The Problem: Generic Plans Can’t Prune Partitions

When PostgreSQL creates a generic plan for this query, it doesn’t know which specific date you’ll query at execution time. Without this knowledge, the planner cannot perform partition pruning the critical optimization that eliminates irrelevant partitions from consideration.

Here’s what happens:

  1. Custom plan (first 5 executions): PostgreSQL sees the actual date value, realizes only one partition is relevant, and creates a plan that touches only that partition. Fast and efficient.
  2. Generic plan (6th execution onward): PostgreSQL creates a plan that must be valid for ANY date value. Since it can’t know which partition you’ll need, it includes ALL 365 partitions in the plan.

The result: Instead of reading from 1 partition, PostgreSQL’s generic plan prepares to read from all 365 partitions. This leads to:

  • Memory exhaustion: The query plan itself becomes enormous, containing nodes for every partition
  • Planning overhead: Even though the plan is cached, initializing it for execution requires allocating memory for all partition nodes
  • Execution inefficiency: The executor must check every partition, even though 364 of them will return zero rows

In extreme cases with thousands of partitions, this can consume gigabytes of memory per connection and bring your database to its knees.

3. Partition Pruning: The Critical Optimization and How It Works

Partition pruning is the process of eliminating partitions that cannot possibly contain relevant data based on query constraints. Understanding partition pruning in depth is essential for working with partitioned tables effectively.

3.1 What Is Partition Pruning?

At its core, partition pruning is PostgreSQL’s mechanism for avoiding unnecessary work. When you query a partitioned table, the database analyzes your WHERE clause and determines which partitions could possibly contain matching rows. All other partitions are excluded from the query execution entirely.

Consider a table partitioned by date range:

CREATE TABLE sales (
    sale_id BIGINT,
    sale_date DATE,
    amount NUMERIC,
    product_id INTEGER
) PARTITION BY RANGE (sale_date);

CREATE TABLE sales_2023_q1 PARTITION OF sales
    FOR VALUES FROM ('2023-01-01') TO ('2023-04-01');
    
CREATE TABLE sales_2023_q2 PARTITION OF sales
    FOR VALUES FROM ('2023-04-01') TO ('2023-07-01');
    
CREATE TABLE sales_2023_q3 PARTITION OF sales
    FOR VALUES FROM ('2023-07-01') TO ('2023-10-01');
    
CREATE TABLE sales_2023_q4 PARTITION OF sales
    FOR VALUES FROM ('2023-10-01') TO ('2024-01-01');

When you execute:

SELECT * FROM sales WHERE sale_date = '2023-05-15';

PostgreSQL performs partition pruning by examining the partition constraints. It determines that only sales_2023_q2 can contain rows with sale_date = ‘2023-05-15’, so it completely ignores the other three partitions. They never get opened, scanned, or loaded into memory.

3.2 The Two Stages of Partition Pruning

PostgreSQL performs partition pruning at two distinct stages in query execution, and understanding the difference is crucial for troubleshooting performance issues.

Stage 1: Plan Time Pruning (Static Pruning)

Plan time pruning happens during the query planning phase, before execution begins. This is the ideal scenario because pruned partitions never appear in the execution plan at all.

When it occurs:

  • The query contains literal values in the WHERE clause
  • The partition key columns are directly compared to constants
  • The planner can evaluate the partition constraints at planning time

Example:

EXPLAIN SELECT * FROM sales WHERE sale_date = '2023-05-15';

Output might show:

Seq Scan on sales_2023_q2 sales
  Filter: (sale_date = '2023-05-15'::date)

Notice that only one partition appears in the plan. The other three partitions were pruned away during planning, and they consume zero resources.

What makes plan time pruning possible:

The planner evaluates the WHERE clause condition against each partition’s constraint. For sales_2023_q2, the constraint is:

sale_date >= '2023-04-01' AND sale_date < '2023-07-01'

The planner performs boolean logic: “Can sale_date = ‘2023-05-15’ be true if the constraint requires sale_date >= ‘2023-04-01’ AND sale_date < ‘2023-07-01’?” Yes, it can. For the other partitions, the answer is no, so they’re eliminated.

Performance characteristics:

  • No runtime overhead for pruned partitions
  • Minimal memory usage
  • Optimal query performance
  • The execution plan is lean and specific

Stage 2: Execution Time Pruning (Dynamic Pruning)

Execution time pruning, also called runtime pruning, happens during query execution rather than planning. This occurs when the planner cannot determine which partitions to prune until the query actually runs.

When it occurs:

  • Parameters or variables are used instead of literal values
  • Subqueries provide the filter values
  • Join conditions determine which partitions are needed
  • Prepared statements with parameters

Example:

PREPARE get_sales AS 
SELECT * FROM sales WHERE sale_date = $1;

EXPLAIN (ANALYZE) EXECUTE get_sales('2023-05-15');

With execution time pruning, the plan initially includes all partitions, but the output shows:

Append (actual rows=100)
  Subplans Removed: 3
  -> Seq Scan on sales_2023_q2 sales (actual rows=100)
       Filter: (sale_date = '2023-05-15'::date)

The key indicator is “Subplans Removed: 3”, which tells you that three partitions were pruned at execution time.

How execution time pruning works:

During the initialization phase of query execution, PostgreSQL evaluates the actual parameter values and applies the same constraint checking logic as plan time pruning. However, instead of eliminating partitions from the plan, it marks them as “pruned” and skips their initialization and execution.

The critical difference:

Even though execution time pruning skips scanning the pruned partitions, the plan still contains nodes for all partitions. This means:

  • Memory is allocated for all partition nodes (though less than full initialization)
  • The plan structure is larger
  • There is a small runtime cost to check each partition
  • More complex bookkeeping is required

This is why execution time pruning, while much better than no pruning, is not quite as efficient as plan time pruning.

3.3 Partition Pruning with Different Partition Strategies

PostgreSQL supports multiple partitioning strategies, and pruning works differently for each.

Range Partitioning

Range partitioning is the most common and supports the most effective pruning:

CREATE TABLE measurements (
    measurement_time TIMESTAMPTZ,
    sensor_id INTEGER,
    value NUMERIC
) PARTITION BY RANGE (measurement_time);

Pruning logic: PostgreSQL uses range comparison. Given a filter like measurement_time >= '2024-01-01' AND measurement_time < '2024-02-01', it identifies all partitions whose ranges overlap with this query range.

Pruning effectiveness: Excellent. Range comparisons are computationally cheap and highly selective.

List Partitioning

List partitioning groups rows by discrete values:

CREATE TABLE orders (
    order_id BIGINT,
    country_code TEXT,
    amount NUMERIC
) PARTITION BY LIST (country_code);

CREATE TABLE orders_us PARTITION OF orders
    FOR VALUES IN ('US');
    
CREATE TABLE orders_uk PARTITION OF orders
    FOR VALUES IN ('UK');
    
CREATE TABLE orders_eu PARTITION OF orders
    FOR VALUES IN ('DE', 'FR', 'IT', 'ES');

Pruning logic: PostgreSQL checks if the query value matches any value in each partition’s list.

SELECT * FROM orders WHERE country_code = 'FR';

Only orders_eu is accessed because ‘FR’ appears in its value list.

Pruning effectiveness: Very good for equality comparisons. Less effective for OR conditions across many values or pattern matching.

Hash Partitioning

Hash partitioning distributes rows using a hash function:

CREATE TABLE users (
    user_id BIGINT,
    username TEXT,
    email TEXT
) PARTITION BY HASH (user_id);

CREATE TABLE users_p0 PARTITION OF users
    FOR VALUES WITH (MODULUS 4, REMAINDER 0);
    
CREATE TABLE users_p1 PARTITION OF users
    FOR VALUES WITH (MODULUS 4, REMAINDER 1);
-- ... p2, p3

Pruning logic: PostgreSQL computes the hash of the query value and determines which partition it maps to.

SELECT * FROM users WHERE user_id = 12345;

PostgreSQL calculates hash(12345) % 4 and accesses only the matching partition.

Pruning effectiveness: Excellent for equality on the partition key. Completely ineffective for range queries, pattern matching, or anything except exact equality matches.

3.4 Complex Partition Pruning Scenarios

Real world queries are often more complex than simple equality comparisons. Here’s how pruning handles various scenarios:

Multi Column Partition Keys

CREATE TABLE events (
    event_date DATE,
    region TEXT,
    data JSONB
) PARTITION BY RANGE (event_date, region);

Pruning works on the leading columns of the partition key. A query filtering only on event_date can still prune effectively. A query filtering only on region cannot prune at all because region is not the leading column.

OR Conditions

SELECT * FROM sales 
WHERE sale_date = '2023-05-15' OR sale_date = '2023-08-20';

PostgreSQL must access partitions for both dates (Q2 and Q3), so it keeps both and prunes Q1 and Q4. OR conditions reduce pruning effectiveness.

Inequality Comparisons

SELECT * FROM sales WHERE sale_date >= '2023-05-01';

PostgreSQL prunes partitions entirely before the date (Q1) but must keep all partitions from Q2 onward. Range queries reduce pruning selectivity.

Joins Between Partitioned Tables

SELECT * FROM sales s
JOIN products p ON s.product_id = p.product_id
WHERE s.sale_date = '2023-05-15';

If sales is partitioned by sale_date, that partition pruning works normally. If products is also partitioned, PostgreSQL attempts partitionwise joins where possible, enabling pruning on both sides.

Subqueries Providing Values

SELECT * FROM sales 
WHERE sale_date = (SELECT MAX(order_date) FROM orders);

This requires execution time pruning because the subquery must run before PostgreSQL knows which partition to access.

3.5 Monitoring Partition Pruning

To verify partition pruning is working, use EXPLAIN:

EXPLAIN (ANALYZE, BUFFERS) 
SELECT * FROM sales WHERE sale_date = '2023-05-15';

What to look for:

Plan time pruning succeeded:

Seq Scan on sales_2023_q2

Only one partition appears in the plan at all.

Execution time pruning succeeded:

Append
  Subplans Removed: 3
  -> Seq Scan on sales_2023_q2

All partitions appear in the plan structure, but “Subplans Removed” shows pruning happened.

No pruning occurred:

Append
  -> Seq Scan on sales_2023_q1
  -> Seq Scan on sales_2023_q2
  -> Seq Scan on sales_2023_q3
  -> Seq Scan on sales_2023_q4

All partitions were scanned. This indicates a problem.

3.6 Why Partition Pruning Fails

Understanding why pruning fails helps you fix it:

  1. Query doesn’t filter on partition key: If your WHERE clause doesn’t reference the partition column(s), PostgreSQL cannot prune.
  2. Function calls on partition key: WHERE EXTRACT(YEAR FROM sale_date) = 2023 prevents pruning because PostgreSQL can’t map the function result back to partition ranges. Use WHERE sale_date >= '2023-01-01' AND sale_date < '2024-01-01' instead.
  3. Type mismatches: If your partition key is DATE but you compare to TEXT without explicit casting, pruning may fail.
  4. Generic plans in prepared statements: As discussed in the main article, generic plans prevent plan time pruning and older PostgreSQL versions struggled with execution time pruning.
  5. OR conditions with non-partition columns: WHERE sale_date = '2023-05-15' OR customer_id = 100 prevents pruning because customer_id isn’t the partition key.
  6. Volatile functions: WHERE sale_date = CURRENT_DATE may prevent plan time pruning (but should work with execution time pruning).

3.7 Partition Pruning Performance Impact

The performance difference between pruned and unpruned queries can be staggering:

Example scenario: 1000 partitions, each with 1 million rows. Query targets one partition.

With pruning:

  • Partitions opened: 1
  • Rows scanned: 1 million
  • Memory for plan nodes: ~10KB
  • Query time: 50ms

Without pruning:

  • Partitions opened: 1000
  • Rows scanned: 1 billion (returning ~1 million)
  • Memory for plan nodes: ~10MB
  • Query time: 45 seconds

The difference is not incremental; it’s exponential as partition count grows.

4. Partition Pruning in Prepared Statements: The Core Problem

Let me illustrate the severity with a real-world scenario:

-- Table with 1000 partitions
CREATE TABLE metrics (
    timestamp TIMESTAMPTZ,
    metric_name TEXT,
    value NUMERIC
) PARTITION BY RANGE (timestamp);

-- Create 1000 daily partitions...

-- Prepared statement
PREPARE get_metrics AS
SELECT * FROM metrics 
WHERE timestamp >= $1 AND timestamp < $2;

After the 6th execution, PostgreSQL switches to a generic plan. Each subsequent execution:

  1. Allocates memory for 1000 partition nodes
  2. Initializes executor state for 1000 partitions
  3. Checks 1000 partition constraints
  4. Returns data from just 1-2 partitions

If you have 100 connections each executing this prepared statement, you’re multiplying this overhead by 100. With connection poolers reusing connections (and thus reusing prepared statements), the problem compounds.

6. The Fix: Evolution Across PostgreSQL Versions

PostgreSQL has steadily improved partition pruning for prepared statements:

PostgreSQL 11: Execution-Time Pruning Introduced

PostgreSQL 11 introduced run-time partition pruning, but it had significant limitations with prepared statements. Generic plans still included all partitions in memory, even if they could be skipped during execution.

PostgreSQL 12: Better Prepared Statement Pruning

PostgreSQL 12 made substantial improvements:

  • Generic plans gained the ability to defer partition pruning to execution time more effectively
  • The planner became smarter about when to use generic vs. custom plans for partitioned tables
  • Memory consumption for generic plans improved significantly

However, issues remained in edge cases, particularly with: • Multi level partitioning • Complex join queries involving partitioned tables • Prepared statements in stored procedures

PostgreSQL 13-14: Refined Heuristics

These versions improved the cost model for deciding between custom and generic plans:

  • Better accounting for partition pruning benefits in the cost calculation
  • More accurate statistics gathering on partitioned tables
  • Improved handling of partitionwise joins

PostgreSQL 15-16: The Real Game Changers

PostgreSQL 15 and 16 brought transformative improvements:

PostgreSQL 15:

  • Dramatically reduced memory usage for generic plans on partitioned tables
  • Improved execution-time pruning performance
  • Better handling of prepared statements with partition pruning

PostgreSQL 16:

  • Introduced incremental sorting improvements that benefit partitioned queries
  • Enhanced partition-wise aggregation
  • More aggressive execution-time pruning

The key breakthrough: PostgreSQL now builds “stub” plans that allocate minimal memory for partitions that will be pruned, rather than fully initializing all partition nodes.

Workarounds for Older Versions

If you’re stuck on older PostgreSQL versions, here are strategies to avoid the prepared statement pitfall:

1. Disable Generic Plans

Force PostgreSQL to always use custom plans:

-- Set at session level
SET plan_cache_mode = force_custom_plan;

-- Or for specific prepared statement contexts
PREPARE get_events AS
SELECT * FROM events WHERE event_date = $1;

-- Before execution
SET LOCAL plan_cache_mode = force_custom_plan;
EXECUTE get_events('2024-06-15');

This sacrifices the planning time savings but ensures proper partition pruning.

2. Use Statement Level Caching Instead

Many ORMs and database drivers offer statement level caching that doesn’t persist across multiple executions:

# psycopg2 example - named cursors create server-side cursors
# but don't persist plans
cursor = connection.cursor()
cursor.execute(
    "SELECT * FROM events WHERE event_date = %s",
    (date_value,)
)

3. Adjust plan_cache_mode Per Query

PostgreSQL 12+ provides plan_cache_mode:

-- auto (default): use PostgreSQL's heuristics
-- force_generic_plan: always use generic plan
-- force_custom_plan: always use custom plan

SET plan_cache_mode = force_custom_plan;

For partitioned tables, force_custom_plan is often the right choice.

4. Increase Custom Plan Count

The threshold of 5 custom plans before switching to generic is hardcoded, but you can work around it by using different prepared statement names or by periodically deallocating and recreating prepared statements:

DEALLOCATE get_events;
PREPARE get_events AS SELECT * FROM events WHERE event_date = $1;

5. Partition Pruning Hints

In PostgreSQL 12+, you can sometimes coerce the planner into better behavior:

-- Using an explicit constraint that helps the planner
SELECT * FROM events 
WHERE event_date = $1 
  AND event_date >= CURRENT_DATE - INTERVAL '1 year';

This additional constraint provides a hint about the parameter range.

Best Practices

  1. Monitor your query plans: Use EXPLAIN (ANALYZE, BUFFERS) to check if partition pruning is happening:
EXPLAIN (ANALYZE, BUFFERS) EXECUTE get_events('2024-06-15');

Look for “Partitions removed” in the output.

  1. Check prepared statement statistics: Query pg_prepared_statements to see generic vs. custom plan usage:
SELECT name, 
       generic_plans, 
       custom_plans 
FROM pg_prepared_statements;
  1. Upgrade PostgreSQL: If you’re dealing with large partitioned tables, the improvements in PostgreSQL 15+ are worth the upgrade effort.
  2. Design partitions appropriately: Don’t over-partition. Having 10,000 tiny partitions creates problems even with perfect pruning.
  3. Use connection pooling wisely: Prepared statements persist per connection. With connection pooling, long-lived connections accumulate many prepared statements. Configure your pooler to occasionally recycle connections.
  4. Benchmark both modes: Test your specific workload with both custom and generic plans to measure the actual impact.

Conclusion

Prepared statements are a powerful optimization, but their interaction with partitioned tables exposes a fundamental tension: caching for reuse versus specificity for efficiency. PostgreSQL’s evolution from version 11 through 16 represents a masterclass in addressing this challenge.

The key takeaway: if you’re using prepared statements with partitioned tables on PostgreSQL versions older than 15, be vigilant about plan caching behavior. Monitor memory usage, check execution plans, and don’t hesitate to force custom plans when generic plans cause problems.

For modern PostgreSQL installations (15+), the improvements are substantial enough that the traditional guidance of “be careful with prepared statements on partitioned tables” is becoming outdated. The database now handles these scenarios with far more intelligence and efficiency.

But understanding the history and mechanics remains crucial, because the next time you see mysterious memory growth in your PostgreSQL connections, you’ll know exactly where to look.

0
0

AWS: Install and configure the AWS CLI on a Macbook

You can absolutely get the following from the AWS help pages; but this is the lazy way to get everything you need for a simple single account setup.

Run the two commands below to drop the package on your Mac.

$ curl "https://awscli.amazonaws.com/AWSCLIV2.pkg" -o "AWSCLIV2.pkg"
$ sudo installer -pkg AWSCLIV2.pkg -target /

Then check the versions you have installed:

$ which aws
$ aws --version

Next you need to setup your environment. Note: This is NOT the recommended way (as it uses long term credentials).

The following example configures a default profile using sample values. Replace them with your own values as described in the following sections.

$ aws configure
AWS Access Key ID [None]: AKIAIOSFODNN7EXAMPLE
AWS Secret Access Key [None]: secretaccesskey
Default region name [None]: af-south-1
Default output format [None]: json

You can also use named profiles. The following example configures a profile named userprod using sample values. Replace them with your own values as described in the following sections.

$ aws configure --profile userprod
AWS Access Key ID [None]: AKIAIOSFODNN7EXAMPLE
AWS Secret Access Key [None]: secretaccesskey
Default region name [None]: af-south-1
Default output format [None]: json

Get your access keys

  1. Sign in to the AWS Management Console and open the IAM console at https://console.aws.amazon.com/iam/.
  2. In the navigation pane of the IAM console, select Users and then select the User name of the user that you created previously.
  3. On the user’s page, select the Security credentials page. Then, under Access keys, select Create access key.
  4. For Create access key Step 1, choose Command Line Interface (CLI).
  5. For Create access key Step 2, enter an optional tag and select Next.
  6. For Create access key Step 3, select Download .csv file to save a .csv file with your IAM user’s access key and secret access key. You need this information for later.
  7. Select Done.
0
0

AWS: Automatically Stop and Start your EC2 Services

Below is a quick (am busy) outline on how to automatically stop and start your EC2 instances.

Step 1: Tag your resources

In order to decide which instances stop and start you first need to add an auto-start-stop: Yes tag to all the instances you want to be affected by the start / stop functions. Note: You can use “Resource Groups and Tag Editor” to bulk apply these tags to the resources you want to be affected by the lambda functions you are going to create. See below (click the orange button called “Manage tags of Selected Resources”).

Step 2: Create a new role for our lambda functions

First we need to create the IAM role to run the Lambda functions. Go to IAM and click the “Create Role” button. Then select “AWS Service” from the “Trusted entity options”, and select Lambda from the “Use Cases” options. Then click “Next”, followed by “Create Policy”. To specify the permission, simply Click the JSON button on the right of the screen and enter the below policy (swapping the region and account id for your region and account id):

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "ec2:DescribeInstances",
                "ec2:StartInstances",
                "ec2:DescribeTags",
                "logs:*",
                "ec2:DescribeInstanceTypes",
                "ec2:StopInstances",
                "ec2:DescribeInstanceStatus"
            ],
            "Resource": "arn:aws:ec2:<region>:<accountID>:instance/*",
            "Condition": {
                "StringEquals": {
                    "aws:ResourceTag/auto-start-stop": "Yes"
                }
            }
        }
    ]
}

Hit next and under “Review and create”, save the above policy as ec2-lambda-start-stop by clicking the “Create Policy” button. Next, search for this newly created policy and select it as per below and hit “Next”.

You will now see the “Name, review, and create” screen. Here you simply need to hit “Create Role” after you enter the role name as ec2-lambda-start-stop-role.

Note the policy is restricted to only have access to EC2 instances that contains auto-start-stop: Yes tags (least privileges).

If you want to review your role, this is how it should look. You can see I have filled in my region and account number in the policy:

Step 3: Create Lambda Functions To Start/Stop EC2 Instances

In this section we will create two lambda functions, one to start the instances and the other to stop the instances.

Step 3a: Add the Stop EC2 instance function

  • Goto Lambda console and click on create function
  • Create a lambda function with a function name of stop-ec2-instance-lambda, python3.11 runtime, and ec2-lambda-stop-start-role (see image below).

Next add the lamdba stop function and save it as stop-ec2-instance. Note, you will need to change the value of the region_name parameter accordingly.

import json
import boto3

ec2 = boto3.resource('ec2', region_name='af-south-1')
def lambda_handler(event, context):
   instances = ec2.instances.filter(Filters=[{'Name': 'instance-state-name', 'Values': ['running']},{'Name': 'tag:auto-start-stop','Values':['Yes']}])
   for instance in instances:
       id=instance.id
       ec2.instances.filter(InstanceIds=[id]).stop()
       print("Instance ID is stopped:- "+instance.id)
   return "success"

This is how your Lambda function should look:

Step 3b: Add the Start EC2 instance function

  • Goto Lambda console and click on create function
  • Create lambda functions with start-ec2-instance, python3.11 runtime, and ec2-lambda-stop-start-role.
  • Then add the below code and save the function as start-ec2-instance-lambda.

Note, you will need to change the value of the region_name parameter accordingly.

import json
import boto3

ec2 = boto3.resource('ec2', region_name='af-south-1')
def lambda_handler(event, context):
   instances = ec2.instances.filter(Filters=[{'Name': 'instance-state-name', 'Values': ['stopped']},{'Name': 'tag:auto-start-stop','Values':['Yes']}])
   for instance in instances:
       id=instance.id
       ec2.instances.filter(InstanceIds=[id]).stop()
       print("Instance ID is stopped:- "+instance.id)
   return "success"

4. Summary

If either of the above lambda functions are triggered, they will start or stop your EC2 instances based on the instance state and the value of auto-start-stop tag. To automate this you can simply setup up cron jobs, step functions, AWS Event Bridge, Jenkins etc.

0
0

How to trigger Scaling Events using Stress-ng Command

If you are testing how your autoscaling policies respond to CPU load then a really simple way to test this is using the “stress” command. Note: this is a very crude mechanism to test and wherever possible you should try and generate synthetic application load.

#!/bin/bash

# DESCRIPTION: After updating from the repo, installs stress-ng, a tool used to create various system load for testing purposes.
yum update -y
# Install stress-ng
sudo apt install stress-ng

# CPU spike: Run a CPU spike for 5 seconds
uptime
stress-ng --cpu 4 --timeout 5s --metrics-brief
uptime

# Disk Test: Start N (2) workers continually writing, reading and removing temporary files:
stress-ng --disk 2 --timeout 5s --metrics-brief

# Memory stress test
# Populate memory. Use mmap N bytes per vm worker, the default is 256MB. 
# You can also specify the size as % of total available memory or in units of 
# Bytes, KBytes, MBytes and GBytes using the suffix b, k, m or g:
# Note: The --vm 2 will start N workers (2 workers) continuously calling 
# mmap/munmap and writing to the allocated memory. Note that this can cause 
# systems to trip the kernel OOM killer on Linux systems if not enough 
# physical memory and swap is not available
stress-ng --vm 2 --vm-bytes 1G --timeout 5s

# Combination Stress
# To run for 5 seconds with 4 cpu stressors, 2 io stressors and 1 vm 
# stressor using 1GB of virtual memory, enter:
stress-ng --cpu 4 --io 2 --vm 1 --vm-bytes 1G --timeout 5s --metrics-brief

0
0

Part 2: Increasing your Cloud consumption (the sane way)

Introduction

This article follows on from the “Cloud Migrations Crusade” blog post…

A single tenancy datacenter is a fixed scale, fixed price service on a closed network. The costs of the resources in the datacenter are divided up and shared out to the enterprise constituents on a semi-random basis. If anyone uses less resources than the forecast this generates waste which is shared back to the enterprise. If there is more demand than forecasted, it will either generate service degradation, panic or an outage! This model is clearly fragile and doesn’t respond quickly to change; it is also wasteful as it requires a level of overprovisioning based on forecast consumption (otherwise you will experience delays in projects, service degradation or have reduced resilience).

Cloud, on the other hand is a multi-tenanted on demand software service which you pay for as you use. But surely having multiple tenants running on the same fixed capacity actually increases the risks, and just because its in the cloud it doesn’t mean that you can get away without over provisioning – so who sits with the over provisioned costs? The cloud providers have to build this into their rates. So cloud providers have to deal with a balance sheet of fixed capacity shared amongst customers running on demand infrastructure. They do this with very clever forecasting, very short provisioning cycles and asking their customers for forecasts and then offering discounts for pre-commits.

Anything that moves you back towards managing resources levels / forecasting will destroy a huge portion of the value of moving to the cloud in the first instance. For example, if you have ever been to a Re:Invent you will be flawed by the rate of innovation and also how easy it is to absorb these new innovative products. But wait – you just signed a 5yr cost commit and now you learn about Aurura’s new serverless database model. You realise that you can save millions of dollars; but you have to wait for your 5yr commits to expire before you adopt or maybe start mining bitcoin with all your excess commits! This is anti-innovation and anti-customer.

Whats even worse is that pre-commits are typically signed up front on day 1- this is total madness!!! At the point where you know nothing about your brave new world, you use the old costs as a proxy to predict the new costs so that you can squeeze a lousy 5px saving at the risk of 100px of the commit size! What you will start to learn is that your cloud success is NOT based on the commercial contract that you sign with your cloud provider; its actually based on the quality of the engineering talent that your organisation is able to attract. Cloud is a IP war – its not a legal/sourcing war. Allow yourself to learn, don’t box yourself in on day 1. When you sign the pre-commit you will notice your first year utilisation projections are actually tiny and therefore the savings are small. So whats the point of signing so early on when the risk is at a maximum and the gains are at a minimum? When you sign this deal you are essentially turning the cloud into a “financial data center” – you have destroyed the cloud before you even started!

A Lesson from the field – Solving Hadoop Compute Demand Spike:

We moved 7000 cores of burst compute to AWS to solve a capacity issue on premise. That’s expensive, so lets “fix the costs”! We can go a sign a RI (reserved instance), play with spot, buy savings plans or even beg / barter for some EDP relief. But instead we plugged the service usuage into Quicksight and analysed the queries. We found one query was using 60 percent of the entire banks compute! Nobody confessed to owning the query, so we just disabled it (if you need a reason for your change management; describe the change as “disabling a financial DDOS”). We quickly found the service owner and explained that running a table scan across billions of rows to return a report with just last months data is not a good idea. We also explained that if they don’t fix this we will start billing them in 6 weeks time (a few million dollars). The team deployed a fix and now we run the banks big data stack at half the costs – just by tuning one query!!!

So the point of the above is that there is no substitute for engineering excellence. You have to understand and engineer the cloud to win, you cannot contract yourself into the cloud. The more contracts you sign the more failures you will experience. This leads me to point 2…

Step 2: Training, Training, Training

Start the biggest training campaign you possibly can – make this your crusade. Train everyone; business, finance, security, infrastructure – you name it, you train it. Don’t limit what anyone can train on, training is cheap – feast as much as you can. Look at Udemy, ACloudGuru, Youtube, WhizLabs etc etc etc. If you get this wrong then you will find your organisation fills up with expensive consultants and bespoke migration products that you don’t need ++ can easily do yourself, via opensource or with your cloud provider toolsets. In fact I would go one step further – if your not prepared to learn about the cloud, your not ready to go there.

Step 3: The OS Build

When you do start your cloud migration and begin to review your base OS images – go right back to the very beginning, remove every single product in all of these base builds. Look at what you can get out the box from your cloud provider and really push yourself hard on what do I really need vs nice to have. But the trick is that to get the real benefit from a cloud migration, you have to start by making your builds as “naked” as possible. Nothing should move into the base build without a good reason. Ownership and report lines are not a good enough reason for someones special “tool” to make it into the build. This process, if done correctly, should deliver you between 20-40px of your cloud migration savings. Do this badly and your costs, complexity and support will all head in the wrong direction.

Security HAS to be a first class citizen of your new world. In most organizations this will likely make for some awkward cultural collisions (control and ownership vs agility) and some difficult dialogs. The cloud, by definition, should be liberating – so how do you secure it without creating a “cloud bunker” that nobody can actually use? More on this later… 🙂

Step 4: Hybrid Networking

For any organisation with data centers – make no mistake, if you get this wrong its over before it starts.

0
0

AWS: Making use of S3s ETags to check if a file has been altered

I was playing with S3 the other day an I noticed that a file which I had uploaded twice, in two different locations had an identical ETag. This immediately made me think that this tag was some kind of hash. So I had a quick look AWS documentation and this ETag turns out to be marginally useful. ETag is an “Entity Tag” and its basically a MD5 hash of the file (although once the file is bigger than 5gb it appears to use another hashing algorithm).

So if you ever want to compare a local copy of a file with an AWS S3 copy of a file you just need to install MD5 (the below steps are for ubuntu linux):

# Update your ubunto
# Download the latest package lists
sudo apt update
# Perform the upgrade
sudo apt-get upgrade -y
# Now install common utils (inc MD5)
sudo apt install -y ucommon-utils
# Upgrades involving the Linux kernel, changing dependencies, adding / removing new packages etc
sudo apt-get dist-upgrade

Next to view the MD5 hash of a file simple type:

# View MD5 hash of
md5sum myfilename.myextension
2aa318899bdf388488656c46127bd814  myfilename.myextension
# The first number above will match your S3 Etag if its not been altered

Below is the screenshot of the properties that you will see in S3 with a matching MD5 hash:

0
0

Using TPC-H tools to Create Test Data for AWS Redshift and AWS EMR

If you need to test out your big data tools below is a useful set of scripts that I have used in the past for aws emr and redshift the below might be helpful:

install git
 sudo yum install make git -y
 install the tpch-kit
 git clone https://github.com/gregrahn/tpch-kit
 cd tpch-kit/dbgen
 sudo yum install gcc -y
 Compile the tpch kit
 make OS=LINUX
 Go home
 cd ~
 Now make your emr data
 mkdir emrdata
 Tell tcph to use the this dir
 export DSS_PATH=$HOME/emrdata
 cd tpch-kit/dbgen
 Now run dbgen in verbose mode, with tables (orders), 10gb data size
 ./dbgen -v -T o -s 10
 move the data to a s3 bucket
 cd $HOME/emrdata
 aws s3api create-bucket -- bucket andrewbakerbigdata --region af-south-1 --LocationConstraint=af-south-1
 aws s3 cp $HOME/emrdata s3://andrewbakerbigdata/emrdata --recursive
 cd $HOME
 mkdir redshiftdata
 Tell tcph to use the this dir
 export DSS_PATH=$HOME/redshiftdata
 Now make your redshift data
 cd tpch-kit/dbgen
 Now run dbgen in verbose mode, with tables (orders), 40gb data size
 ./dbgen -v -T o -s 40
 These are big files, so lets find out how big they are and split them
 Count lines
 cd $HOME/redshiftdata
 wc -l orders.tbl
 Now split orders into 15m lines per file
 split -d -l 15000000 -a 4 orders.tbl orders.tbl.
 Now split line items
 wc -l lineitem.tbl
 split -d -l 60000000 -a 4 lineitem.tbl lineitem.tbl.
 Now clean up the master files
 rm orders.tbl
 rm lineitem.tbl
 move the split data to a s3 bucket
 aws s3 cp $HOME/redshiftdata s3://andrewbakerbigdata/redshiftdata --recursive
0
0

AWS: Please Fix Poor Error Messages, API standards and Bad Defaulting

3 Ways to Handle Work Frustration (Without Quitting) | Business Markets and  Stocks News | madison.com

This is a short blog, and its actually just simple a plea to AWS. Please can you do three things?

  1. North Virginia appears to be the AWS master node. Having this region as a master region causes a large number of support issues (for example S3, KMS, Cloudfront, ACM all use this pet region and all of their APIs suffer as a result). This coupled with point 2) creates some material angst.
  2. Work a little harder on your error messages – they are often really (really) bad. I will post some examples at the bottom of this post over time. But you have to do some basics like reject unknown parameters (yes it’s useful to know there is a typo vs just ignore the parameter).
  3. Use standard parameters across your APIs (eg make specifying the region consistent (even within single products its not consistently applied) and make your verbs consistent).

As a simple example, below i am logged into an EC2 instances in af-south-1 and I can create an S3 bucket in North Virginia, but not in af-south-1. I am sure there is a “fix” (change some config, find out an API parameter was invalid and was silently ignored etc) – but this isn’t the point. The risk (and its real) is that in an attempt to debug this, developers will tend to open up security groups, open up NACLs, widen IAM roles etc. When the devs finally fix the issue; they will be very unlikely to retrace all their steps and restore everything else that they changed. This means that you end up with debugging scars that create overly permission services, due to poor errors messages, inconsistent API parameters/behaviors and a regional bias. Note: I am aware of commercial products, like Radware’s CWP – but that’s not the point. I shouldn’t ever need to debug by dialling back security. Observability was supposed to be there from day 1. The combination of tangential error messages, inconsistent APIs and lack of decent debug information from core services like IAM and S3, are creating a problem that shouldn’t exist.

AWS is a global cloud provider – services should work identically across all regions, and APIs should have standards, APIs shouldn’t silently ignore mistyped parameters, the base config required should either come from context (ie am running in region x) or config (aws config) – not a global default region.

Please note: I deleted the bucket between running the two commands ++ awsconfigure seemed to be ignored by createbucket

[ec2-user@ip-172-31-24-139 emrdata]$ aws s3api create-bucket --bucket ajbbigdatabucketlab2021
{
    "Location": "/ajbbigdatabucketlab2021"
}
[ec2-user@ip-172-31-24-139 emrdata]$ aws s3api create-bucket --bucket ajbbigdatabucketlab2021 --region af-south-1

An error occurred (IllegalLocationConstraintException) when calling the CreateBucket operation: 
The unspecified location constraint is incompatible for the region specific endpoint this request was sent to.

Note, I worked around the createbucket behavior by replacing it with mb:

[ec2-user@ip-172-31-24-139 emrdata]$ aws s3 mb s3://ajbbigdatabucketlab2021 --region af-south-1
make_bucket: ajbbigdatabucketlab2021

Thanks to the AWS dudes for letting me know how to get this working. It turns out the create-bucket and mb APIs, dont use standard parameters. See below (region tag needs to be replaced by a verbose bucket config tag):

aws s3api create-bucket --bucket ajbbigdatabucketlab2021 --create-bucket-configuration LocationConstraint=af-south-1

0
0

Part 1: The Great Public Cloud Crusade…

“Not all cloud transformations are created equally…!”

The cloud is hot…. not just a little hot, but smokin hot!! Covid is messing with the economy, customers are battling financially, the macro economic outlook is problematic, vendor costs are high and climbing and security needs more investment every year. What on earth do we do??!! I know…. lets start a crusade – lets go to the cloud!!!!

Cloud used to be just for the cool kids, the start ups, the hipsters… but not anymore, now corporates are coming and they are coming in their droves. The cloud transformation conversation is playing out globally for almost all sectors, from health care, to pharmaceuticals and finance. The hype and urban legends around public cloud are a creating a lot of FOMO.

For finance teams under severe cost pressures, the cloud has to be an obvious place to seek out some much need pain relief. CIOs are giving glorious on stage testimonials, decrying victory after having gone live with their first “bot in the cloud”. So what is there to blog about, it’s all wonderful right…? Maybe not…

The Backdrop…

Imagine your a CIO or CTO, you haven’t cut code for a while or maybe you have a finance background. Anyway your architecture skills are a bit rusty/vacant, you have been outsourcing technology work for years, you are awash with vendor products, all the integration points are “custom” (aka arc welded) and and hence your stack is very fragile. In fact its so fragile you can trigger outages when someone closes your datacentre door a little too hard! Your technology teams all have low/zero cloud knowledge and now you have been asked to transform your organisation by shipping it off to the cloud… So what do you do???

Lots of organisations believe this challenge is simply a case of finding the cheapest cloud provider, write a legal document, some SLAs, find a vendor who can whiz your servers into the cloud – then you simply cut a cheque. But the truth is the cloud requires IP and if you don’t have IP (aka engineers) then you have a problem…

Plan A: Project Borg

This is an easy, problem – right? Just ask the AWS borg to assimilate you!!! The “Borg” strategy can be achieved by:

  1. Install some software agents in your data centers to come up with a total thumb suck on how much you think you will spend in the cloud. Note: your lack of any real understanding of how the cloud works should not ring any warning bells.
  2. Factor down this thumb suck using another made up / arbitrary “risk factor”.
  3. Next, sign an intergalactic cloud commit with your cloud provider of choice and try to squeeze more than a 10px discount out for taking this enormous risk.
  4. Finally pick up the phone to one of the big 5 consultants and get them to “assimilate” you in the cloud (using some tool to perform a bitwise copy of your servers into the cloud).

Before you know it your peppering your board and excos with those ghastly cloud packs, you are sending out group wide emails with pictures of clouds on them, you are telling your teams to become “cloud ready”. What’s worse your burning serious money as the consultancy team you called in did the usual land and expand. But you cant seem to get a sense of any meaningful progress (and no, a BOT in the cloud doesn’t count as progress).

To fund this new cloud expense line you have to start strangling your existing production spending, maybe you are running your servers for an extra year or two, strangling the network spend, keep these storage arrays for just a little while longer. But don’t worry, before you know it you will be in the cloud – right??

The Problem Statement

The problem is that public cloud was never about physically your iffy datacentre software with someone else; it’s was supposed to be about transformation of this software. The legacy software in your datacentre is almost certainly poisonous and in interdependencies will be as lethal as they are opaque. If you move it, pain will follow and you wont see any real commercial benefits for years.

Put another way, your datacentre is the technical equivalent of a swap. Luckily those lovely cloud people have built you a nice clean swimming pool. BUT don’t go and pump your swamp into this new swimming pool!

Crusades have never give us rational outcomes, you forgot to imagine where the customer was in this painful sideways move, what exactly did you want from this? In fact cloud crusades suffer from a list of oversights, weaknesses and risks:

  1. Actual digital “transformation” will take years to realise (if ever). All you did was just changed your hosting and how you pay for technology – nothing else actually changed.
  2. Your customer value proposition will be totally unchanged, sadly you are still as digital as a fax machine!
  3. Key infrastructure teams will start realising their is no future for them and start wandering. Creating even more instability.
  4. Stability will be problematic as your hybrid network has created a BGP birds nest.
  5. Your company signed a 5 year cloud commit. You took your current tech spend, halved it and then asked your cloud provider to give you discounts on this projected spend. You will likely see around a 10px-15px reduction in your EDP (enterprise discount program) rates, and for this you are taking ENORMOUS downside risks. You’re also accidentally discouraging efficient utilisation of resources. in favour of a culture of “ram it in the cloud and review it once our EDP period expires”.
  6. Your balance sheet will balloon, such that you will end up with a cost base of not dissimilar to NASA, you will need a PhD to diagnose issues and your delivery cadence will be close to zero. Additionally, you will need to create an impairment factory to deal with all your stranded assets.

So what does this approach actually achieve? You will of added a ton of intangible assets by balance sheeting a bunch of profees, you will likely be less stable and even be less secure (more on this later), you know that this is an unsustainable project and that it is the equivalent of an organisational heart transplant. The only people that now understand your organisation, are a team of well paid consultants on a 5x salary multiple and sadly you cannot stop this process – you have to keep paying and praying. Put simply, cloud mass migration (aka assimilation) is a bad idea – so don’t do it!

The key here is that your tech teams have to transform themselves. Nothing can act on them, the transformation has to come from within. When you review organisations that have been around for a while, they may have had a few mergers, have high vendor dependencies and low technology skills; you will tend to end up with the combined/systemic complexity suffering from something similar to Quantum Entanglement. Next we ask an external agency with a suite of tools to unpack this unobservable, irreducible complexity with a few tools, then we get expensive external forces to reverse engineer these entangled systems and recreate them somewhere else. This is not reasonable or rationale – its daft and we should stop doing this.

If not this, then what?

The “then what” bit is even longer that the “not this” bit. So I am posting this as it and if I get 100 hits I will write up the other way – little by little 🙂

Click here to read the work in progress link on another approach to scaling cloud usage…

0
0