#!/usr/bin/env python3 import typing import json from loguru import logger from app.database import AsyncSession from app import models from sqlalchemy import select from urllib.parse import urlparse if typing.TYPE_CHECKING: from app.models import Actor as ActorModel import app.activitypub as ap async def fetch_actor( db_session : AsyncSession, actor_id : str, ) -> "ActorModel": exist_actor = ( await db_session.scalars( select(models.Actor).where( models.Actor.ap_id == actor_id ) ) ).one_or_none() if not exist_actor: ap_object = await ap.fetch(actor_id) exist_actor = await save_actor(ap_object, db_session) return exist_actor else: return exist_actor async def save_actor( ap_object : dict, db_session : AsyncSession ) -> "ActorModel": logger.info("save actor " + ap_object["id"]) actor = models.Actor( ap_id=ap_object["id"], ap_actor=ap_object, ap_type=ap_object["type"], handle=_handle(ap_object), ) db_session.add(actor) await db_session.flush() await db_session.refresh(actor) return actor def _handle ( ap_object :dict ) -> str: ap_id = urlparse(ap_object["id"]) if not ap_id.hostname: raise ValueError(f"Invalid actor ID {ap_id}") handle = '@' + ap_object["preferredUsername"] + '@' + ap_id.hostname return handle async def get_public_key( db_session: AsyncSession, key_id: str ) -> str: existing_actor = ( await db_session.scalars( select(models.Actor).where(models.Actor.ap_id == key_id.split("#")[0]) ) ).one_or_none() public_key = existing_actor.ap_object["publicKey"]["publicKeyPem"] return public_key