#!/usr/bin/env python3 import logging import sys import fastapi import httpx import json from fastapi import FastAPI from fastapi import Depends from fastapi import Request from fastapi import Response from fastapi.exceptions import HTTPException from sqlalchemy.util import monkeypatch_proxied_specials from starlette.responses import JSONResponse from typing import Any from loguru import logger from app.utils import precheck from app.database import get_db_session from app.config import DEBUG from app.activitypub import ME from app.config import BASE_URL, POST_TOKEN from app.config import DOMAIN from app.config import ID from app.config import USERNAME from app.config import VERSION from app.database import AsyncSession from app.database import get_db_session from app.boxes import save_incoming from sqlalchemy import select from app import models def _check_0rtt_early_data(request: Request) -> None: """Disable TLS1.3 0-RTT requests for non-GET.""" if request.headers.get("Early-Data", None) == "1" and request.method != "GET": raise fastapi.HTTPException(status_code=425, detail="Too early") app = FastAPI( docs_url=None, redoc_url=None, dependencies=[Depends(_check_0rtt_early_data)] ) logger.remove() logger.add(sys.stdout, level="DEBUG" if DEBUG else "INFO") logger.add("output.log", level="DEBUG") class ActivityPubResponse(JSONResponse): media_type = "application/activity+json" def is_ap_requested(req: Request) -> bool: accept_value = req.headers.get("accept") if not accept_value: return False for i in { "application/activity+json", "application/ld+json", }: if accept_value.startswith(i): return True return False @app.get("/") async def index( request: Request ): if is_ap_requested(request): return ActivityPubResponse(ME) return ME @app.post("/inbox") async def inbox( request: Request, db_session: AsyncSession = Depends(get_db_session), httpsig_checker = Depends(precheck.inbox_prechecker), ) -> Response: payload = await request.json() if httpsig_checker: if await save_incoming(db_session, payload): return Response(status_code=202) else: return Response( status_code=406, content="invalid activitypub object" ) else: return Response( status_code=406, content="invalid http-sig" ) @app.post("/outbox") async def outbox( request: Request, db_session: AsyncSession = Depends(get_db_session), ) -> Response: payload = await request.body() content = payload.decode("utf-8") logger.info(content) post_token = request.headers.get("Authorization") if POST_TOKEN is not None \ and POST_TOKEN == post_token.split(" ")[1]: # type: ignore logger.info("True token") from app.activitypub import VisibilityEnum from app.boxes import _send_create from app.orgpython import to_html content = to_html(content).replace("\n", "") await _send_create( db_session, "Note", content, VisibilityEnum.PUBLIC ) return Response(status_code=200) else: logger.warning("Non-valid post token!") return Response(status_code=406) @app.get("/tail/{public_id}") async def outbox_activity_by_public_id( public_id: str, request: Request, db_session: AsyncSession = Depends(get_db_session), ) -> ActivityPubResponse: outbox_object = ( await db_session.execute( select(models.OutboxObject).where( models.OutboxObject.public_id == public_id, models.OutboxObject.is_deleted.is_(False), ) ) ).scalar_one_or_none() if not outbox_object: raise HTTPException(status_code=404) return ActivityPubResponse(outbox_object.ap_object) @app.get("/.well-known/webfinger") async def wellknown_webfinger(resource: str) -> JSONResponse: """Exposes/servers WebFinger data.""" if resource not in [f"acct:{USERNAME}@{DOMAIN}", ID]: logger.info(f"Got invalid req for {resource}") raise HTTPException(status_code=404) out = { "subject": f"acct:{USERNAME}@{DOMAIN}", "aliases": [ID], "links": [ { "rel": "http://webfinger.net/rel/profile-page", "type": "text/html", "href": ID + "/", }, {"rel": "self", "type": "application/activity+json", "href": ID}, { "rel": "http://ostatus.org/schema/1.0/subscribe", "template": BASE_URL + "/admin/lookup?query={uri}", }, ], } return JSONResponse( out, media_type="application/jrd+json; charset=utf-8", headers={"Access-Control-Allow-Origin": "*"}, ) @app.get("/.well-known/nodeinfo") async def well_known_nodeinfo() -> dict[str, Any]: return { "links": [ { "rel": "http://nodeinfo.diaspora.software/ns/schema/2.0", "href": f"{BASE_URL}/nodeinfo/2.0", } ] } @app.get("/nodeinfo/2.0") async def nodeinfo( db_session: AsyncSession = Depends(get_db_session), ): return JSONResponse( { "version": "2.0", "software": { "name": "foxhole", "version": VERSION, }, "protocols": ["activitypub"], "services": {"inbound": [], "outbound": []}, "usage": {"users": {"total": 1}, "localPosts": 0}, #TODO "openRegistrations": False, "metadata": {}, }, )