disinto/.woodpecker/detect-duplicates.py

338 lines
12 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""detect-duplicates.py — Find copy-pasted code blocks across shell files.
Two detection passes:
1. Known anti-patterns (grep-style): flags specific hardcoded patterns
that should use shared helpers instead.
2. Sliding-window hash: finds N-line blocks that appear verbatim in
multiple files (catches structural copy-paste).
When DIFF_BASE is set (e.g. "main"), compares findings against that base
branch and only fails (exit 1) when new duplicates are introduced by the
PR. Pre-existing findings are reported as informational.
Without DIFF_BASE the script reports all findings and exits 0
(informational only no base to compare against).
"""
import sys
import os
import hashlib
import re
import subprocess
import tempfile
import shutil
from pathlib import Path
from collections import defaultdict
WINDOW = int(os.environ.get("DUP_WINDOW", "5"))
MIN_FILES = int(os.environ.get("DUP_MIN_FILES", "2"))
# ---------------------------------------------------------------------------
# Known anti-patterns — patterns that should use shared helpers instead
# ---------------------------------------------------------------------------
ANTI_PATTERNS = [
(
r'"\$CI_STATE"\s*=\s*"success"',
'Hardcoded CI_STATE="success" check — extract ci_passed() to lib/ and call it here',
),
(
r'"?\$CI_STATE"?\s*!=\s*"success"',
'Hardcoded CI_STATE!="success" check — extract ci_passed() to lib/ and call it here',
),
(
r'WOODPECKER_REPO_ID\s*=\s*[1-9][0-9]*',
'Hardcoded WOODPECKER_REPO_ID — load from project TOML via load-project.sh instead',
),
]
def check_anti_patterns(sh_files):
"""Return list of (file, lineno, line, message) for anti-pattern hits."""
hits = []
for path in sh_files:
try:
text = path.read_text(errors="replace")
except OSError as exc:
print(f"Warning: cannot read {path}: {exc}", file=sys.stderr)
continue
for lineno, line in enumerate(text.splitlines(), 1):
stripped = line.strip()
if stripped.startswith("#"):
continue
for pattern, message in ANTI_PATTERNS:
if re.search(pattern, line):
hits.append((str(path), lineno, line.rstrip(), message))
return hits
# ---------------------------------------------------------------------------
# Sliding-window duplicate detection
# ---------------------------------------------------------------------------
def meaningful_lines(path):
"""Return [(original_lineno, line)] skipping blank and comment-only lines."""
result = []
try:
text = path.read_text(errors="replace")
except OSError as exc:
print(f"Warning: cannot read {path}: {exc}", file=sys.stderr)
return result
for lineno, line in enumerate(text.splitlines(), 1):
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
result.append((lineno, line.rstrip()))
return result
def sliding_windows(lines, window_size):
"""Yield (start_lineno, content_hash, window_text) for each window."""
for i in range(len(lines) - window_size + 1):
window_lines = [ln for _, ln in lines[i : i + window_size]]
content = "\n".join(window_lines)
h = hashlib.md5(content.encode()).hexdigest()
yield lines[i][0], h, content
def check_duplicates(sh_files):
"""Return list of duplicate groups: [(hash, [(file, lineno, preview)])].
Each group contains locations where the same N-line block appears in 2+
different files.
"""
# hash -> [(file_str, start_lineno, preview)]
hash_locs: dict[str, list] = defaultdict(list)
for path in sh_files:
lines = meaningful_lines(path)
if len(lines) < WINDOW:
continue
seen_in_file: set[str] = set()
for start_lineno, h, content in sliding_windows(lines, WINDOW):
if h in seen_in_file:
continue # already recorded this hash for this file
seen_in_file.add(h)
preview = "\n".join(content.splitlines()[:3])
hash_locs[h].append((str(path), start_lineno, preview))
groups = []
for h, locs in hash_locs.items():
files = {loc[0] for loc in locs}
if len(files) >= MIN_FILES:
groups.append((h, sorted(locs)))
# Sort by number of affected files (most duplicated first)
groups.sort(key=lambda g: -len({loc[0] for loc in g[1]}))
return groups
# ---------------------------------------------------------------------------
# Baseline comparison helpers
# ---------------------------------------------------------------------------
def prepare_baseline(base_ref):
"""Extract .sh files from base_ref into a temp directory.
Fetches the ref first (needed in shallow CI clones), then copies each
file via ``git show``. Returns the temp directory Path, or None on
failure.
"""
# Fetch the base branch (CI clones are typically shallow)
subprocess.run(
["git", "fetch", "origin", base_ref, "--depth=1"],
capture_output=True,
)
ref = f"origin/{base_ref}"
result = subprocess.run(
["git", "ls-tree", "-r", "--name-only", ref],
capture_output=True, text=True,
)
if result.returncode != 0:
print(f"Warning: cannot list files in {ref}: "
f"{result.stderr.strip()}", file=sys.stderr)
return None
sh_paths = [
f for f in result.stdout.splitlines()
if f.endswith(".sh") and ".git/" not in f
]
tmpdir = Path(tempfile.mkdtemp(prefix="dup-baseline-"))
for f in sh_paths:
r = subprocess.run(
["git", "show", f"{ref}:{f}"],
capture_output=True, text=True,
)
if r.returncode == 0:
target = tmpdir / f
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(r.stdout)
return tmpdir
def collect_findings(root):
"""Run both detection passes on .sh files under *root*.
Returns ``(ap_hits, dup_groups)`` with file paths relative to *root*.
"""
root = Path(root)
# Skip architect scripts for duplicate detection (stub formulas, see #99)
EXCLUDED_SUFFIXES = ("architect/architect-run.sh",)
def is_excluded(p):
"""Check if path should be excluded by suffix match."""
return p.suffix == ".sh" and ".git" not in p.parts and any(
str(p).endswith(suffix) for suffix in EXCLUDED_SUFFIXES
)
sh_files = sorted(p for p in root.rglob("*.sh") if not is_excluded(p))
ap_hits = check_anti_patterns(sh_files)
dup_groups = check_duplicates(sh_files)
def rel(p):
try:
return str(Path(p).relative_to(root))
except ValueError:
return p
ap_hits = [(rel(f), ln, line, msg) for f, ln, line, msg in ap_hits]
dup_groups = [
(h, [(rel(f), ln, prev) for f, ln, prev in locs])
for h, locs in dup_groups
]
return ap_hits, dup_groups
# ---------------------------------------------------------------------------
# Reporting helpers
# ---------------------------------------------------------------------------
def print_anti_patterns(hits, label=""):
"""Print anti-pattern hits with an optional label prefix."""
if not hits:
return
prefix = f"{label} " if label else ""
print(f"=== {prefix}Anti-pattern findings ===")
for file, lineno, line, message in hits:
print(f" {file}:{lineno}: {message}")
print(f" > {line[:120]}")
print()
def print_duplicates(groups, label=""):
"""Print duplicate groups with an optional label prefix."""
if not groups:
return
prefix = f"{label} " if label else ""
print(f"=== {prefix}Duplicate code blocks (window={WINDOW} lines) ===")
for h, locs in groups:
files = {loc[0] for loc in locs}
print(f"\n [{h[:8]}] appears in {len(files)} file(s):")
for file, lineno, preview in locs:
print(f" {file}:{lineno}")
first_preview = locs[0][2]
for ln in first_preview.splitlines()[:3]:
print(f" | {ln}")
print()
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> int:
# Skip architect scripts for duplicate detection (stub formulas, see #99)
EXCLUDED_SUFFIXES = ("architect/architect-run.sh",)
def is_excluded(p):
"""Check if path should be excluded by suffix match."""
return p.suffix == ".sh" and ".git" not in p.parts and any(
str(p).endswith(suffix) for suffix in EXCLUDED_SUFFIXES
)
sh_files = sorted(p for p in Path(".").rglob("*.sh") if not is_excluded(p))
if not sh_files:
print("No .sh files found.")
return 0
print(f"Scanning {len(sh_files)} shell files "
f"(window={WINDOW} lines, min_files={MIN_FILES})...\n")
# --- Collect current findings (paths relative to ".") ---
cur_ap, cur_dups = collect_findings(".")
# --- Baseline comparison mode ---
diff_base = os.environ.get("DIFF_BASE", "").strip()
if diff_base:
print(f"Baseline comparison: diffing against {diff_base}\n")
baseline_dir = prepare_baseline(diff_base)
if baseline_dir is None:
print(f"Warning: could not prepare baseline from {diff_base}, "
f"falling back to informational mode.\n", file=sys.stderr)
diff_base = "" # fall through to informational mode
else:
base_ap, base_dups = collect_findings(baseline_dir)
shutil.rmtree(baseline_dir)
# Anti-pattern diff: key by (relative_path, stripped_line, message)
def ap_key(hit):
return (hit[0], hit[2].strip(), hit[3])
base_ap_keys = {ap_key(h) for h in base_ap}
new_ap = [h for h in cur_ap if ap_key(h) not in base_ap_keys]
pre_ap = [h for h in cur_ap if ap_key(h) in base_ap_keys]
# Duplicate diff: key by content hash
base_dup_hashes = {g[0] for g in base_dups}
new_dups = [g for g in cur_dups if g[0] not in base_dup_hashes]
pre_dups = [g for g in cur_dups if g[0] in base_dup_hashes]
# Report pre-existing as info
if pre_ap or pre_dups:
print(f"Pre-existing (not introduced by this PR): "
f"{len(pre_ap)} anti-pattern(s), "
f"{len(pre_dups)} duplicate block(s).")
print_anti_patterns(pre_ap, "Pre-existing")
print_duplicates(pre_dups, "Pre-existing")
# Report and fail on new findings
if new_ap or new_dups:
print(f"NEW findings introduced by this PR: "
f"{len(new_ap)} anti-pattern(s), "
f"{len(new_dups)} duplicate block(s).")
print_anti_patterns(new_ap, "NEW")
print_duplicates(new_dups, "NEW")
return 1
total = len(cur_ap) + len(cur_dups)
if total > 0:
print(f"Total findings: {len(cur_ap)} anti-pattern(s), "
f"{len(cur_dups)} duplicate block(s) — "
f"all pre-existing, no regressions.")
else:
print("No duplicate code or anti-pattern findings.")
return 0
# --- Informational mode (no baseline available) ---
print_anti_patterns(cur_ap)
print_duplicates(cur_dups)
total_issues = len(cur_ap) + len(cur_dups)
if total_issues == 0:
print("No duplicate code or anti-pattern findings.")
else:
print(f"Summary: {len(cur_ap)} anti-pattern hit(s), "
f"{len(cur_dups)} duplicate block(s).")
print("Consider extracting shared patterns to lib/ helpers.")
return 0
if __name__ == "__main__":
sys.exit(main())