import base64 import json from pathlib import Path from typing import Dict, List import requests from app.settings import settings DATA_DIR = Path(__file__).resolve().parent.parent / "data" def _load_json(filename: str) -> Dict: path = DATA_DIR / filename with open(path, encoding="utf-8") as file: return json.load(file) def _sort_builds(builds: List[Dict]) -> List[Dict]: return sorted(builds, key=lambda build: build.get("number", 0), reverse=True) def normalize_build(build: Dict) -> Dict: changes = build.get("changeSets", []) commits = [] for cs in changes: for item in cs.get("items", []): commits.append({ "commit": item.get("commitId", "")[:7], "message": item.get("msg", ""), "author": item.get("author", {}).get("fullName", "unknown"), }) return { "number": build.get("number"), "status": (build.get("result") or "RUNNING").lower(), "finished_at": build.get("timestamp"), "duration_seconds": build.get("duration", 0) // 1000, "url": build.get("url"), "commits": commits, } def _auth_header() -> Dict[str, str]: token = f"{settings.jenkins_user}:{settings.jenkins_token}" encoded = base64.b64encode(token.encode()).decode() return {"Authorization": f"Basic {encoded}"} def fetch_builds(limit: int = 5) -> List[Dict]: url = ( f"{settings.jenkins_base_url}/job/{settings.jenkins_job_name}/api/json" "?tree=builds[number,url,result,timestamp,duration," "changesets[items[commitId,msg,author[fullName]]]]" ) resp = requests.get(url, headers = _auth_header(), timeout=5) resp.raise_for_status() builds = resp.json().get("builds", []) return builds[:limit] def build_history() -> Dict: """Return Jenkins build history data.""" history = _load_json("build_history.json") builds = history.get("builds", []) return { "builds": [normalize_build(b) for b in builds] }