Fetch builds from Jenkins API
This commit is contained in:
@@ -1,7 +1,12 @@
|
||||
import base64
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Dict, List
|
||||
|
||||
import requests
|
||||
|
||||
from app.settings import settings
|
||||
|
||||
DATA_DIR = Path(__file__).resolve().parent.parent / "data"
|
||||
|
||||
|
||||
@@ -14,9 +19,50 @@ def _load_json(filename: str) -> Dict:
|
||||
def _sort_builds(builds: List[Dict]) -> List[Dict]:
|
||||
return sorted(builds, key=lambda build: build.get("number", 0), reverse=True)
|
||||
|
||||
def normalize_build(build: Dict) -> Dict:
|
||||
changes = build.get("changeSets", [])
|
||||
commits = []
|
||||
|
||||
for cs in changes:
|
||||
for item in cs.get("items", []):
|
||||
commits.append({
|
||||
"commit": item.get("commitId", "")[:7],
|
||||
"message": item.get("msg", ""),
|
||||
"author": item.get("author", {}).get("fullName", "unknown"),
|
||||
})
|
||||
|
||||
return {
|
||||
"number": build.get("number"),
|
||||
"status": (build.get("result") or "RUNNING").lower(),
|
||||
"finished_at": build.get("timestamp"),
|
||||
"duration_seconds": build.get("duration", 0) // 1000,
|
||||
"url": build.get("url"),
|
||||
"commits": commits,
|
||||
}
|
||||
|
||||
|
||||
def _auth_header() -> Dict[str, str]:
|
||||
token = f"{settings.jenkins_user}:{settings.jenkins_token}"
|
||||
encoded = base64.b64encode(token.encode()).decode()
|
||||
return {"Authorization": f"Basic {encoded}"}
|
||||
|
||||
def fetch_builds(limit: int = 5) -> List[Dict]:
|
||||
url = (
|
||||
f"{settings.jenkins_base_url}/job/{settings.jenkins_job_name}/api/json"
|
||||
"?tree=builds[number,url,result,timestamp,duration,"
|
||||
"changesets[items[commitId,msg,author[fullName]]]]"
|
||||
)
|
||||
|
||||
resp = requests.get(url, headers = _auth_header(), timeout=5)
|
||||
resp.raise_for_status()
|
||||
|
||||
builds = resp.json().get("builds", [])
|
||||
return builds[:limit]
|
||||
|
||||
def build_history() -> Dict:
|
||||
"""Return Jenkins build history data."""
|
||||
history = _load_json("build_history.json")
|
||||
builds = history.get("builds", [])
|
||||
return {"builds": _sort_builds(builds)}
|
||||
return {
|
||||
"builds": [normalize_build(b) for b in builds]
|
||||
}
|
||||
|
||||
@@ -8,6 +8,10 @@ class RuntimeConfig:
|
||||
git_commit: str = os.getenv("GIT_COMMIT", "local")
|
||||
build_number: str = os.getenv("BUILD_NUMBER", "-")
|
||||
commit_author: str = os.getenv("COMMIT_AUTHOR", "local")
|
||||
jenkins_base_url: str = os.getenv("JENKINS_BASE_URL", "localhost:8080")
|
||||
jenkins_job_name: str = os.getenv("JENKINS_JOB_NAME", "")
|
||||
jenkins_user: str = os.getenv("JENKINS_USER", "")
|
||||
jenkins_token: str = os.getenv("JENKINS_TOKEN", "")
|
||||
|
||||
|
||||
settings = RuntimeConfig()
|
||||
|
||||
Reference in New Issue
Block a user