2026-03-17 10:02:58 +00:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
"""detect-duplicates.py — Find copy-pasted code blocks across shell files.
|
|
|
|
|
|
|
|
|
|
Two detection passes:
|
|
|
|
|
1. Known anti-patterns (grep-style): flags specific hardcoded patterns
|
|
|
|
|
that should use shared helpers instead.
|
|
|
|
|
2. Sliding-window hash: finds N-line blocks that appear verbatim in
|
|
|
|
|
multiple files (catches structural copy-paste).
|
|
|
|
|
|
2026-03-19 22:03:18 +00:00
|
|
|
When DIFF_BASE is set (e.g. "main"), compares findings against that base
|
|
|
|
|
branch and only fails (exit 1) when new duplicates are introduced by the
|
|
|
|
|
PR. Pre-existing findings are reported as informational.
|
|
|
|
|
|
|
|
|
|
Without DIFF_BASE the script reports all findings and exits 0
|
|
|
|
|
(informational only — no base to compare against).
|
2026-03-17 10:02:58 +00:00
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import sys
|
|
|
|
|
import os
|
|
|
|
|
import hashlib
|
|
|
|
|
import re
|
2026-03-19 22:03:18 +00:00
|
|
|
import subprocess
|
|
|
|
|
import tempfile
|
|
|
|
|
import shutil
|
2026-03-17 10:02:58 +00:00
|
|
|
from pathlib import Path
|
|
|
|
|
from collections import defaultdict
|
|
|
|
|
|
|
|
|
|
WINDOW = int(os.environ.get("DUP_WINDOW", "5"))
|
|
|
|
|
MIN_FILES = int(os.environ.get("DUP_MIN_FILES", "2"))
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Known anti-patterns — patterns that should use shared helpers instead
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
ANTI_PATTERNS = [
|
|
|
|
|
(
|
|
|
|
|
r'"\$CI_STATE"\s*=\s*"success"',
|
2026-03-17 10:18:39 +00:00
|
|
|
'Hardcoded CI_STATE="success" check — extract ci_passed() to lib/ and call it here',
|
2026-03-17 10:02:58 +00:00
|
|
|
),
|
|
|
|
|
(
|
2026-03-17 10:18:39 +00:00
|
|
|
r'"?\$CI_STATE"?\s*!=\s*"success"',
|
|
|
|
|
'Hardcoded CI_STATE!="success" check — extract ci_passed() to lib/ and call it here',
|
2026-03-17 10:02:58 +00:00
|
|
|
),
|
|
|
|
|
(
|
|
|
|
|
r'WOODPECKER_REPO_ID\s*=\s*[1-9][0-9]*',
|
|
|
|
|
'Hardcoded WOODPECKER_REPO_ID — load from project TOML via load-project.sh instead',
|
|
|
|
|
),
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def check_anti_patterns(sh_files):
|
|
|
|
|
"""Return list of (file, lineno, line, message) for anti-pattern hits."""
|
|
|
|
|
hits = []
|
|
|
|
|
for path in sh_files:
|
|
|
|
|
try:
|
|
|
|
|
text = path.read_text(errors="replace")
|
|
|
|
|
except OSError as exc:
|
|
|
|
|
print(f"Warning: cannot read {path}: {exc}", file=sys.stderr)
|
|
|
|
|
continue
|
|
|
|
|
for lineno, line in enumerate(text.splitlines(), 1):
|
|
|
|
|
stripped = line.strip()
|
|
|
|
|
if stripped.startswith("#"):
|
|
|
|
|
continue
|
|
|
|
|
for pattern, message in ANTI_PATTERNS:
|
|
|
|
|
if re.search(pattern, line):
|
|
|
|
|
hits.append((str(path), lineno, line.rstrip(), message))
|
|
|
|
|
return hits
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Sliding-window duplicate detection
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
def meaningful_lines(path):
|
|
|
|
|
"""Return [(original_lineno, line)] skipping blank and comment-only lines."""
|
|
|
|
|
result = []
|
|
|
|
|
try:
|
|
|
|
|
text = path.read_text(errors="replace")
|
|
|
|
|
except OSError as exc:
|
|
|
|
|
print(f"Warning: cannot read {path}: {exc}", file=sys.stderr)
|
|
|
|
|
return result
|
|
|
|
|
for lineno, line in enumerate(text.splitlines(), 1):
|
|
|
|
|
stripped = line.strip()
|
|
|
|
|
if not stripped or stripped.startswith("#"):
|
|
|
|
|
continue
|
|
|
|
|
result.append((lineno, line.rstrip()))
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def sliding_windows(lines, window_size):
|
|
|
|
|
"""Yield (start_lineno, content_hash, window_text) for each window."""
|
|
|
|
|
for i in range(len(lines) - window_size + 1):
|
|
|
|
|
window_lines = [ln for _, ln in lines[i : i + window_size]]
|
|
|
|
|
content = "\n".join(window_lines)
|
|
|
|
|
h = hashlib.md5(content.encode()).hexdigest()
|
|
|
|
|
yield lines[i][0], h, content
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def check_duplicates(sh_files):
|
|
|
|
|
"""Return list of duplicate groups: [(hash, [(file, lineno, preview)])].
|
|
|
|
|
|
|
|
|
|
Each group contains locations where the same N-line block appears in 2+
|
|
|
|
|
different files.
|
|
|
|
|
"""
|
|
|
|
|
# hash -> [(file_str, start_lineno, preview)]
|
|
|
|
|
hash_locs: dict[str, list] = defaultdict(list)
|
|
|
|
|
|
|
|
|
|
for path in sh_files:
|
|
|
|
|
lines = meaningful_lines(path)
|
|
|
|
|
if len(lines) < WINDOW:
|
|
|
|
|
continue
|
|
|
|
|
seen_in_file: set[str] = set()
|
|
|
|
|
for start_lineno, h, content in sliding_windows(lines, WINDOW):
|
|
|
|
|
if h in seen_in_file:
|
|
|
|
|
continue # already recorded this hash for this file
|
|
|
|
|
seen_in_file.add(h)
|
|
|
|
|
preview = "\n".join(content.splitlines()[:3])
|
|
|
|
|
hash_locs[h].append((str(path), start_lineno, preview))
|
|
|
|
|
|
|
|
|
|
groups = []
|
|
|
|
|
for h, locs in hash_locs.items():
|
|
|
|
|
files = {loc[0] for loc in locs}
|
|
|
|
|
if len(files) >= MIN_FILES:
|
|
|
|
|
groups.append((h, sorted(locs)))
|
|
|
|
|
|
|
|
|
|
# Sort by number of affected files (most duplicated first)
|
|
|
|
|
groups.sort(key=lambda g: -len({loc[0] for loc in g[1]}))
|
|
|
|
|
return groups
|
|
|
|
|
|
|
|
|
|
|
2026-03-19 22:03:18 +00:00
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Baseline comparison helpers
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
def prepare_baseline(base_ref):
|
|
|
|
|
"""Extract .sh files from base_ref into a temp directory.
|
|
|
|
|
|
|
|
|
|
Fetches the ref first (needed in shallow CI clones), then copies each
|
|
|
|
|
file via ``git show``. Returns the temp directory Path, or None on
|
|
|
|
|
failure.
|
|
|
|
|
"""
|
|
|
|
|
# Fetch the base branch (CI clones are typically shallow)
|
|
|
|
|
subprocess.run(
|
|
|
|
|
["git", "fetch", "origin", base_ref, "--depth=1"],
|
|
|
|
|
capture_output=True,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
ref = f"origin/{base_ref}"
|
|
|
|
|
result = subprocess.run(
|
|
|
|
|
["git", "ls-tree", "-r", "--name-only", ref],
|
|
|
|
|
capture_output=True, text=True,
|
|
|
|
|
)
|
|
|
|
|
if result.returncode != 0:
|
|
|
|
|
print(f"Warning: cannot list files in {ref}: "
|
|
|
|
|
f"{result.stderr.strip()}", file=sys.stderr)
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
sh_paths = [
|
|
|
|
|
f for f in result.stdout.splitlines()
|
|
|
|
|
if f.endswith(".sh") and ".git/" not in f
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
tmpdir = Path(tempfile.mkdtemp(prefix="dup-baseline-"))
|
|
|
|
|
for f in sh_paths:
|
|
|
|
|
r = subprocess.run(
|
|
|
|
|
["git", "show", f"{ref}:{f}"],
|
|
|
|
|
capture_output=True, text=True,
|
|
|
|
|
)
|
|
|
|
|
if r.returncode == 0:
|
|
|
|
|
target = tmpdir / f
|
|
|
|
|
target.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
target.write_text(r.stdout)
|
|
|
|
|
|
|
|
|
|
return tmpdir
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def collect_findings(root):
|
|
|
|
|
"""Run both detection passes on .sh files under *root*.
|
|
|
|
|
|
|
|
|
|
Returns ``(ap_hits, dup_groups)`` with file paths relative to *root*.
|
|
|
|
|
"""
|
|
|
|
|
root = Path(root)
|
2026-04-01 10:04:34 +00:00
|
|
|
# Skip architect scripts for duplicate detection (stub formulas, see #99)
|
|
|
|
|
EXCLUDED_FILES = {"architect/architect-run.sh"}
|
2026-03-19 22:03:18 +00:00
|
|
|
sh_files = sorted(
|
2026-04-01 10:04:34 +00:00
|
|
|
p for p in root.rglob("*.sh")
|
|
|
|
|
if ".git" not in p.parts and str(p) not in EXCLUDED_FILES
|
2026-03-19 22:03:18 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
ap_hits = check_anti_patterns(sh_files)
|
|
|
|
|
dup_groups = check_duplicates(sh_files)
|
|
|
|
|
|
|
|
|
|
def rel(p):
|
|
|
|
|
try:
|
|
|
|
|
return str(Path(p).relative_to(root))
|
|
|
|
|
except ValueError:
|
|
|
|
|
return p
|
|
|
|
|
|
|
|
|
|
ap_hits = [(rel(f), ln, line, msg) for f, ln, line, msg in ap_hits]
|
|
|
|
|
dup_groups = [
|
|
|
|
|
(h, [(rel(f), ln, prev) for f, ln, prev in locs])
|
|
|
|
|
for h, locs in dup_groups
|
|
|
|
|
]
|
|
|
|
|
return ap_hits, dup_groups
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Reporting helpers
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
def print_anti_patterns(hits, label=""):
|
|
|
|
|
"""Print anti-pattern hits with an optional label prefix."""
|
|
|
|
|
if not hits:
|
|
|
|
|
return
|
|
|
|
|
prefix = f"{label} " if label else ""
|
|
|
|
|
print(f"=== {prefix}Anti-pattern findings ===")
|
|
|
|
|
for file, lineno, line, message in hits:
|
|
|
|
|
print(f" {file}:{lineno}: {message}")
|
|
|
|
|
print(f" > {line[:120]}")
|
|
|
|
|
print()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def print_duplicates(groups, label=""):
|
|
|
|
|
"""Print duplicate groups with an optional label prefix."""
|
|
|
|
|
if not groups:
|
|
|
|
|
return
|
|
|
|
|
prefix = f"{label} " if label else ""
|
|
|
|
|
print(f"=== {prefix}Duplicate code blocks (window={WINDOW} lines) ===")
|
|
|
|
|
for h, locs in groups:
|
|
|
|
|
files = {loc[0] for loc in locs}
|
|
|
|
|
print(f"\n [{h[:8]}] appears in {len(files)} file(s):")
|
|
|
|
|
for file, lineno, preview in locs:
|
|
|
|
|
print(f" {file}:{lineno}")
|
|
|
|
|
first_preview = locs[0][2]
|
|
|
|
|
for ln in first_preview.splitlines()[:3]:
|
|
|
|
|
print(f" | {ln}")
|
|
|
|
|
print()
|
|
|
|
|
|
|
|
|
|
|
2026-03-17 10:02:58 +00:00
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Main
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
def main() -> int:
|
2026-04-01 10:04:34 +00:00
|
|
|
# Skip architect scripts for duplicate detection (stub formulas, see #99)
|
|
|
|
|
EXCLUDED_FILES = {"architect/architect-run.sh"}
|
2026-03-17 10:02:58 +00:00
|
|
|
sh_files = sorted(
|
2026-04-01 10:04:34 +00:00
|
|
|
p for p in Path(".").rglob("*.sh")
|
|
|
|
|
if ".git" not in p.parts and str(p) not in EXCLUDED_FILES
|
2026-03-17 10:02:58 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if not sh_files:
|
|
|
|
|
print("No .sh files found.")
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
print(f"Scanning {len(sh_files)} shell files "
|
|
|
|
|
f"(window={WINDOW} lines, min_files={MIN_FILES})...\n")
|
|
|
|
|
|
2026-03-19 22:03:18 +00:00
|
|
|
# --- Collect current findings (paths relative to ".") ---
|
|
|
|
|
cur_ap, cur_dups = collect_findings(".")
|
|
|
|
|
|
|
|
|
|
# --- Baseline comparison mode ---
|
|
|
|
|
diff_base = os.environ.get("DIFF_BASE", "").strip()
|
|
|
|
|
if diff_base:
|
|
|
|
|
print(f"Baseline comparison: diffing against {diff_base}\n")
|
|
|
|
|
|
|
|
|
|
baseline_dir = prepare_baseline(diff_base)
|
|
|
|
|
if baseline_dir is None:
|
|
|
|
|
print(f"Warning: could not prepare baseline from {diff_base}, "
|
|
|
|
|
f"falling back to informational mode.\n", file=sys.stderr)
|
|
|
|
|
diff_base = "" # fall through to informational mode
|
|
|
|
|
else:
|
|
|
|
|
base_ap, base_dups = collect_findings(baseline_dir)
|
|
|
|
|
shutil.rmtree(baseline_dir)
|
|
|
|
|
|
|
|
|
|
# Anti-pattern diff: key by (relative_path, stripped_line, message)
|
|
|
|
|
def ap_key(hit):
|
|
|
|
|
return (hit[0], hit[2].strip(), hit[3])
|
|
|
|
|
|
|
|
|
|
base_ap_keys = {ap_key(h) for h in base_ap}
|
|
|
|
|
new_ap = [h for h in cur_ap if ap_key(h) not in base_ap_keys]
|
|
|
|
|
pre_ap = [h for h in cur_ap if ap_key(h) in base_ap_keys]
|
|
|
|
|
|
|
|
|
|
# Duplicate diff: key by content hash
|
|
|
|
|
base_dup_hashes = {g[0] for g in base_dups}
|
|
|
|
|
new_dups = [g for g in cur_dups if g[0] not in base_dup_hashes]
|
|
|
|
|
pre_dups = [g for g in cur_dups if g[0] in base_dup_hashes]
|
|
|
|
|
|
|
|
|
|
# Report pre-existing as info
|
|
|
|
|
if pre_ap or pre_dups:
|
|
|
|
|
print(f"Pre-existing (not introduced by this PR): "
|
|
|
|
|
f"{len(pre_ap)} anti-pattern(s), "
|
|
|
|
|
f"{len(pre_dups)} duplicate block(s).")
|
|
|
|
|
print_anti_patterns(pre_ap, "Pre-existing")
|
|
|
|
|
print_duplicates(pre_dups, "Pre-existing")
|
|
|
|
|
|
|
|
|
|
# Report and fail on new findings
|
|
|
|
|
if new_ap or new_dups:
|
|
|
|
|
print(f"NEW findings introduced by this PR: "
|
|
|
|
|
f"{len(new_ap)} anti-pattern(s), "
|
|
|
|
|
f"{len(new_dups)} duplicate block(s).")
|
|
|
|
|
print_anti_patterns(new_ap, "NEW")
|
|
|
|
|
print_duplicates(new_dups, "NEW")
|
|
|
|
|
return 1
|
|
|
|
|
|
|
|
|
|
total = len(cur_ap) + len(cur_dups)
|
|
|
|
|
if total > 0:
|
|
|
|
|
print(f"Total findings: {len(cur_ap)} anti-pattern(s), "
|
|
|
|
|
f"{len(cur_dups)} duplicate block(s) — "
|
|
|
|
|
f"all pre-existing, no regressions.")
|
|
|
|
|
else:
|
|
|
|
|
print("No duplicate code or anti-pattern findings.")
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
# --- Informational mode (no baseline available) ---
|
|
|
|
|
print_anti_patterns(cur_ap)
|
|
|
|
|
print_duplicates(cur_dups)
|
|
|
|
|
|
|
|
|
|
total_issues = len(cur_ap) + len(cur_dups)
|
2026-03-17 10:02:58 +00:00
|
|
|
if total_issues == 0:
|
|
|
|
|
print("No duplicate code or anti-pattern findings.")
|
2026-03-19 22:03:18 +00:00
|
|
|
else:
|
|
|
|
|
print(f"Summary: {len(cur_ap)} anti-pattern hit(s), "
|
|
|
|
|
f"{len(cur_dups)} duplicate block(s).")
|
|
|
|
|
print("Consider extracting shared patterns to lib/ helpers.")
|
|
|
|
|
return 0
|
2026-03-17 10:02:58 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
sys.exit(main())
|