[lint] ldsig.py
This commit is contained in:
parent
c43061aaf4
commit
76b4046ed4
1 changed files with 14 additions and 10 deletions
24
app/ldsig.py
24
app/ldsig.py
|
@ -1,7 +1,7 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
"""Ld+json signature."""
|
||||||
import base64
|
import base64
|
||||||
import hashlib
|
import hashlib
|
||||||
import typing
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
import pyld # type: ignore
|
import pyld # type: ignore
|
||||||
|
@ -17,10 +17,12 @@ from app.actor import get_public_key
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
requests_loader = pyld.documentloader.requests.requests_document_loader()
|
requests_loader = pyld.documentloader.requests.requests_document_loader() # type: ignore
|
||||||
|
|
||||||
|
def _loader(url, options):
|
||||||
|
if options is None:
|
||||||
|
options = {}
|
||||||
|
|
||||||
def _loader(url, options={}):
|
|
||||||
# See https://github.com/digitalbazaar/pyld/issues/133
|
# See https://github.com/digitalbazaar/pyld/issues/133
|
||||||
options["headers"]["Accept"] = "application/ld+json"
|
options["headers"]["Accept"] = "application/ld+json"
|
||||||
|
|
||||||
|
@ -44,9 +46,9 @@ def _options_hash(doc: ap.RawObject) -> str:
|
||||||
normalized = jsonld.normalize(
|
normalized = jsonld.normalize(
|
||||||
doc, {"algorithm": "URDNA2015", "format": "application/nquads"}
|
doc, {"algorithm": "URDNA2015", "format": "application/nquads"}
|
||||||
)
|
)
|
||||||
h = hashlib.new("sha256")
|
doc_hash = hashlib.new("sha256")
|
||||||
h.update(normalized.encode("utf-8"))
|
doc_hash.update(normalized.encode("utf-8")) # type: ignore
|
||||||
return h.hexdigest()
|
return doc_hash.hexdigest()
|
||||||
|
|
||||||
|
|
||||||
def _doc_hash(doc: ap.RawObject) -> str:
|
def _doc_hash(doc: ap.RawObject) -> str:
|
||||||
|
@ -56,15 +58,16 @@ def _doc_hash(doc: ap.RawObject) -> str:
|
||||||
normalized = jsonld.normalize(
|
normalized = jsonld.normalize(
|
||||||
doc, {"algorithm": "URDNA2015", "format": "application/nquads"}
|
doc, {"algorithm": "URDNA2015", "format": "application/nquads"}
|
||||||
)
|
)
|
||||||
h = hashlib.new("sha256")
|
doc_hash = hashlib.new("sha256")
|
||||||
h.update(normalized.encode("utf-8"))
|
doc_hash.update(normalized.encode("utf-8")) # type: ignore
|
||||||
return h.hexdigest()
|
return doc_hash.hexdigest()
|
||||||
|
|
||||||
|
|
||||||
async def verify_signature(
|
async def verify_signature(
|
||||||
db_session: AsyncSession,
|
db_session: AsyncSession,
|
||||||
doc: ap.RawObject,
|
doc: ap.RawObject,
|
||||||
) -> bool:
|
) -> bool:
|
||||||
|
"""Verify doc ld signature."""
|
||||||
if "signature" not in doc:
|
if "signature" not in doc:
|
||||||
logger.warning("The object does contain a signature")
|
logger.warning("The object does contain a signature")
|
||||||
return False
|
return False
|
||||||
|
@ -77,10 +80,11 @@ async def verify_signature(
|
||||||
signer = PKCS1_v1_5.new(pubkey)
|
signer = PKCS1_v1_5.new(pubkey)
|
||||||
digest = SHA256.new()
|
digest = SHA256.new()
|
||||||
digest.update(to_be_signed.encode("utf-8"))
|
digest.update(to_be_signed.encode("utf-8"))
|
||||||
return signer.verify(digest, base64.b64decode(signature)) # type: ignore
|
return signer.verify(digest, base64.b64decode(signature)) # pylint: disable=not-callable
|
||||||
|
|
||||||
|
|
||||||
def generate_signature(doc: ap.RawObject, key) -> None:
|
def generate_signature(doc: ap.RawObject, key) -> None:
|
||||||
|
"""Generate doc ld signature."""
|
||||||
options = {
|
options = {
|
||||||
"type": "RsaSignature2017",
|
"type": "RsaSignature2017",
|
||||||
"creator": doc["actor"] + "#main-key",
|
"creator": doc["actor"] + "#main-key",
|
||||||
|
|
Loading…
Reference in a new issue