80 lines
2.3 KiB
Python
80 lines
2.3 KiB
Python
from typing import List, Optional
|
|
from datetime import datetime, UTC
|
|
|
|
from repos.dao import DAO
|
|
from models.Post import Post
|
|
|
|
|
|
class PostService:
|
|
def __init__(self, dao: DAO):
|
|
self.dao = dao
|
|
|
|
async def create_post(
|
|
self,
|
|
date: datetime,
|
|
topic: str,
|
|
generation_ids: List[str],
|
|
project_id: Optional[str],
|
|
user_id: str,
|
|
) -> Post:
|
|
post = Post(
|
|
date=date,
|
|
topic=topic,
|
|
generation_ids=generation_ids,
|
|
project_id=project_id,
|
|
created_by=user_id,
|
|
)
|
|
post_id = await self.dao.posts.create_post(post)
|
|
post.id = post_id
|
|
return post
|
|
|
|
async def get_post(self, post_id: str) -> Optional[Post]:
|
|
return await self.dao.posts.get_post(post_id)
|
|
|
|
async def get_posts(
|
|
self,
|
|
project_id: Optional[str],
|
|
user_id: str,
|
|
limit: int = 20,
|
|
offset: int = 0,
|
|
date_from: Optional[datetime] = None,
|
|
date_to: Optional[datetime] = None,
|
|
) -> List[Post]:
|
|
return await self.dao.posts.get_posts(project_id, user_id, limit, offset, date_from, date_to)
|
|
|
|
async def update_post(
|
|
self,
|
|
post_id: str,
|
|
date: Optional[datetime] = None,
|
|
topic: Optional[str] = None,
|
|
) -> Optional[Post]:
|
|
post = await self.dao.posts.get_post(post_id)
|
|
if not post:
|
|
return None
|
|
|
|
updates: dict = {"updated_at": datetime.now(UTC)}
|
|
if date is not None:
|
|
updates["date"] = date
|
|
if topic is not None:
|
|
updates["topic"] = topic
|
|
|
|
await self.dao.posts.update_post(post_id, updates)
|
|
|
|
# Return refreshed post
|
|
return await self.dao.posts.get_post(post_id)
|
|
|
|
async def delete_post(self, post_id: str) -> bool:
|
|
return await self.dao.posts.delete_post(post_id)
|
|
|
|
async def add_generations(self, post_id: str, generation_ids: List[str]) -> bool:
|
|
post = await self.dao.posts.get_post(post_id)
|
|
if not post:
|
|
return False
|
|
return await self.dao.posts.add_generations(post_id, generation_ids)
|
|
|
|
async def remove_generation(self, post_id: str, generation_id: str) -> bool:
|
|
post = await self.dao.posts.get_post(post_id)
|
|
if not post:
|
|
return False
|
|
return await self.dao.posts.remove_generation(post_id, generation_id)
|