import asyncio import logging import os import tempfile from datetime import datetime from typing import List, Optional, Tuple import httpx from fastapi import HTTPException from models.Asset import Asset, AssetType, AssetContentType from models.Inspiration import Inspiration from repos.dao import DAO from adapters.s3_adapter import S3Adapter # Try to import yt_dlp, but don't crash if it's missing (though we added it to requirements) try: import yt_dlp except ImportError: yt_dlp = None logger = logging.getLogger(__name__) class InspirationService: def __init__(self, dao: DAO, s3_adapter: S3Adapter): self.dao = dao self.s3_adapter = s3_adapter async def create_inspiration(self, source_url: str, created_by: str, project_id: Optional[str] = None, caption: Optional[str] = None) -> Inspiration: # 1. Download content from Instagram try: content_bytes, content_type, ext = await self._download_content(source_url) except Exception as e: logger.error(f"Failed to download content from {source_url}: {e}") raise HTTPException(status_code=400, detail=f"Failed to download content: {str(e)}") # 2. Save as Asset filename = f"inspirations/{datetime.now().strftime('%Y%m%d_%H%M%S')}_insta.{ext}" await self.s3_adapter.upload_file(filename, content_bytes, content_type=content_type) asset = Asset( name=f"Inspiration from {source_url}", type=AssetType.INSPIRATION, content_type=AssetContentType.VIDEO if content_type.startswith("video") else AssetContentType.IMAGE, minio_object_name=filename, minio_bucket=self.s3_adapter.bucket_name, created_by=created_by, project_id=project_id ) asset_id = await self.dao.assets.create_asset(asset) # 3. Create Inspiration object inspiration = Inspiration( source_url=source_url, caption=caption, asset_id=str(asset_id), created_by=created_by, project_id=project_id ) insp_id = await self.dao.inspirations.create_inspiration(inspiration) inspiration.id = insp_id return inspiration async def get_inspirations(self, project_id: Optional[str], created_by: str, limit: int = 20, offset: int = 0) -> List[Inspiration]: return await self.dao.inspirations.get_inspirations(project_id, created_by, limit, offset) async def get_inspiration(self, inspiration_id: str) -> Optional[Inspiration]: return await self.dao.inspirations.get_inspiration(inspiration_id) async def mark_as_completed(self, inspiration_id: str, is_completed: bool = True) -> Optional[Inspiration]: inspiration = await self.dao.inspirations.get_inspiration(inspiration_id) if not inspiration: return None inspiration.is_completed = is_completed inspiration.updated_at = datetime.now() await self.dao.inspirations.update_inspiration(inspiration) return inspiration async def delete_inspiration(self, inspiration_id: str) -> bool: inspiration = await self.dao.inspirations.get_inspiration(inspiration_id) if not inspiration: return False # Delete associated asset if inspiration.asset_id: await self.dao.assets.delete_asset(inspiration.asset_id) return await self.dao.inspirations.delete_inspiration(inspiration_id) async def _download_content(self, url: str) -> Tuple[bytes, str, str]: """ Downloads content using yt-dlp. Returns (content_bytes, content_type, extension) """ if not yt_dlp: raise RuntimeError("yt-dlp is not installed") logger.info(f"Downloading from {url} using yt-dlp...") def run_yt_dlp(): with tempfile.TemporaryDirectory() as tmpdirname: ydl_opts = { 'outtmpl': f'{tmpdirname}/%(id)s.%(ext)s', 'quiet': True, 'no_warnings': True, 'format': 'best', # Best quality single file 'noplaylist': True, # Only single video if it's a playlist/profile 'writethumbnail': False, 'writesubtitles': False, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) # Find the downloaded file files = os.listdir(tmpdirname) if not files: raise Exception("No files downloaded") # Pick the largest file if multiple (e.g. if yt-dlp downloaded parts) # But with 'format': 'best', it should be one. # If carousel, it might be multiple. Let's pick the first one. filename = files[0] filepath = os.path.join(tmpdirname, filename) with open(filepath, 'rb') as f: data = f.read() ext = filename.split('.')[-1].lower() # Determine content type if ext in ['mp4', 'mov', 'avi', 'mkv', 'webm']: content_type = f"video/{ext}" if ext == 'mov': content_type = "video/quicktime" elif ext in ['jpg', 'jpeg', 'png', 'webp']: content_type = f"image/{ext}" if ext == 'jpg': content_type = "image/jpeg" else: content_type = "application/octet-stream" return data, content_type, ext return await asyncio.to_thread(run_yt_dlp)