foxhole/app/main.py

149 lines
4.1 KiB
Python

#!/usr/bin/env python3
import logging
import sys
import fastapi
import httpx
import json
from fastapi import FastAPI
from fastapi import Depends
from fastapi import Request
from fastapi import Response
from fastapi.exceptions import HTTPException
from sqlalchemy.util import monkeypatch_proxied_specials
from starlette.responses import JSONResponse
from uuid import uuid4
from loguru import logger
from app import models
from app.database import get_db_session
from app.config import AP_CONTENT_TYPE, DEBUG, USER_AGENT
from app.activitypub import ME
from app.config import BASE_URL
from app.config import DEBUG
from app.config import DOMAIN
from app.config import ID
from app.config import USERNAME
from app.database import AsyncSession
from app.database import get_db_session
from app.httpsig import HttpSignature
def _check_0rtt_early_data(request: Request) -> None:
"""Disable TLS1.3 0-RTT requests for non-GET."""
if request.headers.get("Early-Data", None) == "1" and request.method != "GET":
raise fastapi.HTTPException(status_code=425, detail="Too early")
app = FastAPI(
docs_url=None, redoc_url=None, dependencies=[Depends(_check_0rtt_early_data)]
)
logger.remove()
logger.add(sys.stdout, level="DEBUG" if DEBUG else "INFO")
@app.get("/")
async def index():
return ME
@app.post("/inbox")
async def inbox(
request: Request,
db_session: AsyncSession = Depends(get_db_session),
) -> Response:
logger.info(f"headers={request.headers}")
payload = await request.json()
logger.info(f"{payload=}")
parsec_signature = HttpSignature.parse_signature(
request.headers.get("signature"))
actor_url = payload["actor"]
async with httpx.AsyncClient() as client:
resp = await client.get(
actor_url,
headers={
"User-Agent": USER_AGENT,
"Accept": AP_CONTENT_TYPE,
},
follow_redirects=True,
)
try:
_actor = resp.json()
pubkey = _actor["publicKey"]["publicKeyPem"]
except json.JSONDecodeError:
raise ValueError
body = await request.body()
signture_string = HttpSignature.build_signature_string(
request.method,
request.url.path,
parsec_signature["headers"],
HttpSignature.calculation_digest(body),
request.headers,
)
is_verify = HttpSignature.verify_signature(
signture_string,
parsec_signature["signature"],
pubkey)
logger.info(signture_string)
logger.info(f"verify? {is_verify}")
await new_incoming(db_session, payload)
return Response(status_code=202)
async def new_incoming(
db_session: AsyncSession,
payload: dict,
) -> models.IncomingActivity | None:
ap_id: str
if "@context" not in payload:
logger.warning(f"invalid object: {payload}")
return None
if "id" in payload:
ap_id = payload["id"]
else:
ap_id = str(uuid4())
incoming_activity = models.IncomingActivity(
ap_id=ap_id,
ap_object=payload,
)
# db_session.add(incoming_activity)
# await db_session.commit()
# await db_session.refresh(incoming_activity)
return incoming_activity
@app.get("/.well-known/webfinger")
async def wellknown_webfinger(resource: str) -> JSONResponse:
"""Exposes/servers WebFinger data."""
if resource not in [f"acct:{USERNAME}@{DOMAIN}", ID]:
logger.info(f"Got invalid req for {resource}")
raise HTTPException(status_code=404)
out = {
"subject": f"acct:{USERNAME}@{DOMAIN}",
"aliases": [ID],
"links": [
{
"rel": "http://webfinger.net/rel/profile-page",
"type": "text/html",
"href": ID + "/",
},
{"rel": "self", "type": "application/activity+json", "href": ID},
{
"rel": "http://ostatus.org/schema/1.0/subscribe",
"template": BASE_URL + "/admin/lookup?query={uri}",
},
],
}
return JSONResponse(
out,
media_type="application/jrd+json; charset=utf-8",
headers={"Access-Control-Allow-Origin": "*"},
)