#!/usr/bin/env python3 import base64 import httpx import hashlib from datetime import datetime from app.config import KEY_PATH, ID from loguru import logger from Crypto.Hash import SHA256 from Crypto.Signature import PKCS1_v1_5 from Crypto.PublicKey import RSA class HttpSignature: """ calculation and verification of HTTP signatures """ @classmethod def calculation_digest( cls, body : bytes, algorithm : str ="sha-256" )-> str : """ Calculates the digest header value for a given HTTP body """ if "sha-256" == algorithm: h = SHA256.new() h.update(body) return "SHA-256=" + \ base64.b64encode(h.digest()).decode("utf-8") else: raise ValueError(f"No support algorithm {algorithm}") @classmethod def verify_signature( cls, signature_string : str, signature : bytes, pubkey, ) -> bool : pubkey = RSA.importKey(pubkey) signer = PKCS1_v1_5.new(pubkey) digest = SHA256.new() digest.update(signature_string.encode("utf-8")) return signer.verify(digest, signature) @classmethod def parse_signature(cls, signature): _detail = {} for item in signature.split(","): name, value = item.split("=", 1) value = value.strip('"') _detail[name.lower()] = value signature_details = { "headers": _detail["headers"].split(), "signature": base64.b64decode(_detail["signature"]), "algorithm": _detail["algorithm"], "keyid": _detail["keyid"], } return signature_details @classmethod def build_signature_string( cls, method : str, path : str, signed_headers : list, body_digest : str | None, headers, ) -> str : signed_string = [] for signed_header in signed_headers: if signed_header == "(request-target)": signed_string.append("(request-target): " + method.lower() + " " + path) elif signed_header == "digest" and body_digest: signed_string.append("digest: " + body_digest) else: signed_string.append(signed_header + ": " + headers[signed_header]) return "\n".join(signed_string) class HTTPXSigAuth(httpx.Auth): def __init__(self, key) -> None: self.key = key def auth_flow( self, r: httpx.Request ): bodydigest = None if r.content: bh = hashlib.new("sha256") bh.update(r.content) bodydigest = "SHA-256=" + base64.b64encode(bh.digest()).decode("utf-8") date = datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S GMT") r.headers["Date"] = date sigheaders = {} if bodydigest: r.headers["digest"] = bodydigest sigheaders = "(request-target) user-agent host date digest content-type" else: sigheaders = "(request-target) user-agent host date accept" logger.warning(r.headers) to_be_signed = HttpSignature.build_signature_string( r.method, r.url.path, sigheaders.split(), bodydigest, r.headers ) if not self.key: raise ValueError("Should never happen") signer = PKCS1_v1_5.new(self.key) digest = SHA256.new() digest.update(to_be_signed.encode("utf-8")) sig = base64.b64encode(signer.sign(digest)).decode() key_id = f"{ID}#main-key" sig_value = f'keyId="{key_id}",algorithm="rsa-sha256",headers="{sigheaders}",signature="{sig}"' # noqa: E501 logger.debug(f"signed request {sig_value=}") r.headers["signature"] = sig_value yield r k = KEY_PATH.read_text() k = RSA.importKey(k) auth = HTTPXSigAuth(k)