disinto/.woodpecker/detect-duplicates.py

329 lines
12 KiB
Python

#!/usr/bin/env python3
"""detect-duplicates.py — Find copy-pasted code blocks across shell files.
Two detection passes:
1. Known anti-patterns (grep-style): flags specific hardcoded patterns
that should use shared helpers instead.
2. Sliding-window hash: finds N-line blocks that appear verbatim in
multiple files (catches structural copy-paste).
When DIFF_BASE is set (e.g. "main"), compares findings against that base
branch and only fails (exit 1) when new duplicates are introduced by the
PR. Pre-existing findings are reported as informational.
Without DIFF_BASE the script reports all findings and exits 0
(informational only — no base to compare against).
"""
import sys
import os
import hashlib
import re
import subprocess
import tempfile
import shutil
from pathlib import Path
from collections import defaultdict
WINDOW = int(os.environ.get("DUP_WINDOW", "5"))
MIN_FILES = int(os.environ.get("DUP_MIN_FILES", "2"))
# ---------------------------------------------------------------------------
# Known anti-patterns — patterns that should use shared helpers instead
# ---------------------------------------------------------------------------
ANTI_PATTERNS = [
(
r'"\$CI_STATE"\s*=\s*"success"',
'Hardcoded CI_STATE="success" check — extract ci_passed() to lib/ and call it here',
),
(
r'"?\$CI_STATE"?\s*!=\s*"success"',
'Hardcoded CI_STATE!="success" check — extract ci_passed() to lib/ and call it here',
),
(
r'WOODPECKER_REPO_ID\s*=\s*[1-9][0-9]*',
'Hardcoded WOODPECKER_REPO_ID — load from project TOML via load-project.sh instead',
),
]
def check_anti_patterns(sh_files):
"""Return list of (file, lineno, line, message) for anti-pattern hits."""
hits = []
for path in sh_files:
try:
text = path.read_text(errors="replace")
except OSError as exc:
print(f"Warning: cannot read {path}: {exc}", file=sys.stderr)
continue
for lineno, line in enumerate(text.splitlines(), 1):
stripped = line.strip()
if stripped.startswith("#"):
continue
for pattern, message in ANTI_PATTERNS:
if re.search(pattern, line):
hits.append((str(path), lineno, line.rstrip(), message))
return hits
# ---------------------------------------------------------------------------
# Sliding-window duplicate detection
# ---------------------------------------------------------------------------
def meaningful_lines(path):
"""Return [(original_lineno, line)] skipping blank and comment-only lines."""
result = []
try:
text = path.read_text(errors="replace")
except OSError as exc:
print(f"Warning: cannot read {path}: {exc}", file=sys.stderr)
return result
for lineno, line in enumerate(text.splitlines(), 1):
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
result.append((lineno, line.rstrip()))
return result
def sliding_windows(lines, window_size):
"""Yield (start_lineno, content_hash, window_text) for each window."""
for i in range(len(lines) - window_size + 1):
window_lines = [ln for _, ln in lines[i : i + window_size]]
content = "\n".join(window_lines)
h = hashlib.md5(content.encode()).hexdigest()
yield lines[i][0], h, content
def check_duplicates(sh_files):
"""Return list of duplicate groups: [(hash, [(file, lineno, preview)])].
Each group contains locations where the same N-line block appears in 2+
different files.
"""
# hash -> [(file_str, start_lineno, preview)]
hash_locs: dict[str, list] = defaultdict(list)
for path in sh_files:
lines = meaningful_lines(path)
if len(lines) < WINDOW:
continue
seen_in_file: set[str] = set()
for start_lineno, h, content in sliding_windows(lines, WINDOW):
if h in seen_in_file:
continue # already recorded this hash for this file
seen_in_file.add(h)
preview = "\n".join(content.splitlines()[:3])
hash_locs[h].append((str(path), start_lineno, preview))
groups = []
for h, locs in hash_locs.items():
files = {loc[0] for loc in locs}
if len(files) >= MIN_FILES:
groups.append((h, sorted(locs)))
# Sort by number of affected files (most duplicated first)
groups.sort(key=lambda g: -len({loc[0] for loc in g[1]}))
return groups
# ---------------------------------------------------------------------------
# Baseline comparison helpers
# ---------------------------------------------------------------------------
def prepare_baseline(base_ref):
"""Extract .sh files from base_ref into a temp directory.
Fetches the ref first (needed in shallow CI clones), then copies each
file via ``git show``. Returns the temp directory Path, or None on
failure.
"""
# Fetch the base branch (CI clones are typically shallow)
subprocess.run(
["git", "fetch", "origin", base_ref, "--depth=1"],
capture_output=True,
)
ref = f"origin/{base_ref}"
result = subprocess.run(
["git", "ls-tree", "-r", "--name-only", ref],
capture_output=True, text=True,
)
if result.returncode != 0:
print(f"Warning: cannot list files in {ref}: "
f"{result.stderr.strip()}", file=sys.stderr)
return None
sh_paths = [
f for f in result.stdout.splitlines()
if f.endswith(".sh") and ".git/" not in f
]
tmpdir = Path(tempfile.mkdtemp(prefix="dup-baseline-"))
for f in sh_paths:
r = subprocess.run(
["git", "show", f"{ref}:{f}"],
capture_output=True, text=True,
)
if r.returncode == 0:
target = tmpdir / f
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(r.stdout)
return tmpdir
def collect_findings(root):
"""Run both detection passes on .sh files under *root*.
Returns ``(ap_hits, dup_groups)`` with file paths relative to *root*.
"""
root = Path(root)
# Skip architect scripts for duplicate detection (stub formulas, see #99)
EXCLUDED_FILES = {"architect/architect-run.sh"}
sh_files = sorted(
p for p in root.rglob("*.sh")
if ".git" not in p.parts and str(p) not in EXCLUDED_FILES
)
ap_hits = check_anti_patterns(sh_files)
dup_groups = check_duplicates(sh_files)
def rel(p):
try:
return str(Path(p).relative_to(root))
except ValueError:
return p
ap_hits = [(rel(f), ln, line, msg) for f, ln, line, msg in ap_hits]
dup_groups = [
(h, [(rel(f), ln, prev) for f, ln, prev in locs])
for h, locs in dup_groups
]
return ap_hits, dup_groups
# ---------------------------------------------------------------------------
# Reporting helpers
# ---------------------------------------------------------------------------
def print_anti_patterns(hits, label=""):
"""Print anti-pattern hits with an optional label prefix."""
if not hits:
return
prefix = f"{label} " if label else ""
print(f"=== {prefix}Anti-pattern findings ===")
for file, lineno, line, message in hits:
print(f" {file}:{lineno}: {message}")
print(f" > {line[:120]}")
print()
def print_duplicates(groups, label=""):
"""Print duplicate groups with an optional label prefix."""
if not groups:
return
prefix = f"{label} " if label else ""
print(f"=== {prefix}Duplicate code blocks (window={WINDOW} lines) ===")
for h, locs in groups:
files = {loc[0] for loc in locs}
print(f"\n [{h[:8]}] appears in {len(files)} file(s):")
for file, lineno, preview in locs:
print(f" {file}:{lineno}")
first_preview = locs[0][2]
for ln in first_preview.splitlines()[:3]:
print(f" | {ln}")
print()
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> int:
# Skip architect scripts for duplicate detection (stub formulas, see #99)
EXCLUDED_FILES = {"architect/architect-run.sh"}
sh_files = sorted(
p for p in Path(".").rglob("*.sh")
if ".git" not in p.parts and str(p) not in EXCLUDED_FILES
)
if not sh_files:
print("No .sh files found.")
return 0
print(f"Scanning {len(sh_files)} shell files "
f"(window={WINDOW} lines, min_files={MIN_FILES})...\n")
# --- Collect current findings (paths relative to ".") ---
cur_ap, cur_dups = collect_findings(".")
# --- Baseline comparison mode ---
diff_base = os.environ.get("DIFF_BASE", "").strip()
if diff_base:
print(f"Baseline comparison: diffing against {diff_base}\n")
baseline_dir = prepare_baseline(diff_base)
if baseline_dir is None:
print(f"Warning: could not prepare baseline from {diff_base}, "
f"falling back to informational mode.\n", file=sys.stderr)
diff_base = "" # fall through to informational mode
else:
base_ap, base_dups = collect_findings(baseline_dir)
shutil.rmtree(baseline_dir)
# Anti-pattern diff: key by (relative_path, stripped_line, message)
def ap_key(hit):
return (hit[0], hit[2].strip(), hit[3])
base_ap_keys = {ap_key(h) for h in base_ap}
new_ap = [h for h in cur_ap if ap_key(h) not in base_ap_keys]
pre_ap = [h for h in cur_ap if ap_key(h) in base_ap_keys]
# Duplicate diff: key by content hash
base_dup_hashes = {g[0] for g in base_dups}
new_dups = [g for g in cur_dups if g[0] not in base_dup_hashes]
pre_dups = [g for g in cur_dups if g[0] in base_dup_hashes]
# Report pre-existing as info
if pre_ap or pre_dups:
print(f"Pre-existing (not introduced by this PR): "
f"{len(pre_ap)} anti-pattern(s), "
f"{len(pre_dups)} duplicate block(s).")
print_anti_patterns(pre_ap, "Pre-existing")
print_duplicates(pre_dups, "Pre-existing")
# Report and fail on new findings
if new_ap or new_dups:
print(f"NEW findings introduced by this PR: "
f"{len(new_ap)} anti-pattern(s), "
f"{len(new_dups)} duplicate block(s).")
print_anti_patterns(new_ap, "NEW")
print_duplicates(new_dups, "NEW")
return 1
total = len(cur_ap) + len(cur_dups)
if total > 0:
print(f"Total findings: {len(cur_ap)} anti-pattern(s), "
f"{len(cur_dups)} duplicate block(s) — "
f"all pre-existing, no regressions.")
else:
print("No duplicate code or anti-pattern findings.")
return 0
# --- Informational mode (no baseline available) ---
print_anti_patterns(cur_ap)
print_duplicates(cur_dups)
total_issues = len(cur_ap) + len(cur_dups)
if total_issues == 0:
print("No duplicate code or anti-pattern findings.")
else:
print(f"Summary: {len(cur_ap)} anti-pattern hit(s), "
f"{len(cur_dups)} duplicate block(s).")
print("Consider extracting shared patterns to lib/ helpers.")
return 0
if __name__ == "__main__":
sys.exit(main())