From 1389ba47fbd8a1c085c59029fe9a096d9a41c6ea Mon Sep 17 00:00:00 2001 From: SouthFox Date: Sat, 18 Mar 2023 01:07:07 +0800 Subject: [PATCH] feat/build http signature --- app/httpsig.py | 53 +++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 50 insertions(+), 3 deletions(-) diff --git a/app/httpsig.py b/app/httpsig.py index 6eb070d..d240e6f 100644 --- a/app/httpsig.py +++ b/app/httpsig.py @@ -1,11 +1,14 @@ #!/usr/bin/env python3 import base64 +from urllib.request import HTTPSHandler import httpx import json import fastapi +import hashlib +from datetime import datetime from typing import Literal, TypedDict, cast, Any from typing import Optional -from app.config import AP_CONTENT_TYPE, USER_AGENT +from app.config import AP_CONTENT_TYPE, USER_AGENT, KEY_PATH, ID from loguru import logger @@ -118,8 +121,8 @@ class HttpSignature: cls, method : str, path : str, - signed_headers : dict, - body_digest : str, + signed_headers : list, + body_digest : str | None, headers, ) -> str : signed_string = [] @@ -131,3 +134,47 @@ class HttpSignature: else: signed_string.append(signed_header + ": " + headers[signed_header]) return "\n".join(signed_string) + + +class HTTPXSigAuth(httpx.Auth): + def __init__(self, key) -> None: + self.key = key + + def auth_flow( + self, r: httpx.Request + ): + bodydigest = None + if r.content: + bh = hashlib.new("sha256") + bh.update(r.content) + bodydigest = "SHA-256=" + base64.b64encode(bh.digest()).decode("utf-8") + + date = datetime.utcnow().strftime("%Y %m %d %H:%M:%S GMT") + r.headers["Date"] = date + sigheaders = {} + if bodydigest: + r.headers["digest"] = bodydigest + sigheaders = "(request-target) user-agent host date digest content-type" + else: + sigheaders = "(request-target) user-agent host date accept" + + logger.warning(r.headers) + to_be_signed = HttpSignature.build_signature_string( + r.method, r.url.path, sigheaders.split(), bodydigest, r.headers + ) + if not self.key: + raise ValueError("Should never happen") + signer = PKCS1_v1_5.new(self.key) + digest = SHA256.new() + digest.update(to_be_signed.encode("utf-8")) + sig = base64.b64encode(signer.sign(digest)).decode() + + key_id = f"{ID}#main-key" + sig_value = f'keyId="{key_id}",algorithm="rsa-sha256",headers="{sigheaders}",signature="{sig}"' # noqa: E501 + logger.debug(f"signed request {sig_value=}") + r.headers["signature"] = sig_value + yield r + +k = KEY_PATH.read_text() +k = RSA.importKey(k) +auth = HTTPXSigAuth(k)