From 279cb5c6f6475e5fc5da39c43968bea14665deaa Mon Sep 17 00:00:00 2001 From: xds Date: Fri, 13 Feb 2026 17:30:11 +0300 Subject: [PATCH] feat: Implement cancellation of stale generations in the service and repository, along with a new test. --- aiws.py | 32 +++++++++-- .../generation_service.cpython-313.pyc | Bin 27060 -> 27911 bytes api/service/generation_service.py | 13 ++++- .../generation_repo.cpython-313.pyc | Bin 6234 -> 7177 bytes repos/generation_repo.py | 18 ++++++ tests/test_scheduler.py | 52 ++++++++++++++++++ 6 files changed, 109 insertions(+), 6 deletions(-) create mode 100644 tests/test_scheduler.py diff --git a/aiws.py b/aiws.py index 57a1c7e..00a5007 100644 --- a/aiws.py +++ b/aiws.py @@ -120,6 +120,17 @@ assets_router.message.middleware(AuthMiddleware(repo=users_repo, admin_id=ADMIN_ gen_router.message.middleware(AlbumMiddleware(latency=0.8)) +async def start_scheduler(service: GenerationService): + while True: + try: + logger.info("Running scheduler for stacked generation killing") + await service.cleanup_stale_generations() + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Scheduler error: {e}") + await asyncio.sleep(300) # Check every 5 minutes + # --- LIFESPAN (Запуск FastAPI + Bot) --- @asynccontextmanager async def lifespan(app: FastAPI): @@ -151,17 +162,28 @@ async def lifespan(app: FastAPI): # ) # print("🤖 Bot polling started") + # 3. ЗАПУСК ШЕДУЛЕРА + scheduler_task = asyncio.create_task(start_scheduler(generation_service)) + print("⏰ Scheduler started") + yield # --- SHUTDOWN --- print("🛑 Shutting down...") - - # 3. Остановка бота - polling_task.cancel() + + # 4. Остановка шедулера + scheduler_task.cancel() try: - await polling_task + await scheduler_task except asyncio.CancelledError: - print("🤖 Bot polling stopped") + print("⏰ Scheduler stopped") + + # 3. Остановка бота + # polling_task.cancel() + # try: + # await polling_task + # except asyncio.CancelledError: + # print("🤖 Bot polling stopped") # 4. Отключение БД # Обычно Motor закрывать не обязательно при выходе, но хорошим тоном считается diff --git a/api/service/__pycache__/generation_service.cpython-313.pyc b/api/service/__pycache__/generation_service.cpython-313.pyc index 1970192c370f5adde4006310cc1fba07cd7354a9..a0f1879d59e01ba62471ecf76a971f0164da9ec3 100644 GIT binary patch delta 925 zcmZWnT}YEr7(VZJ+~z*#oM!2kwd2;*A&6psYKhaP2`1s!=tsfkJo_R2W@nqxh$bae zbfNKOKUZ}rbrl9dVCYTP8xfNs-3t+x20>kP&MZrO;d|cqd(Znm&wGAee}FqVFmDtW z%ZS@MG85@J|H|BIEPn(D&7mR0KtW7WfMLa{4H=!_)TWGfj?axQIHbKd`oN`C$@Lsh z%WjTL4w`I;9aJ359+KQO3!mi9ixmM*w{1Ee-R9qhUYot#>rg$Y=6Rq{w7yyb%sL?^Kw1>qPDg1a!;DR=qR03C~w5LGGj$0JcS zM&e(7vJi0CtY5~xJE!U2tnSltGt!C4Xlk5 zvk?xt%0PbpVNsW&!~gAq)rg{60n-8&Ik++F5b>mdW)3la9f%OZj&h%~=9Lo@dA1#7 zx%!^%^HC_9j09-IuHw|+1I+O2i{0dmYiJ6s7wU}Nuwpy-tnO)D*4B`*HDql)8Cy^K z!lg7v+hDeQ@UC~IqPj4dor{;`!hC C@Ay#w delta 182 zcmZp_#kl1%Bj0CUUM>b8ICQlyvp#eqpB1Ob1V#pisSGg;MU24=MNGkrMa;oUlVdoQ z8I>kCaLNP81)Qpk%9D?AhBB&5mf_L{k{(=|K(dG{jjc#7ok>%5^Aj$o$jNEx`i%0E zr={ 0: + logger.info(f"Cleaned up {count} stale generations (timeout)") + except Exception as e: + logger.error(f"Error cleaning up stale generations: {e}") \ No newline at end of file diff --git a/repos/__pycache__/generation_repo.cpython-313.pyc b/repos/__pycache__/generation_repo.cpython-313.pyc index 34fb78ee00d29c6f06f9e6a6a106d024003dfc50..a36ef065d34ea1a6dbc954970959ccc52b454cbd 100644 GIT binary patch delta 2219 zcmZ`)U2GIp6ux(7cJ}|T-InbZ%eEBBRJxRH3#CBOB4BsxmUP&d*pkV1ciIl@&TQ^X zrA5dZO%xy0;KcBX#>5ACvN0hs(FiZbm`KAz$pk`TB(d?O6(dm}^qxC~7K}I9Z_dp9 zIp00sIcM#jcL%}`LLon*i4lr;Y9sBeftPWNWwXk#RO+v#Fc_~SB}eagv;`T z&$@}5edBUM)vsVMgh*^=uU`zTfGxUg?^w-Lb*)&j`BBv{Mb7q?i>6|#RmJuJmX(TGw7J5`BbPp( zj}KvBK8XmH5OGPEa8j$a4cXqc)iOHbko(%T!QcDt@=1*hNC?!dJ z#wTvG!^ag(Aw}>yuGI9EpjC3s;+XY=>pTuwBiygJ$?E5O63sw%D+QY)ieYfnN~KaH zVO`({geZM82CY;>eDOzqKR(n1>%7ydR+e>ox{tn+cKF8C^co3LOXLVUaVJc5QrL9j zQ6Yol*1TY0?_$vNH?Hr2MZExh6l{NqC^RT#xoEEZLkAIIste#b3ZkDd|2+7&c|#{A z#WQM+WMH?))a43|Eh9v_;ge9T)fBDl5DPfOdf+(bICB6@FTe%|VOp)23SrbqBg{c; zi~t21mPdnz=UZPN-n#gQ?<*|tgJn$91aO6JY@zY_Dp_uN5n5nRzCIaPA=hN1z$SWI-WA z@Qq33b-27_4Mh?HXhj;W$;dXGu+B$@@!`d9BER4R;PEfM(@;#&?cB*fSA*Q$$N;8L z3ifZOuu1TrVgq<=v9;lAT;B`R0~A(hYNMx>^={(KRC!GNY0hqk^DA(OLm4clk(*jf zAd;Y#ouD_CXV#0rdN1O!^>|pV-ehnBLe9o`*!nos zxW!d)^(`?xQ!!~yvLpo+h(zJsvnQP>45p6GE2TN;6)6Zzs-jj^Go{Rz6xx`|w5DOB9hxqx z6Xp_egx}^g{Vai?5CMSIYhXQJszq&qgkY6typYe20 zkG^P!t9n_TR_WQSUPRte`4lV;P-6P@v<$?Yj>4n5qx8DarJ`0+Dzae~E6QWVbc@DS!U}o=-Dz&ST=}r_Y45G(oma(cV@qA5OUC%cxDnWbdr_O{L|XKuD-*4$huyy{1>EqB_|%WeHjZT+`*ynLsl>$6>-?7C{) z>gZkb@GT9ioDd2>2n3t_4^c4UzsRj7QM7S6(!LaFzv*pXYo#CmG(g%nhq>VdzV61u z&HQyCMr{(OpQ)|9> zaR*q?=}8)ZX~LEzRZx5+ z@(2X*YLpk-nzOk6I}+TjYte|i<63DI(Z_ea58=UVXE$ujiw(A5F4R~G0#tm>vt(hTLIv7nVyPA7ir#+HuPp_|(= znsZ9UNN?nb6CsF(!yo>1HtuA557HfT6=jGl&UZzB4LmaqK6(pf_bwFPP&Jm-pqS5@ArG}&3?7b zH;RvDW=tYJ>*sHRM(J*`HarlkZ%K!AXp7hssGWmc6W6(xVymD^TWM*w4!X3}R^B$i zkha#EvQ03ht+%G_0u<~Sm}!z4StgzQBIy`gIm|~06#G+_IY!gUapUbc4WiH;=v>_k zlJxF_Pb+H_nS4lqb_nD;6qHV#_318MWZ%;kZLml5%X$sw#63^(;+uZxgNv8!`*Ad$ zW+(&!&` zpI$4Y!?+!Uo)d`#x>&+9xTnNX3{%3g6p)v$pleCMn zGafJDX_*I4frc>2sNW6-sSlDUTtXifBbgQ;>TvdD^(giI)ai@5vWmG%8>6e{Pjul` zJabWol@m0Jg@r{~he`fj49!xr-9nYlvv&*I^z!Jqa4$DRrJpG)L3lwf)|VSW;(X!` z0uQkI@H)a3gskv_l~fNqxF=g+WYXUd8(bHRoxtN})cx4+rNT@mYPsS?J)ZCT_0TzV z5e3-t|WeHX_62(k77@BZ88wDVNWBQrhs$(Un_6Z z_Q);Yr^8p!y(M5Q#+p3YXycsJZS?=I_erT*Cbfe>IBn;eh^>}lvi8-^+K11AeOK9{ zx7aV0AJ<@3#95T<-6(auuBErzWBdug`vTf+OKan2RoH3mwP79|RRk<_HhRlw;md&+ z5%4Tmf-nUYm)8((AiRZe3*kKheBN=AR8Z-OX-^^O9;JTbIC9XThEezkp>PH`k_p`# z-K*R5gU3YI8d($uw5<}?(>rkx_A?@wnHS%=xwn4v(Czy-4x%`bqtDfD5{2^J^0~t( zjbL4(<0f~)UJyZ^4I0bTV8@MxkEBFd3dsVW;Di}Bt{z6-(ZIhBec`|^CeI1@Q=ybT eBul4c>4elz$mS{8c}T82*7qoF{6!GuCI11B=`0EW diff --git a/repos/generation_repo.py b/repos/generation_repo.py index b561548..5f2d38e 100644 --- a/repos/generation_repo.py +++ b/repos/generation_repo.py @@ -1,4 +1,5 @@ from typing import Optional, List +from datetime import datetime, timedelta, UTC from PIL.ImageChops import offset from bson import ObjectId @@ -85,3 +86,20 @@ class GenerationRepo: generation["id"] = str(generation.pop("_id")) generations.append(Generation(**generation)) return generations + + async def cancel_stale_generations(self, timeout_minutes: int = 60) -> int: + cutoff_time = datetime.now(UTC) - timedelta(minutes=timeout_minutes) + res = await self.collection.update_many( + { + "status": GenerationStatus.RUNNING, + "created_at": {"$lt": cutoff_time} + }, + { + "$set": { + "status": GenerationStatus.FAILED, + "failed_reason": "Timeout: Execution time limit exceeded", + "updated_at": datetime.now(UTC) + } + } + ) + return res.modified_count diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py new file mode 100644 index 0000000..95376ef --- /dev/null +++ b/tests/test_scheduler.py @@ -0,0 +1,52 @@ +import asyncio +import os +from datetime import datetime, timedelta, UTC +from motor.motor_asyncio import AsyncIOMotorClient +from models.Generation import Generation, GenerationStatus +from repos.generation_repo import GenerationRepo +from dotenv import load_dotenv + +load_dotenv() + +# Mock configs if not present in env +MONGO_HOST = os.getenv("MONGO_HOST", "mongodb://localhost:27017") +DB_NAME = os.getenv("DB_NAME", "bot_db") + +print(f"Connecting to MongoDB: {MONGO_HOST}, DB: {DB_NAME}") + +async def test_scheduler(): + client = AsyncIOMotorClient(MONGO_HOST) + repo = GenerationRepo(client, db_name=DB_NAME) + + # 1. Create a "stale" generation (2 hours ago) + stale_gen = Generation( + prompt="stale test", + status=GenerationStatus.RUNNING, + created_at=datetime.now(UTC) - timedelta(minutes=120), + assets_list=[], + aspect_ratio="NINESIXTEEN", + quality="ONEK" + ) + gen_id = await repo.create_generation(stale_gen) + print(f"Created stale generation: {gen_id}") + + # 2. Run cleanup + print("Running cleanup...") + count = await repo.cancel_stale_generations(timeout_minutes=60) + print(f"Cleaned up {count} generations") + + # 3. Verify status + updated_gen = await repo.get_generation(gen_id) + print(f"Generation status: {updated_gen.status}") + print(f"Failed reason: {updated_gen.failed_reason}") + + if updated_gen.status == GenerationStatus.FAILED: + print("✅ SUCCESS: Generation marked as FAILED") + else: + print("❌ FAILURE: Generation status not updated") + + # Cleanup + await repo.collection.delete_one({"_id": updated_gen.id}) # Remove test data + +if __name__ == "__main__": + asyncio.run(test_scheduler())