#!/usr/bin/env python3 import httpx from app import config from app.key import get_pubkey_as_pem from app.httpsig import auth from loguru import logger RawObject = dict AS_CTX = "https://www.w3.org/ns/activitystreams" AS_PUBLIC = "https://www.w3.org/ns/activitystreams#Public" ACTOR_TYPES = ["Application", "Group", "Organization", "Person", "Service"] AS_EXTENDED_CTX = [ "https://www.w3.org/ns/activitystreams", "https://w3id.org/security/v1", { # AS ext "Hashtag": "as:Hashtag", "sensitive": "as:sensitive", "manuallyApprovesFollowers": "as:manuallyApprovesFollowers", "alsoKnownAs": {"@id": "as:alsoKnownAs", "@type": "@id"}, "movedTo": {"@id": "as:movedTo", "@type": "@id"}, # toot "toot": "http://joinmastodon.org/ns#", "featured": {"@id": "toot:featured", "@type": "@id"}, "Emoji": "toot:Emoji", "blurhash": "toot:blurhash", "votersCount": "toot:votersCount", # schema "schema": "http://schema.org#", "PropertyValue": "schema:PropertyValue", "value": "schema:value", # ostatus "ostatus": "http://ostatus.org#", "conversation": "ostatus:conversation", }, ] _LOCAL_ACTOR_SUMMARY = config.CONFIG.summary ME = { "@context": AS_EXTENDED_CTX, "type": "Person", "id": config.ID, "following": config.BASE_URL + "/following", "followers": config.BASE_URL + "/followers", "featured": config.BASE_URL + "/featured", "inbox": config.BASE_URL + "/inbox", "outbox": config.BASE_URL + "/outbox", "preferredUsername": config.USERNAME, "name": config.CONFIG.name, "summary": _LOCAL_ACTOR_SUMMARY, "endpoints": { # For compat with servers expecting a sharedInbox... "sharedInbox": config.BASE_URL + "/inbox", }, "url": config.ID + "/", # XXX: the path is important for Mastodon compat "manuallyApprovesFollowers": config.CONFIG.manually_approves_followers, "attachment": [], # TODO "icon": { "mediaType": "image/png", # TODO "type": "Image", "url": config.CONFIG.icon_url, }, "publicKey": { "id": f"{config.ID}#main-key", "owner": config.ID, "publicKeyPem": get_pubkey_as_pem(config.KEY_PATH), }, "tag": [] # TODO } async def post( url: str, payload : dict, ) -> httpx.Response : logger.info(f"send post, {payload=}") async with httpx.AsyncClient() as client: resp = await client.post( url, headers={ "User-Agent": config.USER_AGENT, "Content-Type": config.AP_CONTENT_TYPE, }, json=payload, auth=auth, ) return resp