From 6b27db30b254f0cd36bef5d7c1513fd8166e29f8 Mon Sep 17 00:00:00 2001 From: SouthFox Date: Thu, 27 Jul 2023 15:16:34 +0800 Subject: [PATCH] [feat] add post sign flow --- demo/httpsig.py | 61 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/demo/httpsig.py b/demo/httpsig.py index 1d0fa29..98c0647 100644 --- a/demo/httpsig.py +++ b/demo/httpsig.py @@ -3,12 +3,19 @@ Verify and build HTTP signatures """ import base64 +import httpx +import hashlib + +from datetime import datetime +from app import logger +from demo.config import ID from dataclasses import dataclass from Crypto.Hash import SHA256 from Crypto.Signature import PKCS1_v1_5 from Crypto.PublicKey import RSA from werkzeug.datastructures import Headers +from httpx import Headers as Httpx_headers @dataclass @@ -20,7 +27,7 @@ class SignedData: path: str signed_list: list body_digest: str | None - headers: Headers + headers: Headers | Httpx_headers class HttpSignature: @@ -103,3 +110,55 @@ class HttpSignature: + signed_data.headers[signed_str]) return "\n".join(signed_string) + + +class HTTPXSigAuth(httpx.Auth): + def __init__(self, key) -> None: + self.key = key + + def auth_flow( + self, r: httpx.Request + ): + bodydigest = None + if r.content: + bh = hashlib.new("sha256") + bh.update(r.content) + bodydigest = "SHA-256=" + base64.b64encode(bh.digest()).decode("utf-8") + + date = datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S GMT") + r.headers["Date"] = date + sigheaders = {} + if bodydigest: + r.headers["digest"] = bodydigest + sigheaders = "(request-target) user-agent host date digest content-type" + else: + sigheaders = "(request-target) user-agent host date accept" + + logger.warning(r.headers) + + sigdate = SignedData( + method = r.method, + path = r.url.path, + signed_list = sigheaders.split(), + body_digest = bodydigest, + headers = r.headers, + ) + + to_be_signed = HttpSignature.build_signature_string( + sigdate + ) + + if not self.key: + raise ValueError("Should never happen") + signer = PKCS1_v1_5.new(self.key) + digest = SHA256.new() + digest.update(to_be_signed.encode("utf-8")) + sig = base64.b64encode(signer.sign(digest)) + logger.debug(f"{sig=}") + + + key_id = f"{ID}#main-key" + sig_value = f'keyId="{key_id}",algorithm="rsa-sha256",headers="{sigheaders}",signature="{sig.decode()}"' # noqa: E501 + logger.debug(f"signed request {sig_value=}") + r.headers["signature"] = sig_value + yield r