Initial Switch to V2. Completely Overhauled Backend, Frontend and General Structure.
This commit is contained in:
93
backend/notes/service.py
Normal file
93
backend/notes/service.py
Normal file
@@ -0,0 +1,93 @@
|
||||
import uuid
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy import select, delete, func
|
||||
from sqlalchemy.orm import selectinload
|
||||
from notes.orm import Entry, EntryLink
|
||||
from notes.models import EntryCreate, EntryUpdate, EntryLinkIn
|
||||
from shared.exceptions import NotFoundError
|
||||
|
||||
|
||||
async def create_entry(db: AsyncSession, data: EntryCreate, author_id: str, author_name: str) -> Entry:
|
||||
entry = Entry(
|
||||
type=data.type,
|
||||
title=data.title,
|
||||
body=data.body,
|
||||
status=data.status if data.type == "issue" else None,
|
||||
severity=data.severity if data.type == "issue" else None,
|
||||
category=data.category if data.type == "issue" else None,
|
||||
author_id=author_id,
|
||||
author_name=author_name,
|
||||
)
|
||||
db.add(entry)
|
||||
await db.flush() # get the ID before inserting links
|
||||
|
||||
for link_in in data.links:
|
||||
db.add(EntryLink(entry_id=entry.id, entity_type=link_in.entity_type, entity_id=link_in.entity_id))
|
||||
|
||||
await db.commit()
|
||||
await db.refresh(entry)
|
||||
return await _get_entry_with_links(db, entry.id)
|
||||
|
||||
|
||||
async def get_entry(db: AsyncSession, entry_id: uuid.UUID) -> Entry:
|
||||
return await _get_entry_with_links(db, entry_id)
|
||||
|
||||
|
||||
async def list_entries(
|
||||
db: AsyncSession,
|
||||
type: str | None, status: str | None, severity: str | None, category: str | None,
|
||||
page: int, limit: int,
|
||||
) -> tuple[list[Entry], int]:
|
||||
limit = min(100, max(1, limit))
|
||||
offset = (max(1, page) - 1) * limit
|
||||
|
||||
q = select(Entry).options(selectinload(Entry.links))
|
||||
if type: q = q.where(Entry.type == type)
|
||||
if status: q = q.where(Entry.status == status)
|
||||
if severity: q = q.where(Entry.severity == severity)
|
||||
if category: q = q.where(Entry.category == category)
|
||||
|
||||
total_q = select(func.count()).select_from(q.subquery())
|
||||
total = (await db.execute(total_q)).scalar()
|
||||
rows = (await db.execute(q.order_by(Entry.created_at.desc()).limit(limit).offset(offset))).scalars().all()
|
||||
return rows, total
|
||||
|
||||
|
||||
async def update_entry(db: AsyncSession, entry_id: uuid.UUID, data: EntryUpdate) -> Entry:
|
||||
entry = await _get_entry_with_links(db, entry_id)
|
||||
for field, value in data.model_dump(exclude_unset=True).items():
|
||||
setattr(entry, field, value)
|
||||
await db.commit()
|
||||
return await _get_entry_with_links(db, entry_id)
|
||||
|
||||
|
||||
async def delete_entry(db: AsyncSession, entry_id: uuid.UUID):
|
||||
entry = await _get_entry_with_links(db, entry_id)
|
||||
await db.delete(entry)
|
||||
await db.commit()
|
||||
|
||||
|
||||
async def replace_links(db: AsyncSession, entry_id: uuid.UUID, links: list[EntryLinkIn]) -> Entry:
|
||||
await _get_entry_with_links(db, entry_id) # raises 404 if not found
|
||||
await db.execute(delete(EntryLink).where(EntryLink.entry_id == entry_id))
|
||||
for link_in in links:
|
||||
db.add(EntryLink(entry_id=entry_id, entity_type=link_in.entity_type, entity_id=link_in.entity_id))
|
||||
await db.commit()
|
||||
return await _get_entry_with_links(db, entry_id)
|
||||
|
||||
|
||||
async def list_entries_for_entity(db: AsyncSession, entity_type: str, entity_id: str) -> list[Entry]:
|
||||
link_sq = select(EntryLink.entry_id).where(
|
||||
EntryLink.entity_type == entity_type,
|
||||
EntryLink.entity_id == entity_id,
|
||||
).subquery()
|
||||
q = select(Entry).options(selectinload(Entry.links)).where(Entry.id.in_(select(link_sq)))
|
||||
return (await db.execute(q.order_by(Entry.created_at.desc()))).scalars().all()
|
||||
|
||||
|
||||
async def _get_entry_with_links(db: AsyncSession, entry_id: uuid.UUID) -> Entry:
|
||||
q = select(Entry).options(selectinload(Entry.links)).where(Entry.id == entry_id)
|
||||
result = (await db.execute(q)).scalar_one_or_none()
|
||||
if not result:
|
||||
raise NotFoundError("Entry")
|
||||
return result
|
||||
Reference in New Issue
Block a user