72 lines
2.0 KiB
Python
72 lines
2.0 KiB
Python
import hashlib
|
|
import hmac
|
|
import json
|
|
import time
|
|
from datetime import datetime, timedelta, timezone
|
|
from urllib.parse import parse_qs, unquote
|
|
|
|
import jwt
|
|
|
|
|
|
def verify_telegram_login(data: dict, bot_token: str) -> bool:
|
|
"""Verify data from Telegram Login Widget."""
|
|
data = dict(data)
|
|
check_hash = data.pop("hash", "")
|
|
if not check_hash:
|
|
return False
|
|
|
|
data_check_string = "\n".join(
|
|
f"{k}={v}" for k, v in sorted(data.items())
|
|
)
|
|
secret_key = hashlib.sha256(bot_token.encode()).digest()
|
|
computed = hmac.new(secret_key, data_check_string.encode(), hashlib.sha256).hexdigest()
|
|
|
|
if int(data.get("auth_date", 0)) < time.time() - 86400:
|
|
return False
|
|
|
|
return hmac.compare_digest(computed, check_hash)
|
|
|
|
|
|
def verify_telegram_webapp(init_data: str, bot_token: str) -> dict | None:
|
|
"""Verify Telegram WebApp initData and return parsed user dict."""
|
|
parsed = parse_qs(init_data)
|
|
data = {k: v[0] for k, v in parsed.items()}
|
|
|
|
check_hash = data.pop("hash", "")
|
|
if not check_hash:
|
|
return None
|
|
|
|
data_check_string = "\n".join(
|
|
f"{k}={v}" for k, v in sorted(data.items())
|
|
)
|
|
secret_key = hmac.new(b"WebAppData", bot_token.encode(), hashlib.sha256).digest()
|
|
computed = hmac.new(secret_key, data_check_string.encode(), hashlib.sha256).hexdigest()
|
|
|
|
if not hmac.compare_digest(computed, check_hash):
|
|
return None
|
|
|
|
user_raw = data.get("user")
|
|
if not user_raw:
|
|
return None
|
|
|
|
return json.loads(unquote(user_raw))
|
|
|
|
|
|
def create_access_token(
|
|
rider_id: str,
|
|
telegram_id: int,
|
|
secret: str,
|
|
algorithm: str,
|
|
expires_minutes: int,
|
|
) -> str:
|
|
payload = {
|
|
"sub": rider_id,
|
|
"tg_id": telegram_id,
|
|
"exp": datetime.now(timezone.utc) + timedelta(minutes=expires_minutes),
|
|
}
|
|
return jwt.encode(payload, secret, algorithm=algorithm)
|
|
|
|
|
|
def decode_access_token(token: str, secret: str, algorithm: str) -> dict:
|
|
return jwt.decode(token, secret, algorithms=[algorithm])
|