Merge pull request 'fix: fix: duplicate-detection CI step fails on pre-existing duplicates unrelated to PR (#296)' (#316) from fix/issue-296 into main

This commit is contained in:
johba 2026-03-19 23:09:55 +01:00
commit eb6353eb21
2 changed files with 183 additions and 34 deletions

View file

@ -3,7 +3,7 @@
# #
# Steps: # Steps:
# 1. shellcheck — lint all .sh files (warnings+errors) # 1. shellcheck — lint all .sh files (warnings+errors)
# 2. duplicate-detection — report copy-pasted code blocks (non-blocking) # 2. duplicate-detection — report copy-pasted code blocks (fails only on new duplicates for PRs)
when: when:
event: [push, pull_request] event: [push, pull_request]
@ -23,5 +23,7 @@ steps:
- name: duplicate-detection - name: duplicate-detection
image: python:3-alpine image: python:3-alpine
commands: commands:
- apk add --no-cache git
- python3 .woodpecker/detect-duplicates.py - python3 .woodpecker/detect-duplicates.py
failure: ignore environment:
DIFF_BASE: ${CI_COMMIT_TARGET_BRANCH}

View file

@ -7,14 +7,21 @@ Two detection passes:
2. Sliding-window hash: finds N-line blocks that appear verbatim in 2. Sliding-window hash: finds N-line blocks that appear verbatim in
multiple files (catches structural copy-paste). multiple files (catches structural copy-paste).
Exit 0 = clean. Exit 1 = findings (CI step is set to failure: ignore, When DIFF_BASE is set (e.g. "main"), compares findings against that base
so overall CI stays green while findings are visible in logs). branch and only fails (exit 1) when new duplicates are introduced by the
PR. Pre-existing findings are reported as informational.
Without DIFF_BASE the script reports all findings and exits 0
(informational only no base to compare against).
""" """
import sys import sys
import os import os
import hashlib import hashlib
import re import re
import subprocess
import tempfile
import shutil
from pathlib import Path from pathlib import Path
from collections import defaultdict from collections import defaultdict
@ -120,6 +127,112 @@ def check_duplicates(sh_files):
return groups return groups
# ---------------------------------------------------------------------------
# Baseline comparison helpers
# ---------------------------------------------------------------------------
def prepare_baseline(base_ref):
"""Extract .sh files from base_ref into a temp directory.
Fetches the ref first (needed in shallow CI clones), then copies each
file via ``git show``. Returns the temp directory Path, or None on
failure.
"""
# Fetch the base branch (CI clones are typically shallow)
subprocess.run(
["git", "fetch", "origin", base_ref, "--depth=1"],
capture_output=True,
)
ref = f"origin/{base_ref}"
result = subprocess.run(
["git", "ls-tree", "-r", "--name-only", ref],
capture_output=True, text=True,
)
if result.returncode != 0:
print(f"Warning: cannot list files in {ref}: "
f"{result.stderr.strip()}", file=sys.stderr)
return None
sh_paths = [
f for f in result.stdout.splitlines()
if f.endswith(".sh") and ".git/" not in f
]
tmpdir = Path(tempfile.mkdtemp(prefix="dup-baseline-"))
for f in sh_paths:
r = subprocess.run(
["git", "show", f"{ref}:{f}"],
capture_output=True, text=True,
)
if r.returncode == 0:
target = tmpdir / f
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(r.stdout)
return tmpdir
def collect_findings(root):
"""Run both detection passes on .sh files under *root*.
Returns ``(ap_hits, dup_groups)`` with file paths relative to *root*.
"""
root = Path(root)
sh_files = sorted(
p for p in root.rglob("*.sh") if ".git" not in p.parts
)
ap_hits = check_anti_patterns(sh_files)
dup_groups = check_duplicates(sh_files)
def rel(p):
try:
return str(Path(p).relative_to(root))
except ValueError:
return p
ap_hits = [(rel(f), ln, line, msg) for f, ln, line, msg in ap_hits]
dup_groups = [
(h, [(rel(f), ln, prev) for f, ln, prev in locs])
for h, locs in dup_groups
]
return ap_hits, dup_groups
# ---------------------------------------------------------------------------
# Reporting helpers
# ---------------------------------------------------------------------------
def print_anti_patterns(hits, label=""):
"""Print anti-pattern hits with an optional label prefix."""
if not hits:
return
prefix = f"{label} " if label else ""
print(f"=== {prefix}Anti-pattern findings ===")
for file, lineno, line, message in hits:
print(f" {file}:{lineno}: {message}")
print(f" > {line[:120]}")
print()
def print_duplicates(groups, label=""):
"""Print duplicate groups with an optional label prefix."""
if not groups:
return
prefix = f"{label} " if label else ""
print(f"=== {prefix}Duplicate code blocks (window={WINDOW} lines) ===")
for h, locs in groups:
files = {loc[0] for loc in locs}
print(f"\n [{h[:8]}] appears in {len(files)} file(s):")
for file, lineno, preview in locs:
print(f" {file}:{lineno}")
first_preview = locs[0][2]
for ln in first_preview.splitlines()[:3]:
print(f" | {ln}")
print()
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Main # Main
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -136,40 +249,74 @@ def main() -> int:
print(f"Scanning {len(sh_files)} shell files " print(f"Scanning {len(sh_files)} shell files "
f"(window={WINDOW} lines, min_files={MIN_FILES})...\n") f"(window={WINDOW} lines, min_files={MIN_FILES})...\n")
# --- Pass 1: anti-patterns --- # --- Collect current findings (paths relative to ".") ---
ap_hits = check_anti_patterns(sh_files) cur_ap, cur_dups = collect_findings(".")
if ap_hits:
print("=== Anti-pattern findings ===")
for file, lineno, line, message in ap_hits:
print(f" {file}:{lineno}: {message}")
print(f" > {line[:120]}")
print()
# --- Pass 2: sliding-window duplicates --- # --- Baseline comparison mode ---
dup_groups = check_duplicates(sh_files) diff_base = os.environ.get("DIFF_BASE", "").strip()
if dup_groups: if diff_base:
print(f"=== Duplicate code blocks (window={WINDOW} lines) ===") print(f"Baseline comparison: diffing against {diff_base}\n")
for h, locs in dup_groups:
files = {loc[0] for loc in locs}
print(f"\n [{h[:8]}] appears in {len(files)} file(s):")
for file, lineno, preview in locs:
print(f" {file}:{lineno}")
# Show first 3 lines of the duplicated block
first_preview = locs[0][2]
for ln in first_preview.splitlines()[:3]:
print(f" | {ln}")
print()
# --- Summary --- baseline_dir = prepare_baseline(diff_base)
total_issues = len(ap_hits) + len(dup_groups) if baseline_dir is None:
if total_issues == 0: print(f"Warning: could not prepare baseline from {diff_base}, "
f"falling back to informational mode.\n", file=sys.stderr)
diff_base = "" # fall through to informational mode
else:
base_ap, base_dups = collect_findings(baseline_dir)
shutil.rmtree(baseline_dir)
# Anti-pattern diff: key by (relative_path, stripped_line, message)
def ap_key(hit):
return (hit[0], hit[2].strip(), hit[3])
base_ap_keys = {ap_key(h) for h in base_ap}
new_ap = [h for h in cur_ap if ap_key(h) not in base_ap_keys]
pre_ap = [h for h in cur_ap if ap_key(h) in base_ap_keys]
# Duplicate diff: key by content hash
base_dup_hashes = {g[0] for g in base_dups}
new_dups = [g for g in cur_dups if g[0] not in base_dup_hashes]
pre_dups = [g for g in cur_dups if g[0] in base_dup_hashes]
# Report pre-existing as info
if pre_ap or pre_dups:
print(f"Pre-existing (not introduced by this PR): "
f"{len(pre_ap)} anti-pattern(s), "
f"{len(pre_dups)} duplicate block(s).")
print_anti_patterns(pre_ap, "Pre-existing")
print_duplicates(pre_dups, "Pre-existing")
# Report and fail on new findings
if new_ap or new_dups:
print(f"NEW findings introduced by this PR: "
f"{len(new_ap)} anti-pattern(s), "
f"{len(new_dups)} duplicate block(s).")
print_anti_patterns(new_ap, "NEW")
print_duplicates(new_dups, "NEW")
return 1
total = len(cur_ap) + len(cur_dups)
if total > 0:
print(f"Total findings: {len(cur_ap)} anti-pattern(s), "
f"{len(cur_dups)} duplicate block(s) — "
f"all pre-existing, no regressions.")
else:
print("No duplicate code or anti-pattern findings.") print("No duplicate code or anti-pattern findings.")
return 0 return 0
print(f"Summary: {len(ap_hits)} anti-pattern hit(s), " # --- Informational mode (no baseline available) ---
f"{len(dup_groups)} duplicate block(s).") print_anti_patterns(cur_ap)
print_duplicates(cur_dups)
total_issues = len(cur_ap) + len(cur_dups)
if total_issues == 0:
print("No duplicate code or anti-pattern findings.")
else:
print(f"Summary: {len(cur_ap)} anti-pattern hit(s), "
f"{len(cur_dups)} duplicate block(s).")
print("Consider extracting shared patterns to lib/ helpers.") print("Consider extracting shared patterns to lib/ helpers.")
return 1 return 0
if __name__ == "__main__": if __name__ == "__main__":