gsparc-mezzouna-api/app/baserow.py

95 lines
3.6 KiB
Python

"""Client Baserow — authentification JWT et opérations CRUD."""
import json
import logging
import httpx
from app.config import BASEROW_URL, BASEROW_HOST, BASEROW_USER, BASEROW_PASSWORD, BASEROW_TOKEN
logger = logging.getLogger(__name__)
_jwt_cache: str | None = None
async def get_jwt() -> str:
"""Retourne un JWT valide, avec cache et refresh automatique."""
global _jwt_cache
if _jwt_cache:
# Tester si le JWT est encore valide
try:
async with httpx.AsyncClient(timeout=10) as client:
r = await client.get(
f"{BASEROW_URL}/api/_health/",
headers={"Host": BASEROW_HOST, "Authorization": f"JWT {_jwt_cache}"},
)
if r.status_code == 200 and "healthy" in r.text:
return _jwt_cache
except Exception:
pass
# Login
async with httpx.AsyncClient(timeout=10) as client:
r = await client.post(
f"{BASEROW_URL}/api/user/token-auth/",
headers={"Host": BASEROW_HOST, "Content-Type": "application/json"},
json={"email": BASEROW_USER, "password": BASEROW_PASSWORD},
)
r.raise_for_status()
data = r.json()
_jwt_cache = data.get("access_token") or data.get("token")
logger.info("JWT Baserow régénéré")
return _jwt_cache
async def baserow_request(method: str, path: str, json_data: dict | None = None) -> dict:
"""Appel générique à l'API Baserow avec JWT."""
jwt = await get_jwt()
headers = {"Host": BASEROW_HOST, "Authorization": f"JWT {jwt}"}
if json_data:
headers["Content-Type"] = "application/json"
async with httpx.AsyncClient(timeout=30) as client:
r = await client.request(
method=method,
url=f"{BASEROW_URL}{path}",
headers=headers,
json=json_data,
)
if r.status_code == 401:
# JWT expiré, on reset et on retry une fois
global _jwt_cache
_jwt_cache = None
jwt = await get_jwt()
headers["Authorization"] = f"JWT {jwt}"
r = await client.request(method=method, url=f"{BASEROW_URL}{path}", headers=headers, json=json_data)
r.raise_for_status()
return r.json() if r.text else {}
# ── Opérations CRUD simplifiées ──────────────────────────────────────────────
async def list_rows(table_id: int, filters: dict | None = None, size: int = 100) -> list[dict]:
"""Lister les lignes d'une table avec filtres optionnels."""
path = f"/api/database/rows/table/{table_id}/?user_field_names=true&size={size}"
result = await baserow_request("GET", path)
return result.get("results", [])
async def get_row(table_id: int, row_id: int) -> dict:
"""Récupérer une ligne par son ID."""
return await baserow_request("GET", f"/api/database/rows/table/{table_id}/{row_id}/?user_field_names=true")
async def create_row(table_id: int, data: dict) -> dict:
"""Créer une nouvelle ligne."""
return await baserow_request("POST", f"/api/database/rows/table/{table_id}/?user_field_names=true", data)
async def update_row(table_id: int, row_id: int, data: dict) -> dict:
"""Mettre à jour une ligne."""
return await baserow_request("PATCH", f"/api/database/rows/table/{table_id}/{row_id}/?user_field_names=true", data)
async def delete_row(table_id: int, row_id: int) -> None:
"""Supprimer une ligne."""
await baserow_request("DELETE", f"/api/database/rows/table/{table_id}/{row_id}/")