#!/usr/bin/env python3 import pytest import fastapi import respx import httpx from uuid import uuid4 from tests import factories from tests.utils import build_remote_actor from unittest import mock from app.main import app from app.utils import precheck from fastapi.testclient import TestClient from sqlalchemy.orm import Session from app import activitypub as ap, httpsig from app.config import AP_CONTENT_TYPE from app import models from sqlalchemy import select def test_inbox_follow_request( db: Session, client: TestClient, respx_mock: respx.MockRouter, ) -> None: # build test actor ra = build_remote_actor() ap_id = ra.ap_id # type: ignore # mock request respx_mock.get(ap_id).mock(return_value=httpx.Response(200,json=ra.ap_actor)) respx_mock.post(ap_id + "/inbox").mock(return_value=httpx.Response(202)) # send follower request with mock.patch("app.boxes.MANUALLY_APPROVES_FOLLOWERS", False): follow_id = ap_id + "/follow/" + (uuid4().hex) response = client.post( "/inbox", headers={"Content-Type": AP_CONTENT_TYPE}, json={ "@context": ap.AS_CTX, "type": "Follow", "id": follow_id, "actor": ap_id, "object": ap.ME["id"], }, ) assert response.status_code == 202 # actor was saved in actor table saved_actor = db.execute(select(models.Actor)).scalar_one() assert saved_actor.ap_id == ap_id # follower request was saved in outbox table outbox_object = db.execute(select(models.OutboxObject)).scalar_one() assert outbox_object.ap_type == "Accept" # follower was saved in follower table follower_actor = db.execute(select(models.Follower)).scalar_one() assert follower_actor.ap_actor_id == ap_id # send undo follower request with mock.patch("app.boxes.MANUALLY_APPROVES_FOLLOWERS", False): undo_id = ap_id + "/undo/" + (uuid4().hex) response = client.post( "/inbox", headers={"Content-Type": AP_CONTENT_TYPE}, json={ "@context": ap.AS_CTX, "type": "Undo", "id": undo_id, "actor": ap_id, "object": { "type": "Follow", "id": follow_id, "actor": ap_id, "object": ap.ME["id"], }, }, ) assert response.status_code == 202 # follower was not saved in follower table follower_actor = db.execute(select(models.Follower)).scalar_one_or_none() assert follower_actor == None def test_inbox_announce_activity( db: Session, client: TestClient, respx_mock: respx.MockRouter, ) -> None: # build test actor ra = build_remote_actor() ap_id = ra.ap_id # type: ignore app.dependency_overrides[precheck.inbox_prechecker] = factories.inbox_prechecker # mock request respx_mock.get(ap_id).mock(return_value=httpx.Response(200,json=ra.ap_actor)) respx_mock.post(ap_id + "/inbox").mock(return_value=httpx.Response(202)) from app.models import now from app.activitypub import VisibilityEnum note_id = ap.ME["id"] + (uuid4().hex) published = now().replace(microsecond=0).isoformat().replace("+00:00", "Z") note_object = { "@context": ap.AS_EXTENDED_CTX, "type": "Note", "id": note_id, "attributedTo": ap.ME["id"], "content": "
Test!
", "to": [ap.AS_PUBLIC], "cc": [], "published": published, # "context": context, # "conversation": context, "url": note_id, "tag": [], "summary": None, "inReplyTo": None, "sensitive": False, "attachment": [], } outbox_object = models.OutboxObject( public_id=note_id, ap_object=note_object, ap_id=note_id, ap_type="Note", visibility =VisibilityEnum.PUBLIC, ) db.add(outbox_object) db.commit() # send announce request response = client.post( "/inbox", headers={"Content-Type": AP_CONTENT_TYPE}, json={ "@context": ap.AS_CTX, "type": "Announce", "id": ap_id + "/announce/" + (uuid4().hex), "actor": ap_id, "object": note_id, }, ) assert response.status_code == 202 saved_note = db.execute(select(models.OutboxObject)).scalar_one() print(saved_note.ap_id) assert saved_note.announces_count == 1 def test_inbox_like_activity( db: Session, client: TestClient, respx_mock: respx.MockRouter, ) -> None: # build test actor ra = build_remote_actor() ap_id = ra.ap_id # type: ignore app.dependency_overrides[precheck.inbox_prechecker] = factories.inbox_prechecker # mock request respx_mock.get(ap_id).mock(return_value=httpx.Response(200,json=ra.ap_actor)) respx_mock.post(ap_id + "/inbox").mock(return_value=httpx.Response(202)) from app.models import now from app.activitypub import VisibilityEnum note_id = ap.ME["id"] + (uuid4().hex) published = now().replace(microsecond=0).isoformat().replace("+00:00", "Z") note_object = { "@context": ap.AS_EXTENDED_CTX, "type": "Note", "id": note_id, "attributedTo": ap.ME["id"], "content": "Test!
", "to": [ap.AS_PUBLIC], "cc": [], "published": published, # "context": context, # "conversation": context, "url": note_id, "tag": [], "summary": None, "inReplyTo": None, "sensitive": False, "attachment": [], } outbox_object = models.OutboxObject( public_id=note_id, ap_object=note_object, ap_id=note_id, ap_type="Note", visibility =VisibilityEnum.PUBLIC, ) db.add(outbox_object) db.commit() # send announce request response = client.post( "/inbox", headers={"Content-Type": AP_CONTENT_TYPE}, json={ "@context": ap.AS_CTX, "type": "Like", "id": ap_id + "/like/" + (uuid4().hex), "actor": ap_id, "object": note_id, }, ) assert response.status_code == 202 saved_note = db.execute(select(models.OutboxObject)).scalar_one() print(saved_note.ap_id) assert saved_note.likes_count == 1