From 5e4b00f9a3a06c4fd94d9ea44f7c3bd044bae29b Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 19 Mar 2026 22:03:18 +0000 Subject: [PATCH] fix: duplicate-detection CI step fails on pre-existing duplicates unrelated to PR (#296) Add baseline comparison to detect-duplicates.py: when DIFF_BASE is set (via CI_COMMIT_TARGET_BRANCH for PRs), the script compares findings against the base branch and only fails on new duplicates introduced by the PR. Pre-existing duplicates are reported as informational. For push events (no DIFF_BASE), the script reports all findings but exits 0 (informational only). Removes failure:ignore from the CI step so new duplicates properly block PRs. Co-Authored-By: Claude Opus 4.6 (1M context) --- .woodpecker/ci.yml | 6 +- .woodpecker/detect-duplicates.py | 211 ++++++++++++++++++++++++++----- 2 files changed, 183 insertions(+), 34 deletions(-) diff --git a/.woodpecker/ci.yml b/.woodpecker/ci.yml index f42f07c..61b8586 100644 --- a/.woodpecker/ci.yml +++ b/.woodpecker/ci.yml @@ -3,7 +3,7 @@ # # Steps: # 1. shellcheck — lint all .sh files (warnings+errors) -# 2. duplicate-detection — report copy-pasted code blocks (non-blocking) +# 2. duplicate-detection — report copy-pasted code blocks (fails only on new duplicates for PRs) when: event: [push, pull_request] @@ -23,5 +23,7 @@ steps: - name: duplicate-detection image: python:3-alpine commands: + - apk add --no-cache git - python3 .woodpecker/detect-duplicates.py - failure: ignore + environment: + DIFF_BASE: ${CI_COMMIT_TARGET_BRANCH} diff --git a/.woodpecker/detect-duplicates.py b/.woodpecker/detect-duplicates.py index caf6012..c43fd1f 100644 --- a/.woodpecker/detect-duplicates.py +++ b/.woodpecker/detect-duplicates.py @@ -7,14 +7,21 @@ Two detection passes: 2. Sliding-window hash: finds N-line blocks that appear verbatim in multiple files (catches structural copy-paste). -Exit 0 = clean. Exit 1 = findings (CI step is set to failure: ignore, -so overall CI stays green while findings are visible in logs). +When DIFF_BASE is set (e.g. "main"), compares findings against that base +branch and only fails (exit 1) when new duplicates are introduced by the +PR. Pre-existing findings are reported as informational. + +Without DIFF_BASE the script reports all findings and exits 0 +(informational only — no base to compare against). """ import sys import os import hashlib import re +import subprocess +import tempfile +import shutil from pathlib import Path from collections import defaultdict @@ -120,6 +127,112 @@ def check_duplicates(sh_files): return groups +# --------------------------------------------------------------------------- +# Baseline comparison helpers +# --------------------------------------------------------------------------- + +def prepare_baseline(base_ref): + """Extract .sh files from base_ref into a temp directory. + + Fetches the ref first (needed in shallow CI clones), then copies each + file via ``git show``. Returns the temp directory Path, or None on + failure. + """ + # Fetch the base branch (CI clones are typically shallow) + subprocess.run( + ["git", "fetch", "origin", base_ref, "--depth=1"], + capture_output=True, + ) + + ref = f"origin/{base_ref}" + result = subprocess.run( + ["git", "ls-tree", "-r", "--name-only", ref], + capture_output=True, text=True, + ) + if result.returncode != 0: + print(f"Warning: cannot list files in {ref}: " + f"{result.stderr.strip()}", file=sys.stderr) + return None + + sh_paths = [ + f for f in result.stdout.splitlines() + if f.endswith(".sh") and ".git/" not in f + ] + + tmpdir = Path(tempfile.mkdtemp(prefix="dup-baseline-")) + for f in sh_paths: + r = subprocess.run( + ["git", "show", f"{ref}:{f}"], + capture_output=True, text=True, + ) + if r.returncode == 0: + target = tmpdir / f + target.parent.mkdir(parents=True, exist_ok=True) + target.write_text(r.stdout) + + return tmpdir + + +def collect_findings(root): + """Run both detection passes on .sh files under *root*. + + Returns ``(ap_hits, dup_groups)`` with file paths relative to *root*. + """ + root = Path(root) + sh_files = sorted( + p for p in root.rglob("*.sh") if ".git" not in p.parts + ) + + ap_hits = check_anti_patterns(sh_files) + dup_groups = check_duplicates(sh_files) + + def rel(p): + try: + return str(Path(p).relative_to(root)) + except ValueError: + return p + + ap_hits = [(rel(f), ln, line, msg) for f, ln, line, msg in ap_hits] + dup_groups = [ + (h, [(rel(f), ln, prev) for f, ln, prev in locs]) + for h, locs in dup_groups + ] + return ap_hits, dup_groups + + +# --------------------------------------------------------------------------- +# Reporting helpers +# --------------------------------------------------------------------------- + +def print_anti_patterns(hits, label=""): + """Print anti-pattern hits with an optional label prefix.""" + if not hits: + return + prefix = f"{label} " if label else "" + print(f"=== {prefix}Anti-pattern findings ===") + for file, lineno, line, message in hits: + print(f" {file}:{lineno}: {message}") + print(f" > {line[:120]}") + print() + + +def print_duplicates(groups, label=""): + """Print duplicate groups with an optional label prefix.""" + if not groups: + return + prefix = f"{label} " if label else "" + print(f"=== {prefix}Duplicate code blocks (window={WINDOW} lines) ===") + for h, locs in groups: + files = {loc[0] for loc in locs} + print(f"\n [{h[:8]}] appears in {len(files)} file(s):") + for file, lineno, preview in locs: + print(f" {file}:{lineno}") + first_preview = locs[0][2] + for ln in first_preview.splitlines()[:3]: + print(f" | {ln}") + print() + + # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- @@ -136,40 +249,74 @@ def main() -> int: print(f"Scanning {len(sh_files)} shell files " f"(window={WINDOW} lines, min_files={MIN_FILES})...\n") - # --- Pass 1: anti-patterns --- - ap_hits = check_anti_patterns(sh_files) - if ap_hits: - print("=== Anti-pattern findings ===") - for file, lineno, line, message in ap_hits: - print(f" {file}:{lineno}: {message}") - print(f" > {line[:120]}") - print() + # --- Collect current findings (paths relative to ".") --- + cur_ap, cur_dups = collect_findings(".") - # --- Pass 2: sliding-window duplicates --- - dup_groups = check_duplicates(sh_files) - if dup_groups: - print(f"=== Duplicate code blocks (window={WINDOW} lines) ===") - for h, locs in dup_groups: - files = {loc[0] for loc in locs} - print(f"\n [{h[:8]}] appears in {len(files)} file(s):") - for file, lineno, preview in locs: - print(f" {file}:{lineno}") - # Show first 3 lines of the duplicated block - first_preview = locs[0][2] - for ln in first_preview.splitlines()[:3]: - print(f" | {ln}") - print() + # --- Baseline comparison mode --- + diff_base = os.environ.get("DIFF_BASE", "").strip() + if diff_base: + print(f"Baseline comparison: diffing against {diff_base}\n") - # --- Summary --- - total_issues = len(ap_hits) + len(dup_groups) + baseline_dir = prepare_baseline(diff_base) + if baseline_dir is None: + print(f"Warning: could not prepare baseline from {diff_base}, " + f"falling back to informational mode.\n", file=sys.stderr) + diff_base = "" # fall through to informational mode + else: + base_ap, base_dups = collect_findings(baseline_dir) + shutil.rmtree(baseline_dir) + + # Anti-pattern diff: key by (relative_path, stripped_line, message) + def ap_key(hit): + return (hit[0], hit[2].strip(), hit[3]) + + base_ap_keys = {ap_key(h) for h in base_ap} + new_ap = [h for h in cur_ap if ap_key(h) not in base_ap_keys] + pre_ap = [h for h in cur_ap if ap_key(h) in base_ap_keys] + + # Duplicate diff: key by content hash + base_dup_hashes = {g[0] for g in base_dups} + new_dups = [g for g in cur_dups if g[0] not in base_dup_hashes] + pre_dups = [g for g in cur_dups if g[0] in base_dup_hashes] + + # Report pre-existing as info + if pre_ap or pre_dups: + print(f"Pre-existing (not introduced by this PR): " + f"{len(pre_ap)} anti-pattern(s), " + f"{len(pre_dups)} duplicate block(s).") + print_anti_patterns(pre_ap, "Pre-existing") + print_duplicates(pre_dups, "Pre-existing") + + # Report and fail on new findings + if new_ap or new_dups: + print(f"NEW findings introduced by this PR: " + f"{len(new_ap)} anti-pattern(s), " + f"{len(new_dups)} duplicate block(s).") + print_anti_patterns(new_ap, "NEW") + print_duplicates(new_dups, "NEW") + return 1 + + total = len(cur_ap) + len(cur_dups) + if total > 0: + print(f"Total findings: {len(cur_ap)} anti-pattern(s), " + f"{len(cur_dups)} duplicate block(s) — " + f"all pre-existing, no regressions.") + else: + print("No duplicate code or anti-pattern findings.") + return 0 + + # --- Informational mode (no baseline available) --- + print_anti_patterns(cur_ap) + print_duplicates(cur_dups) + + total_issues = len(cur_ap) + len(cur_dups) if total_issues == 0: print("No duplicate code or anti-pattern findings.") - return 0 - - print(f"Summary: {len(ap_hits)} anti-pattern hit(s), " - f"{len(dup_groups)} duplicate block(s).") - print("Consider extracting shared patterns to lib/ helpers.") - return 1 + else: + print(f"Summary: {len(cur_ap)} anti-pattern hit(s), " + f"{len(cur_dups)} duplicate block(s).") + print("Consider extracting shared patterns to lib/ helpers.") + return 0 if __name__ == "__main__":