Installation
Install with CLI
Recommended
gh skills-hub install sql-server-table-reconciliation Don't have the extension? Run gh extension install samueltauil/skills-hub first.
Download and extract to your repository:
.github/skills/sql-server-table-reconciliation/ Extract the ZIP to .github/skills/ in your repo. The folder name must match sql-server-table-reconciliation for Copilot to auto-discover it.
Skill Files (2)
SKILL.md 5.7 KB
---
name: sql-server-table-reconciliation
description: "Use when: comparing SQL Server tables across instances, data migration validation, ETL verification, row mismatch detection, schema drift, reconciliation report, production vs staging comparison. Uses mssql-python driver with Apache Arrow for fast columnar data transfer and comparison."
---
# SQL Server Table Reconciliation
Compare identical tables across two SQL Server instances using Python with `mssql-python` driver and Apache Arrow. Detect missing rows, column mismatches, schema drift, and produce a reconciliation report.
## Workflow
1. Collect connection details for source and target
2. Identify primary key / composite key
3. Detect schema differences
4. Extract data via Arrow for efficient columnar transfer
5. Compare rows and columns
6. Generate reconciliation report
## Collect Inputs
| Parameter | Required | Description |
|-----------|----------|-------------|
| Source server | Yes | Source SQL Server (e.g. `prod-server.database.windows.net`) |
| Source database | Yes | Source database name |
| Target server | Yes | Target SQL Server (e.g. `staging-server.database.windows.net`) |
| Target database | Yes | Target database name |
| Tables | Yes | Comma-separated `schema.table` names, or `schema.*` wildcard (e.g. `dbo.Orders,dbo.Items` or `dbo.*`) |
| Auth mode | Yes | `sql` (user/password) or `entra` (Azure AD/token) |
| Primary key | Auto-detect | Column(s) forming the row identity. Auto-detect from metadata if not provided. |
| Columns to compare | All | Subset of columns, or all non-PK columns |
| Chunk size | `100000` | Rows per batch for large tables |
| Output format | `console` | `console`, `csv`, `parquet`, or `json` |
## Bundled Script
The reconciliation logic is provided as a standalone script at `scripts/reconcile.py`. Invoke it with the appropriate arguments based on user inputs:
```bash
python scripts/reconcile.py \
--source-server <source_server> \
--source-database <source_database> \
--target-server <target_server> \
--target-database <target_database> \
--tables "<table_spec>" \
--auth <sql|entra> \
--chunk-size <chunk_size> \
--output <console|csv|json>
```
### Optional arguments
| Argument | Description |
|----------|-------------|
| `--primary-key` | Comma-separated PK column(s). Omit to auto-detect. |
| `--columns` | Comma-separated columns to compare. Omit to compare all non-PK columns. |
### Example invocations
Single table with SQL auth:
```bash
python scripts/reconcile.py \
--source-server prod-server.database.windows.net \
--source-database ProdDB \
--target-server staging-server.database.windows.net \
--target-database StagingDB \
--tables "dbo.Orders" \
--auth sql \
--output console
```
Wildcard with Entra auth and CSV output:
```bash
python scripts/reconcile.py \
--source-server prod-server.database.windows.net \
--source-database ProdDB \
--target-server staging-server.database.windows.net \
--target-database StagingDB \
--tables "dbo.*" \
--auth entra \
--output csv
```
### Prerequisites
Install required packages before running:
```bash
pip install mssql-python pyarrow pandas
```
## Comparison Rules
- **Normalize types before comparing**: cast decimals to same precision, trim strings, normalize datetime to UTC
- **NULL handling**: `NULL == NULL` is considered a match (both sides missing = no diff)
- **Ignore row order**: always compare by PK join, never positional
- **Large tables**: chunk extraction with `OFFSET/FETCH` or `ROW_NUMBER()` partitioning
## Hash-Based Optimization (for large tables)
When table has >1M rows, generate a hash pre-check:
```sql
SELECT {pk_cols},
HASHBYTES('SHA2_256', CONCAT_WS('|', col1, col2, ...)) AS row_hash
FROM {table}
```
Compare hashes first; only fetch full rows for mismatched hashes. This reduces data transfer significantly.
## Report Format
```
Reconciling dbo.EMPLOYEES...
Reconciling dbo.DEPARTMENTS...
Reconciling dbo.JOBS...
--- dbo.EMPLOYEES ---
Source: 107 Target: 107
Missing: 0 Extra: 0 Mismatches: 0
Result: โ IDENTICAL
--- dbo.DEPARTMENTS ---
Source: 27 Target: 27
Missing: 0 Extra: 0 Mismatches: 3
Result: โ DIFFERENCES FOUND
--- dbo.JOBS ---
Source: 19 Target: 19
Missing: 0 Extra: 0 Mismatches: 0
Result: โ IDENTICAL
=== Summary: 2 passed, 1 failed, 0 skipped / 3 tables ===
```
When a single table is provided, include full detail (schema drift, sample rows, mismatches). When multiple tables, use the compact per-table format above with full detail only for tables with `FAIL` status.
## Performance Considerations
| Scenario | Strategy |
|----------|----------|
| < 100K rows | Single Arrow fetch, in-memory pandas compare |
| 100Kโ1M rows | Chunked extraction (100K batches), streaming comparison |
| > 1M rows | Hash pre-check โ only fetch mismatched rows |
| Wide tables (100+ cols) | Compare PK + hash first, drill into specific columns on mismatch |
| Network-constrained | Use Arrow columnar format (10-50x smaller than row-by-row) |
## Constraints
- Always use `mssql-python` driver (not pyodbc, pymssql)
- Always use Apache Arrow via cursor (`cursor.arrow()`) for data extraction
- Connection MUST use connection string format, not keyword arguments (kwargs like `encrypt=True` throw errors)
- Never compare without identifying PK first โ ask user if auto-detect fails
- Handle connection failures gracefully with retry logic
- **Never hardcode credentials** in generated scripts โ use `os.environ` / `getpass` (env vars: `MSSQL_USER`, `MSSQL_PASSWORD`)
- Do not print credentials in output or logs
- Use parameterized queries (`?` placeholders) for metadata lookups โ never f-string interpolate user input into SQL
scripts/
reconcile.py 12.9 KB
#!/usr/bin/env python3
"""SQL Server Table Reconciliation Script.
Compare identical tables across two SQL Server instances using mssql-python
driver and Apache Arrow. Detect missing rows, column mismatches, schema drift,
and produce a reconciliation report.
Usage:
python reconcile.py \
--source-server prod-server.database.windows.net \
--source-database ProdDB \
--target-server staging-server.database.windows.net \
--target-database StagingDB \
--tables "dbo.Orders,dbo.Items" \
--auth entra \
--output console \
--chunk-size 100000
Environment variables for credentials (when --auth sql):
MSSQL_USER - SQL Server username
MSSQL_PASSWORD - SQL Server password
"""
import argparse
import os
import sys
from getpass import getpass
import pandas as pd
import pyarrow as pa
import pyarrow.compute as pc
from mssql_python import connect as mssql_connect
# --- Connection Setup ---
def connect(server, database, auth_mode, user=None, password=None):
"""Connect using mssql-python driver.
Reads credentials from env vars or prompts interactively. Never hardcodes."""
if auth_mode == "sql":
user = user or os.environ.get("MSSQL_USER") or input("Username: ")
password = password or os.environ.get("MSSQL_PASSWORD") or getpass("Password: ")
conn_str = (
f"Server={server};Database={database};"
f"UID={user};PWD={password};"
f"TrustServerCertificate=yes;Encrypt=yes"
)
else:
# Entra (Azure AD) authentication
conn_str = (
f"Server={server};Database={database};"
f"Authentication=ActiveDirectoryDefault;"
f"TrustServerCertificate=yes;Encrypt=yes"
)
return mssql_connect(conn_str)
# --- Table Discovery ---
def resolve_tables(conn, table_spec):
"""Resolve table spec to list of schema.table names.
Accepts: 'dbo.*', 'dbo.Orders,dbo.Items', or 'dbo.Orders'."""
tables = []
for spec in table_spec.split(","):
spec = spec.strip()
schema, tbl = spec.split(".")
if tbl == "*":
query = """
SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE'
ORDER BY TABLE_NAME
"""
cur = conn.cursor()
cur.execute(query, [schema])
rows = cur.arrow().to_pandas()
tables.extend(f"{schema}.{t}" for t in rows["TABLE_NAME"])
else:
tables.append(spec)
return tables
# --- Schema Comparison ---
def compare_schema(source_conn, target_conn, table):
"""Compare column names, types, nullability. Return drift report and common columns."""
query = """
SELECT COLUMN_NAME, DATA_TYPE, IS_NULLABLE, CHARACTER_MAXIMUM_LENGTH,
NUMERIC_PRECISION, NUMERIC_SCALE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = ?
AND TABLE_NAME = ?
ORDER BY ORDINAL_POSITION
"""
schema_name, table_name = table.split(".")
src_cur = source_conn.cursor()
src_cur.execute(query, [schema_name, table_name])
source_schema = src_cur.arrow().to_pandas()
tgt_cur = target_conn.cursor()
tgt_cur.execute(query, [schema_name, table_name])
target_schema = tgt_cur.arrow().to_pandas()
src_cols = set(source_schema["COLUMN_NAME"])
tgt_cols = set(target_schema["COLUMN_NAME"])
drift = []
only_in_source = src_cols - tgt_cols
only_in_target = tgt_cols - src_cols
if only_in_source:
drift.append(f"Columns only in source: {sorted(only_in_source)}")
if only_in_target:
drift.append(f"Columns only in target: {sorted(only_in_target)}")
common_cols = sorted(src_cols & tgt_cols)
# Check type differences for common columns
src_types = source_schema.set_index("COLUMN_NAME")
tgt_types = target_schema.set_index("COLUMN_NAME")
for col in common_cols:
if col in src_types.index and col in tgt_types.index:
s = src_types.loc[col]
t = tgt_types.loc[col]
if s["DATA_TYPE"] != t["DATA_TYPE"]:
drift.append(
f" {col}: type {s['DATA_TYPE']} vs {t['DATA_TYPE']}"
)
return drift, common_cols
# --- Primary Key Detection ---
def detect_primary_key(conn, table):
"""Auto-detect PK columns from sys.index_columns."""
schema, tbl = table.split(".")
query = """
SELECT c.name
FROM sys.indexes i
JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id
JOIN sys.columns c ON ic.object_id = c.object_id AND ic.column_id = c.column_id
WHERE i.is_primary_key = 1
AND OBJECT_SCHEMA_NAME(i.object_id) = ?
AND OBJECT_NAME(i.object_id) = ?
ORDER BY ic.key_ordinal
"""
cur = conn.cursor()
cur.execute(query, [schema, tbl])
result = cur.arrow()
return result.column("name").to_pylist()
# --- Data Extraction (Arrow) ---
def extract_table(conn, table, pk_cols, chunk_size=100000):
"""Extract table data as Arrow Table, using Arrow columnar transfer."""
pk_order = ", ".join(pk_cols)
query = f"SELECT * FROM {table} ORDER BY {pk_order}"
cur = conn.cursor()
cur.execute(query)
return cur.arrow()
# --- Hash Pre-check (for large tables) ---
def extract_hashes(conn, table, pk_cols, compare_cols):
"""Extract PK + row hash for large table optimization."""
pk_select = ", ".join(pk_cols)
col_concat = ", ".join(compare_cols)
query = f"""
SELECT {pk_select},
HASHBYTES('SHA2_256', CONCAT_WS('|', {col_concat})) AS row_hash
FROM {table}
ORDER BY {pk_select}
"""
cur = conn.cursor()
cur.execute(query)
return cur.arrow()
# --- Comparison Logic ---
def reconcile(source_table, target_table, pk_cols, compare_cols):
"""Compare two Arrow tables.
1. Convert to pandas with PK as index
2. Identify missing/extra rows
3. Compare column values for matching rows
4. Handle NULL vs non-NULL (NULL == NULL is a match)
"""
src_df = source_table.to_pandas().set_index(pk_cols)
tgt_df = target_table.to_pandas().set_index(pk_cols)
# Missing/extra rows
src_keys = set(src_df.index.tolist() if len(pk_cols) > 1 else src_df.index)
tgt_keys = set(tgt_df.index.tolist() if len(pk_cols) > 1 else tgt_df.index)
missing_in_target = src_keys - tgt_keys
extra_in_target = tgt_keys - src_keys
common_keys = src_keys & tgt_keys
# Column-level mismatches on common rows
common_src = src_df.loc[src_df.index.isin(common_keys), compare_cols]
common_tgt = tgt_df.loc[tgt_df.index.isin(common_keys), compare_cols]
diff = common_src.compare(common_tgt, keep_shape=False)
return {
"missing_in_target": missing_in_target,
"extra_in_target": extra_in_target,
"mismatches": diff,
"total_source": len(src_df),
"total_target": len(tgt_df),
}
# --- Per-Table Pipeline ---
def reconcile_table(source_conn, target_conn, table, pk_override=None, columns=None,
chunk_size=100000):
"""Run full reconciliation for one table. Returns result dict."""
schema_drift, common_cols = compare_schema(source_conn, target_conn, table)
pk_cols = pk_override
if not pk_cols:
pk_cols = detect_primary_key(source_conn, table)
if not pk_cols:
pk_cols = detect_primary_key(target_conn, table)
if not pk_cols:
return {"table": table, "error": "No PK detected", "status": "SKIPPED"}
compare_cols = columns if columns else [c for c in common_cols if c not in pk_cols]
source_data = extract_table(source_conn, table, pk_cols, chunk_size)
target_data = extract_table(target_conn, table, pk_cols, chunk_size)
result = reconcile(source_data, target_data, pk_cols, compare_cols)
result["table"] = table
result["schema_drift"] = schema_drift
result["status"] = (
"PASS"
if not (result["missing_in_target"] or result["extra_in_target"] or len(result["mismatches"]))
else "FAIL"
)
return result
# --- Report Generation ---
def generate_report(all_results, output_format="console"):
"""Output per-table details + combined summary."""
for r in all_results:
print(f"\n--- {r['table']} ---")
if r.get("error"):
print(f" SKIPPED: {r['error']}")
continue
print(f" Source: {r['total_source']:,} Target: {r['total_target']:,}")
print(
f" Missing: {len(r['missing_in_target'])} "
f"Extra: {len(r['extra_in_target'])} "
f"Mismatches: {len(r['mismatches'])}"
)
print(
f" Result: {'โ IDENTICAL' if r['status'] == 'PASS' else 'โ DIFFERENCES FOUND'}"
)
if r.get("schema_drift"):
print(" Schema drift:")
for d in r["schema_drift"]:
print(f" {d}")
# Summary
passed = sum(1 for r in all_results if r["status"] == "PASS")
failed = sum(1 for r in all_results if r["status"] == "FAIL")
skipped = sum(1 for r in all_results if r["status"] == "SKIPPED")
print(
f"\n=== Summary: {passed} passed, {failed} failed, "
f"{skipped} skipped / {len(all_results)} tables ==="
)
# Export if requested
if output_format == "csv":
rows = [
{
"table": r["table"],
"status": r["status"],
"source_rows": r.get("total_source", 0),
"target_rows": r.get("total_target", 0),
"missing": len(r.get("missing_in_target", [])),
"extra": len(r.get("extra_in_target", [])),
"mismatches": len(r.get("mismatches", [])),
}
for r in all_results
]
df = pd.DataFrame(rows)
df.to_csv("reconciliation_report.csv", index=False)
print("\nReport saved to reconciliation_report.csv")
elif output_format == "json":
import json
rows = [
{
"table": r["table"],
"status": r["status"],
"source_rows": r.get("total_source", 0),
"target_rows": r.get("total_target", 0),
"missing": len(r.get("missing_in_target", [])),
"extra": len(r.get("extra_in_target", [])),
"mismatches": len(r.get("mismatches", [])),
}
for r in all_results
]
with open("reconciliation_report.json", "w") as f:
json.dump(rows, f, indent=2)
print("\nReport saved to reconciliation_report.json")
# --- Main ---
def main():
parser = argparse.ArgumentParser(
description="Compare SQL Server tables across two instances."
)
parser.add_argument("--source-server", required=True, help="Source SQL Server host")
parser.add_argument("--source-database", required=True, help="Source database name")
parser.add_argument("--target-server", required=True, help="Target SQL Server host")
parser.add_argument("--target-database", required=True, help="Target database name")
parser.add_argument(
"--tables",
required=True,
help="Comma-separated schema.table names or schema.* wildcard",
)
parser.add_argument(
"--auth",
choices=["sql", "entra"],
default="sql",
help="Authentication mode (default: sql)",
)
parser.add_argument(
"--primary-key",
default=None,
help="Comma-separated PK column(s). Auto-detected if omitted.",
)
parser.add_argument(
"--columns",
default=None,
help="Comma-separated columns to compare. All non-PK columns if omitted.",
)
parser.add_argument(
"--chunk-size",
type=int,
default=100000,
help="Rows per batch for large tables (default: 100000)",
)
parser.add_argument(
"--output",
choices=["console", "csv", "json"],
default="console",
help="Output format (default: console)",
)
args = parser.parse_args()
pk_override = [c.strip() for c in args.primary_key.split(",")] if args.primary_key else None
columns = [c.strip() for c in args.columns.split(",")] if args.columns else None
print(f"Connecting to source: {args.source_server}/{args.source_database}")
source_conn = connect(args.source_server, args.source_database, args.auth)
print(f"Connecting to target: {args.target_server}/{args.target_database}")
target_conn = connect(args.target_server, args.target_database, args.auth)
tables = resolve_tables(source_conn, args.tables)
print(f"Tables to reconcile: {tables}")
results = []
for table in tables:
print(f"Reconciling {table}...")
results.append(
reconcile_table(
source_conn, target_conn, table,
pk_override=pk_override,
columns=columns,
chunk_size=args.chunk_size,
)
)
generate_report(results, output_format=args.output)
if __name__ == "__main__":
main()
License (MIT)
View full license text
MIT License Copyright GitHub, Inc. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.