#!/usr/bin/env python3 """ Verify and build HTTP signatures """ import base64 import httpx import hashlib from Crypto.Hash import SHA256 from Crypto.Signature import PKCS1_v1_5 from Crypto.PublicKey import RSA from werkzeug.datastructures import Headers from httpx import Headers as Httpx_headers from datetime import datetime from loguru import logger from dataclasses import dataclass from demo.config import ID, KEY_PATH @dataclass class SignedData: """ Signature data """ method: str path: str signed_list: list body_digest: str | None headers: Headers | Httpx_headers class HttpSignature: """ calculation and verification of HTTP signatures """ @classmethod def calculation_digest( cls, body: bytes, algorithm: str ="sha-256" ) -> str: """ Calculates the digest header value for a given HTTP body """ if "sha-256" == algorithm: body_digest = SHA256.new() body_digest.update(body) return "SHA-256=" + \ base64.b64encode(body_digest.digest() ).decode("utf-8") raise ValueError(f"No support algorithm {algorithm}") @classmethod def verify_signature( cls, signature_string: str, signature: bytes, pubkey, ) -> bool: """ Verify signatur that returns bool """ pubkey = RSA.importKey(pubkey) signer = PKCS1_v1_5.new(pubkey) digest = SHA256.new() digest.update(signature_string.encode("utf-8")) return signer.verify(digest, signature) # pylint: disable=E1102 @classmethod def parse_signature( cls, signature: str ) -> dict: """ Parse signature string in headers """ detail = {} for item in signature.split(','): name, value = item.split('=', 1) value = value.strip('"') detail[name.lower()] = value signature_details = { "headers": detail["headers"].split(), "signature": base64.b64decode(detail["signature"]), "algorithm": detail["algorithm"], "keyid": detail["keyid"], } return signature_details @classmethod def build_signature_string( cls, signed_data: SignedData, ) -> str: """ Build signature string """ signed_string = [] for signed_str in signed_data.signed_list: if signed_str == "(request-target)": signed_string.append("(request-target): " + signed_data.method.lower() + ' ' + signed_data.path) elif signed_str == "digest" and signed_data.body_digest: signed_string.append("digest: " + signed_data.body_digest) else: signed_string.append(signed_str + ": " + signed_data.headers[signed_str]) return "\n".join(signed_string) class HTTPXSigAuth(httpx.Auth): def __init__(self, key) -> None: self.key = key def auth_flow( self, request: httpx.Request ): bodydigest = None if request.content: bh = hashlib.new("sha256") bh.update(request.content) bodydigest = "SHA-256=" + base64.b64encode(bh.digest()).decode("utf-8") date = datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S GMT") request.headers["Date"] = date sigheaders = {} if bodydigest: request.headers["digest"] = bodydigest sigheaders = "(request-target) user-agent host date digest content-type" else: sigheaders = "(request-target) user-agent host date accept" logger.info(request.headers) sigdate = SignedData( method = request.method, path = request.url.path, signed_list = sigheaders.split(), body_digest = bodydigest, headers = request.headers, ) to_be_signed = HttpSignature.build_signature_string( sigdate ) logger.debug(f"{to_be_signed}") if not self.key: raise ValueError("Should never happen") signer = PKCS1_v1_5.new(self.key) digest = SHA256.new() digest.update(to_be_signed.encode("utf-8")) sig = base64.b64encode(signer.sign(digest)) key_id = f"{ID}#main-key" sig_value = f'keyId="{key_id}",algorithm="rsa-sha256",\ headers="{sigheaders}",signature="{sig.decode()}"' logger.debug(f"signed request {sig_value=}") request.headers["signature"] = sig_value yield request k = KEY_PATH.read_text() k = RSA.importKey(k) auth = HTTPXSigAuth(k)