#!/usr/bin/env python3 """Actor stuff.""" import typing from urllib.parse import urlparse from sqlalchemy import select from loguru import logger import app.activitypub as ap from app import models from app.database import AsyncSession if typing.TYPE_CHECKING: from app.models import Actor as ActorModel async def fetch_actor( db_session: AsyncSession, actor_id: str, ) -> "ActorModel": """Fetch actor on db, if not exist will be grabed and stored in db.""" exist_actor = ( await db_session.scalars( select(models.Actor).where( models.Actor.ap_id == actor_id ) ) ).one_or_none() if exist_actor: return exist_actor ap_object = await ap.fetch(actor_id) exist_actor = await save_actor(ap_object, db_session) return exist_actor async def save_actor( ap_object: dict, db_session: AsyncSession ) -> "ActorModel": """Save actor to db.""" logger.info("save actor " + ap_object["id"]) actor = models.Actor( ap_id=ap_object["id"], ap_actor=ap_object, ap_type=ap_object["type"], handle=_handle(ap_object), ) db_session.add(actor) await db_session.flush() await db_session.refresh(actor) return actor def _handle( ap_object: dict, ) -> str: ap_id = urlparse(ap_object["id"]) if not ap_id.hostname: raise ValueError(f"Invalid actor ID {ap_id}") handle = '@' + ap_object["preferredUsername"] + '@' + ap_id.hostname return handle async def get_public_key( db_session: AsyncSession, key_id: str ) -> str: """Give key id and reutrn public key.""" existing_actor = ( await db_session.scalars( select(models.Actor).where( models.Actor.ap_id == key_id.split("#")[0]) ) ).one_or_none() public_key = existing_actor.ap_object["publicKey"]["publicKeyPem"] return public_key