#!/usr/bin/env python3 import base64 from urllib.request import HTTPSHandler import httpx import json import fastapi import hashlib from datetime import datetime from typing import Literal, TypedDict, cast, Any from typing import Optional from app.config import AP_CONTENT_TYPE, USER_AGENT, KEY_PATH, ID from loguru import logger from Crypto.Hash import SHA256 from Crypto.Signature import PKCS1_v1_5 from Crypto.PublicKey import RSA async def httpsig_checker( request : fastapi.Request, ) -> bool : """ Check http signature """ payload = await request.json() logger.info(f"headers={request.headers}") logger.info(f"{payload=}") parsec_signature = HttpSignature.parse_signature( request.headers.get("signature") ) actor_url = payload["actor"] async with httpx.AsyncClient() as client: resp = await client.get( actor_url, headers={ "User-Agent": USER_AGENT, "Accept": AP_CONTENT_TYPE, }, follow_redirects=True, ) try: _actor = resp.json() pubkey = _actor["publicKey"]["publicKeyPem"] except json.JSONDecodeError: raise ValueError body = await request.body() signture_string = HttpSignature.build_signature_string( request.method, request.url.path, parsec_signature["headers"], HttpSignature.calculation_digest(body), request.headers, ) is_verify = HttpSignature.verify_signature( signture_string, parsec_signature["signature"], pubkey) logger.info(signture_string) logger.info(f"verify? {is_verify}") return is_verify class HttpSignature: """ calculation and verification of HTTP signatures """ @classmethod def calculation_digest( cls, body : bytes, algorithm : str ="sha-256" )-> str : """ Calculates the digest header value for a given HTTP body """ if "sha-256" == algorithm: h = SHA256.new() h.update(body) return "SHA-256=" + \ base64.b64encode(h.digest()).decode("utf-8") else: raise ValueError(f"No support algorithm {algorithm}") @classmethod def verify_signature( cls, signature_string : str, signature : bytes, pubkey, ) -> bool : pubkey = RSA.importKey(pubkey) signer = PKCS1_v1_5.new(pubkey) digest = SHA256.new() digest.update(signature_string.encode("utf-8")) return signer.verify(digest, signature) @classmethod def parse_signature(cls, signature): _detail = {} for item in signature.split(","): name, value = item.split("=", 1) value = value.strip('"') _detail[name.lower()] = value signature_details = { "headers": _detail["headers"].split(), "signature": base64.b64decode(_detail["signature"]), "algorithm": _detail["algorithm"], "keyid": _detail["keyid"], } return signature_details @classmethod def build_signature_string( cls, method : str, path : str, signed_headers : list, body_digest : str | None, headers, ) -> str : signed_string = [] for signed_header in signed_headers: if signed_header == "(request-target)": signed_string.append("(request-target): " + method.lower() + " " + path) elif signed_header == "digest" and body_digest: signed_string.append("digest: " + body_digest) else: signed_string.append(signed_header + ": " + headers[signed_header]) return "\n".join(signed_string) class HTTPXSigAuth(httpx.Auth): def __init__(self, key) -> None: self.key = key def auth_flow( self, r: httpx.Request ): bodydigest = None if r.content: bh = hashlib.new("sha256") bh.update(r.content) bodydigest = "SHA-256=" + base64.b64encode(bh.digest()).decode("utf-8") date = datetime.utcnow().strftime("%Y %m %d %H:%M:%S GMT") r.headers["Date"] = date sigheaders = {} if bodydigest: r.headers["digest"] = bodydigest sigheaders = "(request-target) user-agent host date digest content-type" else: sigheaders = "(request-target) user-agent host date accept" logger.warning(r.headers) to_be_signed = HttpSignature.build_signature_string( r.method, r.url.path, sigheaders.split(), bodydigest, r.headers ) if not self.key: raise ValueError("Should never happen") signer = PKCS1_v1_5.new(self.key) digest = SHA256.new() digest.update(to_be_signed.encode("utf-8")) sig = base64.b64encode(signer.sign(digest)).decode() key_id = f"{ID}#main-key" sig_value = f'keyId="{key_id}",algorithm="rsa-sha256",headers="{sigheaders}",signature="{sig}"' # noqa: E501 logger.debug(f"signed request {sig_value=}") r.headers["signature"] = sig_value yield r k = KEY_PATH.read_text() k = RSA.importKey(k) auth = HTTPXSigAuth(k)