#!/usr/bin/env python3 """Inbox or Outbox.""" from datetime import datetime as dtime from datetime import timezone from loguru import logger from demo.config import BASE_URL, ID from demo.actor import fetch_actor from demo import activitypub as ap import json import uuid import os def process_inbox( payload ) -> None: if "@context" not in payload: logger.warning(f"invalid object: {payload}") return None if "Follow" == payload["type"]: ap_actor = fetch_actor(payload["actor"]) _send_accept(payload, ap_actor["inbox"]) _save_ap_object(payload, "inbox") _save_ap_object(ap_actor, "actor") if "STUB" == payload["type"]: send_note("STUB") def _save_ap_object( ap_object: dict, path: str, ): file_name = hash(ap_object["id"]) with open(f"data/{path}/{file_name}.json", 'w') as f: json.dump(ap_object, f) def _send_accept( ap_object, inbox_url: str, ) -> None : try: payload = { "@context": ap.AS_CTX, "id": f"{BASE_URL}/" + str(uuid.uuid4()), "type": "Accept", "actor": ap.ME["id"], "object": ap_object["id"], } _save_ap_object(payload, "outbox") ap.post(inbox_url, payload) except Exception as e: logger.error(e) def send_note( content: str, ): """Send Note!""" published = dtime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") to = [] cc = [] to = [ap.AS_PUBLIC] cc = [f"{BASE_URL}/followers"] object_id = f"{BASE_URL}/" + str(uuid.uuid4()) ap_object = { "@context": ap.AS_EXTENDED_CTX, "type": "Note", "id": object_id, "attributedTo": ID, "content": content, "to": to, "cc": cc, "published": published, "url": object_id, "tag": [], "summary": None, "inReplyTo": None, "sensitive": False, "attachment": [], } recipients = _compute_recipients() ap_object = _wrap_ap_object(ap_object) for inbox_url in recipients: ap.post( inbox_url, ap_object, ) def _compute_recipients() -> set[str]: recipients = set() filelist = [] path = os.getcwd() for root, _, files in os.walk(path + '/data/actor'): for name in files : filelist.append(os.path.join(root, name)) for filename in filelist: if filename.endswith('.json'): with open(filename, 'r') as f: ap_actor = json.load(f) inbox_url = ap_actor.get("endpoints", {}).get("sharedInbox") \ or ap_actor.get("inbox") recipients.add(inbox_url) return recipients def _wrap_ap_object(ap_object: dict) -> dict: if ap_object["type"] in ["Note"]: if "@context" in ap_object: del ap_object["@context"] return { "@context": ap.AS_EXTENDED_CTX, "actor": ID, "to": ap_object.get("to", []), "cc": ap_object.get("cc", []), "id": ap_object["id"] + "/activity", "object": ap_object, "published": ap_object["published"], "type": "Create", } return ap_object