Files
Velocity-OS/core/api/api/routes_inventory.py
Sagnik Ghosh 2666b59327
Some checks failed
Velocity-OS Deployment Pipeline / lint (push) Has been cancelled
Velocity-OS Deployment Pipeline / build-and-push (agents) (push) Has been cancelled
Velocity-OS Deployment Pipeline / build-and-push (core) (push) Has been cancelled
Velocity-OS Deployment Pipeline / build-and-push (media-engine) (push) Has been cancelled
Velocity-OS Deployment Pipeline / build-and-push (webos) (push) Has been cancelled
Velocity-OS Deployment Pipeline / sign-images (agents) (push) Has been cancelled
Velocity-OS Deployment Pipeline / sign-images (core) (push) Has been cancelled
Velocity-OS Deployment Pipeline / sign-images (media-engine) (push) Has been cancelled
Velocity-OS Deployment Pipeline / sign-images (webos) (push) Has been cancelled
Velocity-OS Deployment Pipeline / notify-ingress (push) Has been cancelled
fix: wire Velocity-OS live CRM and inventory data
2026-05-01 18:54:00 +05:30

497 lines
18 KiB
Python

"""
routes_inventory.py
───────────────────
Inventory Pipeline API
Endpoints:
POST /inventory/import-batches — create a new import batch
GET /inventory/import-batches — list import batches
GET /inventory/import-batches/{batch_id} — get batch status
POST /inventory/properties — create a single property
GET /inventory/properties — list properties
GET /inventory/properties/{property_id} — get a property
PATCH /inventory/properties/{property_id} — update a property
DELETE /inventory/properties/{property_id} — archive a property
POST /inventory/properties/{property_id}/media — attach media to a property
GET /inventory/properties/{property_id}/media — list media for a property
DELETE /inventory/media/{media_asset_id} — remove a media asset
"""
from __future__ import annotations
import json
import logging
from datetime import datetime, timezone
from typing import Any, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from pydantic import BaseModel, Field
from backend.auth.dependencies import get_current_user
logger = logging.getLogger("velocity.inventory")
router = APIRouter()
# ── Helpers ───────────────────────────────────────────────────────────────────
def _pool(request: Request):
pool = request.app.state.db_pool
if pool is None:
raise HTTPException(status_code=503, detail="Database unavailable.")
return pool
def _tenant_scope(user) -> str:
return user.tenant_id
def _as_float(value: Any) -> float | None:
return float(value) if value is not None else None
def _canonical_project_payload(row: dict[str, Any]) -> dict[str, Any]:
location = row.get("location_json")
amenities = row.get("amenities_json")
unit_mix = row.get("unit_mix")
price_min = row.get("price_min")
price_max = row.get("price_max")
return {
"property_id": str(row["project_id"]),
"project_name": row["project_name"],
"developer_name": row["developer_name"],
"property_type": "project",
"location": {
"city": row.get("city"),
"district": row.get("micro_market"),
"area": row.get("micro_market"),
"address": row.get("address"),
**(location if isinstance(location, dict) else {}),
},
"price_bands": [
{
"label": "Current inventory",
"min": _as_float(price_min),
"max": _as_float(price_max),
"currency": "INR lakh",
}
] if price_min is not None or price_max is not None else [],
"unit_mix": unit_mix if isinstance(unit_mix, list) else [],
"amenities": amenities if isinstance(amenities, list) else [],
"status": row.get("project_status"),
"ingested_at": row.get("created_at"),
"created_at": row.get("created_at"),
}
async def _fetch_canonical_project_rows(conn: Any, limit: int, offset: int, project_id: str | None = None) -> list[dict[str, Any]]:
filters = ""
params: list[Any] = []
if project_id is not None:
filters = "WHERE p.project_id = $1"
params.append(project_id)
rows = await conn.fetch(
f"""
SELECT
p.project_id,
p.project_name,
p.developer_name,
p.city,
p.micro_market,
p.address,
p.project_status,
p.location_json,
p.amenities_json,
p.created_at,
MIN(u.price_current) AS price_min,
MAX(u.price_current) AS price_max,
COALESCE(
jsonb_agg(
DISTINCT jsonb_build_object(
'configuration', u.configuration,
'count', 1,
'available', CASE WHEN u.status = 'available' THEN 1 ELSE 0 END,
'area', CASE WHEN u.area_sqft IS NULL THEN NULL ELSE concat(u.area_sqft::text, ' sqft') END,
'price', CASE WHEN u.price_current IS NULL THEN NULL ELSE concat('INR ', u.price_current::text, ' lakh') END
)
) FILTER (WHERE u.unit_id IS NOT NULL),
'[]'::jsonb
) AS unit_mix
FROM inventory_projects p
LEFT JOIN inventory_units u ON u.project_id = p.project_id
{filters}
GROUP BY p.project_id
ORDER BY p.project_name ASC
LIMIT ${len(params) + 1} OFFSET ${len(params) + 2}
""",
*params,
limit,
offset,
)
return [_canonical_project_payload(dict(row)) for row in rows]
# ── Pydantic Models ───────────────────────────────────────────────────────────
VALID_SOURCE_TYPES = {"csv", "json", "api_push", "manual"}
VALID_PROPERTY_STATUSES = {"active", "archived", "draft", "under_review"}
VALID_MEDIA_TYPES = {"image", "video", "floorplan", "brochure", "360", "vr"}
class ImportBatchCreate(BaseModel):
source_type: str
source_file_ref: Optional[str] = None
total_rows: int = 0
class PropertyCreate(BaseModel):
batch_id: Optional[str] = None
source_id: Optional[str] = None
project_name: str
developer_name: str
location: dict = Field(default_factory=dict) # {city, district, lat, lng}
property_type: str
price_bands: list[dict] = Field(default_factory=list)
unit_mix: list[dict] = Field(default_factory=list)
amenities: list[str] = Field(default_factory=list)
status: str = "draft"
validation_state: dict = Field(default_factory=dict)
class PropertyUpdate(BaseModel):
project_name: Optional[str] = None
developer_name: Optional[str] = None
location: Optional[dict] = None
property_type: Optional[str] = None
price_bands: Optional[list[dict]] = None
unit_mix: Optional[list[dict]] = None
amenities: Optional[list[str]] = None
status: Optional[str] = None
validation_state: Optional[dict] = None
class MediaAssetCreate(BaseModel):
media_type: str
url: str
thumbnail_url: Optional[str] = None
sort_order: int = 0
metadata: dict = Field(default_factory=dict)
# ── Import Batches ────────────────────────────────────────────────────────────
@router.post("/import-batches", status_code=status.HTTP_201_CREATED,
summary="Create an inventory import batch")
async def create_import_batch(
request: Request,
body: ImportBatchCreate,
user=Depends(get_current_user),
):
if body.source_type not in VALID_SOURCE_TYPES:
raise HTTPException(400, f"Invalid source_type. Valid: {sorted(VALID_SOURCE_TYPES)}")
pool = _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO inventory_import_batches
(tenant_id, source_type, submitted_by, total_rows, source_file_ref)
VALUES ($1, $2, $3, $4, $5)
RETURNING batch_id, status, created_at
""",
_tenant_scope(user), body.source_type, user.user_id, body.total_rows, body.source_file_ref,
)
return dict(row)
@router.get("/import-batches", summary="List import batches")
async def list_import_batches(
request: Request,
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT batch_id, source_type, submitted_by, status, total_rows,
accepted_rows, rejected_rows, created_at, completed_at
FROM inventory_import_batches
WHERE tenant_id = $1
ORDER BY created_at DESC
LIMIT $2 OFFSET $3
""",
_tenant_scope(user), limit, offset,
)
total = await conn.fetchval(
"SELECT COUNT(*) FROM inventory_import_batches WHERE tenant_id=$1", _tenant_scope(user),
)
return {"total": total, "limit": limit, "offset": offset, "batches": [dict(r) for r in rows]}
@router.get("/import-batches/{batch_id}", summary="Get import batch status")
async def get_import_batch(
batch_id: str,
request: Request,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT * FROM inventory_import_batches WHERE batch_id=$1 AND tenant_id=$2
""",
batch_id, _tenant_scope(user),
)
if not row:
raise HTTPException(404, "Batch not found")
return dict(row)
# ── Properties ────────────────────────────────────────────────────────────────
@router.post("/properties", status_code=status.HTTP_201_CREATED, summary="Create a property")
async def create_property(
request: Request,
body: PropertyCreate,
user=Depends(get_current_user),
):
if body.status not in VALID_PROPERTY_STATUSES:
raise HTTPException(400, f"Invalid status. Valid: {sorted(VALID_PROPERTY_STATUSES)}")
pool = _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO inventory_properties (
tenant_id, batch_id, source_id, project_name, developer_name,
location, property_type, price_bands, unit_mix, amenities,
status, validation_state
) VALUES (
$1, $2, $3, $4, $5,
$6::jsonb, $7, $8::jsonb, $9::jsonb, $10,
$11, $12::jsonb
)
RETURNING property_id, created_at
""",
_tenant_scope(user), body.batch_id, body.source_id, body.project_name, body.developer_name,
json.dumps(body.location), body.property_type, json.dumps(body.price_bands),
json.dumps(body.unit_mix), body.amenities,
body.status, json.dumps(body.validation_state),
)
return {"property_id": str(row["property_id"]), "created_at": str(row["created_at"])}
@router.get("/properties", summary="List inventory properties")
async def list_properties(
request: Request,
status_filter: Optional[str] = Query(None, alias="status"),
property_type: Optional[str] = Query(None),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
where_clause = "WHERE tenant_id = $1"
params: list[Any] = [_tenant_scope(user)]
idx = 2
if status_filter:
where_clause += f" AND status = ${idx}"
params.append(status_filter)
idx += 1
if property_type:
where_clause += f" AND property_type = ${idx}"
params.append(property_type)
idx += 1
rows = await conn.fetch(
f"""
SELECT property_id, project_name, developer_name, property_type,
location, price_bands, unit_mix, status, ingested_at, created_at
FROM inventory_properties
{where_clause}
ORDER BY created_at DESC
LIMIT ${idx} OFFSET ${idx+1}
""",
*params, limit, offset,
)
total = await conn.fetchval(
f"SELECT COUNT(*) FROM inventory_properties {where_clause}", *params,
)
if total == 0 and not status_filter and not property_type:
canonical_rows = await _fetch_canonical_project_rows(conn, limit, offset)
canonical_total = await conn.fetchval("SELECT COUNT(*) FROM inventory_projects")
return {"total": canonical_total, "limit": limit, "offset": offset, "properties": canonical_rows}
return {"total": total, "limit": limit, "offset": offset, "properties": [dict(r) for r in rows]}
@router.get("/properties/{property_id}", summary="Get a property")
async def get_property(
property_id: str,
request: Request,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT * FROM inventory_properties WHERE property_id=$1 AND tenant_id=$2",
property_id, _tenant_scope(user),
)
if not row:
canonical_rows = await _fetch_canonical_project_rows(conn, limit=1, offset=0, project_id=property_id)
if canonical_rows:
return canonical_rows[0]
if not row:
raise HTTPException(404, "Property not found")
return dict(row)
@router.patch("/properties/{property_id}", summary="Update a property")
async def update_property(
property_id: str,
request: Request,
body: PropertyUpdate,
user=Depends(get_current_user),
):
updates: list[str] = []
values: list[Any] = []
idx = 1
def _add(col: str, val: Any, cast: str = ""):
nonlocal idx
updates.append(f"{col} = ${idx}{cast}")
values.append(val)
idx += 1
if body.project_name is not None: _add("project_name", body.project_name)
if body.developer_name is not None: _add("developer_name", body.developer_name)
if body.location is not None: _add("location", json.dumps(body.location), "::jsonb")
if body.property_type is not None: _add("property_type", body.property_type)
if body.price_bands is not None: _add("price_bands", json.dumps(body.price_bands), "::jsonb")
if body.unit_mix is not None: _add("unit_mix", json.dumps(body.unit_mix), "::jsonb")
if body.amenities is not None: _add("amenities", body.amenities)
if body.status is not None:
if body.status not in VALID_PROPERTY_STATUSES:
raise HTTPException(400, f"Invalid status. Valid: {sorted(VALID_PROPERTY_STATUSES)}")
_add("status", body.status)
if body.validation_state is not None:
_add("validation_state", json.dumps(body.validation_state), "::jsonb")
if not updates:
raise HTTPException(400, "No fields to update")
_add("updated_at", datetime.now(timezone.utc))
values.extend([property_id, _tenant_scope(user)])
pool = _pool(request)
async with pool.acquire() as conn:
result = await conn.execute(
f"""
UPDATE inventory_properties
SET {', '.join(updates)}
WHERE property_id=${idx} AND tenant_id=${idx+1}
""",
*values,
)
if result == "UPDATE 0":
raise HTTPException(404, "Property not found")
return {"status": "updated"}
@router.delete("/properties/{property_id}", summary="Archive a property")
async def archive_property(
property_id: str,
request: Request,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
result = await conn.execute(
"""
UPDATE inventory_properties
SET status='archived', updated_at=NOW()
WHERE property_id=$1 AND tenant_id=$2
""",
property_id, _tenant_scope(user),
)
if result == "UPDATE 0":
raise HTTPException(404, "Property not found")
return {"status": "archived"}
# ── Media Assets ──────────────────────────────────────────────────────────────
@router.post("/properties/{property_id}/media", status_code=status.HTTP_201_CREATED,
summary="Attach media to a property")
async def add_media(
property_id: str,
request: Request,
body: MediaAssetCreate,
user=Depends(get_current_user),
):
if body.media_type not in VALID_MEDIA_TYPES:
raise HTTPException(400, f"Invalid media_type. Valid: {sorted(VALID_MEDIA_TYPES)}")
pool = _pool(request)
async with pool.acquire() as conn:
# Verify property belongs to tenant
exists = await conn.fetchval(
"SELECT 1 FROM inventory_properties WHERE property_id=$1 AND tenant_id=$2",
property_id, _tenant_scope(user),
)
if not exists:
raise HTTPException(404, "Property not found")
row = await conn.fetchrow(
"""
INSERT INTO inventory_media_assets
(property_id, tenant_id, media_type, url, thumbnail_url, sort_order, metadata, uploaded_by)
VALUES ($1,$2,$3,$4,$5,$6,$7::jsonb,$8)
RETURNING media_asset_id, created_at
""",
property_id, _tenant_scope(user), body.media_type, body.url, body.thumbnail_url,
body.sort_order, json.dumps(body.metadata), user.user_id,
)
return {"media_asset_id": str(row["media_asset_id"]), "created_at": str(row["created_at"])}
@router.get("/properties/{property_id}/media", summary="List media for a property")
async def list_media(
property_id: str,
request: Request,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT media_asset_id, media_type, url, thumbnail_url, sort_order, metadata, created_at
FROM inventory_media_assets
WHERE property_id=$1 AND tenant_id=$2
ORDER BY sort_order ASC, created_at ASC
""",
property_id, _tenant_scope(user),
)
return {"media": [dict(r) for r in rows]}
@router.delete("/media/{media_asset_id}", summary="Remove a media asset")
async def delete_media(
media_asset_id: str,
request: Request,
user=Depends(get_current_user),
):
pool = _pool(request)
async with pool.acquire() as conn:
result = await conn.execute(
"DELETE FROM inventory_media_assets WHERE media_asset_id=$1 AND tenant_id=$2",
media_asset_id, _tenant_scope(user),
)
if result == "DELETE 0":
raise HTTPException(404, "Media asset not found")
return {"status": "deleted"}