foxhole/tests/test_inbox.py

224 lines
6.4 KiB
Python

#!/usr/bin/env python3
import pytest
import fastapi
import respx
import httpx
from uuid import uuid4
from tests import factories
from tests.utils import build_remote_actor
from unittest import mock
from app.main import app
from app.utils import precheck
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from app import activitypub as ap, httpsig
from app.config import AP_CONTENT_TYPE
from app import models
from sqlalchemy import select
def test_inbox_follow_request(
db: Session,
client: TestClient,
respx_mock: respx.MockRouter,
) -> None:
# build test actor
ra = build_remote_actor()
ap_id = ra.ap_id # type: ignore
# mock request
respx_mock.get(ap_id).mock(return_value=httpx.Response(200,json=ra.ap_actor))
respx_mock.post(ap_id + "/inbox").mock(return_value=httpx.Response(202))
# send follower request
with mock.patch("app.boxes.MANUALLY_APPROVES_FOLLOWERS", False):
follow_id = ap_id + "/follow/" + (uuid4().hex)
response = client.post(
"/inbox",
headers={"Content-Type": AP_CONTENT_TYPE},
json={
"@context": ap.AS_CTX,
"type": "Follow",
"id": follow_id,
"actor": ap_id,
"object": ap.ME["id"],
},
)
assert response.status_code == 202
# actor was saved in actor table
saved_actor = db.execute(select(models.Actor)).scalar_one()
assert saved_actor.ap_id == ap_id
# follower request was saved in outbox table
outbox_object = db.execute(select(models.OutboxObject)).scalar_one()
assert outbox_object.ap_type == "Accept"
# follower was saved in follower table
follower_actor = db.execute(select(models.Follower)).scalar_one()
assert follower_actor.ap_actor_id == ap_id
# send undo follower request
with mock.patch("app.boxes.MANUALLY_APPROVES_FOLLOWERS", False):
undo_id = ap_id + "/undo/" + (uuid4().hex)
response = client.post(
"/inbox",
headers={"Content-Type": AP_CONTENT_TYPE},
json={
"@context": ap.AS_CTX,
"type": "Undo",
"id": undo_id,
"actor": ap_id,
"object": {
"type": "Follow",
"id": follow_id,
"actor": ap_id,
"object": ap.ME["id"],
},
},
)
assert response.status_code == 202
# follower was not saved in follower table
follower_actor = db.execute(select(models.Follower)).scalar_one_or_none()
assert follower_actor == None
def test_inbox_announce_activity(
db: Session,
client: TestClient,
respx_mock: respx.MockRouter,
) -> None:
# build test actor
ra = build_remote_actor()
ap_id = ra.ap_id # type: ignore
app.dependency_overrides[precheck.inbox_prechecker] = factories.inbox_prechecker
# mock request
respx_mock.get(ap_id).mock(return_value=httpx.Response(200,json=ra.ap_actor))
respx_mock.post(ap_id + "/inbox").mock(return_value=httpx.Response(202))
from app.models import now
from app.activitypub import VisibilityEnum
note_id = ap.ME["id"] + (uuid4().hex)
published = now().replace(microsecond=0).isoformat().replace("+00:00", "Z")
note_object = {
"@context": ap.AS_EXTENDED_CTX,
"type": "Note",
"id": note_id,
"attributedTo": ap.ME["id"],
"content": "<p>Test!</p>",
"to": [ap.AS_PUBLIC],
"cc": [],
"published": published,
# "context": context,
# "conversation": context,
"url": note_id,
"tag": [],
"summary": None,
"inReplyTo": None,
"sensitive": False,
"attachment": [],
}
outbox_object = models.OutboxObject(
public_id=note_id,
ap_object=note_object,
ap_id=note_id,
ap_type="Note",
visibility =VisibilityEnum.PUBLIC,
)
db.add(outbox_object)
db.commit()
# send announce request
response = client.post(
"/inbox",
headers={"Content-Type": AP_CONTENT_TYPE},
json={
"@context": ap.AS_CTX,
"type": "Announce",
"id": ap_id + "/announce/" + (uuid4().hex),
"actor": ap_id,
"object": note_id,
},
)
assert response.status_code == 202
saved_note = db.execute(select(models.OutboxObject)).scalar_one()
print(saved_note.ap_id)
assert saved_note.announces_count == 1
def test_inbox_like_activity(
db: Session,
client: TestClient,
respx_mock: respx.MockRouter,
) -> None:
# build test actor
ra = build_remote_actor()
ap_id = ra.ap_id # type: ignore
app.dependency_overrides[precheck.inbox_prechecker] = factories.inbox_prechecker
# mock request
respx_mock.get(ap_id).mock(return_value=httpx.Response(200,json=ra.ap_actor))
respx_mock.post(ap_id + "/inbox").mock(return_value=httpx.Response(202))
from app.models import now
from app.activitypub import VisibilityEnum
note_id = ap.ME["id"] + (uuid4().hex)
published = now().replace(microsecond=0).isoformat().replace("+00:00", "Z")
note_object = {
"@context": ap.AS_EXTENDED_CTX,
"type": "Note",
"id": note_id,
"attributedTo": ap.ME["id"],
"content": "<p>Test!</p>",
"to": [ap.AS_PUBLIC],
"cc": [],
"published": published,
# "context": context,
# "conversation": context,
"url": note_id,
"tag": [],
"summary": None,
"inReplyTo": None,
"sensitive": False,
"attachment": [],
}
outbox_object = models.OutboxObject(
public_id=note_id,
ap_object=note_object,
ap_id=note_id,
ap_type="Note",
visibility =VisibilityEnum.PUBLIC,
)
db.add(outbox_object)
db.commit()
# send announce request
response = client.post(
"/inbox",
headers={"Content-Type": AP_CONTENT_TYPE},
json={
"@context": ap.AS_CTX,
"type": "Like",
"id": ap_id + "/like/" + (uuid4().hex),
"actor": ap_id,
"object": note_id,
},
)
assert response.status_code == 202
saved_note = db.execute(select(models.OutboxObject)).scalar_one()
print(saved_note.ap_id)
assert saved_note.likes_count == 1