foxhole/app/ldsig.py

101 lines
3 KiB
Python
Raw Permalink Normal View History

2023-04-01 18:12:36 +02:00
#!/usr/bin/env python3
2023-07-29 11:01:03 +02:00
"""Ld+json signature."""
2023-04-01 18:12:36 +02:00
import base64
import hashlib
from datetime import datetime
import pyld # type: ignore
from Crypto.Hash import SHA256
from Crypto.Signature import PKCS1_v1_5
from Crypto.PublicKey import RSA
from loguru import logger
from pyld import jsonld # type: ignore
from app import activitypub as ap
from app.database import AsyncSession
from app.actor import get_public_key
2023-07-29 11:01:03 +02:00
requests_loader = pyld.documentloader.requests.requests_document_loader() # type: ignore
2023-04-01 18:12:36 +02:00
2023-07-29 11:01:03 +02:00
def _loader(url, options):
if options is None:
options = {}
2023-04-01 18:12:36 +02:00
# See https://github.com/digitalbazaar/pyld/issues/133
options["headers"]["Accept"] = "application/ld+json"
if url == "https://w3id.org/identity/v1":
url = (
"https://raw.githubusercontent.com/web-payments/web-payments.org"
"/master/contexts/identity-v1.jsonld"
)
return requests_loader(url, options)
pyld.jsonld.set_document_loader(_loader)
def _options_hash(doc: ap.RawObject) -> str:
doc = dict(doc["signature"])
for k in ["type", "id", "signatureValue"]:
if k in doc:
del doc[k]
doc["@context"] = "https://w3id.org/security/v1"
normalized = jsonld.normalize(
doc, {"algorithm": "URDNA2015", "format": "application/nquads"}
)
2023-07-29 11:01:03 +02:00
doc_hash = hashlib.new("sha256")
doc_hash.update(normalized.encode("utf-8")) # type: ignore
return doc_hash.hexdigest()
2023-04-01 18:12:36 +02:00
def _doc_hash(doc: ap.RawObject) -> str:
doc = dict(doc)
if "signature" in doc:
del doc["signature"]
normalized = jsonld.normalize(
doc, {"algorithm": "URDNA2015", "format": "application/nquads"}
)
2023-07-29 11:01:03 +02:00
doc_hash = hashlib.new("sha256")
doc_hash.update(normalized.encode("utf-8")) # type: ignore
return doc_hash.hexdigest()
2023-04-01 18:12:36 +02:00
async def verify_signature(
db_session: AsyncSession,
doc: ap.RawObject,
) -> bool:
2023-07-29 11:01:03 +02:00
"""Verify doc ld signature."""
2023-04-01 18:12:36 +02:00
if "signature" not in doc:
logger.warning("The object does contain a signature")
return False
key_id = doc["signature"]["creator"]
key = await get_public_key(db_session, key_id)
to_be_signed = _options_hash(doc) + _doc_hash(doc)
signature = doc["signature"]["signatureValue"]
pubkey = RSA.importKey(key)
signer = PKCS1_v1_5.new(pubkey)
digest = SHA256.new()
digest.update(to_be_signed.encode("utf-8"))
2023-07-29 11:01:03 +02:00
return signer.verify(digest, base64.b64decode(signature)) # pylint: disable=not-callable
2023-04-01 18:12:36 +02:00
def generate_signature(doc: ap.RawObject, key) -> None:
2023-07-29 11:01:03 +02:00
"""Generate doc ld signature."""
2023-04-01 18:12:36 +02:00
options = {
"type": "RsaSignature2017",
"creator": doc["actor"] + "#main-key",
"created": datetime.utcnow().replace(microsecond=0).isoformat() + "Z",
}
doc["signature"] = options
to_be_signed = _options_hash(doc) + _doc_hash(doc)
signer = PKCS1_v1_5.new(key)
digest = SHA256.new()
digest.update(to_be_signed.encode("utf-8"))
sig = base64.b64encode(signer.sign(digest)) # type: ignore
options["signatureValue"] = sig.decode("utf-8")