2023-03-16 09:28:35 +01:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
import base64
|
2023-03-17 10:59:29 +01:00
|
|
|
import httpx
|
|
|
|
import json
|
|
|
|
import fastapi
|
2023-03-16 09:28:35 +01:00
|
|
|
from typing import Literal, TypedDict, cast, Any
|
2023-03-17 09:49:09 +01:00
|
|
|
from typing import Optional
|
2023-03-17 10:59:29 +01:00
|
|
|
from app.config import AP_CONTENT_TYPE, USER_AGENT
|
|
|
|
|
|
|
|
from loguru import logger
|
2023-03-16 09:28:35 +01:00
|
|
|
|
|
|
|
from Crypto.Hash import SHA256
|
|
|
|
from Crypto.Signature import PKCS1_v1_5
|
2023-03-17 09:49:09 +01:00
|
|
|
from Crypto.PublicKey import RSA
|
2023-03-16 09:28:35 +01:00
|
|
|
|
2023-03-17 10:59:29 +01:00
|
|
|
async def httpsig_checker(
|
|
|
|
request : fastapi.Request,
|
|
|
|
) -> bool :
|
|
|
|
"""
|
|
|
|
Check http signature
|
|
|
|
"""
|
|
|
|
payload = await request.json()
|
|
|
|
logger.info(f"headers={request.headers}")
|
|
|
|
logger.info(f"{payload=}")
|
|
|
|
|
|
|
|
parsec_signature = HttpSignature.parse_signature(
|
|
|
|
request.headers.get("signature")
|
|
|
|
)
|
|
|
|
|
|
|
|
actor_url = payload["actor"]
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
|
|
resp = await client.get(
|
|
|
|
actor_url,
|
|
|
|
headers={
|
|
|
|
"User-Agent": USER_AGENT,
|
|
|
|
"Accept": AP_CONTENT_TYPE,
|
|
|
|
},
|
|
|
|
follow_redirects=True,
|
|
|
|
)
|
|
|
|
|
|
|
|
try:
|
|
|
|
_actor = resp.json()
|
|
|
|
pubkey = _actor["publicKey"]["publicKeyPem"]
|
|
|
|
except json.JSONDecodeError:
|
|
|
|
raise ValueError
|
|
|
|
|
|
|
|
body = await request.body()
|
|
|
|
signture_string = HttpSignature.build_signature_string(
|
|
|
|
request.method,
|
|
|
|
request.url.path,
|
|
|
|
parsec_signature["headers"],
|
|
|
|
HttpSignature.calculation_digest(body),
|
|
|
|
request.headers,
|
|
|
|
)
|
|
|
|
is_verify = HttpSignature.verify_signature(
|
|
|
|
signture_string,
|
|
|
|
parsec_signature["signature"],
|
|
|
|
pubkey)
|
|
|
|
|
|
|
|
logger.info(signture_string)
|
|
|
|
logger.info(f"verify? {is_verify}")
|
|
|
|
|
|
|
|
return is_verify
|
|
|
|
|
|
|
|
|
2023-03-16 09:28:35 +01:00
|
|
|
class HttpSignature:
|
|
|
|
"""
|
|
|
|
calculation and verification of HTTP signatures
|
|
|
|
"""
|
|
|
|
|
|
|
|
@classmethod
|
2023-03-17 09:49:09 +01:00
|
|
|
def calculation_digest(
|
|
|
|
cls,
|
|
|
|
body : bytes,
|
|
|
|
algorithm : str ="sha-256"
|
|
|
|
)-> str :
|
2023-03-16 09:28:35 +01:00
|
|
|
"""
|
|
|
|
Calculates the digest header value for a given HTTP body
|
|
|
|
"""
|
|
|
|
if "sha-256" == algorithm:
|
|
|
|
h = SHA256.new()
|
|
|
|
h.update(body)
|
|
|
|
return "SHA-256=" + \
|
|
|
|
base64.b64encode(h.digest()).decode("utf-8")
|
|
|
|
else:
|
2023-03-17 09:49:09 +01:00
|
|
|
raise ValueError(f"No support algorithm {algorithm}")
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def verify_signature(
|
|
|
|
cls,
|
|
|
|
signature_string : str,
|
|
|
|
signature : bytes,
|
|
|
|
pubkey,
|
|
|
|
) -> bool :
|
|
|
|
pubkey = RSA.importKey(pubkey)
|
|
|
|
signer = PKCS1_v1_5.new(pubkey)
|
|
|
|
digest = SHA256.new()
|
|
|
|
digest.update(signature_string.encode("utf-8"))
|
|
|
|
return signer.verify(digest, signature)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def parse_signature(cls, signature):
|
|
|
|
_detail = {}
|
|
|
|
for item in signature.split(","):
|
|
|
|
name, value = item.split("=", 1)
|
|
|
|
value = value.strip('"')
|
|
|
|
_detail[name.lower()] = value
|
|
|
|
signature_details = {
|
|
|
|
"headers": _detail["headers"].split(),
|
|
|
|
"signature": base64.b64decode(_detail["signature"]),
|
|
|
|
"algorithm": _detail["algorithm"],
|
|
|
|
"keyid": _detail["keyid"],
|
|
|
|
}
|
|
|
|
return signature_details
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def build_signature_string(
|
|
|
|
cls,
|
|
|
|
method : str,
|
|
|
|
path : str,
|
|
|
|
signed_headers : dict,
|
|
|
|
body_digest : str,
|
|
|
|
headers,
|
|
|
|
) -> str :
|
|
|
|
signed_string = []
|
|
|
|
for signed_header in signed_headers:
|
|
|
|
if signed_header == "(request-target)":
|
|
|
|
signed_string.append("(request-target): " + method.lower() + " " + path)
|
|
|
|
elif signed_header == "digest" and body_digest:
|
|
|
|
signed_string.append("digest: " + body_digest)
|
|
|
|
else:
|
|
|
|
signed_string.append(signed_header + ": " + headers[signed_header])
|
|
|
|
return "\n".join(signed_string)
|