85 lines
2.5 KiB
Python
85 lines
2.5 KiB
Python
#!/usr/bin/env python3
|
|
import pytest
|
|
import fastapi
|
|
import respx
|
|
import httpx
|
|
from uuid import uuid4
|
|
from tests import factories
|
|
from tests.utils import build_remote_actor
|
|
|
|
from unittest import mock
|
|
from app.main import app
|
|
from app.utils import precheck
|
|
from fastapi.testclient import TestClient
|
|
from sqlalchemy.orm import Session
|
|
from app import activitypub as ap, httpsig
|
|
from app.config import AP_CONTENT_TYPE
|
|
from app import models
|
|
from sqlalchemy import select
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_outbox_send_follow_request(
|
|
db: Session,
|
|
client: TestClient,
|
|
respx_mock: respx.MockRouter,
|
|
) -> None:
|
|
# build test actor
|
|
ra = build_remote_actor()
|
|
remote_ap_id = ra.ap_id # type: ignore
|
|
|
|
# mock request
|
|
respx_mock.get(remote_ap_id).mock(
|
|
return_value=httpx.Response(200,json=ra.ap_actor))
|
|
respx_mock.post(remote_ap_id + "/inbox").mock(
|
|
return_value=httpx.Response(202))
|
|
|
|
from app.database import async_session
|
|
from app.boxes import send_follow
|
|
|
|
async with async_session() as db_session: #type: ignore
|
|
await send_follow(db_session, remote_ap_id)
|
|
|
|
# And the Follow activity was created in the outbox
|
|
outbox_object = db.execute(select(models.OutboxObject)).scalar_one()
|
|
assert outbox_object.ap_type == "Follow"
|
|
assert outbox_object.activity_object_ap_id == remote_ap_id
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_outbox_send_create_activity(
|
|
db: Session,
|
|
client: TestClient,
|
|
respx_mock: respx.MockRouter,
|
|
) -> None:
|
|
# build test actor
|
|
ra = build_remote_actor()
|
|
remote_ap_id = ra.ap_id # type: ignore
|
|
|
|
# mock request
|
|
respx_mock.get(remote_ap_id).mock(
|
|
return_value=httpx.Response(200,json=ra.ap_actor))
|
|
respx_mock.post(remote_ap_id + "/inbox").mock(
|
|
return_value=httpx.Response(202))
|
|
|
|
from app.database import async_session
|
|
from app.boxes import _send_create
|
|
from app.activitypub import VisibilityEnum
|
|
from app.orgpython import to_html
|
|
|
|
async with async_session() as db_session: #type: ignore
|
|
content = "*Blod Text* =code Text= \n"
|
|
content = to_html(content)
|
|
|
|
await _send_create(
|
|
db_session,
|
|
"Note",
|
|
content,
|
|
VisibilityEnum.PUBLIC
|
|
)
|
|
|
|
# And the Follow activity was created in the outbox
|
|
outbox_object = db.execute(select(models.OutboxObject)).scalar_one()
|
|
assert outbox_object.ap_type == "Note"
|
|
print(outbox_object.ap_object["content"])
|
|
assert outbox_object.ap_object["content"] == content
|