147 lines
5.9 KiB
Python
147 lines
5.9 KiB
Python
import asyncio
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
from datetime import datetime
|
|
from typing import List, Optional, Tuple
|
|
|
|
import httpx
|
|
from fastapi import HTTPException
|
|
|
|
from models.Asset import Asset, AssetType, AssetContentType
|
|
from models.Inspiration import Inspiration
|
|
from repos.dao import DAO
|
|
from adapters.s3_adapter import S3Adapter
|
|
|
|
# Try to import yt_dlp, but don't crash if it's missing (though we added it to requirements)
|
|
try:
|
|
import yt_dlp
|
|
except ImportError:
|
|
yt_dlp = None
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class InspirationService:
|
|
def __init__(self, dao: DAO, s3_adapter: S3Adapter):
|
|
self.dao = dao
|
|
self.s3_adapter = s3_adapter
|
|
|
|
async def create_inspiration(self, source_url: str, created_by: str, project_id: Optional[str] = None, caption: Optional[str] = None) -> Inspiration:
|
|
# 1. Download content from Instagram
|
|
try:
|
|
content_bytes, content_type, ext = await self._download_content(source_url)
|
|
except Exception as e:
|
|
logger.error(f"Failed to download content from {source_url}: {e}")
|
|
raise HTTPException(status_code=400, detail=f"Failed to download content: {str(e)}")
|
|
|
|
# 2. Save as Asset
|
|
filename = f"inspirations/{datetime.now().strftime('%Y%m%d_%H%M%S')}_insta.{ext}"
|
|
|
|
await self.s3_adapter.upload_file(filename, content_bytes, content_type=content_type)
|
|
|
|
asset = Asset(
|
|
name=f"Inspiration from {source_url}",
|
|
type=AssetType.INSPIRATION,
|
|
content_type=AssetContentType.VIDEO if content_type.startswith("video") else AssetContentType.IMAGE,
|
|
minio_object_name=filename,
|
|
minio_bucket=self.s3_adapter.bucket_name,
|
|
created_by=created_by,
|
|
project_id=project_id
|
|
)
|
|
asset_id = await self.dao.assets.create_asset(asset)
|
|
|
|
# 3. Create Inspiration object
|
|
inspiration = Inspiration(
|
|
source_url=source_url,
|
|
caption=caption,
|
|
asset_id=str(asset_id),
|
|
created_by=created_by,
|
|
project_id=project_id
|
|
)
|
|
insp_id = await self.dao.inspirations.create_inspiration(inspiration)
|
|
inspiration.id = insp_id
|
|
|
|
return inspiration
|
|
|
|
async def get_inspirations(self, project_id: Optional[str], created_by: str, limit: int = 20, offset: int = 0) -> List[Inspiration]:
|
|
return await self.dao.inspirations.get_inspirations(project_id, created_by, limit, offset)
|
|
|
|
async def get_inspiration(self, inspiration_id: str) -> Optional[Inspiration]:
|
|
return await self.dao.inspirations.get_inspiration(inspiration_id)
|
|
|
|
async def mark_as_completed(self, inspiration_id: str, is_completed: bool = True) -> Optional[Inspiration]:
|
|
inspiration = await self.dao.inspirations.get_inspiration(inspiration_id)
|
|
if not inspiration:
|
|
return None
|
|
|
|
inspiration.is_completed = is_completed
|
|
inspiration.updated_at = datetime.now()
|
|
await self.dao.inspirations.update_inspiration(inspiration)
|
|
return inspiration
|
|
|
|
async def delete_inspiration(self, inspiration_id: str) -> bool:
|
|
inspiration = await self.dao.inspirations.get_inspiration(inspiration_id)
|
|
if not inspiration:
|
|
return False
|
|
|
|
# Delete associated asset
|
|
if inspiration.asset_id:
|
|
await self.dao.assets.delete_asset(inspiration.asset_id)
|
|
|
|
return await self.dao.inspirations.delete_inspiration(inspiration_id)
|
|
|
|
async def _download_content(self, url: str) -> Tuple[bytes, str, str]:
|
|
"""
|
|
Downloads content using yt-dlp.
|
|
Returns (content_bytes, content_type, extension)
|
|
"""
|
|
if not yt_dlp:
|
|
raise RuntimeError("yt-dlp is not installed")
|
|
|
|
logger.info(f"Downloading from {url} using yt-dlp...")
|
|
|
|
def run_yt_dlp():
|
|
with tempfile.TemporaryDirectory() as tmpdirname:
|
|
ydl_opts = {
|
|
'outtmpl': f'{tmpdirname}/%(id)s.%(ext)s',
|
|
'quiet': True,
|
|
'no_warnings': True,
|
|
'format': 'best', # Best quality single file
|
|
'noplaylist': True, # Only single video if it's a playlist/profile
|
|
'writethumbnail': False,
|
|
'writesubtitles': False,
|
|
}
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
ydl.download([url])
|
|
|
|
# Find the downloaded file
|
|
files = os.listdir(tmpdirname)
|
|
if not files:
|
|
raise Exception("No files downloaded")
|
|
|
|
# Pick the largest file if multiple (e.g. if yt-dlp downloaded parts)
|
|
# But with 'format': 'best', it should be one.
|
|
# If carousel, it might be multiple. Let's pick the first one.
|
|
filename = files[0]
|
|
filepath = os.path.join(tmpdirname, filename)
|
|
|
|
with open(filepath, 'rb') as f:
|
|
data = f.read()
|
|
|
|
ext = filename.split('.')[-1].lower()
|
|
|
|
# Determine content type
|
|
if ext in ['mp4', 'mov', 'avi', 'mkv', 'webm']:
|
|
content_type = f"video/{ext}"
|
|
if ext == 'mov': content_type = "video/quicktime"
|
|
elif ext in ['jpg', 'jpeg', 'png', 'webp']:
|
|
content_type = f"image/{ext}"
|
|
if ext == 'jpg': content_type = "image/jpeg"
|
|
else:
|
|
content_type = "application/octet-stream"
|
|
|
|
return data, content_type, ext
|
|
|
|
return await asyncio.to_thread(run_yt_dlp)
|