foxhole/app/boxes.py
SouthFox da6b51fc5f
All checks were successful
ci/woodpecker/push/lint Pipeline was successful
ci/woodpecker/push/test Pipeline was successful
[chore] format code
2023-08-02 20:53:09 +08:00

562 lines
16 KiB
Python

#!/usr/bin/env python3
from typing import Any
import uuid
from sqlalchemy.orm import session
from app import models
from app import ldsig
from app.database import AsyncSession
from app.models import InboxObject, OutboxObject, now
from app.activitypub import ME
from app.activitypub import handle_visibility
from app.config import MANUALLY_APPROVES_FOLLOWERS
from app.config import BASE_URL, ID
from app.models import Actor
from app.actor import fetch_actor
from app.httpsig import k
import app.activitypub as ap
from urllib.parse import urlparse
from sqlalchemy import select
from sqlalchemy import delete
from sqlalchemy.orm import joinedload
from sqlalchemy.exc import IntegrityError
from loguru import logger
from uuid import uuid4
from datetime import datetime
def allocate_outbox_id() -> str:
return str(uuid.uuid4())
def build_object_id(id) -> str:
return f"{BASE_URL}/tail/{id}"
async def get_outbox_object(
db_session: AsyncSession,
ap_id: str,
) -> OutboxObject | None :
return (
(
await db_session.execute(
select(OutboxObject)
.where(OutboxObject.ap_id == ap_id)
.options(
joinedload(models.OutboxObject.relates_to_inbox_object).options(
joinedload(models.InboxObject.actor),
),
)
)
)
.unique()
.scalar_one_or_none()
)
async def save_incoming(
db_session: AsyncSession,
payload: dict,
) -> models.IncomingActivity | None:
ap_id: str
if "@context" not in payload:
logger.warning(f"invalid object: {payload}")
return None
if "id" in payload:
ap_id = payload["id"]
else:
ap_id = str(uuid4())
incoming_activity = models.IncomingActivity(
ap_id=ap_id,
ap_object=payload,
)
if await process_incoming(db_session, payload):
await db_session.commit()
await db_session.flush()
return incoming_activity
db_session.add(incoming_activity)
await db_session.commit()
await db_session.refresh(incoming_activity)
return incoming_activity
async def process_incoming(
db_session: AsyncSession,
ap_object: dict[str, Any],
) -> bool:
actor = await fetch_actor(db_session, ap_object["actor"])
relates_to_inbox_object = None
relates_to_outbox_object = None
if isinstance(ap_object["object"], dict):
if ap_object["object"]["id"].startswith(BASE_URL):
relates_to_outbox_object = await get_outbox_object(
db_session,
ap_object["object"]["id"],
)
else:
if ap_object["object"].startswith(BASE_URL):
relates_to_outbox_object = await get_outbox_object(
db_session,
ap_object["object"],
)
def build_object(
object,
relates_to_inbox_object = None,
relates_to_outbox_object = None,
) -> InboxObject:
inbox_object = models.InboxObject(
actor_id=actor.id,
server=urlparse(object["id"]).hostname,
is_hidden_from_stream=True,
ap_actor_id=actor.ap_id,
ap_type=object["type"],
ap_id=object["id"],
ap_published_at=now(),
ap_object=object,
visibility=handle_visibility(object),
activity_object_ap_id=object["actor"],
relates_to_inbox_object_id=relates_to_inbox_object.id
if relates_to_inbox_object
else None,
relates_to_outbox_object_id=relates_to_outbox_object.id
if relates_to_outbox_object
else None,
)
return inbox_object
if "Follow" == ap_object["type"]:
inbox_object = build_object(ap_object)
db_session.add(inbox_object)
await db_session.flush()
await db_session.refresh(inbox_object)
if await _handle_follow(db_session, actor, inbox_object):
return True
return False
elif "Undo" == ap_object["type"]:
#inbox_object = build_object(ap_object)
#db_session.add(inbox_object)
if await _handle_undo(db_session, ap_object):
return True
elif ap_object["type"] in ["Accept", "Rejact"]:
follow_id = ap_object["object"]
if isinstance(follow_id, dict):
follow_id = follow_id["id"]
relate_following_object = (await db_session.execute(
select(models.OutboxObject)
.where(models.OutboxObject.ap_id == follow_id)
.options(
joinedload(models.OutboxObject.relates_to_inbox_object).options(
joinedload(models.InboxObject.actor)
)
)
)
).unique().scalar_one_or_none()
if "Accept" == ap_object["type"]:
try:
inbox_object = build_object(ap_object)
db_session.add(inbox_object)
await db_session.flush()
except IntegrityError:
logger.warning("Duplicate accept respond?")
await db_session.rollback()
return True
await db_session.refresh(inbox_object)
following = models.Following(
actor_id=relate_following_object.relates_to_actor.id,
outbox_object_id=relate_following_object.id,
ap_actor_id=ap_object["actor"],
)
db_session.add(following)
await db_session.flush()
await db_session.refresh(following)
return True
elif "Announce" == ap_object["type"]:
inbox_object = build_object(
ap_object,
relates_to_inbox_object=relates_to_inbox_object,
relates_to_outbox_object=relates_to_outbox_object,
)
db_session.add(inbox_object)
await db_session.flush()
await db_session.refresh(inbox_object)
if await _handle_announce(
db_session,
inbox_object,
relates_to_outbox_object=relates_to_outbox_object,
relates_to_inbox_object=relates_to_inbox_object,
):
return True
elif "Like" == ap_object["type"]:
inbox_object = build_object(
ap_object,
relates_to_inbox_object=relates_to_inbox_object,
relates_to_outbox_object=relates_to_outbox_object,
)
db_session.add(inbox_object)
await db_session.flush()
await db_session.refresh(inbox_object)
if await _handle_like(
db_session,
inbox_object,
relates_to_outbox_object=relates_to_outbox_object,
relates_to_inbox_object=relates_to_inbox_object,
):
return True
return False
async def _handle_follow(
db_session: AsyncSession,
actor: Actor,
inbox_object: InboxObject,
) -> bool:
if ME["id"] != inbox_object.ap_object["object"]: #type: ignore
# await db_session.delete(ap_object)
logger.warning("no match follow object!" + inbox_object.ap_object["id"]) #type: ignore
return False
if MANUALLY_APPROVES_FOLLOWERS:
return False
await _send_accept(db_session, actor, inbox_object)
return True
async def _send_accept(
db_session: AsyncSession,
actor: Actor,
inbox_object: InboxObject,
) -> None:
follower = models.Follower(
actor_id=inbox_object.actor_id,
inbox_object_id=inbox_object.id,
ap_actor_id=inbox_object.ap_object["actor"], #type: ignore
)
try:
db_session.add(follower)
await db_session.flush()
except IntegrityError:
await db_session.rollback()
logger.warning("existing follower in db!")
try:
reply_id = allocate_outbox_id()
url = actor.inbox_url # type: ignore
out = {
"@context": ap.AS_CTX,
"id": build_object_id(reply_id),
"type": "Accept",
"actor": ME["id"],
"object": inbox_object.ap_object["id"], #type: ignore
}
#TODO outcoming
await save_to_outbox(
db_session,
reply_id,
out,
relates_to_actor_id=actor.id, # type: ignore
)
await ap.post(url, out) # type: ignore
except Exception as e:
logger.error(e)
async def _handle_undo(
db_session: AsyncSession,
inbox_object: dict
) -> bool:
if inbox_object["object"]["object"] != ME["id"]:
logger.warning("Wrong undo object! "
+ inbox_object["object"]["actor"])
return False
if "Follow" == inbox_object["object"]["type"]:
relate_object = (await db_session.execute(
select(models.InboxObject)
.where(models.InboxObject.ap_id == inbox_object["object"]["id"])
.options(
joinedload(models.InboxObject.actor),
joinedload(models.InboxObject.relates_to_inbox_object),
joinedload(models.InboxObject.relates_to_outbox_object),
)
)
).scalar_one_or_none() # type: ignore
if relate_object:
relate_object.undo_id=inbox_object["object"]["id"]
relate_object.is_deleted=True
await db_session.execute(
delete(models.Follower).where(
models.Follower.ap_actor_id == inbox_object["actor"]
)
)
logger.info("undo follow " + inbox_object["actor"])
return True
return False
async def send_follow(
db_session : AsyncSession,
acct : str
):
await _send_follow(db_session, acct)
await db_session.commit()
await db_session.flush()
async def _handle_announce(
db_session: AsyncSession,
inbox_object: InboxObject,
relates_to_outbox_object: models.OutboxObject | None,
relates_to_inbox_object: models.InboxObject | None,
) -> bool :
if relates_to_outbox_object:
relates_to_outbox_object.announces_count = ( # type: ignore
models.OutboxObject.announces_count + 1
)
logger.info(f"announces +1 {relates_to_outbox_object.ap_id}")
return True
else:
return False
async def _handle_like(
db_session: AsyncSession,
inbox_object: InboxObject,
relates_to_outbox_object: models.OutboxObject | None,
relates_to_inbox_object: models.InboxObject | None,
) -> bool :
if relates_to_outbox_object:
relates_to_outbox_object.likes_count = ( # type: ignore
models.OutboxObject.likes_count + 1
)
logger.info(f"likes +1 {relates_to_outbox_object.ap_id}")
return True
else:
return False
async def _send_follow(
db_session : AsyncSession,
actor_url : str,
):
actor = await fetch_actor(db_session, actor_url)
follow_id = build_object_id(allocate_outbox_id())
out = {
"@context": ap.AS_CTX,
"id": follow_id,
"type": "Follow",
"actor": ME["id"],
"object": actor.ap_id,
}
await save_to_outbox(
db_session,
follow_id,
out,
relates_to_actor_id=actor.id, # type: ignore
activity_object_ap_id=actor.ap_id
)
await ap.post(
actor.inbox_url,
out,
)
async def _send_create(
db_session: AsyncSession,
ap_type: str,
content: str,
visibility: ap.VisibilityEnum,
published: str | None = None,
) -> bool:
object_id = allocate_outbox_id()
if not published:
published = now().replace(microsecond=0).isoformat().replace("+00:00", "Z")
to = []
cc = []
if visibility == ap.VisibilityEnum.PUBLIC:
to = [ap.AS_PUBLIC]
cc = [f"{BASE_URL}/followers"]
else:
raise ValueError(f"Unsupport visibility {visibility}")
ap_object = {
"@context": ap.AS_EXTENDED_CTX,
"type": ap_type,
"id": build_object_id(object_id),
"attributedTo": ID,
"content": content,
"to": to,
"cc": cc,
"published": published,
# "context": context,
# "conversation": context,
"url": build_object_id(object_id),
"tag": [],
"summary": None,
"inReplyTo": None,
"sensitive": False,
"attachment": [],
}
outbox_object = await save_to_outbox(
db_session,
object_id,
ap_object,
)
recipients = await _compute_recipients(db_session, ap_object)
ap_object = ap.wrap_ap_object(ap_object)
if ap_object["type"] == "Create":
if ap.VisibilityEnum.PUBLIC == outbox_object.visibility:
ldsig.generate_signature(ap_object, k)
for r in recipients:
await ap.post(
r,
ap_object,
)
return True
async def _compute_recipients(
db_session: AsyncSession,
ap_object: dict,
) -> set[str]:
async def process_collection(
db_session,
url) -> list[Actor]:
if url == BASE_URL + "/followers":
followers = (
(
await db_session.scalars(
select(models.Follower).options(
joinedload(models.Follower.actor)
)
)
)
.unique()
.all()
)
else:
raise ValueError(f"{url}) not supported")
return [follower.actor for follower in followers]
_recipients = []
for field in ["to", "cc", "bcc", "bto"]:
if field in ap_object:
_recipients.extend(ap_object[field])
recipients = set()
logger.info(f"{_recipients}")
for r in _recipients:
if r in [ap.AS_PUBLIC, ID]:
continue
if r.startswith(BASE_URL):
for actor in await process_collection(db_session, r):
recipients.add(actor.share_inbox_url)
continue
return recipients
async def save_to_inbox(
db_session : AsyncSession,
inbox_id : str,
ap_object : dict,
relates_to_inbox_object_id: int | None = None,
relates_to_outbox_object_id: int | None = None,
relates_to_actor_id: int | None = None,
) -> InboxObject:
ap_type = ap_object["type"]
ap_id = ap_object["id"]
visibility = handle_visibility(ap_object)
inbox_object = OutboxObject(
public_id=inbox_id,
ap_object=ap_object,
ap_id=ap_id,
ap_type=ap_type,
visibility =visibility,
relates_to_inbox_object_id=relates_to_inbox_object_id,
relates_to_outbox_object_id=relates_to_outbox_object_id,
relates_to_actor_id=relates_to_actor_id,
)
db_session.add(inbox_object)
await db_session.flush()
await db_session.refresh(inbox_object)
return inbox_object
async def save_to_outbox(
db_session : AsyncSession,
outbox_id : str,
ap_object : dict,
activity_object_ap_id: str | None = None,
relates_to_inbox_object_id: int | None = None,
relates_to_outbox_object_id: int | None = None,
relates_to_actor_id: int | None = None,
) -> OutboxObject:
ap_type = ap_object["type"]
ap_id = ap_object["id"]
visibility = handle_visibility(ap_object)
outbox_object = OutboxObject(
public_id=outbox_id,
ap_object=ap_object,
ap_id=ap_id,
ap_type=ap_type,
visibility =visibility,
activity_object_ap_id=activity_object_ap_id,
relates_to_inbox_object_id=relates_to_inbox_object_id,
relates_to_outbox_object_id=relates_to_outbox_object_id,
relates_to_actor_id=relates_to_actor_id,
)
db_session.add(outbox_object)
await db_session.flush()
await db_session.refresh(outbox_object)
await db_session.commit()
return outbox_object