from __future__ import annotations from typing import Annotated from uuid import uuid4 from fastapi import APIRouter, Header, HTTPException, status from app.adapters.identity.demo_adapter import DemoIdentityAdapter from app.core.constants import ERROR_CODE_OK from app.core.time import format_now from app.schemas.common import ApiResponse from app.schemas.identity import ( IdentityUser, LoginData, LoginRequest, PermissionsData, TokenIntrospectData, TokenIntrospectRequest, ) from app.services.identity_service import IdentityService router = APIRouter(prefix="/api/demo/identity", tags=["demo-identity"]) def build_request_id(header_value: str | None) -> str: return header_value or f"req-{uuid4().hex[:12]}" @router.post("/login", response_model=ApiResponse[LoginData]) def login( payload: LoginRequest, x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None, ) -> ApiResponse[LoginData]: request_id = build_request_id(x_request_id) adapter = DemoIdentityAdapter() result = adapter.login(payload.username, payload.password) if not result: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail={"code": "UNAUTHORIZED", "message": "invalid username or password"}) access_token, user = result identity_user = IdentityService.to_identity_user(user) return ApiResponse[LoginData]( request_id=request_id, success=True, code=ERROR_CODE_OK, message="login success", data=LoginData(access_token=access_token, expires_in_seconds=7200, user=identity_user), timestamp=format_now("Asia/Shanghai"), ) @router.get("/me", response_model=ApiResponse[IdentityUser]) def me( authorization: Annotated[str | None, Header(alias="Authorization")] = None, x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None, ) -> ApiResponse[IdentityUser]: request_id = build_request_id(x_request_id) token = (authorization or "").removeprefix("Bearer ").strip() user = IdentityService().get_user_by_token(token) if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail={"code": "UNAUTHORIZED", "message": "invalid token"}) return ApiResponse[IdentityUser]( request_id=request_id, success=True, code=ERROR_CODE_OK, message="success", data=IdentityService.to_identity_user(user), timestamp=format_now("Asia/Shanghai"), ) @router.get("/users/{user_id}/permissions", response_model=ApiResponse[PermissionsData]) def get_permissions(user_id: str, x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None) -> ApiResponse[PermissionsData]: request_id = build_request_id(x_request_id) user = IdentityService().get_permissions(user_id) if not user: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail={"code": "NOT_FOUND", "message": "user not found"}) return ApiResponse[PermissionsData]( request_id=request_id, success=True, code=ERROR_CODE_OK, message="success", data=PermissionsData( user_id=user["user_id"], roles=user["roles"], permissions=user["permissions"], allowed_envs=user["allowed_envs"], allowed_apps=user["allowed_apps"], ), timestamp=format_now("Asia/Shanghai"), ) @router.post("/token/introspect", response_model=ApiResponse[TokenIntrospectData]) def introspect_token( payload: TokenIntrospectRequest, x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None, ) -> ApiResponse[TokenIntrospectData]: request_id = build_request_id(x_request_id) user = IdentityService().get_user_by_token(payload.access_token) data = TokenIntrospectData(active=bool(user), user=IdentityService.to_identity_user(user) if user else None) return ApiResponse[TokenIntrospectData]( request_id=request_id, success=True, code=ERROR_CODE_OK, message="success", data=data, timestamp=format_now("Asia/Shanghai"), )