90 lines
2.5 KiB
Python
90 lines
2.5 KiB
Python
#!/usr/bin/env python3
|
|
import fastapi
|
|
import json
|
|
|
|
from loguru import logger
|
|
from app.httpsig import HttpSignature
|
|
from app import ldsig
|
|
|
|
from app.database import AsyncSession
|
|
from app.database import get_db_session
|
|
from app.actor import fetch_actor
|
|
from sqlalchemy import select
|
|
|
|
|
|
async def inbox_prechecker(
|
|
request: fastapi.Request,
|
|
db_session: AsyncSession = fastapi.Depends(get_db_session)
|
|
) -> bool:
|
|
"""
|
|
Check http request
|
|
"""
|
|
payload = await request.json()
|
|
|
|
try:
|
|
logger.info(f"headers={request.headers}")
|
|
logger.info(f"{payload=}")
|
|
|
|
parsec_signature = HttpSignature.parse_signature(
|
|
request.headers.get("signature")
|
|
)
|
|
except KeyError:
|
|
logger.warning("Not signature headers")
|
|
raise KeyError
|
|
|
|
try:
|
|
if request.method == "POST" and request.url.path.endswith("/inbox"):
|
|
from app import models
|
|
|
|
if (
|
|
payload["type"] == "Delete"
|
|
and payload["actor"] == payload["object"]
|
|
and not (
|
|
await db_session.scalars(
|
|
select(models.Actor).where(
|
|
models.Actor.ap_id == payload["actor"],
|
|
)
|
|
)
|
|
).one_or_none()
|
|
):
|
|
logger.info(f"Dropping unnecessary delete activity " +
|
|
payload["actor"])
|
|
raise fastapi.HTTPException(status_code=202)
|
|
except fastapi.HTTPException as http_exc:
|
|
raise http_exc
|
|
except Exception:
|
|
logger.exception("Failed to precheck delete activity")
|
|
|
|
actor_id = payload["actor"]
|
|
send_actor_id = parsec_signature["keyid"].split('#')[0]
|
|
|
|
if actor_id != send_actor_id:
|
|
return await ldsig.verify_signature(db_session, payload)
|
|
|
|
_actor = await fetch_actor(db_session, actor_id)
|
|
|
|
try:
|
|
pubkey = _actor.ap_actor["publicKey"]["publicKeyPem"]
|
|
except json.JSONDecodeError:
|
|
raise ValueError
|
|
except KeyError:
|
|
logger.warning("actor gone? ")
|
|
raise KeyError
|
|
|
|
body = await request.body()
|
|
signture_string = HttpSignature.build_signature_string(
|
|
request.method,
|
|
request.url.path,
|
|
parsec_signature["headers"],
|
|
HttpSignature.calculation_digest(body),
|
|
request.headers,
|
|
)
|
|
is_verify = HttpSignature.verify_signature(
|
|
signture_string,
|
|
parsec_signature["signature"],
|
|
pubkey)
|
|
|
|
logger.info(signture_string)
|
|
logger.info(f"verify? {is_verify}")
|
|
|
|
return is_verify
|