foxhole/app/httpsig.py

128 lines
3.9 KiB
Python
Raw Normal View History

2023-03-16 09:28:35 +01:00
#!/usr/bin/env python3
import base64
2023-03-17 10:59:29 +01:00
import httpx
2023-03-17 18:07:07 +01:00
import hashlib
2023-03-17 10:59:29 +01:00
from datetime import datetime
from app.config import KEY_PATH, ID
2023-03-17 10:59:29 +01:00
from loguru import logger
2023-03-16 09:28:35 +01:00
from Crypto.Hash import SHA256
from Crypto.Signature import PKCS1_v1_5
2023-03-17 09:49:09 +01:00
from Crypto.PublicKey import RSA
2023-03-16 09:28:35 +01:00
2023-03-17 10:59:29 +01:00
2023-03-16 09:28:35 +01:00
class HttpSignature:
"""
calculation and verification of HTTP signatures
"""
@classmethod
2023-03-17 09:49:09 +01:00
def calculation_digest(
cls,
body : bytes,
algorithm : str ="sha-256"
)-> str :
2023-03-16 09:28:35 +01:00
"""
Calculates the digest header value for a given HTTP body
"""
if "sha-256" == algorithm:
h = SHA256.new()
h.update(body)
return "SHA-256=" + \
base64.b64encode(h.digest()).decode("utf-8")
else:
2023-03-17 09:49:09 +01:00
raise ValueError(f"No support algorithm {algorithm}")
@classmethod
def verify_signature(
cls,
signature_string : str,
signature : bytes,
pubkey,
) -> bool :
pubkey = RSA.importKey(pubkey)
signer = PKCS1_v1_5.new(pubkey)
digest = SHA256.new()
digest.update(signature_string.encode("utf-8"))
return signer.verify(digest, signature)
@classmethod
def parse_signature(cls, signature):
_detail = {}
for item in signature.split(","):
name, value = item.split("=", 1)
value = value.strip('"')
_detail[name.lower()] = value
signature_details = {
"headers": _detail["headers"].split(),
"signature": base64.b64decode(_detail["signature"]),
"algorithm": _detail["algorithm"],
"keyid": _detail["keyid"],
}
return signature_details
@classmethod
def build_signature_string(
cls,
method : str,
path : str,
2023-03-17 18:07:07 +01:00
signed_headers : list,
body_digest : str | None,
2023-03-17 09:49:09 +01:00
headers,
) -> str :
signed_string = []
for signed_header in signed_headers:
if signed_header == "(request-target)":
signed_string.append("(request-target): " + method.lower() + " " + path)
elif signed_header == "digest" and body_digest:
signed_string.append("digest: " + body_digest)
else:
signed_string.append(signed_header + ": " + headers[signed_header])
return "\n".join(signed_string)
2023-03-17 18:07:07 +01:00
class HTTPXSigAuth(httpx.Auth):
def __init__(self, key) -> None:
self.key = key
def auth_flow(
self, r: httpx.Request
):
bodydigest = None
if r.content:
bh = hashlib.new("sha256")
bh.update(r.content)
bodydigest = "SHA-256=" + base64.b64encode(bh.digest()).decode("utf-8")
2023-03-28 19:51:15 +02:00
date = datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S GMT")
2023-03-17 18:07:07 +01:00
r.headers["Date"] = date
sigheaders = {}
if bodydigest:
r.headers["digest"] = bodydigest
sigheaders = "(request-target) user-agent host date digest content-type"
else:
sigheaders = "(request-target) user-agent host date accept"
logger.warning(r.headers)
to_be_signed = HttpSignature.build_signature_string(
r.method, r.url.path, sigheaders.split(), bodydigest, r.headers
)
if not self.key:
raise ValueError("Should never happen")
signer = PKCS1_v1_5.new(self.key)
digest = SHA256.new()
digest.update(to_be_signed.encode("utf-8"))
sig = base64.b64encode(signer.sign(digest)).decode()
key_id = f"{ID}#main-key"
sig_value = f'keyId="{key_id}",algorithm="rsa-sha256",headers="{sigheaders}",signature="{sig}"' # noqa: E501
logger.debug(f"signed request {sig_value=}")
r.headers["signature"] = sig_value
yield r
k = KEY_PATH.read_text()
k = RSA.importKey(k)
auth = HTTPXSigAuth(k)