"""Client Baserow — authentification JWT et opérations CRUD.""" import json import logging import httpx from app.config import BASEROW_URL, BASEROW_HOST, BASEROW_USER, BASEROW_PASSWORD, BASEROW_TOKEN logger = logging.getLogger(__name__) _jwt_cache: str | None = None async def get_jwt() -> str: """Retourne un JWT valide, avec cache et refresh automatique.""" global _jwt_cache if _jwt_cache: # Tester si le JWT est encore valide try: async with httpx.AsyncClient(timeout=10) as client: r = await client.get( f"{BASEROW_URL}/api/_health/", headers={"Host": BASEROW_HOST, "Authorization": f"JWT {_jwt_cache}"}, ) if r.status_code == 200 and "healthy" in r.text: return _jwt_cache except Exception: pass # Login async with httpx.AsyncClient(timeout=10) as client: r = await client.post( f"{BASEROW_URL}/api/user/token-auth/", headers={"Host": BASEROW_HOST, "Content-Type": "application/json"}, json={"email": BASEROW_USER, "password": BASEROW_PASSWORD}, ) r.raise_for_status() data = r.json() _jwt_cache = data.get("access_token") or data.get("token") logger.info("JWT Baserow régénéré") return _jwt_cache async def baserow_request(method: str, path: str, json_data: dict | None = None) -> dict: """Appel générique à l'API Baserow avec JWT.""" jwt = await get_jwt() headers = {"Host": BASEROW_HOST, "Authorization": f"JWT {jwt}"} if json_data: headers["Content-Type"] = "application/json" async with httpx.AsyncClient(timeout=30) as client: r = await client.request( method=method, url=f"{BASEROW_URL}{path}", headers=headers, json=json_data, ) if r.status_code == 401: # JWT expiré, on reset et on retry une fois global _jwt_cache _jwt_cache = None jwt = await get_jwt() headers["Authorization"] = f"JWT {jwt}" r = await client.request(method=method, url=f"{BASEROW_URL}{path}", headers=headers, json=json_data) r.raise_for_status() return r.json() if r.text else {} # ── Opérations CRUD simplifiées ────────────────────────────────────────────── async def list_rows(table_id: int, filters: dict | None = None, size: int = 100) -> list[dict]: """Lister les lignes d'une table avec filtres optionnels.""" path = f"/api/database/rows/table/{table_id}/?user_field_names=true&size={size}" result = await baserow_request("GET", path) return result.get("results", []) async def get_row(table_id: int, row_id: int) -> dict: """Récupérer une ligne par son ID.""" return await baserow_request("GET", f"/api/database/rows/table/{table_id}/{row_id}/?user_field_names=true") async def create_row(table_id: int, data: dict) -> dict: """Créer une nouvelle ligne.""" return await baserow_request("POST", f"/api/database/rows/table/{table_id}/?user_field_names=true", data) async def update_row(table_id: int, row_id: int, data: dict) -> dict: """Mettre à jour une ligne.""" return await baserow_request("PATCH", f"/api/database/rows/table/{table_id}/{row_id}/?user_field_names=true", data) async def delete_row(table_id: int, row_id: int) -> None: """Supprimer une ligne.""" await baserow_request("DELETE", f"/api/database/rows/table/{table_id}/{row_id}/")