fix: duplicate-detection CI step fails on pre-existing duplicates unrelated to PR (#296)

Add baseline comparison to detect-duplicates.py: when DIFF_BASE is set
(via CI_COMMIT_TARGET_BRANCH for PRs), the script compares findings
against the base branch and only fails on new duplicates introduced by
the PR. Pre-existing duplicates are reported as informational.

For push events (no DIFF_BASE), the script reports all findings but
exits 0 (informational only). Removes failure:ignore from the CI step
so new duplicates properly block PRs.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
openhands 2026-03-19 22:03:18 +00:00
parent 8d39ce79e7
commit 5e4b00f9a3
2 changed files with 183 additions and 34 deletions

View file

@ -3,7 +3,7 @@
#
# Steps:
# 1. shellcheck — lint all .sh files (warnings+errors)
# 2. duplicate-detection — report copy-pasted code blocks (non-blocking)
# 2. duplicate-detection — report copy-pasted code blocks (fails only on new duplicates for PRs)
when:
event: [push, pull_request]
@ -23,5 +23,7 @@ steps:
- name: duplicate-detection
image: python:3-alpine
commands:
- apk add --no-cache git
- python3 .woodpecker/detect-duplicates.py
failure: ignore
environment:
DIFF_BASE: ${CI_COMMIT_TARGET_BRANCH}

View file

@ -7,14 +7,21 @@ Two detection passes:
2. Sliding-window hash: finds N-line blocks that appear verbatim in
multiple files (catches structural copy-paste).
Exit 0 = clean. Exit 1 = findings (CI step is set to failure: ignore,
so overall CI stays green while findings are visible in logs).
When DIFF_BASE is set (e.g. "main"), compares findings against that base
branch and only fails (exit 1) when new duplicates are introduced by the
PR. Pre-existing findings are reported as informational.
Without DIFF_BASE the script reports all findings and exits 0
(informational only no base to compare against).
"""
import sys
import os
import hashlib
import re
import subprocess
import tempfile
import shutil
from pathlib import Path
from collections import defaultdict
@ -120,6 +127,112 @@ def check_duplicates(sh_files):
return groups
# ---------------------------------------------------------------------------
# Baseline comparison helpers
# ---------------------------------------------------------------------------
def prepare_baseline(base_ref):
"""Extract .sh files from base_ref into a temp directory.
Fetches the ref first (needed in shallow CI clones), then copies each
file via ``git show``. Returns the temp directory Path, or None on
failure.
"""
# Fetch the base branch (CI clones are typically shallow)
subprocess.run(
["git", "fetch", "origin", base_ref, "--depth=1"],
capture_output=True,
)
ref = f"origin/{base_ref}"
result = subprocess.run(
["git", "ls-tree", "-r", "--name-only", ref],
capture_output=True, text=True,
)
if result.returncode != 0:
print(f"Warning: cannot list files in {ref}: "
f"{result.stderr.strip()}", file=sys.stderr)
return None
sh_paths = [
f for f in result.stdout.splitlines()
if f.endswith(".sh") and ".git/" not in f
]
tmpdir = Path(tempfile.mkdtemp(prefix="dup-baseline-"))
for f in sh_paths:
r = subprocess.run(
["git", "show", f"{ref}:{f}"],
capture_output=True, text=True,
)
if r.returncode == 0:
target = tmpdir / f
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(r.stdout)
return tmpdir
def collect_findings(root):
"""Run both detection passes on .sh files under *root*.
Returns ``(ap_hits, dup_groups)`` with file paths relative to *root*.
"""
root = Path(root)
sh_files = sorted(
p for p in root.rglob("*.sh") if ".git" not in p.parts
)
ap_hits = check_anti_patterns(sh_files)
dup_groups = check_duplicates(sh_files)
def rel(p):
try:
return str(Path(p).relative_to(root))
except ValueError:
return p
ap_hits = [(rel(f), ln, line, msg) for f, ln, line, msg in ap_hits]
dup_groups = [
(h, [(rel(f), ln, prev) for f, ln, prev in locs])
for h, locs in dup_groups
]
return ap_hits, dup_groups
# ---------------------------------------------------------------------------
# Reporting helpers
# ---------------------------------------------------------------------------
def print_anti_patterns(hits, label=""):
"""Print anti-pattern hits with an optional label prefix."""
if not hits:
return
prefix = f"{label} " if label else ""
print(f"=== {prefix}Anti-pattern findings ===")
for file, lineno, line, message in hits:
print(f" {file}:{lineno}: {message}")
print(f" > {line[:120]}")
print()
def print_duplicates(groups, label=""):
"""Print duplicate groups with an optional label prefix."""
if not groups:
return
prefix = f"{label} " if label else ""
print(f"=== {prefix}Duplicate code blocks (window={WINDOW} lines) ===")
for h, locs in groups:
files = {loc[0] for loc in locs}
print(f"\n [{h[:8]}] appears in {len(files)} file(s):")
for file, lineno, preview in locs:
print(f" {file}:{lineno}")
first_preview = locs[0][2]
for ln in first_preview.splitlines()[:3]:
print(f" | {ln}")
print()
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
@ -136,40 +249,74 @@ def main() -> int:
print(f"Scanning {len(sh_files)} shell files "
f"(window={WINDOW} lines, min_files={MIN_FILES})...\n")
# --- Pass 1: anti-patterns ---
ap_hits = check_anti_patterns(sh_files)
if ap_hits:
print("=== Anti-pattern findings ===")
for file, lineno, line, message in ap_hits:
print(f" {file}:{lineno}: {message}")
print(f" > {line[:120]}")
print()
# --- Collect current findings (paths relative to ".") ---
cur_ap, cur_dups = collect_findings(".")
# --- Pass 2: sliding-window duplicates ---
dup_groups = check_duplicates(sh_files)
if dup_groups:
print(f"=== Duplicate code blocks (window={WINDOW} lines) ===")
for h, locs in dup_groups:
files = {loc[0] for loc in locs}
print(f"\n [{h[:8]}] appears in {len(files)} file(s):")
for file, lineno, preview in locs:
print(f" {file}:{lineno}")
# Show first 3 lines of the duplicated block
first_preview = locs[0][2]
for ln in first_preview.splitlines()[:3]:
print(f" | {ln}")
print()
# --- Baseline comparison mode ---
diff_base = os.environ.get("DIFF_BASE", "").strip()
if diff_base:
print(f"Baseline comparison: diffing against {diff_base}\n")
# --- Summary ---
total_issues = len(ap_hits) + len(dup_groups)
baseline_dir = prepare_baseline(diff_base)
if baseline_dir is None:
print(f"Warning: could not prepare baseline from {diff_base}, "
f"falling back to informational mode.\n", file=sys.stderr)
diff_base = "" # fall through to informational mode
else:
base_ap, base_dups = collect_findings(baseline_dir)
shutil.rmtree(baseline_dir)
# Anti-pattern diff: key by (relative_path, stripped_line, message)
def ap_key(hit):
return (hit[0], hit[2].strip(), hit[3])
base_ap_keys = {ap_key(h) for h in base_ap}
new_ap = [h for h in cur_ap if ap_key(h) not in base_ap_keys]
pre_ap = [h for h in cur_ap if ap_key(h) in base_ap_keys]
# Duplicate diff: key by content hash
base_dup_hashes = {g[0] for g in base_dups}
new_dups = [g for g in cur_dups if g[0] not in base_dup_hashes]
pre_dups = [g for g in cur_dups if g[0] in base_dup_hashes]
# Report pre-existing as info
if pre_ap or pre_dups:
print(f"Pre-existing (not introduced by this PR): "
f"{len(pre_ap)} anti-pattern(s), "
f"{len(pre_dups)} duplicate block(s).")
print_anti_patterns(pre_ap, "Pre-existing")
print_duplicates(pre_dups, "Pre-existing")
# Report and fail on new findings
if new_ap or new_dups:
print(f"NEW findings introduced by this PR: "
f"{len(new_ap)} anti-pattern(s), "
f"{len(new_dups)} duplicate block(s).")
print_anti_patterns(new_ap, "NEW")
print_duplicates(new_dups, "NEW")
return 1
total = len(cur_ap) + len(cur_dups)
if total > 0:
print(f"Total findings: {len(cur_ap)} anti-pattern(s), "
f"{len(cur_dups)} duplicate block(s) — "
f"all pre-existing, no regressions.")
else:
print("No duplicate code or anti-pattern findings.")
return 0
# --- Informational mode (no baseline available) ---
print_anti_patterns(cur_ap)
print_duplicates(cur_dups)
total_issues = len(cur_ap) + len(cur_dups)
if total_issues == 0:
print("No duplicate code or anti-pattern findings.")
return 0
print(f"Summary: {len(ap_hits)} anti-pattern hit(s), "
f"{len(dup_groups)} duplicate block(s).")
print("Consider extracting shared patterns to lib/ helpers.")
return 1
else:
print(f"Summary: {len(cur_ap)} anti-pattern hit(s), "
f"{len(cur_dups)} duplicate block(s).")
print("Consider extracting shared patterns to lib/ helpers.")
return 0
if __name__ == "__main__":