Compare commits

..

No commits in common. "280d74b4e93825ff1c2091393522c6fa9bfa2975" and "e32310da73c64575609e1b76e8f4b4ec8f8698b5" have entirely different histories.

14 changed files with 584 additions and 2547 deletions

View file

@ -75,32 +75,6 @@ ME = {
"tag": [] # TODO tag support
}
class BaseActor:
def __init__(self, ap_actor: RawObject) -> None:
if (ap_type := ap_actor.get("type")) not in ACTOR_TYPES:
raise ValueError(f"Unexpected actor type: {ap_type}")
self._ap_actor = ap_actor
self._ap_type : str = ap_type # type: ignore
@property
def ap_actor(self) -> RawObject:
return self._ap_actor
@property
def inbox_url(self) -> str:
return self.ap_actor["inbox"]
@property
def ap_type(self) -> str:
return self._ap_type
@property
def share_inbox_url(self) -> str:
return self.ap_actor.get("endpoints", {}).get("sharedInbox") \
or self.inbox_url
class VisibilityEnum(str, enum.Enum):
PUBLIC = "public"
UNLISTED = "unlisted"
@ -121,26 +95,6 @@ def handle_visibility(
return VisibilityEnum.DIRECT
def wrap_ap_object(ap_object: dict) -> dict:
if ap_object["type"] in ["Note"]:
if "@context" in ap_object:
del ap_object["@context"]
return {
"@context": AS_EXTENDED_CTX,
"actor": config.ID,
"to": ap_object.get("to", []),
"cc": ap_object.get("cc", []),
"id": ap_object["id"] + "/activity",
"object": ap_object,
"published": ap_object["published"],
"type": "Create",
}
return ap_object
async def post(
url: str,
payload : dict,
@ -158,7 +112,6 @@ async def post(
auth=auth,
)
resp.raise_for_status()
return resp

View file

@ -1,6 +1,5 @@
#!/usr/bin/env python3
import typing
import json
from loguru import logger
from app.database import AsyncSession
from app import models
@ -13,6 +12,26 @@ if typing.TYPE_CHECKING:
import app.activitypub as ap
class BaseActor:
def __init__(self, ap_actor: ap.RawObject) -> None:
if (ap_type := ap_actor.get("type")) not in ap.ACTOR_TYPES:
raise ValueError(f"Unexpected actor type: {ap_type}")
self._ap_actor = ap_actor
self._ap_type : str = ap_type # type: ignore
@property
def ap_actor(self) -> ap.RawObject:
return self._ap_actor
@property
def inbox_url(self) -> str:
return self.ap_actor["inbox"]
@property
def ap_type(self) -> str:
return self._ap_type
async def fetch_actor(
db_session : AsyncSession,
@ -30,8 +49,8 @@ async def fetch_actor(
ap_object = await ap.fetch(actor_id)
exist_actor = await save_actor(ap_object, db_session)
return exist_actor
else:
return exist_actor
return exist_actor
async def save_actor(
ap_object : dict,
@ -60,18 +79,3 @@ def _handle (
handle = '@' + ap_object["preferredUsername"] + '@' + ap_id.hostname
return handle
async def get_public_key(
db_session: AsyncSession,
key_id: str
) -> str:
existing_actor = (
await db_session.scalars(
select(models.Actor).where(models.Actor.ap_id == key_id.split("#")[0])
)
).one_or_none()
public_key = existing_actor.ap_object["publicKey"]["publicKeyPem"]
return public_key

View file

@ -5,16 +5,14 @@ import uuid
from sqlalchemy.orm import session
from app import models
from app import ldsig
from app.database import AsyncSession
from app.models import InboxObject, OutboxObject, now
from app.activitypub import ME
from app.activitypub import handle_visibility
from app.config import MANUALLY_APPROVES_FOLLOWERS
from app.config import BASE_URL, ID
from app.config import BASE_URL
from app.models import Actor
from app.actor import fetch_actor
from app.httpsig import k
import app.activitypub as ap
@ -25,7 +23,6 @@ from sqlalchemy.orm import joinedload
from sqlalchemy.exc import IntegrityError
from loguru import logger
from uuid import uuid4
from datetime import datetime
@ -136,8 +133,6 @@ async def process_incoming(
await db_session.flush()
await db_session.refresh(following)
return True
# elif "Creat" == ap_object["type"]:
return False
@ -178,21 +173,18 @@ async def _send_accept(
await db_session.rollback()
logger.warning("existing follower in db!")
try:
reply_id = allocate_outbox_id()
reply_id = allocate_outbox_id()
url = actor.inbox_url # type: ignore
out = {
"@context": ap.AS_CTX,
"id": build_object_id(reply_id),
"type": "Accept",
"actor": ME["id"],
"object": inbox_object.ap_object["id"], #type: ignore
}
#TODO outcoming
await ap.post(url, out) # type: ignore
except Exception as e:
logger.error(e)
url = actor.inbox_url # type: ignore
out = {
"@context": ap.AS_CTX,
"id": build_object_id(reply_id),
"type": "Accept",
"actor": ME["id"],
"object": inbox_object.ap_object["id"], #type: ignore
}
#TODO outcoming
await ap.post(url, out) # type: ignore
async def _handle_undo(
@ -268,112 +260,6 @@ async def _send_follow(
)
async def _send_create(
db_session: AsyncSession,
ap_type: str,
content: str,
visibility: ap.VisibilityEnum,
published: str | None = None,
) -> bool:
object_id = build_object_id(allocate_outbox_id())
if not published:
published = now().replace(microsecond=0).isoformat().replace("+00:00", "Z")
to = []
cc = []
if visibility == ap.VisibilityEnum.PUBLIC:
to = [ap.AS_PUBLIC]
cc = [f"{BASE_URL}/followers"]
else:
raise ValueError(f"Unsupport visibility {visibility}")
ap_object = {
"@context": ap.AS_EXTENDED_CTX,
"type": ap_type,
"id": object_id,
"attributedTo": ID,
"content": content,
"to": to,
"cc": cc,
"published": published,
# "context": context,
# "conversation": context,
"url": object_id,
"tag": [],
"summary": None,
"inReplyTo": None,
"sensitive": False,
"attachment": [],
}
outbox_object = await save_to_outbox(
db_session,
object_id,
ap_object,
)
recipients = await _compute_recipients(db_session, ap_object)
ap_object = ap.wrap_ap_object(ap_object)
if ap_object["type"] == "Create":
if ap.VisibilityEnum.PUBLIC == outbox_object.visibility:
ldsig.generate_signature(ap_object, k)
for r in recipients:
await ap.post(
r,
ap_object,
)
return True
async def _compute_recipients(
db_session: AsyncSession,
ap_object: dict,
) -> set[str]:
async def process_collection(
db_session,
url) -> list[Actor]:
if url == BASE_URL + "/followers":
followers = (
(
await db_session.scalars(
select(models.Follower).options(
joinedload(models.Follower.actor)
)
)
)
.unique()
.all()
)
else:
raise ValueError(f"{url}) not supported")
return [follower.actor for follower in followers]
_recipients = []
for field in ["to", "cc", "bcc", "bto"]:
if field in ap_object:
_recipients.extend(ap_object[field])
recipients = set()
logger.info(f"{_recipients}")
for r in _recipients:
if r in [ap.AS_PUBLIC, ID]:
continue
if r.startswith(BASE_URL):
for actor in await process_collection(db_session, r):
recipients.add(actor.share_inbox_url)
continue
return recipients
async def save_to_inbox(
db_session : AsyncSession,
inbox_id : str,

View file

@ -1,96 +0,0 @@
#!/usr/bin/env python3
import base64
import hashlib
import typing
from datetime import datetime
import pyld # type: ignore
from Crypto.Hash import SHA256
from Crypto.Signature import PKCS1_v1_5
from Crypto.PublicKey import RSA
from loguru import logger
from pyld import jsonld # type: ignore
from app import activitypub as ap
from app.database import AsyncSession
from app.actor import get_public_key
requests_loader = pyld.documentloader.requests.requests_document_loader()
def _loader(url, options={}):
# See https://github.com/digitalbazaar/pyld/issues/133
options["headers"]["Accept"] = "application/ld+json"
if url == "https://w3id.org/identity/v1":
url = (
"https://raw.githubusercontent.com/web-payments/web-payments.org"
"/master/contexts/identity-v1.jsonld"
)
return requests_loader(url, options)
pyld.jsonld.set_document_loader(_loader)
def _options_hash(doc: ap.RawObject) -> str:
doc = dict(doc["signature"])
for k in ["type", "id", "signatureValue"]:
if k in doc:
del doc[k]
doc["@context"] = "https://w3id.org/security/v1"
normalized = jsonld.normalize(
doc, {"algorithm": "URDNA2015", "format": "application/nquads"}
)
h = hashlib.new("sha256")
h.update(normalized.encode("utf-8"))
return h.hexdigest()
def _doc_hash(doc: ap.RawObject) -> str:
doc = dict(doc)
if "signature" in doc:
del doc["signature"]
normalized = jsonld.normalize(
doc, {"algorithm": "URDNA2015", "format": "application/nquads"}
)
h = hashlib.new("sha256")
h.update(normalized.encode("utf-8"))
return h.hexdigest()
async def verify_signature(
db_session: AsyncSession,
doc: ap.RawObject,
) -> bool:
if "signature" not in doc:
logger.warning("The object does contain a signature")
return False
key_id = doc["signature"]["creator"]
key = await get_public_key(db_session, key_id)
to_be_signed = _options_hash(doc) + _doc_hash(doc)
signature = doc["signature"]["signatureValue"]
pubkey = RSA.importKey(key)
signer = PKCS1_v1_5.new(pubkey)
digest = SHA256.new()
digest.update(to_be_signed.encode("utf-8"))
return signer.verify(digest, base64.b64decode(signature)) # type: ignore
def generate_signature(doc: ap.RawObject, key) -> None:
options = {
"type": "RsaSignature2017",
"creator": doc["actor"] + "#main-key",
"created": datetime.utcnow().replace(microsecond=0).isoformat() + "Z",
}
doc["signature"] = options
to_be_signed = _options_hash(doc) + _doc_hash(doc)
signer = PKCS1_v1_5.new(key)
digest = SHA256.new()
digest.update(to_be_signed.encode("utf-8"))
sig = base64.b64encode(signer.sign(digest)) # type: ignore
options["signatureValue"] = sig.decode("utf-8")

View file

@ -7,7 +7,7 @@ from typing import Union
from app import activitypub as ap
from app.database import Base
from app.database import metadata_obj
from app.activitypub import BaseActor
from app.actor import BaseActor
from sqlalchemy import Column
from sqlalchemy import Boolean

View file

@ -1,29 +0,0 @@
BSD 3-Clause License
Copyright (c) 2017-2020, honmaple
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View file

@ -1,25 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# ********************************************************************************
# Copyright © 2017-2020 jianglin
# File Name: __init__.py
# Author: jianglin
# Email: mail@honmaple.com
# Created: 2019-05-29 18:06:22 (CST)
# Last Update: Sunday 2020-08-16 19:45:09 (CST)
# By:
# Description:
# ********************************************************************************
from .document import Document
def to_text(content, **kwargs):
return Document(content, **kwargs).to_text()
def to_html(content, **kwargs):
return Document(content, **kwargs).to_html()
def to_markdown(content, **kwargs):
return Document(content, **kwargs).to_markdown()

View file

@ -1,879 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# ********************************************************************************
# Copyright © 2017-2020 jianglin
# File Name: document.py
# Author: jianglin
# Email: mail@honmaple.com
# Created: 2018-02-26 11:44:43 (CST)
# Last Update: Wednesday 2020-08-19 12:00:03 (CST)
# Description:
# ********************************************************************************
import re
from hashlib import sha1
from textwrap import dedent
from .inline import Blankline, Hr, InlineText
from .src import highlight as src_highlight
DRAWER_BEGIN_REGEXP = re.compile(r"^(\s*):(\S+):\s*$")
DRAWER_END_REGEXP = re.compile(r"^(\s*):END:\s*$")
DRAWER_PROPERTY_REGEXP = re.compile(r"^(\s*):(\S+):(\s+(.*)$|$)")
BLOCK_BEGIN_REGEXP = re.compile(r"(?i)^(\s*)#\+BEGIN_(\w+)(.*)")
BLOCK_END_REGEXP = re.compile(r"(?i)^(\s*)#\+END_(\w+)")
BLOCK_RESULT_REGEXP = re.compile(r"(?i)^(\s*)#\+RESULTS:")
BLOCK_RESULT_CONTENT_REGEXP = re.compile(r"(?:^|\s+):(\s+(.*)|$)")
TABLE_SEP_REGEXP = re.compile(r"^(\s*)(\|[+-|]*)\s*$")
TABLE_ROW_REGEXP = re.compile(r"^(\s*)(\|.*)")
TABLE_ALIGN_REGEXP = re.compile(r"^<(l|c|r)>$")
LIST_DESCRIPTIVE_REGEXP = re.compile(r"^(\s*)([+*-])\s+(.*)::(\s|$)")
LIST_UNORDER_REGEXP = re.compile(r"^(\s*)([+*-])(\s+(.*)|$)")
LIST_ORDER_REGEXP = re.compile(r"^(\s*)(([0-9]+|[a-zA-Z])[.)])(\s+(.*)|$)")
LIST_STATUS_REGEXP = re.compile(r"\[( |X|-)\]\s")
LIST_LEVEL_REGEXP = re.compile(r"(\s*)(.+)$")
HEADLINE_REGEXP = re.compile(
r"^(\*+)(?:\s+(.+?))?(?:\s+\[#(.+)\])?(\s+.*?)(?:\s+:(.+):)?$")
KEYWORD_REGEXP = re.compile(r"^(\s*)#\+([^:]+):(\s+(.*)|$)")
COMMENT_REGEXP = re.compile(r"^(\s*)#(.*)")
ATTRIBUTE_REGEXP = re.compile(r"(?:^|\s+)(:[-\w]+)\s+(.*)$")
TODO_KEYWORDS = ("DONE", "TODO")
def string_split(s, sep):
if not s:
return []
return s.split(sep)
class Parser(object):
def __init__(self, content=""):
self.lines = content.splitlines()
self.level = 0
self.element = ""
self.children = []
self.escape = True
self.needparse = True
self.parsed_nodes = (
"blankline",
"headline",
"table",
"list",
"drawer",
"block",
"block_result",
"keyword",
"hr",
)
def first_child(self):
if len(self.children) == 0:
return
return self.children[0]
def last_child(self):
if len(self.children) == 0:
return
return self.children[-1]
def add_child(self, node):
last = self.last_child()
if self.is_headline(last):
if self.is_properties(node):
last.properties = node
return
if not self.is_headline(node):
last.add_child(node)
return
if self.is_headline(node) and node.stars > last.stars:
last.add_child(node)
return
if self.is_table(last):
if self.is_table(node):
last.add_child(node)
return
if self.is_list(last):
if self.is_blankline(node):
last.add_child(node)
return
if node.level > last.level:
last.add_child(node)
return
if self.is_list(node) and node.level == last.level:
last.add_child(node)
return
if self.is_keyword(last):
if self.is_table(node):
node.keyword = last
if self.is_paragraph(last):
if self.is_inlinetext(node):
last.add_child(node)
return
if self.is_inlinetext(node):
self.children.append(self.paragraph(node))
return
self.children.append(node)
def is_keyword(self, child):
return child and isinstance(child, Keyword)
def is_headline(self, child):
return child and isinstance(child, Headline)
def is_list(self, child):
return child and isinstance(child, List)
def is_table(self, child):
return child and isinstance(child, Table)
def is_src(self, child):
return child and isinstance(child, (Src, Example))
def is_inlinetext(self, child):
return child and isinstance(child, InlineText)
def is_blankline(self, child):
return child and isinstance(child, Blankline)
def is_paragraph(self, child):
return child and isinstance(child, Paragraph)
def is_properties(self, child):
return child and isinstance(child, Properties)
def inlinetext(self, text):
return InlineText(text, self.needparse, self.escape)
def paragraph(self, node):
n = Paragraph()
n.add_child(node)
return n
def _parse_paired(self, cls, index, lines):
node = cls.match(lines[index])
if not node:
return None, index
end = len(lines)
num = index + 1
while num < end:
if node.matchend(num, lines):
node.preparse(lines[index + 1:num])
return node, num
num += 1
return None, index
def _parse_nopaired(self, cls, index, lines):
node = cls.match(lines[index])
if not node:
return None, index
end = len(lines)
num = index + 1
while num < end:
if node.matchend(num, lines):
break
num += 1
node.preparse(lines[index + 1:num])
return node, num
def parse_headline(self, index, lines):
return Headline.match(lines[index]), index
def parse_list(self, index, lines):
return List.match(lines[index]), index
def parse_table(self, index, lines):
return self._parse_nopaired(Table, index, lines)
def parse_drawer(self, index, lines):
return self._parse_paired(Drawer, index, lines)
def parse_block(self, index, lines):
return self._parse_paired(Block, index, lines)
def parse_block_result(self, index, lines):
return self._parse_paired(BlockResult, index, lines)
def parse_blankline(self, index, lines):
return Blankline.match(lines[index]), index
def parse_keyword(self, index, lines):
return Keyword.match(lines[index]), index
def parse_hr(self, index, lines):
return Hr.match(lines[index]), index
def parse_inlinetext(self, index, lines):
return self.inlinetext(lines[index]), index
def parse(self, index, lines):
for b in self.parsed_nodes:
func = "parse_" + b
if not hasattr(self, func):
continue
block, num = getattr(self, func)(index, lines)
if not block:
continue
return block, num
return self.parse_inlinetext(index, lines)
def preparse(self, lines):
index = 0
while index < len(lines):
line = lines[index]
node, index = self.parse(index, lines)
if node:
node.level = len(line) - len(line.strip())
self.add_child(node)
index += 1
def to_html(self):
if len(self.children) == 0 and len(self.lines) > 0:
self.preparse(self.lines)
children = []
for child in self.children:
content = child.to_html()
if not content:
continue
children.append(content)
text = "\n".join(children)
if self.element:
return self.element.format(text)
return text
def __str__(self):
str_children = [str(child) for child in self.children]
return self.__class__.__name__ + '(' + ','.join(str_children) + ')'
def __repr__(self):
return self.__str__()
class Headline(Parser):
def __init__(
self,
title,
stars=1,
keyword=None,
priority=None,
tags=[],
todo_keywords=TODO_KEYWORDS):
super(Headline, self).__init__()
self.title = title
self.stars = stars
self.keyword = keyword
self.priority = priority
self.tags = tags
self.properties = None
self.todo_keywords = todo_keywords
@classmethod
def match(cls, line):
match = HEADLINE_REGEXP.match(line)
if not match:
return
stars = len(match[1])
keyword = match[2] or ""
priority = match[3] or ""
if keyword and not priority:
if len(keyword) >= 4 and keyword[0:2] == "[#":
priority = keyword[2:-1]
keyword = ""
title = keyword + match[4]
keyword = ""
return cls(
title,
stars,
keyword,
priority,
string_split(match[5], ":"),
)
def id(self):
hid = 'org-{0}'.format(sha1(self.title.encode()).hexdigest()[:10])
if self.properties:
return self.properties.get("CUSTOM_ID", hid)
return hid
def toc(self):
b = ""
if self.keyword:
b = b + "<span class=\"todo\">{0}</span>".format(self.keyword)
if self.priority:
b = b + "<span class=\"priority\">{0}</span>".format(self.priority)
b = b + self.inlinetext(self.title).to_html()
for tag in self.tags:
b = b + "<span class=\"tag\">{0}</span>".format(tag)
return b.strip()
def to_html(self):
b = "<h{0} id=\"{1}\">{2}</h{0}>".format(
self.stars,
self.id(),
self.toc(),
)
return b + super(Headline, self).to_html()
class Drawer(Parser):
def __init__(self, name):
super(Drawer, self).__init__()
self.name = name
@classmethod
def match(cls, line):
match = DRAWER_BEGIN_REGEXP.match(line)
if not match:
return
name = match[2]
if name.upper() == "PROPERTIES":
return Properties(name)
return Drawer(name)
def matchend(self, index, lines):
return DRAWER_END_REGEXP.match(lines[index])
def to_html(self):
return ""
class Properties(Drawer):
def __init__(self, name):
super(Properties, self).__init__(name)
self.properties = {}
def parse(self, index, lines):
match = DRAWER_PROPERTY_REGEXP.match(lines[index])
if match:
self.properties[match[2].upper()] = match[4]
return None, index
def get(self, key, default=None):
return self.properties.get(key, default)
def to_html(self):
return ""
class Block(Parser):
def __init__(self, name, params=""):
super(Block, self).__init__()
self.name = name
self.params = params
@classmethod
def match(cls, line):
match = BLOCK_BEGIN_REGEXP.match(line)
if not match:
return
name = match[2].lower()
if name == "src":
return Src(*match[3].strip().split(" ", 1))
if name == "example":
return Example(match[3])
if name == "center":
return Center(match[3])
if name == "verse":
return Verse(match[3])
if name == "quote":
return Quote(match[3])
if name == "export":
return Export(*match[3].strip().split(" ", 1))
return cls(name, match[3])
def matchend(self, index, lines):
match = BLOCK_END_REGEXP.match(lines[index])
return match and match[2].lower() == self.name
class Center(Block):
def __init__(self, params=""):
super(Center, self).__init__("center", params)
self.element = "<div style=\"text-align: center;\">\n{0}\n</div>"
class Verse(Block):
def __init__(self, params=""):
super(Verse, self).__init__("verse", params)
self.element = "<p class=\"verse\">\n{0}\n</p>"
def add_child(self, node):
self.children.append(node)
def to_html(self):
children = [child.to_html() for child in self.children]
return self.element.format("<br />".join(children))
class Quote(Block):
def __init__(self, params=""):
super(Quote, self).__init__("quote", params)
self.element = "<blockquote>\n{0}\n</blockquote>"
class Export(Block):
def __init__(self, language="", params=""):
super(Export, self).__init__("export", params)
self.language = language
self.escape = self.language.upper() != "HTML"
self.parsed_nodes = ()
def to_html(self):
if not self.escape:
return super(Export, self).to_html()
return ""
class Src(Block):
def __init__(self, language="", params="", highlight=False):
super(Src, self).__init__("src", params)
self.language = language
self.highlight_code = highlight
self.element = "<pre class=\"src src-{0}\">\n{1}\n</pre>"
self.needparse = False
self.escape = False
self.parsed_nodes = ()
def add_child(self, node):
self.children.append(node)
def highlight(self, language, text):
return src_highlight(language, text)
def to_html(self):
text = "\n".join([child.to_html() for child in self.children])
if self.highlight_code:
return self.highlight(self.language, dedent(text))
if not self.language:
return "<pre>\n{0}\n</pre>".format(dedent(text))
return self.element.format(self.language, dedent(text))
class Example(Src):
def __init__(self, params="", highlight=False):
super(Example, self).__init__("example", params, highlight)
self.name = "example"
class BlockResult(Parser):
def __init__(self):
super(BlockResult, self).__init__()
self.element = "<pre class=\"example\">\n{0}\n</pre>"
@classmethod
def match(cls, line):
match = BLOCK_RESULT_REGEXP.match(line)
if not match:
return
return cls()
def matchend(self, index, lines):
return not BLOCK_RESULT_CONTENT_REGEXP.match(lines[index])
def parse(self, index, lines):
match = BLOCK_RESULT_CONTENT_REGEXP.match(lines[index])
return self.inlinetext(match[2]), index
class ListItem(Parser):
def __init__(self, status=None, checkbox="HTML"):
super(ListItem, self).__init__()
self.status = status
self.checkbox = checkbox
self.element = "<li>\n{0}\n</li>"
@classmethod
def match(cls, line):
status = None
content = line
status_match = LIST_STATUS_REGEXP.match(line)
if status_match:
status, content = status_match[1], content[len("[ ] "):]
node = cls(status)
node.add_child(node.inlinetext(content))
return node
def set_status(self):
if not self.checkbox:
return
if self.checkbox == "HTML":
if self.status == "X":
node = self.inlinetext(
'<input type="checkbox" checked="checked" />')
else:
node = self.inlinetext('<input type="checkbox" />')
node.needparse = False
node.escape = False
else:
node = self.inlinetext("=[{0}]=".format(self.status))
if not self.children:
self.children.append(node)
return
self.children[0].children = [node] + self.children[0].children
def to_html(self):
if self.status is not None:
self.set_status()
return super(ListItem, self).to_html()
class DescriptiveItem(ListItem):
def __init__(self, title="", status=""):
super(DescriptiveItem, self).__init__(title, status)
self.element = "<dt>\n{0}\n</dt>"
class List(Parser):
def __init__(self, items=[]):
super(List, self).__init__()
self.children = items
@classmethod
def match(cls, line):
match = UnorderList.match(line)
if match:
return match
match = OrderList.match(line)
if match:
return match
return Descriptive.match(line)
def add_child(self, node):
if self.is_list(node) and node.level == self.level:
self.children.append(node.children[0])
return
last = self.last_child()
last.add_child(node)
class Descriptive(List):
def __init__(self, items=[]):
super(Descriptive, self).__init__(items)
self.element = "<dd>\n{0}\n</dd>"
@classmethod
def match(cls, line):
match = LIST_DESCRIPTIVE_REGEXP.match(line)
if not match:
return
title = DescriptiveItem.match(match[3])
return cls([title])
class UnorderList(List):
def __init__(self, items=[]):
super(UnorderList, self).__init__(items)
self.element = "<ul>\n{0}\n</ul>"
@classmethod
def match(cls, line):
match = LIST_UNORDER_REGEXP.match(line)
if not match:
return
title = ListItem.match(match[4])
return cls([title])
class OrderList(List):
def __init__(self, items=[]):
super(OrderList, self).__init__(items)
self.element = "<ol>\n{0}\n</ol>"
@classmethod
def match(cls, line):
match = LIST_ORDER_REGEXP.match(line)
if not match:
return
title = ListItem.match(match[4])
return cls([title])
class TableColumn(Parser):
def __init__(self, content="", header=False):
super(TableColumn, self).__init__(content)
self.header = header
self.parsed_nodes = ()
def add_child(self, child):
self.children.append(child)
def reset(self):
self.header = True
def to_html(self):
self.element = "<th>{0}</th>" if self.header else "<td>{0}</td>"
return super(TableColumn, self).to_html()
class TableRow(Parser):
def __init__(self, header=False):
super(TableRow, self).__init__()
self.is_sep = False
self.header = header
self.element = "<tr>\n{0}\n</tr>"
self.parsed_nodes = ("tablecolumn", )
@classmethod
def match(cls, line):
match = TABLE_ROW_REGEXP.match(line)
if not match:
return
row = cls()
row.is_sep = bool(TABLE_SEP_REGEXP.match(line))
row.preparse(match[2].strip("|").split("|"))
return row
def add_child(self, child):
self.children.append(child)
def parse_tablecolumn(self, index, lines):
return TableColumn(lines[index].strip(), self.header), index
def reset(self):
self.header = True
for column in self.children:
column.reset()
class Table(Parser):
def __init__(self, keyword=None):
super(Table, self).__init__()
self.element = "<table>\n{0}\n</table>"
self.keyword = keyword
self.parsed_nodes = ("tablerow", )
@classmethod
def match(cls, line):
row = TableRow.match(line)
if not row:
return
table = cls()
if row.is_sep:
return table
table.add_child(row)
return table
def matchend(self, index, lines):
return not TABLE_ROW_REGEXP.match(lines[index])
def reset(self):
first = self.first_child()
if first and first.header:
return
for row in self.children:
row.reset()
def add_child(self, child):
if child.is_sep:
return self.reset()
self.children.append(child)
def parse_tablerow(self, index, lines):
return TableRow.match(lines[index]), index
class Keyword(Parser):
def __init__(self, key, value=""):
super(Keyword, self).__init__()
self.key = key
self.value = value
def options(self):
results = {}
for line in self.value.split(" "):
if not line:
continue
m = line.split(":", 1)
k = m[0]
if not k:
continue
results[k] = "" if len(m) == 1 else m[1]
return results
def properties(self):
results = {}
line = self.value.strip()
if not line:
return results
m = line.split(" ", 1)
k = m[0]
if not k:
return results
results[k] = "" if len(m) == 1 else m[1]
return results
@classmethod
def match(cls, line):
match = KEYWORD_REGEXP.match(line)
if not match:
return
return cls(match[2], match[4])
def to_html(self):
return ""
class Paragraph(Parser):
def __init__(self, content=""):
super(Paragraph, self).__init__(content)
self.element = "<p>\n{0}\n</p>"
self.parsed_nodes = ()
def add_child(self, node):
self.children.append(node)
class Section(Parser):
def __init__(self, headline):
super(Section, self).__init__()
self.headline = headline
@property
def stars(self):
return self.headline.stars
def add_child(self, node):
last = self.last_child()
if not last:
self.children.append(node)
return
if node.stars > last.stars:
last.add_child(node)
return
self.children.append(node)
def to_html(self):
text = "<li>"
text += "<a href=\"#{0}\">{1}</a>".format(
self.headline.id(),
self.headline.toc(),
)
if not self.children:
return text + "</li>"
text += "\n<ul>\n{0}\n</ul>\n</li>".format(
"\n".join([child.to_html() for child in self.children]))
return text
class Toc(Parser):
def __init__(self):
super(Toc, self).__init__()
self.element = (
'<div id="table-of-contents">'
'<h2>Table of Contents</h2>'
'<div id="text-table-of-contents">'
'\n<ul>\n{0}\n</ul>\n</div></div>')
def add_child(self, node):
last = self.last_child()
if not last:
self.children.append(node)
return
if node.stars > last.stars:
last.add_child(node)
return
if node.stars < last.stars:
last.add_child(node)
return
self.children.append(node)
def to_html(self):
if not self.children:
return ""
return super(Toc, self).to_html()
class Document(Parser):
def __init__(self, content, offset=0, highlight=False, **options):
super(Document, self).__init__(content)
self.offset = offset
self.highlight = highlight
self.options = options
self.properties = {}
self.toc = Toc()
def _is_true(self, value):
return value in ("true", "t", "1", True, 1)
def section(self, node):
return Section(node)
def parse_keyword(self, index, lines):
block, index = super(Document, self).parse_keyword(index, lines)
if not block:
return block, index
if block.key == "OPTIONS":
self.options.update(**block.options())
elif block.key == "PROPERTY":
self.properties.update(**block.properties())
else:
self.properties[block.key] = block.value
return block, index
def parse_headline(self, index, lines):
block, index = super(Document, self).parse_headline(index, lines)
if not block:
return block, index
block.stars = block.stars + self.offset
todo_keywords = self.properties.get("TODO")
if todo_keywords:
block.todo_keywords = todo_keywords.split(" ")
s = block.title.split(" ", 1)
if len(s) > 1 and s[0] in block.todo_keywords:
block.keyword = s[0]
block.title = s[1]
self.toc.add_child(self.section(block))
return block, index
def parse_block(self, index, lines):
block, index = super(Document, self).parse_block(index, lines)
if not block:
return block, index
if self.is_src(block):
block.highlight_code = self.highlight
return block, index
def to_html(self):
text = super(Document, self).to_html()
if self._is_true(self.options.get("toc")):
return self.toc.to_html() + "\n" + text
return text

View file

@ -1,432 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# ********************************************************************************
# Copyright © 2017-2020 jianglin
# File Name: inline.py
# Author: jianglin
# Email: mail@honmaple.com
# Created: 2018-02-26 11:41:22 (CST)
# Last Update: Tuesday 2020-08-18 17:21:40 (CST)
# By:
# Description:
# ********************************************************************************
import re
import os
# _inline_regexp = r"(^|.*?(?<![/\\])){0}(.+?(?<![/\\])){0}(.*?|$)"
_inline_regexp = r"(^|.*?(?<![/\\])){0}(.+?(?<![/\\])){0}(.*?|$)"
BOLD_REGEXP = re.compile(_inline_regexp.format('\\*'))
CODE_REGEXP = re.compile(_inline_regexp.format('(?:\\=|`)'))
ITALIC_REGEXP = re.compile(_inline_regexp.format('(?:\\*\\*|\\/)'))
DELETE_REGEXP = re.compile(_inline_regexp.format('\\+'))
VERBATIM_REGEXP = re.compile(_inline_regexp.format('~'))
UNDERLINE_REGEXP = re.compile(_inline_regexp.format('_'))
PERCENT_REGEXP = re.compile(r"\[(\d+/\d+|\d+%)\]")
HR_REGEXP = re.compile(r"^\s*\-{5,}\s*")
FN_REGEXP = re.compile(r"(^|.*?(?<![/\\]))(\[fn:(.+?)\])(.*?|$)")
IMG_REGEXP = re.compile(r"^[.](png|gif|jpe?g|svg|tiff?)$")
LINK_REGEXP = re.compile(r'\[\[(.+?)\](?:\[(.+?)\])?\]')
VIDEO_REGEXP = re.compile(r"^[.](webm|mp4)$")
NEWLINE_REGEXP = re.compile(r"(^|.*?(?<![/\\]))(\\\\(\s*)$)")
BLANKLINE_REGEXP = re.compile(r"^(\s*)$")
TIMESTAMP_REGEXP = re.compile(
r"^<(\d{4}-\d{2}-\d{2})( [A-Za-z]+)?( \d{2}:\d{2})?( \+\d+[dwmy])?>")
_html_escape = (
("&", "&amp;"),
("'", "&#39;"),
("<", "&lt;"),
(">", "&gt;"),
("\"", "&#34;"),
)
# https://github.com/tsroten/zhon/blob/develop/zhon/hanzi.py
_chinese_non_stops = (
# Fullwidth ASCII variants
'\uFF02\uFF03\uFF04\uFF05\uFF06\uFF07\uFF08\uFF09\uFF0A\uFF0B\uFF0C\uFF0D'
'\uFF0F\uFF1A\uFF1B\uFF1C\uFF1D\uFF1E\uFF20\uFF3B\uFF3C\uFF3D\uFF3E\uFF3F'
'\uFF40\uFF5B\uFF5C\uFF5D\uFF5E\uFF5F\uFF60'
# Halfwidth CJK punctuation
'\uFF62\uFF63\uFF64'
# CJK symbols and punctuation
'\u3000\u3001\u3003'
# CJK angle and corner brackets
'\u3008\u3009\u300A\u300B\u300C\u300D\u300E\u300F\u3010\u3011'
# CJK brackets and symbols/punctuation
'\u3014\u3015\u3016\u3017\u3018\u3019\u301A\u301B\u301C\u301D\u301E\u301F'
# Other CJK symbols
'\u3030'
# Special CJK indicators
'\u303E\u303F'
# Dashes
'\u2013\u2014'
# Quotation marks and apostrophe
'\u2018\u2019\u201B\u201C\u201D\u201E\u201F'
# General punctuation
'\u2026\u2027'
# Overscores and underscores
'\uFE4F'
# Small form variants
'\uFE51\uFE54'
# Latin punctuation
'\u00B7')
_chinese_stops = (
'\uFF01' # Fullwidth exclamation mark
'\uFF1F' # Fullwidth question mark
'\uFF61' # Halfwidth ideographic full stop
'\u3002' # Ideographic full stop
)
def html_escape(text):
for e in _html_escape:
text = text.replace(e[0], e[1])
return text
def match_chinese(ch):
if '\u4e00' <= ch <= '\u9fff':
return True
if ch in _chinese_stops:
return True
return ch in _chinese_non_stops
def match_emphasis(cls, regexp, line, index):
match = regexp.match(line, index)
if not match:
return None, index
end = match.end()
if index != 0:
prechar = line[index - 1]
border = prechar != " " and prechar not in "-({'\""
if border and not match_chinese(prechar):
return None, index
if end < len(line):
endchar = line[end]
border = endchar != " " and endchar not in "-.,:!?;'\")}["
if border and not match_chinese(endchar):
return None, index
return cls(match[2]), end - 1
class InlineParser(object):
def __init__(self, content=""):
self.content = content
self.children = []
self.element = ""
def add_child(self, child):
self.children.append(child)
def parse_code(self, index, lines):
return Code.match(lines, index)
def parse_bold(self, index, lines):
return Bold.match(lines, index)
def parse_italic(self, index, lines):
return Italic.match(lines, index)
def parse_delete(self, index, lines):
return Delete.match(lines, index)
def parse_verbatim(self, index, lines):
return Verbatim.match(lines, index)
def parse_underline(self, index, lines):
return Underline.match(lines, index)
def parse_percent(self, index, lines):
return Percent.match(lines, index)
def parse_link(self, index, lines):
return Link.match(lines, index)
def parse_fn(self, index, lines):
return Fn.match(lines, index)
def parse_newline(self, index, lines):
return Newline.match(lines, index)
def parse(self, index, lines):
chars = (
("=", "code"),
("`", "code"),
("~", "verbatim"),
("_", "underline"),
("+", "delete"),
("/", "italic"),
("**", "italic"),
("*", "bold"),
("[[", "link"),
("[", "percent"),
("\\", "newline"),
)
char_map = dict(chars)
single_char = lines[index]
double_char = lines[index:index + 2]
for char in chars:
c1 = len(char[0]) == 1 and char[0] == single_char
c2 = len(char[0]) == 2 and char[0] == double_char
if c1 or c2:
node, num = getattr(self, "parse_" + char_map[char[0]])(
index, lines)
if node:
return node, num
if lines[index:index + 3] == "[fn":
node, num = self.parse_fn(index, lines)
if node:
return node, num
child = self.last_child()
if child and isinstance(child, Text):
child.content += single_char
return None, index
return Text(single_char), index
def last_child(self):
if len(self.children) == 0:
return
return self.children[-1]
def preparse(self, lines):
index = 0
while index < len(lines):
block, index = self.parse(index, lines)
index += 1
if not block:
continue
self.add_child(block)
def to_html(self):
if len(self.children) == 0 and self.content:
self.preparse(self.content)
text = "".join([child.to_html() for child in self.children])
if self.element:
return self.element.format(text)
return text
def __str__(self):
return '{}({})'.format(self.__class__.__name__, self.content.strip())
def __repr__(self):
return self.__str__()
class Text(InlineParser):
def to_html(self):
return self.content
class Newline(InlineParser):
@classmethod
def match(cls, line, index):
match = NEWLINE_REGEXP.match(line, index)
if not match:
return None, index
return cls(), match.end() - 1
def to_html(self):
return "<br/>"
class Bold(InlineParser):
def __init__(self, content):
super(Bold, self).__init__(content)
self.element = "<b>{0}</b>"
@classmethod
def match(cls, line, index):
return match_emphasis(cls, BOLD_REGEXP, line, index)
class Code(InlineParser):
def __init__(self, content):
super(Code, self).__init__(content)
self.element = "<code>{0}</code>"
@classmethod
def match(cls, line, index):
return match_emphasis(cls, CODE_REGEXP, line, index)
class Italic(InlineParser):
def __init__(self, content):
super(Italic, self).__init__(content)
self.element = "<i>{0}</i>"
@classmethod
def match(cls, line, index):
return match_emphasis(cls, ITALIC_REGEXP, line, index)
class Delete(InlineParser):
def __init__(self, content):
super(Delete, self).__init__(content)
self.element = "<del>{0}</del>"
@classmethod
def match(cls, line, index):
return match_emphasis(cls, DELETE_REGEXP, line, index)
class Verbatim(InlineParser):
def __init__(self, content):
super(Verbatim, self).__init__(content)
self.element = "<code>{0}</code>"
@classmethod
def match(cls, line, index):
return match_emphasis(cls, VERBATIM_REGEXP, line, index)
class Underline(InlineParser):
def __init__(self, content):
super(Underline, self).__init__(content)
self.element = "<span style=\"text-decoration:underline\">{0}</span>"
@classmethod
def match(cls, line, index):
return match_emphasis(cls, UNDERLINE_REGEXP, line, index)
class Percent(InlineParser):
def __init__(self, content):
super(Percent, self).__init__(content)
self.element = "<code>[{0}]</code>"
@classmethod
def match(cls, line, index):
match = PERCENT_REGEXP.match(line, index)
if not match:
return None, index
return cls(match[1]), match.end()
class Link(InlineParser):
def __init__(self, url, desc=None):
super(Link, self).__init__(url)
self.desc = desc
@classmethod
def match(cls, line, index):
match = LINK_REGEXP.match(line, index)
if not match:
return None, index
return cls(match[1], match[2]), match.end()
def is_img(self):
_, ext = os.path.splitext(self.content)
return not self.desc and IMG_REGEXP.match(ext)
def is_vedio(self):
_, ext = os.path.splitext(self.content)
return not self.desc and VIDEO_REGEXP.match(ext)
def to_html(self):
if self.is_img():
return "<img src=\"{0}\"/>".format(self.content)
if self.is_vedio():
return "<video src=\"{0}\">{0}</video>".format(self.content)
if self.desc:
return '<a href="{0}">{1}</a>'.format(self.content, self.desc)
return '<a href="{0}">{1}</a>'.format(self.content, self.content)
class Fn(InlineParser):
def __init__(self, content):
super(Fn, self).__init__(content)
self.element = '<sup><a id="fnr:{0}" class="footref" href="#fn.{0}">{0}</a></sup>'
@classmethod
def match(cls, line, index):
match = FN_REGEXP.match(line, index)
if not match:
return None, index
return cls(match[3]), match.end()
def to_html(self):
return self.element.format(self.content)
class Timestamp(InlineParser):
def __init__(self, date="", time="", interval=None):
super(Timestamp, self).__init__()
self.date = date
self.time = time
self.interval = interval
@classmethod
def match(cls, line, index):
match = TIMESTAMP_REGEXP.match(line, index)
if not match:
return None, index
return cls(match[1], match[3], match[4]), match.end()
class Blankline(InlineParser):
def __init__(self):
super(Blankline, self).__init__()
@classmethod
def match(cls, line):
match = BLANKLINE_REGEXP.match(line)
if not match:
return
return cls()
def to_html(self):
return ""
class Hr(InlineParser):
def __init__(self):
super(Hr, self).__init__()
@classmethod
def match(cls, line):
if HR_REGEXP.match(line):
return cls()
return
def to_html(self):
return ""
class InlineText(InlineParser):
def __init__(self, content="", needparse=True, escape=True):
super(InlineText, self).__init__(content)
self.needparse = needparse
self.escape = escape
def to_html(self):
if self.escape:
self.content = html_escape(self.content)
if not self.needparse:
return self.content
return super(InlineText, self).to_html()

View file

@ -1,30 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# ********************************************************************************
# Copyright © 2017-2020 jianglin
# File Name: src.py
# Author: jianglin
# Email: mail@honmaple.com
# Created: 2018-02-26 12:41:22 (CST)
# Last Update: Sunday 2020-08-16 19:45:32 (CST)
# By:
# Description:
# ********************************************************************************
try:
import pygments
from pygments import lexers
from pygments import formatters
except ImportError:
pygments = None
def highlight(language, text):
if pygments is None:
return text
try:
lexer = lexers.get_lexer_by_name(language)
except pygments.util.ClassNotFound:
lexer = lexers.guess_lexer(text)
formatter = formatters.HtmlFormatter()
return pygments.highlight(text, lexer, formatter)

View file

@ -9,7 +9,6 @@ from app.config import AP_CONTENT_TYPE, USER_AGENT
from app.database import AsyncSession
from app.database import get_db_session
from app.actor import fetch_actor
from sqlalchemy import select
@ -57,11 +56,20 @@ async def inbox_prechecker(
except Exception:
logger.exception("Failed to precheck delete activity")
actor_id = payload["actor"]
_actor = await fetch_actor(db_session, actor_id)
actor_url = payload["actor"]
async with httpx.AsyncClient() as client:
resp = await client.get(
actor_url,
headers={
"User-Agent": USER_AGENT,
"Accept": AP_CONTENT_TYPE,
},
follow_redirects=True,
)
try:
pubkey = _actor.ap_actor["publicKey"]["publicKeyPem"]
_actor = resp.json()
pubkey = _actor["publicKey"]["publicKeyPem"]
except json.JSONDecodeError:
raise ValueError
except KeyError:

1352
poetry.lock generated

File diff suppressed because it is too large Load diff

View file

@ -21,8 +21,6 @@ pydantic = "^1.10.2"
tomli-w = "^1.0.0"
invoke = "^2.0.0"
cffi = "^1.15.1"
pyld = "^2.0.3"
requests = "^2.28.2"
[tool.poetry.group.dev.dependencies]

View file

@ -6,19 +6,10 @@ import asyncio
from pathlib import Path
from prompt_toolkit import prompt
from prompt_toolkit.key_binding import KeyBindings
from invoke import Context # type: ignore
from invoke import run # type: ignore
from invoke import task # type: ignore
_kb = KeyBindings()
@_kb.add("c-@")
def _(event):
"""Save multi-line buffer on CTRL + space"""
event.current_buffer.validate_and_handle()
@task
def config(ctx):
from Crypto.PublicKey import RSA
@ -101,6 +92,7 @@ def accept_follow(ctx):
async def _do():
async with async_session() as db_session: #type: ignore
try:
exist_request = (
await db_session.scalars(
select(models.IncomingActivity).where(
@ -135,36 +127,5 @@ def accept_follow(ctx):
print("Done!")
asyncio.run(_do())
@task
def send_note(ctx):
from app.database import async_session
from app.activitypub import VisibilityEnum
from app.boxes import _send_create
from app.orgpython import to_html
content = prompt(
(
"note contents, in org mode, "
"use [CTRL] + [SPACE] to submit:\n"
),
key_bindings=_kb,
multiline=True,
)
content = to_html(content)
async def _dodo():
async with async_session() as db_session: #type: ignore
await _send_create(
db_session,
"Note",
content,
VisibilityEnum.PUBLIC
)
print("Done!")
asyncio.run(_dodo())