Files
TallerCiCd/backend/app/services/builds.py
2025-12-16 09:32:14 +01:00

84 lines
2.6 KiB
Python

# CI/CD Workshop
# Copyright (C) 2025 OpenBokeron
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import base64
import json
from pathlib import Path
from typing import Dict, List
import requests
from app.settings import settings
DATA_DIR = Path(__file__).resolve().parent.parent / "data"
def _load_json(filename: str) -> Dict:
path = DATA_DIR / filename
with open(path, encoding="utf-8") as file:
return json.load(file)
def _sort_builds(builds: List[Dict]) -> List[Dict]:
return sorted(builds, key=lambda build: build.get("number", 0), reverse=True)
def normalize_build(build: Dict) -> Dict:
changes = build.get("changeSets", [])
commits = []
for cs in changes:
for item in cs.get("items", []):
commits.append({
"commit": item.get("commitId", "")[:7],
"message": item.get("msg", ""),
"author": item.get("author", {}).get("fullName", "unknown"),
})
return {
"number": build.get("number"),
"status": (build.get("result") or "RUNNING").lower(),
"finished_at": build.get("timestamp"),
"duration_seconds": build.get("duration", 0) // 1000,
"url": build.get("url"),
"commits": commits,
}
def _auth_header() -> Dict[str, str]:
token = f"{settings.jenkins_user}:{settings.jenkins_token}"
encoded = base64.b64encode(token.encode()).decode()
return {"Authorization": f"Basic {encoded}"}
def fetch_builds(limit: int = 5) -> List[Dict]:
url = (
f"{settings.jenkins_base_url}/job/{settings.jenkins_job_name}/api/json"
"?tree=builds[number,url,result,timestamp,duration,"
"changesets[items[commitId,msg,author[fullName]]]]"
)
resp = requests.get(url, headers = _auth_header(), timeout=5)
resp.raise_for_status()
builds = resp.json().get("builds", [])
return builds[:limit]
def build_history() -> Dict:
"""Return Jenkins build history data."""
builds = fetch_builds()
return {
"builds": [normalize_build(b) for b in builds]
}