foxhole/tests/test_outbox.py
SouthFox b11a3ce322
Some checks failed
ci/woodpecker/push/lint Pipeline was successful
ci/woodpecker/push/test Pipeline failed
[test] add test special config
2023-07-30 20:50:08 +08:00

85 lines
2.4 KiB
Python

#!/usr/bin/env python3
import pytest
import fastapi
import respx
import httpx
from uuid import uuid4
from tests import factories
from tests.utils import build_remote_actor
from unittest import mock
from app.main import app
from app.utils import precheck
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from app import activitypub as ap, httpsig
from app.config import AP_CONTENT_TYPE
from app import models
from sqlalchemy import select
@pytest.mark.asyncio
async def test_outbox_send_follow_request(
db: Session,
async_db_session,
client: TestClient,
respx_mock: respx.MockRouter,
) -> None:
# build test actor
ra = build_remote_actor()
remote_ap_id = ra.ap_id # type: ignore
# mock request
respx_mock.get(remote_ap_id).mock(
return_value=httpx.Response(200,json=ra.ap_actor))
respx_mock.post(remote_ap_id + "/inbox").mock(
return_value=httpx.Response(202))
from app.boxes import send_follow
async with async_db_session as db_session: #type: ignore
await send_follow(db_session, remote_ap_id)
# And the Follow activity was created in the outbox
outbox_object = db.execute(select(models.OutboxObject)).scalar_one()
assert outbox_object.ap_type == "Follow"
assert outbox_object.activity_object_ap_id == remote_ap_id
@pytest.mark.asyncio
async def test_outbox_send_create_activity(
db: Session,
async_db_session,
client: TestClient,
respx_mock: respx.MockRouter,
) -> None:
# build test actor
ra = build_remote_actor()
remote_ap_id = ra.ap_id # type: ignore
# mock request
respx_mock.get(remote_ap_id).mock(
return_value=httpx.Response(200,json=ra.ap_actor))
respx_mock.post(remote_ap_id + "/inbox").mock(
return_value=httpx.Response(202))
from app.boxes import _send_create
from app.activitypub import VisibilityEnum
from app.orgpython import to_html
async with async_db_session as db_session: #type: ignore
content = "*Blod Text* =code Text= \n"
content = to_html(content)
await _send_create(
db_session,
"Note",
content,
VisibilityEnum.PUBLIC
)
# And the Follow activity was created in the outbox
outbox_object = db.execute(select(models.OutboxObject)).scalar_one()
assert outbox_object.ap_type == "Note"
print(outbox_object.ap_object["content"])
assert outbox_object.ap_object["content"] == content