foxhole/app/utils/precheck.py

88 lines
2.5 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
import fastapi
import json
from loguru import logger
from app.httpsig import HttpSignature
2023-08-02 12:16:27 +02:00
from app import ldsig
from app.database import AsyncSession
from app.database import get_db_session
2023-04-01 11:17:01 +02:00
from app.actor import fetch_actor
from sqlalchemy import select
async def inbox_prechecker(
2024-02-28 07:39:24 +01:00
request: fastapi.Request, db_session: AsyncSession = fastapi.Depends(get_db_session)
2023-08-02 12:16:27 +02:00
) -> bool:
"""
Check http request
"""
payload = await request.json()
try:
logger.info(f"headers={request.headers}")
logger.info(f"{payload=}")
parsec_signature = HttpSignature.parse_signature(
request.headers.get("signature")
)
except KeyError:
logger.warning("Not signature headers")
raise KeyError
try:
if request.method == "POST" and request.url.path.endswith("/inbox"):
from app import models
if (
payload["type"] == "Delete"
and payload["actor"] == payload["object"]
and not (
await db_session.scalars(
select(models.Actor).where(
2023-03-19 06:05:16 +01:00
models.Actor.ap_id == payload["actor"],
)
)
).one_or_none()
):
2024-02-28 07:39:24 +01:00
logger.info(f"Dropping unnecessary delete activity " + payload["actor"])
raise fastapi.HTTPException(status_code=202)
except fastapi.HTTPException as http_exc:
raise http_exc
except Exception:
logger.exception("Failed to precheck delete activity")
2023-04-01 11:17:01 +02:00
actor_id = payload["actor"]
2024-02-28 07:39:24 +01:00
send_actor_id = parsec_signature["keyid"].split("#")[0]
2023-08-02 12:16:27 +02:00
2023-04-01 11:17:01 +02:00
_actor = await fetch_actor(db_session, actor_id)
try:
2023-04-01 18:11:30 +02:00
pubkey = _actor.ap_actor["publicKey"]["publicKeyPem"]
except json.JSONDecodeError:
raise ValueError
except KeyError:
logger.warning("actor gone? ")
raise KeyError
2024-02-28 07:39:24 +01:00
if actor_id != send_actor_id:
return await ldsig.verify_signature(db_session, payload)
body = await request.body()
signture_string = HttpSignature.build_signature_string(
request.method,
request.url.path,
parsec_signature["headers"],
HttpSignature.calculation_digest(body),
request.headers,
)
is_verify = HttpSignature.verify_signature(
2024-02-28 07:39:24 +01:00
signture_string, parsec_signature["signature"], pubkey
)
logger.info(signture_string)
logger.info(f"verify? {is_verify}")
return is_verify