init
This commit is contained in:
71
backend/app/core/security.py
Normal file
71
backend/app/core/security.py
Normal file
@@ -0,0 +1,71 @@
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
import time
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from urllib.parse import parse_qs, unquote
|
||||
|
||||
import jwt
|
||||
|
||||
|
||||
def verify_telegram_login(data: dict, bot_token: str) -> bool:
|
||||
"""Verify data from Telegram Login Widget."""
|
||||
data = dict(data)
|
||||
check_hash = data.pop("hash", "")
|
||||
if not check_hash:
|
||||
return False
|
||||
|
||||
data_check_string = "\n".join(
|
||||
f"{k}={v}" for k, v in sorted(data.items())
|
||||
)
|
||||
secret_key = hashlib.sha256(bot_token.encode()).digest()
|
||||
computed = hmac.new(secret_key, data_check_string.encode(), hashlib.sha256).hexdigest()
|
||||
|
||||
if int(data.get("auth_date", 0)) < time.time() - 86400:
|
||||
return False
|
||||
|
||||
return hmac.compare_digest(computed, check_hash)
|
||||
|
||||
|
||||
def verify_telegram_webapp(init_data: str, bot_token: str) -> dict | None:
|
||||
"""Verify Telegram WebApp initData and return parsed user dict."""
|
||||
parsed = parse_qs(init_data)
|
||||
data = {k: v[0] for k, v in parsed.items()}
|
||||
|
||||
check_hash = data.pop("hash", "")
|
||||
if not check_hash:
|
||||
return None
|
||||
|
||||
data_check_string = "\n".join(
|
||||
f"{k}={v}" for k, v in sorted(data.items())
|
||||
)
|
||||
secret_key = hmac.new(b"WebAppData", bot_token.encode(), hashlib.sha256).digest()
|
||||
computed = hmac.new(secret_key, data_check_string.encode(), hashlib.sha256).hexdigest()
|
||||
|
||||
if not hmac.compare_digest(computed, check_hash):
|
||||
return None
|
||||
|
||||
user_raw = data.get("user")
|
||||
if not user_raw:
|
||||
return None
|
||||
|
||||
return json.loads(unquote(user_raw))
|
||||
|
||||
|
||||
def create_access_token(
|
||||
rider_id: str,
|
||||
telegram_id: int,
|
||||
secret: str,
|
||||
algorithm: str,
|
||||
expires_minutes: int,
|
||||
) -> str:
|
||||
payload = {
|
||||
"sub": rider_id,
|
||||
"tg_id": telegram_id,
|
||||
"exp": datetime.now(timezone.utc) + timedelta(minutes=expires_minutes),
|
||||
}
|
||||
return jwt.encode(payload, secret, algorithm=algorithm)
|
||||
|
||||
|
||||
def decode_access_token(token: str, secret: str, algorithm: str) -> dict:
|
||||
return jwt.decode(token, secret, algorithms=[algorithm])
|
||||
Reference in New Issue
Block a user