#!/usr/bin/env python3 from typing import Any import uuid from sqlalchemy.orm import session from app import models from app import ldsig from app.database import AsyncSession from app.models import InboxObject, OutboxObject, now from app.activitypub import ME from app.activitypub import handle_visibility from app.config import MANUALLY_APPROVES_FOLLOWERS from app.config import BASE_URL, ID from app.models import Actor from app.actor import fetch_actor from app.httpsig import k import app.activitypub as ap from app.hyap import post # type: ignore from urllib.parse import urlparse from sqlalchemy import select from sqlalchemy import delete from sqlalchemy.orm import joinedload from sqlalchemy.exc import IntegrityError from loguru import logger from uuid import uuid4 from datetime import datetime def allocate_outbox_id() -> str: return str(uuid.uuid4()) def build_object_id(id) -> str: return f"{BASE_URL}/tail/{id}" async def get_outbox_object( db_session: AsyncSession, ap_id: str, ) -> OutboxObject | None: return ( ( await db_session.execute( select(OutboxObject) .where(OutboxObject.ap_id == ap_id) .options( joinedload(OutboxObject.relates_to_inbox_object).options( joinedload(InboxObject.actor), ), ) ) ) .unique() .scalar_one_or_none() ) async def get_inbox_object( db_session: AsyncSession, ap_id: str, ) -> InboxObject | None: return ( await db_session.execute( select(InboxObject) .where(InboxObject.ap_id == ap_id) .options( joinedload(InboxObject.actor), joinedload(InboxObject.relates_to_inbox_object), joinedload(InboxObject.relates_to_outbox_object), ) ) ).scalar_one_or_none() async def save_incoming( db_session: AsyncSession, payload: dict, ) -> models.IncomingActivity | None: ap_id: str if "@context" not in payload: logger.warning(f"invalid object: {payload}") return None if "id" in payload: ap_id = payload["id"] else: ap_id = str(uuid4()) incoming_activity = models.IncomingActivity( ap_id=ap_id, ap_object=payload, ) if await process_incoming(db_session, payload): await db_session.commit() await db_session.flush() return incoming_activity db_session.add(incoming_activity) await db_session.commit() await db_session.refresh(incoming_activity) return incoming_activity async def process_incoming( db_session: AsyncSession, ap_object: dict[str, Any], ) -> bool: actor = await fetch_actor(db_session, ap_object["actor"]) relates_to_inbox_object = None relates_to_outbox_object = None if isinstance(ap_object["object"], dict): if ap_object["object"]["id"].startswith(BASE_URL): relates_to_outbox_object = await get_outbox_object( db_session, ap_object["object"]["id"], ) relates_to_inbox_object = await get_inbox_object( db_session, ap_object["object"]["id"] ) else: if ap_object["object"].startswith(BASE_URL): relates_to_outbox_object = await get_outbox_object( db_session, ap_object["object"], ) relates_to_inbox_object = await get_inbox_object( db_session, ap_object["object"] ) def build_object( object, relates_to_inbox_object = None, relates_to_outbox_object = None, ) -> InboxObject: inbox_object = models.InboxObject( actor_id=actor.id, server=urlparse(object["id"]).hostname, is_hidden_from_stream=True, ap_actor_id=actor.ap_id, ap_type=object["type"], ap_id=object["id"], ap_published_at=now(), ap_object=object, visibility=handle_visibility(object), activity_object_ap_id=object["actor"], relates_to_inbox_object_id=relates_to_inbox_object.id if relates_to_inbox_object else None, relates_to_outbox_object_id=relates_to_outbox_object.id if relates_to_outbox_object else None, ) return inbox_object if "Follow" == ap_object["type"]: inbox_object = build_object(ap_object) db_session.add(inbox_object) await db_session.flush() await db_session.refresh(inbox_object) if await _handle_follow(db_session, actor, inbox_object): return True return False elif "Undo" == ap_object["type"]: #inbox_object = build_object(ap_object) #db_session.add(inbox_object) if await _handle_undo(db_session, ap_object): return True elif ap_object["type"] in ["Accept", "Rejact"]: follow_id = ap_object["object"] if isinstance(follow_id, dict): follow_id = follow_id["id"] relate_following_object = (await db_session.execute( select(models.OutboxObject) .where(models.OutboxObject.ap_id == follow_id) .options( joinedload(models.OutboxObject.relates_to_inbox_object).options( joinedload(models.InboxObject.actor) ) ) ) ).unique().scalar_one_or_none() if "Accept" == ap_object["type"]: try: inbox_object = build_object(ap_object) db_session.add(inbox_object) await db_session.flush() except IntegrityError: logger.warning("Duplicate accept respond?") await db_session.rollback() return True await db_session.refresh(inbox_object) following = models.Following( actor_id=relate_following_object.relates_to_actor.id, outbox_object_id=relate_following_object.id, ap_actor_id=ap_object["actor"], ) db_session.add(following) await db_session.flush() await db_session.refresh(following) return True elif "Announce" == ap_object["type"]: inbox_object = build_object( ap_object, relates_to_inbox_object=relates_to_inbox_object, relates_to_outbox_object=relates_to_outbox_object, ) db_session.add(inbox_object) await db_session.flush() await db_session.refresh(inbox_object) if await _handle_announce( db_session, inbox_object, relates_to_outbox_object=relates_to_outbox_object, relates_to_inbox_object=relates_to_inbox_object, ): return True elif "Like" == ap_object["type"]: inbox_object = build_object( ap_object, relates_to_inbox_object=relates_to_inbox_object, relates_to_outbox_object=relates_to_outbox_object, ) db_session.add(inbox_object) await db_session.flush() await db_session.refresh(inbox_object) if await _handle_like( db_session, inbox_object, relates_to_outbox_object=relates_to_outbox_object, relates_to_inbox_object=relates_to_inbox_object, ): return True return False async def _handle_follow( db_session: AsyncSession, actor: Actor, inbox_object: InboxObject, ) -> bool: if ME["id"] != inbox_object.ap_object["object"]: #type: ignore # await db_session.delete(ap_object) logger.warning("no match follow object!" + inbox_object.ap_object["id"]) #type: ignore return False if MANUALLY_APPROVES_FOLLOWERS: return False await _send_accept(db_session, actor, inbox_object) return True async def _send_accept( db_session: AsyncSession, actor: Actor, inbox_object: InboxObject, ) -> None: follower = models.Follower( actor_id=inbox_object.actor_id, inbox_object_id=inbox_object.id, ap_actor_id=inbox_object.ap_object["actor"], #type: ignore ) try: db_session.add(follower) await db_session.flush() except IntegrityError: await db_session.rollback() logger.warning("existing follower in db!") try: reply_id = allocate_outbox_id() url = actor.inbox_url # type: ignore out = { "@context": ap.AS_CTX, "id": build_object_id(reply_id), "type": "Accept", "actor": ME["id"], "object": inbox_object.ap_object["id"], #type: ignore } #TODO outcoming await save_to_outbox( db_session, reply_id, out, relates_to_actor_id=actor.id, # type: ignore ) await post(url, out) # type: ignore except Exception as e: logger.error(e) async def _handle_undo( db_session: AsyncSession, inbox_object: dict ) -> bool: if inbox_object["object"]["object"] != ME["id"]: logger.warning("Wrong undo object! " + inbox_object["object"]["actor"]) return False if "Follow" == inbox_object["object"]["type"]: relate_object = (await db_session.execute( select(models.InboxObject) .where(models.InboxObject.ap_id == inbox_object["object"]["id"]) .options( joinedload(models.InboxObject.actor), joinedload(models.InboxObject.relates_to_inbox_object), joinedload(models.InboxObject.relates_to_outbox_object), ) ) ).scalar_one_or_none() # type: ignore if relate_object: relate_object.undo_id=inbox_object["object"]["id"] relate_object.is_deleted=True await db_session.execute( delete(models.Follower).where( models.Follower.ap_actor_id == inbox_object["actor"] ) ) logger.info("undo follow " + inbox_object["actor"]) return True return False async def send_follow( db_session : AsyncSession, acct : str ): await _send_follow(db_session, acct) await db_session.commit() await db_session.flush() async def _handle_announce( db_session: AsyncSession, inbox_object: InboxObject, relates_to_outbox_object: models.OutboxObject | None, relates_to_inbox_object: models.InboxObject | None, ) -> bool : if relates_to_outbox_object: relates_to_outbox_object.announces_count = ( # type: ignore models.OutboxObject.announces_count + 1 ) logger.info(f"announces +1 {relates_to_outbox_object.ap_id}") return True else: return False async def _handle_like( db_session: AsyncSession, inbox_object: InboxObject, relates_to_outbox_object: models.OutboxObject | None, relates_to_inbox_object: models.InboxObject | None, ) -> bool : if relates_to_outbox_object: relates_to_outbox_object.likes_count = ( # type: ignore models.OutboxObject.likes_count + 1 ) logger.info(f"likes +1 {relates_to_outbox_object.ap_id}") return True else: return False async def _send_follow( db_session : AsyncSession, actor_url : str, ): actor = await fetch_actor(db_session, actor_url) follow_id = build_object_id(allocate_outbox_id()) out = { "@context": ap.AS_CTX, "id": follow_id, "type": "Follow", "actor": ME["id"], "object": actor.ap_id, } await save_to_outbox( db_session, follow_id, out, relates_to_actor_id=actor.id, # type: ignore activity_object_ap_id=actor.ap_id ) await post( actor.inbox_url, out, ) async def _send_create( db_session: AsyncSession, ap_type: str, content: str, visibility: ap.VisibilityEnum, published: str | None = None, ) -> bool: object_id = allocate_outbox_id() if not published: published = now().replace(microsecond=0).isoformat().replace("+00:00", "Z") to = [] cc = [] if visibility == ap.VisibilityEnum.PUBLIC: to = [ap.AS_PUBLIC] cc = [f"{BASE_URL}/followers"] elif visibility == ap.VisibilityEnum.UNLISTED: to = [f"{BASE_URL}/followers"] cc = [ap.AS_PUBLIC] elif visibility == ap.VisibilityEnum.FOLLOWERS_ONLY: to = [f"{BASE_URL}/followers"] cc = [] else: raise ValueError(f"Unsupport visibility {visibility}") ap_object = { "@context": ap.AS_EXTENDED_CTX, "type": ap_type, "id": build_object_id(object_id), "attributedTo": ID, "content": content, "to": to, "cc": cc, "published": published, # "context": context, # "conversation": context, "url": build_object_id(object_id), "tag": [], "summary": None, "inReplyTo": None, "sensitive": False, "attachment": [], } outbox_object = await save_to_outbox( db_session, object_id, ap_object, ) recipients = await _compute_recipients(db_session, ap_object) ap_object = ap.wrap_ap_object(ap_object) if ap_object["type"] == "Create": if ap.VisibilityEnum.PUBLIC == outbox_object.visibility: ldsig.generate_signature(ap_object, k) try: for r in recipients: await post( r, ap_object, ) except Exception as e: logger.error(e) return True async def _compute_recipients( db_session: AsyncSession, ap_object: dict, ) -> set[str]: async def process_collection( db_session, url) -> list[Actor]: if url == BASE_URL + "/followers": followers = ( ( await db_session.scalars( select(models.Follower).options( joinedload(models.Follower.actor) ) ) ) .unique() .all() ) else: raise ValueError(f"{url}) not supported") return [follower.actor for follower in followers] _recipients = [] for field in ["to", "cc", "bcc", "bto"]: if field in ap_object: _recipients.extend(ap_object[field]) recipients = set() logger.info(f"{_recipients}") for r in _recipients: if r in [ap.AS_PUBLIC, ID]: continue if r.startswith(BASE_URL): for actor in await process_collection(db_session, r): recipients.add(actor.share_inbox_url) continue return recipients async def save_to_inbox( db_session : AsyncSession, inbox_id : str, ap_object : dict, relates_to_inbox_object_id: int | None = None, relates_to_outbox_object_id: int | None = None, relates_to_actor_id: int | None = None, ) -> InboxObject: ap_type = ap_object["type"] ap_id = ap_object["id"] visibility = handle_visibility(ap_object) inbox_object = OutboxObject( public_id=inbox_id, ap_object=ap_object, ap_id=ap_id, ap_type=ap_type, visibility =visibility, relates_to_inbox_object_id=relates_to_inbox_object_id, relates_to_outbox_object_id=relates_to_outbox_object_id, relates_to_actor_id=relates_to_actor_id, ) db_session.add(inbox_object) await db_session.flush() await db_session.refresh(inbox_object) return inbox_object async def save_to_outbox( db_session : AsyncSession, outbox_id : str, ap_object : dict, activity_object_ap_id: str | None = None, relates_to_inbox_object_id: int | None = None, relates_to_outbox_object_id: int | None = None, relates_to_actor_id: int | None = None, ) -> OutboxObject: ap_type = ap_object["type"] ap_id = ap_object["id"] visibility = handle_visibility(ap_object) outbox_object = OutboxObject( public_id=outbox_id, ap_object=ap_object, ap_id=ap_id, ap_type=ap_type, visibility =visibility, activity_object_ap_id=activity_object_ap_id, relates_to_inbox_object_id=relates_to_inbox_object_id, relates_to_outbox_object_id=relates_to_outbox_object_id, relates_to_actor_id=relates_to_actor_id, ) db_session.add(outbox_object) await db_session.flush() await db_session.refresh(outbox_object) await db_session.commit() return outbox_object