refactor/httpsig check

This commit is contained in:
SouthFox 2023-03-17 17:59:29 +08:00
parent 25bddf408f
commit 1e5999c309
3 changed files with 93 additions and 69 deletions

29
app/boxes.py Normal file
View file

@ -0,0 +1,29 @@
#!/usr/bin/env python3
from app import models
from app.database import AsyncSession
from loguru import logger
from uuid import uuid4
async def save_incoming(
db_session: AsyncSession,
payload: dict,
) -> models.IncomingActivity | None:
ap_id: str
if "@context" not in payload:
logger.warning(f"invalid object: {payload}")
return None
if "id" in payload:
ap_id = payload["id"]
else:
ap_id = str(uuid4())
incoming_activity = models.IncomingActivity(
ap_id=ap_id,
ap_object=payload,
)
db_session.add(incoming_activity)
await db_session.commit()
await db_session.refresh(incoming_activity)
return incoming_activity

View file

@ -1,12 +1,68 @@
#!/usr/bin/env python3
import base64
import httpx
import json
import fastapi
from typing import Literal, TypedDict, cast, Any
from typing import Optional
from app.config import AP_CONTENT_TYPE, USER_AGENT
from loguru import logger
from Crypto.Hash import SHA256
from Crypto.Signature import PKCS1_v1_5
from Crypto.PublicKey import RSA
async def httpsig_checker(
request : fastapi.Request,
) -> bool :
"""
Check http signature
"""
payload = await request.json()
logger.info(f"headers={request.headers}")
logger.info(f"{payload=}")
parsec_signature = HttpSignature.parse_signature(
request.headers.get("signature")
)
actor_url = payload["actor"]
async with httpx.AsyncClient() as client:
resp = await client.get(
actor_url,
headers={
"User-Agent": USER_AGENT,
"Accept": AP_CONTENT_TYPE,
},
follow_redirects=True,
)
try:
_actor = resp.json()
pubkey = _actor["publicKey"]["publicKeyPem"]
except json.JSONDecodeError:
raise ValueError
body = await request.body()
signture_string = HttpSignature.build_signature_string(
request.method,
request.url.path,
parsec_signature["headers"],
HttpSignature.calculation_digest(body),
request.headers,
)
is_verify = HttpSignature.verify_signature(
signture_string,
parsec_signature["signature"],
pubkey)
logger.info(signture_string)
logger.info(f"verify? {is_verify}")
return is_verify
class HttpSignature:
"""
calculation and verification of HTTP signatures

View file

@ -12,22 +12,20 @@ from fastapi import Response
from fastapi.exceptions import HTTPException
from sqlalchemy.util import monkeypatch_proxied_specials
from starlette.responses import JSONResponse
from uuid import uuid4
from loguru import logger
from app import models
from app import httpsig
from app.database import get_db_session
from app.config import AP_CONTENT_TYPE, DEBUG, USER_AGENT
from app.config import DEBUG
from app.activitypub import ME
from app.config import BASE_URL
from app.config import DEBUG
from app.config import DOMAIN
from app.config import ID
from app.config import USERNAME
from app.database import AsyncSession
from app.database import get_db_session
from app.httpsig import HttpSignature
from app.boxes import save_incoming
def _check_0rtt_early_data(request: Request) -> None:
"""Disable TLS1.3 0-RTT requests for non-GET."""
@ -49,74 +47,15 @@ async def index():
async def inbox(
request: Request,
db_session: AsyncSession = Depends(get_db_session),
httpsig_checker = Depends(httpsig.httpsig_checker),
) -> Response:
logger.info(f"headers={request.headers}")
payload = await request.json()
logger.info(f"{payload=}")
parsec_signature = HttpSignature.parse_signature(
request.headers.get("signature"))
actor_url = payload["actor"]
async with httpx.AsyncClient() as client:
resp = await client.get(
actor_url,
headers={
"User-Agent": USER_AGENT,
"Accept": AP_CONTENT_TYPE,
},
follow_redirects=True,
)
try:
_actor = resp.json()
pubkey = _actor["publicKey"]["publicKeyPem"]
except json.JSONDecodeError:
raise ValueError
body = await request.body()
signture_string = HttpSignature.build_signature_string(
request.method,
request.url.path,
parsec_signature["headers"],
HttpSignature.calculation_digest(body),
request.headers,
)
is_verify = HttpSignature.verify_signature(
signture_string,
parsec_signature["signature"],
pubkey)
logger.info(signture_string)
logger.info(f"verify? {is_verify}")
await new_incoming(db_session, payload)
return Response(status_code=202)
async def new_incoming(
db_session: AsyncSession,
payload: dict,
) -> models.IncomingActivity | None:
ap_id: str
if "@context" not in payload:
logger.warning(f"invalid object: {payload}")
return None
if "id" in payload:
ap_id = payload["id"]
if httpsig_checker:
await save_incoming(db_session, payload)
return Response(status_code=202)
else:
ap_id = str(uuid4())
incoming_activity = models.IncomingActivity(
ap_id=ap_id,
ap_object=payload,
)
# db_session.add(incoming_activity)
# await db_session.commit()
# await db_session.refresh(incoming_activity)
return incoming_activity
return Response(status_code=401)
@app.get("/.well-known/webfinger")
async def wellknown_webfinger(resource: str) -> JSONResponse: