COSCUP-ap-demo/demo/httpsig.py
SouthFox 6b27db30b2
Some checks failed
ci/woodpecker/push/lint Pipeline failed
ci/woodpecker/push/test Pipeline was successful
[feat] add post sign flow
2023-07-27 15:16:51 +08:00

164 lines
4.6 KiB
Python

#!/usr/bin/env python3
"""
Verify and build HTTP signatures
"""
import base64
import httpx
import hashlib
from datetime import datetime
from app import logger
from demo.config import ID
from dataclasses import dataclass
from Crypto.Hash import SHA256
from Crypto.Signature import PKCS1_v1_5
from Crypto.PublicKey import RSA
from werkzeug.datastructures import Headers
from httpx import Headers as Httpx_headers
@dataclass
class SignedData:
"""
Signature data
"""
method: str
path: str
signed_list: list
body_digest: str | None
headers: Headers | Httpx_headers
class HttpSignature:
"""
calculation and verification of HTTP signatures
"""
@classmethod
def calculation_digest(
cls,
body: bytes,
algorithm: str ="sha-256"
) -> str:
"""
Calculates the digest header value for a given HTTP body
"""
if "sha-256" == algorithm:
body_digest = SHA256.new()
body_digest.update(body)
return "SHA-256=" + \
base64.b64encode(body_digest.digest()
).decode("utf-8")
raise ValueError(f"No support algorithm {algorithm}")
@classmethod
def verify_signature(
cls,
signature_string: str,
signature: bytes,
pubkey,
) -> bool:
"""
Verify signatur that returns bool
"""
pubkey = RSA.importKey(pubkey)
signer = PKCS1_v1_5.new(pubkey)
digest = SHA256.new()
digest.update(signature_string.encode("utf-8"))
return signer.verify(digest, signature) # pylint: disable=E1102
@classmethod
def parse_signature(
cls,
signature: str
) -> dict:
"""
Parse signature string in headers
"""
detail = {}
for item in signature.split(','):
name, value = item.split('=', 1)
value = value.strip('"')
detail[name.lower()] = value
signature_details = {
"headers": detail["headers"].split(),
"signature": base64.b64decode(detail["signature"]),
"algorithm": detail["algorithm"],
"keyid": detail["keyid"],
}
return signature_details
@classmethod
def build_signature_string(
cls,
signed_data: SignedData,
) -> str:
"""
Build signature string
"""
signed_string = []
for signed_str in signed_data.signed_list:
if signed_str == "(request-target)":
signed_string.append("(request-target): "
+ signed_data.method.lower() + ' ' + signed_data.path)
elif signed_str == "digest" and signed_data.body_digest:
signed_string.append("digest: " + signed_data.body_digest)
else:
signed_string.append(signed_str + ": "
+ signed_data.headers[signed_str])
return "\n".join(signed_string)
class HTTPXSigAuth(httpx.Auth):
def __init__(self, key) -> None:
self.key = key
def auth_flow(
self, r: httpx.Request
):
bodydigest = None
if r.content:
bh = hashlib.new("sha256")
bh.update(r.content)
bodydigest = "SHA-256=" + base64.b64encode(bh.digest()).decode("utf-8")
date = datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S GMT")
r.headers["Date"] = date
sigheaders = {}
if bodydigest:
r.headers["digest"] = bodydigest
sigheaders = "(request-target) user-agent host date digest content-type"
else:
sigheaders = "(request-target) user-agent host date accept"
logger.warning(r.headers)
sigdate = SignedData(
method = r.method,
path = r.url.path,
signed_list = sigheaders.split(),
body_digest = bodydigest,
headers = r.headers,
)
to_be_signed = HttpSignature.build_signature_string(
sigdate
)
if not self.key:
raise ValueError("Should never happen")
signer = PKCS1_v1_5.new(self.key)
digest = SHA256.new()
digest.update(to_be_signed.encode("utf-8"))
sig = base64.b64encode(signer.sign(digest))
logger.debug(f"{sig=}")
key_id = f"{ID}#main-key"
sig_value = f'keyId="{key_id}",algorithm="rsa-sha256",headers="{sigheaders}",signature="{sig.decode()}"' # noqa: E501
logger.debug(f"signed request {sig_value=}")
r.headers["signature"] = sig_value
yield r