foxhole/app/httpsig.py
2023-03-18 11:57:20 +08:00

180 lines
5.3 KiB
Python

#!/usr/bin/env python3
import base64
from urllib.request import HTTPSHandler
import httpx
import json
import fastapi
import hashlib
from datetime import datetime
from typing import Literal, TypedDict, cast, Any
from typing import Optional
from app.config import AP_CONTENT_TYPE, USER_AGENT, KEY_PATH, ID
from loguru import logger
from Crypto.Hash import SHA256
from Crypto.Signature import PKCS1_v1_5
from Crypto.PublicKey import RSA
async def httpsig_checker(
request : fastapi.Request,
) -> bool :
"""
Check http signature
"""
payload = await request.json()
logger.info(f"headers={request.headers}")
logger.info(f"{payload=}")
parsec_signature = HttpSignature.parse_signature(
request.headers.get("signature")
)
actor_url = payload["actor"]
async with httpx.AsyncClient() as client:
resp = await client.get(
actor_url,
headers={
"User-Agent": USER_AGENT,
"Accept": AP_CONTENT_TYPE,
},
follow_redirects=True,
)
try:
_actor = resp.json()
pubkey = _actor["publicKey"]["publicKeyPem"]
except json.JSONDecodeError:
raise ValueError
body = await request.body()
signture_string = HttpSignature.build_signature_string(
request.method,
request.url.path,
parsec_signature["headers"],
HttpSignature.calculation_digest(body),
request.headers,
)
is_verify = HttpSignature.verify_signature(
signture_string,
parsec_signature["signature"],
pubkey)
logger.info(signture_string)
logger.info(f"verify? {is_verify}")
return is_verify
class HttpSignature:
"""
calculation and verification of HTTP signatures
"""
@classmethod
def calculation_digest(
cls,
body : bytes,
algorithm : str ="sha-256"
)-> str :
"""
Calculates the digest header value for a given HTTP body
"""
if "sha-256" == algorithm:
h = SHA256.new()
h.update(body)
return "SHA-256=" + \
base64.b64encode(h.digest()).decode("utf-8")
else:
raise ValueError(f"No support algorithm {algorithm}")
@classmethod
def verify_signature(
cls,
signature_string : str,
signature : bytes,
pubkey,
) -> bool :
pubkey = RSA.importKey(pubkey)
signer = PKCS1_v1_5.new(pubkey)
digest = SHA256.new()
digest.update(signature_string.encode("utf-8"))
return signer.verify(digest, signature)
@classmethod
def parse_signature(cls, signature):
_detail = {}
for item in signature.split(","):
name, value = item.split("=", 1)
value = value.strip('"')
_detail[name.lower()] = value
signature_details = {
"headers": _detail["headers"].split(),
"signature": base64.b64decode(_detail["signature"]),
"algorithm": _detail["algorithm"],
"keyid": _detail["keyid"],
}
return signature_details
@classmethod
def build_signature_string(
cls,
method : str,
path : str,
signed_headers : list,
body_digest : str | None,
headers,
) -> str :
signed_string = []
for signed_header in signed_headers:
if signed_header == "(request-target)":
signed_string.append("(request-target): " + method.lower() + " " + path)
elif signed_header == "digest" and body_digest:
signed_string.append("digest: " + body_digest)
else:
signed_string.append(signed_header + ": " + headers[signed_header])
return "\n".join(signed_string)
class HTTPXSigAuth(httpx.Auth):
def __init__(self, key) -> None:
self.key = key
def auth_flow(
self, r: httpx.Request
):
bodydigest = None
if r.content:
bh = hashlib.new("sha256")
bh.update(r.content)
bodydigest = "SHA-256=" + base64.b64encode(bh.digest()).decode("utf-8")
date = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S GMT")
r.headers["Date"] = date
sigheaders = {}
if bodydigest:
r.headers["digest"] = bodydigest
sigheaders = "(request-target) user-agent host date digest content-type"
else:
sigheaders = "(request-target) user-agent host date accept"
logger.warning(r.headers)
to_be_signed = HttpSignature.build_signature_string(
r.method, r.url.path, sigheaders.split(), bodydigest, r.headers
)
if not self.key:
raise ValueError("Should never happen")
signer = PKCS1_v1_5.new(self.key)
digest = SHA256.new()
digest.update(to_be_signed.encode("utf-8"))
sig = base64.b64encode(signer.sign(digest)).decode()
key_id = f"{ID}#main-key"
sig_value = f'keyId="{key_id}",algorithm="rsa-sha256",headers="{sigheaders}",signature="{sig}"' # noqa: E501
logger.debug(f"signed request {sig_value=}")
r.headers["signature"] = sig_value
yield r
k = KEY_PATH.read_text()
k = RSA.importKey(k)
auth = HTTPXSigAuth(k)