foxhole/tests/test_outbox.py
SouthFox a1f801c1d2
All checks were successful
ci/woodpecker/push/lint Pipeline was successful
ci/woodpecker/push/test Pipeline was successful
[test] add follow in/outbox object tests
2023-08-03 20:59:58 +08:00

160 lines
4.7 KiB
Python

#!/usr/bin/env python3
import pytest
import fastapi
import respx
import httpx
from uuid import uuid4
from tests import factories
from tests.utils import build_remote_actor
from unittest import mock
from app.main import app
from app.utils import precheck
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from app import activitypub as ap, httpsig
from app.config import AP_CONTENT_TYPE
from app import models
from sqlalchemy import select
@pytest.mark.asyncio
async def test_outbox_send_follow_request(
db: Session,
async_db_session,
client: TestClient,
respx_mock: respx.MockRouter,
) -> None:
# build test actor
ra = build_remote_actor()
remote_ap_id = ra.ap_id # type: ignore
# mock request
respx_mock.get(remote_ap_id).mock(
return_value=httpx.Response(200,json=ra.ap_actor))
respx_mock.post(remote_ap_id + "/inbox").mock(
return_value=httpx.Response(202))
from app.boxes import send_follow
async with async_db_session as db_session: #type: ignore
await send_follow(db_session, remote_ap_id)
# And the Follow activity was created in the outbox
outbox_object = db.execute(select(models.OutboxObject)).scalar_one()
assert outbox_object.ap_type == "Follow"
assert outbox_object.activity_object_ap_id == remote_ap_id
response = client.post(
"/inbox",
headers={"Content-Type": AP_CONTENT_TYPE},
json={
"@context": ap.AS_CTX,
"type": "Accept",
"id": "https://accept-test",
"actor": remote_ap_id,
"object": outbox_object.ap_id,
},
)
assert response.status_code == 202
# follower was saved in follower table
following_actor = db.execute(select(models.Following)).scalar_one()
assert following_actor.ap_actor_id == remote_ap_id
assert following_actor.outbox_object == outbox_object
inbox_object = db.execute(select(models.InboxObject)).scalar_one()
assert inbox_object.ap_type == "Accept"
@pytest.mark.asyncio
async def test_outbox_send_follow_request_nest_accept(
db: Session,
async_db_session,
client: TestClient,
respx_mock: respx.MockRouter,
) -> None:
# build test actor
ra = build_remote_actor()
remote_ap_id = ra.ap_id # type: ignore
# mock request
respx_mock.get(remote_ap_id).mock(
return_value=httpx.Response(200,json=ra.ap_actor))
respx_mock.post(remote_ap_id + "/inbox").mock(
return_value=httpx.Response(202))
from app.boxes import send_follow
async with async_db_session as db_session: #type: ignore
await send_follow(db_session, remote_ap_id)
# And the Follow activity was created in the outbox
outbox_object = db.execute(select(models.OutboxObject)).scalar_one()
assert outbox_object.ap_type == "Follow"
assert outbox_object.activity_object_ap_id == remote_ap_id
response = client.post(
"/inbox",
headers={"Content-Type": AP_CONTENT_TYPE},
json={
"@context": ap.AS_CTX,
"type": "Accept",
"id": "https://accept-test",
"actor": remote_ap_id,
"object": {
"id": outbox_object.ap_id,
"type": "Accept",
"actor": ap.ME["id"],
"object": remote_ap_id,
},
},
)
assert response.status_code == 202
# follower was saved in follower table
following_actor = db.execute(select(models.Following)).scalar_one()
assert following_actor.ap_actor_id == remote_ap_id
@pytest.mark.asyncio
async def test_outbox_send_create_activity(
db: Session,
async_db_session,
client: TestClient,
respx_mock: respx.MockRouter,
) -> None:
# build test actor
ra = build_remote_actor()
remote_ap_id = ra.ap_id # type: ignore
# mock request
respx_mock.get(remote_ap_id).mock(
return_value=httpx.Response(200,json=ra.ap_actor))
respx_mock.post(remote_ap_id + "/inbox").mock(
return_value=httpx.Response(202))
from app.boxes import _send_create
from app.activitypub import VisibilityEnum
from app.orgpython import to_html
async with async_db_session as db_session: #type: ignore
content = "*Blod Text* =code Text= \n"
content = to_html(content)
await _send_create(
db_session,
"Note",
content,
VisibilityEnum.PUBLIC
)
# And the Follow activity was created in the outbox
outbox_object = db.execute(select(models.OutboxObject)).scalar_one()
assert outbox_object.ap_type == "Note"
print(outbox_object.ap_object["content"])
assert outbox_object.ap_object["content"] == content