# CI/CD Workshop # Copyright (C) 2025 OpenBokeron # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import base64 import html import re from typing import Dict, List, Tuple import requests from app.settings import settings def _sort_builds(builds: List[Dict]) -> List[Dict]: return sorted(builds, key=lambda build: build.get("number", 0), reverse=True) def _extract_commits(build: Dict) -> List[Dict]: commits = [] for cs in build.get("changeSets", []): for item in cs.get("items", []): commits.append( { "commit": item.get("commitId", "")[:7], "message": item.get("msg", ""), "author": item.get("author", {}).get("fullName", "unknown"), } ) return commits def _extract_trigger(build: Dict) -> str: for action in build.get("actions", []): for cause in action.get("causes", []): description = cause.get("shortDescription") if description: return html.unescape(description).strip() return "" def _summarize_trigger(trigger: str) -> Tuple[str, str, str]: if not trigger: return "", "", "" pr_url = "" for line in trigger.splitlines(): if line.strip().lower().startswith("reviewed-on:"): pr_url = line.split(":", 1)[-1].strip() break author_match = re.search(r"\(([^)]+)\)\.", trigger) author = author_match.group(1).strip() if author_match else "" pr_match = re.search(r"#(\d+)", trigger) title_match = re.search(r"Merge pull request '([^']+)'", trigger) if pr_match: pr_number = pr_match.group(1) title = title_match.group(1).strip() if title_match else "" if title: return f"Merge PR #{pr_number} ยท {title}", pr_url, author return f"Merge PR #{pr_number}", pr_url, author return trigger.splitlines()[0].strip(), pr_url, author def normalize_build(build: Dict) -> Dict: commits = _extract_commits(build) trigger = _extract_trigger(build) trigger_label, trigger_url, trigger_author = _summarize_trigger(trigger) status = (build.get("result") or "RUNNING").lower() if build.get("building"): status = "running" return { "number": build.get("number"), "status": status, "finished_at": build.get("timestamp"), "duration_seconds": build.get("duration", 0) // 1000, "url": build.get("url"), "commits": commits, "trigger": trigger, "trigger_label": trigger_label, "trigger_url": trigger_url, "trigger_author": trigger_author, } def _auth_header() -> Dict[str, str]: if not settings.jenkins_user or not settings.jenkins_token: return {} token = f"{settings.jenkins_user}:{settings.jenkins_token}" encoded = base64.b64encode(token.encode()).decode() return {"Authorization": f"Basic {encoded}"} def fetch_builds(limit: int = 5) -> List[Dict]: if not settings.jenkins_job_name: raise ValueError("JENKINS_JOB_NAME not configured") url = ( f"{settings.jenkins_base_url}/job/{settings.jenkins_job_name}/api/json" "?tree=builds[number,url,result,timestamp,duration,building," "changeSets[items[commitId,msg,author[fullName]]]," "actions[causes[shortDescription]]]" ) resp = requests.get(url, headers=_auth_header(), timeout=5) resp.raise_for_status() builds = resp.json().get("builds", []) return _sort_builds(builds)[:limit] def build_history() -> Dict: """Return Jenkins build history data.""" builds = fetch_builds() return {"builds": [normalize_build(b) for b in builds]}