109 lines
4.0 KiB
Python

from __future__ import annotations
from typing import Annotated
from uuid import uuid4
from fastapi import APIRouter, Header, HTTPException, status
from app.adapters.identity.demo_adapter import DemoIdentityAdapter
from app.core.constants import ERROR_CODE_OK
from app.core.time import format_now
from app.schemas.common import ApiResponse
from app.schemas.identity import (
IdentityUser,
LoginData,
LoginRequest,
PermissionsData,
TokenIntrospectData,
TokenIntrospectRequest,
)
from app.services.identity_service import IdentityService
router = APIRouter(prefix="/api/demo/identity", tags=["demo-identity"])
def build_request_id(header_value: str | None) -> str:
return header_value or f"req-{uuid4().hex[:12]}"
@router.post("/login", response_model=ApiResponse[LoginData])
def login(
payload: LoginRequest,
x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None,
) -> ApiResponse[LoginData]:
request_id = build_request_id(x_request_id)
adapter = DemoIdentityAdapter()
result = adapter.login(payload.username, payload.password)
if not result:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail={"code": "UNAUTHORIZED", "message": "invalid username or password"})
access_token, user = result
identity_user = IdentityService.to_identity_user(user)
return ApiResponse[LoginData](
request_id=request_id,
success=True,
code=ERROR_CODE_OK,
message="login success",
data=LoginData(access_token=access_token, expires_in_seconds=7200, user=identity_user),
timestamp=format_now("Asia/Shanghai"),
)
@router.get("/me", response_model=ApiResponse[IdentityUser])
def me(
authorization: Annotated[str | None, Header(alias="Authorization")] = None,
x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None,
) -> ApiResponse[IdentityUser]:
request_id = build_request_id(x_request_id)
token = (authorization or "").removeprefix("Bearer ").strip()
user = IdentityService().get_user_by_token(token)
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail={"code": "UNAUTHORIZED", "message": "invalid token"})
return ApiResponse[IdentityUser](
request_id=request_id,
success=True,
code=ERROR_CODE_OK,
message="success",
data=IdentityService.to_identity_user(user),
timestamp=format_now("Asia/Shanghai"),
)
@router.get("/users/{user_id}/permissions", response_model=ApiResponse[PermissionsData])
def get_permissions(user_id: str, x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None) -> ApiResponse[PermissionsData]:
request_id = build_request_id(x_request_id)
user = IdentityService().get_permissions(user_id)
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail={"code": "NOT_FOUND", "message": "user not found"})
return ApiResponse[PermissionsData](
request_id=request_id,
success=True,
code=ERROR_CODE_OK,
message="success",
data=PermissionsData(
user_id=user["user_id"],
roles=user["roles"],
permissions=user["permissions"],
allowed_envs=user["allowed_envs"],
allowed_apps=user["allowed_apps"],
),
timestamp=format_now("Asia/Shanghai"),
)
@router.post("/token/introspect", response_model=ApiResponse[TokenIntrospectData])
def introspect_token(
payload: TokenIntrospectRequest,
x_request_id: Annotated[str | None, Header(alias="X-Request-Id")] = None,
) -> ApiResponse[TokenIntrospectData]:
request_id = build_request_id(x_request_id)
user = IdentityService().get_user_by_token(payload.access_token)
data = TokenIntrospectData(active=bool(user), user=IdentityService.to_identity_user(user) if user else None)
return ApiResponse[TokenIntrospectData](
request_id=request_id,
success=True,
code=ERROR_CODE_OK,
message="success",
data=data,
timestamp=format_now("Asia/Shanghai"),
)