COSCUP-ap-demo/demo/activitypub.py

100 lines
2.6 KiB
Python

#!/usr/bin/env python3
"""ActivityPub settings, fetch & post"""
import httpx
from demo import config
from demo.utils.key import get_pubkey_as_pem
from demo.httpsig import auth
from loguru import logger
AS_CTX = "https://www.w3.org/ns/activitystreams"
AS_PUBLIC = "https://www.w3.org/ns/activitystreams#Public"
AS_EXTENDED_CTX = [
"https://www.w3.org/ns/activitystreams",
"https://w3id.org/security/v1",
{
# AS ext
"Hashtag": "as:Hashtag",
"sensitive": "as:sensitive",
"manuallyApprovesFollowers": "as:manuallyApprovesFollowers",
"alsoKnownAs": {"@id": "as:alsoKnownAs", "@type": "@id"},
"movedTo": {"@id": "as:movedTo", "@type": "@id"},
# toot
"toot": "http://joinmastodon.org/ns#",
"featured": {"@id": "toot:featured", "@type": "@id"},
"Emoji": "toot:Emoji",
"blurhash": "toot:blurhash",
"votersCount": "toot:votersCount",
},
]
ME = {
"@context": AS_EXTENDED_CTX,
"type": "Person",
"id": config.ID,
"following": config.BASE_URL + "/following",
"followers": config.BASE_URL + "/followers",
"featured": config.BASE_URL + "/featured",
"inbox": config.BASE_URL + "/inbox",
"outbox": config.BASE_URL + "/outbox",
"preferredUsername": config.USERNAME,
"name": config.NICKNAME,
"summary": config.ACTOR_SUMMARY,
"endpoints": {
"sharedInbox": config.BASE_URL + "/inbox",
},
"url": config.ID + "/", # the path is important for Mastodon compat
"manuallyApprovesFollowers": False,
"attachment": [],
"icon": {
"mediaType": "image/png",
"type": "Image",
"url": config.AVATAR_URL,
},
"publicKey": {
"id": f"{config.ID}#main-key",
"owner": config.ID,
"publicKeyPem": get_pubkey_as_pem(config.KEY_PATH),
},
"tag": [] # TODO tag support
}
def fetch(
url: str,
) -> dict:
"""Fetch url and return json"""
logger.debug(f"fetch {url}")
with httpx.Client() as client:
resp = client.get(
url,
headers={
"User-Agent": config.USER_AGENT,
"Accept": config.AP_CONTENT_TYPE,
}
)
return resp.json()
def post(
url: str,
payload : dict,
) -> httpx.Response :
logger.debug(f"send post, {payload=}")
with httpx.Client() as client:
resp = client.post(
url,
headers={
"User-Agent": config.USER_AGENT,
"Content-Type": config.AP_CONTENT_TYPE,
},
json=payload,
auth=auth,
)
resp.raise_for_status()
return resp