COSCUP-ap-demo/demo/boxes.py

137 lines
3.3 KiB
Python

#!/usr/bin/env python3
"""Inbox or Outbox."""
from datetime import datetime as dtime
from datetime import timezone
from loguru import logger
from demo.config import BASE_URL, ID
from demo.actor import fetch_actor
from demo import activitypub as ap
import json
import uuid
import os
def process_inbox(
payload
) -> None:
if "@context" not in payload:
logger.warning(f"invalid object: {payload}")
return None
if "Follow" == payload["type"]:
ap_actor = fetch_actor(payload["actor"])
_send_accept(payload, ap_actor["inbox"])
_save_ap_object(payload, "inbox")
_save_ap_object(ap_actor, "actor")
if "STUB" == payload["type"]:
send_note("STUB")
def _save_ap_object(
ap_object: dict,
path: str,
):
file_name = hash(ap_object["id"])
with open(f"data/{path}/{file_name}.json", 'w') as f:
json.dump(ap_object, f)
def _send_accept(
ap_object,
inbox_url: str,
) -> None :
try:
payload = {
"@context": ap.AS_CTX,
"id": f"{BASE_URL}/" + str(uuid.uuid4()),
"type": "Accept",
"actor": ap.ME["id"],
"object": ap_object["id"],
}
_save_ap_object(payload, "outbox")
ap.post(inbox_url, payload)
except Exception as e:
logger.error(e)
def send_note(
content: str,
):
"""Send Note!"""
published = dtime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
to = []
cc = []
to = [ap.AS_PUBLIC]
cc = [f"{BASE_URL}/followers"]
object_id = f"{BASE_URL}/" + str(uuid.uuid4())
ap_object = {
"@context": ap.AS_EXTENDED_CTX,
"type": "Note",
"id": object_id,
"attributedTo": ID,
"content": content,
"to": to,
"cc": cc,
"published": published,
"url": object_id,
"tag": [],
"summary": None,
"inReplyTo": None,
"sensitive": False,
"attachment": [],
}
recipients = _compute_recipients()
ap_object = _wrap_ap_object(ap_object)
for inbox_url in recipients:
ap.post(
inbox_url,
ap_object,
)
def _compute_recipients() -> set[str]:
recipients = set()
filelist = []
path = os.getcwd()
for root, _, files in os.walk(path + '/data/actor'):
for name in files :
filelist.append(os.path.join(root, name))
for filename in filelist:
if filename.endswith('.json'):
with open(filename, 'r') as f:
ap_actor = json.load(f)
inbox_url = ap_actor.get("endpoints", {}).get("sharedInbox") \
or ap_actor.get("inbox")
recipients.add(inbox_url)
return recipients
def _wrap_ap_object(ap_object: dict) -> dict:
if ap_object["type"] in ["Note"]:
if "@context" in ap_object:
del ap_object["@context"]
return {
"@context": ap.AS_EXTENDED_CTX,
"actor": ID,
"to": ap_object.get("to", []),
"cc": ap_object.get("cc", []),
"id": ap_object["id"] + "/activity",
"object": ap_object,
"published": ap_object["published"],
"type": "Create",
}
return ap_object