foxhole/app/actor.py

82 lines
1.9 KiB
Python
Raw Normal View History

2023-03-18 14:51:36 +01:00
#!/usr/bin/env python3
2023-03-21 07:24:57 +01:00
import typing
2023-03-18 14:51:36 +01:00
from loguru import logger
from app.database import AsyncSession
2023-03-21 07:24:57 +01:00
from app import models
2023-03-18 14:51:36 +01:00
from sqlalchemy import select
from urllib.parse import urlparse
2023-03-21 07:24:57 +01:00
if typing.TYPE_CHECKING:
from app.models import Actor as ActorModel
2023-03-18 14:51:36 +01:00
import app.activitypub as ap
2023-03-21 07:24:57 +01:00
2023-03-20 10:54:54 +01:00
class BaseActor:
def __init__(self, ap_actor: ap.RawObject) -> None:
if (ap_type := ap_actor.get("type")) not in ap.ACTOR_TYPES:
raise ValueError(f"Unexpected actor type: {ap_type}")
self._ap_actor = ap_actor
2023-03-21 07:24:57 +01:00
self._ap_type : str = ap_type # type: ignore
2023-03-20 10:54:54 +01:00
@property
def ap_actor(self) -> ap.RawObject:
return self._ap_actor
@property
def inbox_url(self) -> str:
return self.ap_actor["inbox"]
2023-03-21 07:24:57 +01:00
@property
def ap_type(self) -> str:
return self._ap_type
2023-03-18 14:51:36 +01:00
async def fetch_actor(
db_session : AsyncSession,
2023-03-21 06:58:14 +01:00
actor_id : str,
2023-03-21 07:24:57 +01:00
) -> "ActorModel":
2023-03-18 14:51:36 +01:00
exist_actor = (
await db_session.scalars(
2023-03-21 07:24:57 +01:00
select(models.Actor).where(
models.Actor.ap_id == actor_id
2023-03-18 14:51:36 +01:00
)
)
).one_or_none()
if not exist_actor:
ap_object = await ap.fetch(actor_id)
exist_actor = await save_actor(ap_object, db_session)
return exist_actor
return exist_actor
async def save_actor(
ap_object : dict,
db_session : AsyncSession
2023-03-21 07:24:57 +01:00
) -> "ActorModel":
2023-03-18 14:51:36 +01:00
logger.info("save actor " + ap_object["id"])
2023-03-21 07:24:57 +01:00
actor = models.Actor(
2023-03-18 14:51:36 +01:00
ap_id=ap_object["id"],
ap_actor=ap_object,
ap_type=ap_object["type"],
handle=_handle(ap_object),
)
db_session.add(actor)
await db_session.flush()
await db_session.refresh(actor)
return actor
def _handle (
ap_object :dict
) -> str:
ap_id = urlparse(ap_object["id"])
if not ap_id.hostname:
raise ValueError(f"Invalid actor ID {ap_id}")
handle = '@' + ap_object["preferredUsername"] + '@' + ap_id.hostname
return handle