Files
Project_Velocity/backend/auth/dependencies.py
2026-04-12 02:02:58 +05:30

135 lines
4.5 KiB
Python

"""
backend/auth/dependencies.py — FastAPI RBAC Dependency Injection
Provides:
- get_current_user: decodes JWT and returns UserPrincipal
- require_role(min_role): raises HTTP 403 if user role is insufficient
Role hierarchy (ascending):
JUNIOR_BROKER < SENIOR_BROKER < SALES_DIRECTOR < ADMIN
"""
from __future__ import annotations
import os
from datetime import datetime, timedelta, timezone
from typing import Optional
from dataclasses import dataclass
from fastapi import Depends, Header, HTTPException, status
from jose import JWTError, jwt
from passlib.context import CryptContext
# ── Role hierarchy ────────────────────────────────────────────────────────────
ROLE_HIERARCHY = {
"JUNIOR_BROKER": 0,
"SENIOR_BROKER": 1,
"SALES_DIRECTOR": 2,
"ADMIN": 3,
}
# ── Password hashing ──────────────────────────────────────────────────────────
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(plain: str) -> str:
return pwd_context.hash(plain)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
# ── JWT helpers ───────────────────────────────────────────────────────────────
# Secret and algorithm retrieved from environment — never hardcoded.
JWT_SECRET = os.environ["VELOCITY_JWT_SECRET"]
JWT_ALGORITHM = "HS256"
JWT_EXPIRE_HOURS = 8
def create_access_token(user_id: str, role: str) -> str:
expire = datetime.now(timezone.utc) + timedelta(hours=JWT_EXPIRE_HOURS)
payload = {
"sub": user_id,
"role": role,
"exp": expire,
"iat": datetime.now(timezone.utc),
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
# ── UserPrincipal dataclass ───────────────────────────────────────────────────
@dataclass
class UserPrincipal:
user_id: str
role: str
@property
def role_level(self) -> int:
return ROLE_HIERARCHY.get(self.role, -1)
# ── Dependency: parse bearer token ────────────────────────────────────────────
def get_current_user(
authorization: Optional[str] = Header(default=None),
) -> UserPrincipal:
"""
Extracts and validates a JWT from the Authorization: Bearer <token> header.
Raises HTTP 401 on missing/invalid token.
"""
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing or malformed Authorization header.",
headers={"WWW-Authenticate": "Bearer"},
)
token = authorization.split(" ", 1)[1]
try:
payload = jwt.decode(
token,
JWT_SECRET,
algorithms=[JWT_ALGORITHM],
options={"require": ["sub", "role", "exp"]},
)
except JWTError as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid token: {exc}",
headers={"WWW-Authenticate": "Bearer"},
) from exc
return UserPrincipal(user_id=payload["sub"], role=payload["role"])
# ── Dependency factory: role gate ─────────────────────────────────────────────
def require_role(minimum_role: str):
"""
Returns a FastAPI dependency that raises HTTP 403 if the authenticated
user's role is below `minimum_role` in the hierarchy.
Usage:
@router.get("/protected")
async def protected(user: UserPrincipal = Depends(require_role("SENIOR_BROKER"))):
...
"""
min_level = ROLE_HIERARCHY.get(minimum_role)
if min_level is None:
raise ValueError(f"Unknown role: {minimum_role}")
def _check(user: UserPrincipal = Depends(get_current_user)) -> UserPrincipal:
if user.role_level < min_level:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Insufficient role. Required: {minimum_role}, current: {user.role}.",
)
return user
return _check