COSCUP-ap-demo/demo/httpsig.py

174 lines
4.8 KiB
Python
Raw Permalink Normal View History

2023-06-09 04:52:07 +02:00
#!/usr/bin/env python3
"""
Verify and build HTTP signatures
"""
import base64
2023-07-27 09:16:34 +02:00
import httpx
import hashlib
2023-06-09 04:52:07 +02:00
from Crypto.Hash import SHA256
from Crypto.Signature import PKCS1_v1_5
from Crypto.PublicKey import RSA
2023-06-19 08:34:03 +02:00
from werkzeug.datastructures import Headers
2023-07-27 09:16:34 +02:00
from httpx import Headers as Httpx_headers
2023-07-27 09:49:14 +02:00
from datetime import datetime
from loguru import logger
from dataclasses import dataclass
2023-07-27 14:30:02 +02:00
from demo.config import ID, KEY_PATH
2023-07-27 09:49:14 +02:00
2023-06-09 05:07:02 +02:00
@dataclass
class SignedData:
2023-06-09 05:09:28 +02:00
"""
Signature data
"""
2023-06-09 05:07:02 +02:00
method: str
path: str
signed_list: list
body_digest: str | None
2023-07-27 09:16:34 +02:00
headers: Headers | Httpx_headers
2023-06-09 04:52:07 +02:00
class HttpSignature:
"""
calculation and verification of HTTP signatures
"""
@classmethod
def calculation_digest(
cls,
body: bytes,
algorithm: str ="sha-256"
2023-06-09 04:52:07 +02:00
) -> str:
"""
Calculates the digest header value for a given HTTP body
"""
if "sha-256" == algorithm:
body_digest = SHA256.new()
body_digest.update(body)
return "SHA-256=" + \
base64.b64encode(body_digest.digest()
).decode("utf-8")
raise ValueError(f"No support algorithm {algorithm}")
@classmethod
def verify_signature(
cls,
signature_string: str,
signature: bytes,
2023-06-09 04:52:07 +02:00
pubkey,
) -> bool:
"""
Verify signatur that returns bool
"""
pubkey = RSA.importKey(pubkey)
signer = PKCS1_v1_5.new(pubkey)
digest = SHA256.new()
digest.update(signature_string.encode("utf-8"))
return signer.verify(digest, signature) # pylint: disable=E1102
2023-06-09 04:52:07 +02:00
@classmethod
def parse_signature(
cls,
signature: str
2023-06-09 04:52:07 +02:00
) -> dict:
"""
Parse signature string in headers
"""
detail = {}
for item in signature.split(','):
name, value = item.split('=', 1)
value = value.strip('"')
detail[name.lower()] = value
signature_details = {
"headers": detail["headers"].split(),
"signature": base64.b64decode(detail["signature"]),
"algorithm": detail["algorithm"],
"keyid": detail["keyid"],
}
return signature_details
@classmethod
def build_signature_string(
cls,
2023-06-09 05:07:02 +02:00
signed_data: SignedData,
2023-06-09 04:52:07 +02:00
) -> str:
"""
Build signature string
"""
signed_string = []
2023-06-09 05:07:02 +02:00
for signed_str in signed_data.signed_list:
if signed_str == "(request-target)":
2023-06-09 04:52:07 +02:00
signed_string.append("(request-target): "
2023-06-09 05:07:02 +02:00
+ signed_data.method.lower() + ' ' + signed_data.path)
elif signed_str == "digest" and signed_data.body_digest:
signed_string.append("digest: " + signed_data.body_digest)
2023-06-09 04:52:07 +02:00
else:
2023-06-09 05:07:02 +02:00
signed_string.append(signed_str + ": "
+ signed_data.headers[signed_str])
2023-06-09 04:52:07 +02:00
return "\n".join(signed_string)
2023-07-27 09:16:34 +02:00
class HTTPXSigAuth(httpx.Auth):
def __init__(self, key) -> None:
self.key = key
def auth_flow(
2023-07-27 09:53:45 +02:00
self,
request: httpx.Request
2023-07-27 09:16:34 +02:00
):
bodydigest = None
2023-07-27 09:53:45 +02:00
if request.content:
2023-07-27 09:16:34 +02:00
bh = hashlib.new("sha256")
2023-07-27 09:53:45 +02:00
bh.update(request.content)
2023-07-27 09:16:34 +02:00
bodydigest = "SHA-256=" + base64.b64encode(bh.digest()).decode("utf-8")
date = datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S GMT")
2023-07-27 09:53:45 +02:00
request.headers["Date"] = date
2023-07-27 09:16:34 +02:00
sigheaders = {}
if bodydigest:
2023-07-27 09:53:45 +02:00
request.headers["digest"] = bodydigest
2023-07-27 09:16:34 +02:00
sigheaders = "(request-target) user-agent host date digest content-type"
else:
sigheaders = "(request-target) user-agent host date accept"
2023-07-27 19:21:24 +02:00
logger.info(request.headers)
2023-07-27 09:16:34 +02:00
sigdate = SignedData(
2023-07-27 09:53:45 +02:00
method = request.method,
path = request.url.path,
2023-07-27 09:16:34 +02:00
signed_list = sigheaders.split(),
body_digest = bodydigest,
2023-07-27 09:53:45 +02:00
headers = request.headers,
2023-07-27 09:16:34 +02:00
)
to_be_signed = HttpSignature.build_signature_string(
sigdate
)
2023-07-27 19:21:24 +02:00
logger.debug(f"{to_be_signed}")
2023-07-27 09:16:34 +02:00
if not self.key:
raise ValueError("Should never happen")
signer = PKCS1_v1_5.new(self.key)
digest = SHA256.new()
digest.update(to_be_signed.encode("utf-8"))
sig = base64.b64encode(signer.sign(digest))
key_id = f"{ID}#main-key"
2023-07-27 09:49:14 +02:00
sig_value = f'keyId="{key_id}",algorithm="rsa-sha256",\
headers="{sigheaders}",signature="{sig.decode()}"'
2023-07-27 09:16:34 +02:00
logger.debug(f"signed request {sig_value=}")
2023-07-27 09:53:45 +02:00
request.headers["signature"] = sig_value
yield request
2023-07-27 14:30:02 +02:00
k = KEY_PATH.read_text()
k = RSA.importKey(k)
auth = HTTPXSigAuth(k)