322 lines
9.1 KiB
Python
322 lines
9.1 KiB
Python
import locale
|
|
import logging
|
|
import unicodedata
|
|
from collections import defaultdict
|
|
from contextlib import contextmanager
|
|
from dataclasses import fields
|
|
from datetime import datetime, timedelta, timezone
|
|
from html import escape as html_escape
|
|
from html.parser import HTMLParser
|
|
from io import StringIO
|
|
from typing import *
|
|
|
|
import nio
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
tzdb = {
|
|
"Europe/Berlin": timezone(timedelta(hours=2), "Europe/Berlin"),
|
|
}
|
|
|
|
|
|
def html_nametag(uid, name):
|
|
return f'<a href="https://matrix.to/#/{html_escape(uid)}">{html_escape(name)}</a>'
|
|
|
|
|
|
async def reply(
|
|
message,
|
|
plain: Optional[str] = None,
|
|
*,
|
|
html: Optional[str] = None,
|
|
with_name: bool = False,
|
|
in_thread: bool = False,
|
|
) -> nio.RoomSendResponse:
|
|
"""Reply to the given message in plain text or HTML.
|
|
|
|
If with_name is set the reply will be prefixed with the original
|
|
message's sender's name.
|
|
If in_thread is set the reply will reference the original message,
|
|
allowing a client to connect the messages unambiguously.
|
|
"""
|
|
assert plain or html
|
|
if with_name:
|
|
if plain:
|
|
plain = f"{message.sender_name}: {plain}"
|
|
if html:
|
|
sender = html_nametag(message.event.sender, message.sender_name)
|
|
html = f"{sender}: {html}"
|
|
args = {}
|
|
if in_thread:
|
|
args["reply_to_event_id"] = message.event.event_id
|
|
if html:
|
|
args["html"] = html
|
|
if plain:
|
|
args["plain"] = plain
|
|
return await send_message(message.app.client, message.room.room_id, **args)
|
|
|
|
|
|
async def react(message, emoji: str) -> nio.RoomSendResponse:
|
|
"""Annotate a message with an Emoji.
|
|
|
|
Only a single Emoji character should be used for the reaction; compound
|
|
characters are ok.
|
|
"""
|
|
# For details on Matrix reactions see MSC2677:
|
|
# - https://github.com/uhoreg/matrix-doc/blob/aggregations-reactions/proposals/2677-reactions.md
|
|
# - and https://github.com/matrix-org/matrix-appservice-slack/pull/522/commits/88e7076a595509b196f53369102a469cace6cc19
|
|
vs16 = "\ufe0f" # see https://en.wikipedia.org/wiki/Variation_Selectors_%28Unicode_block%29
|
|
if not emoji.endswith(vs16):
|
|
if len(emoji) != 1:
|
|
log.warning("Reactions should only use a single emoji, got: %s", emoji)
|
|
else:
|
|
emoji = unicodedata.normalize("NFC", emoji + vs16)
|
|
return await message.app.client.room_send(
|
|
room_id=message.room.room_id,
|
|
message_type="m.reaction",
|
|
content={
|
|
"m.relates_to": {
|
|
"rel_type": "m.annotation",
|
|
"event_id": message.event.event_id,
|
|
"key": emoji,
|
|
}
|
|
},
|
|
ignore_unverified_devices=True,
|
|
)
|
|
|
|
|
|
async def redact(message, reason: Optional[str] = None) -> nio.RoomRedactResponse:
|
|
"""Redact a message.
|
|
|
|
This allows not only to redact text messages, but also reactions, i.e.
|
|
take back a reaction.
|
|
"""
|
|
resp = await message.app.client.room_redact(
|
|
room_id=message.room.room_id, event_id=message.event.event_id, reason=reason
|
|
)
|
|
if isinstance(resp, nio.RoomRedactError):
|
|
raise RuntimeError(
|
|
f"Could not redact: {message.event.event_id}: {resp.message}"
|
|
)
|
|
return resp
|
|
|
|
|
|
async def send_message(
|
|
client: nio.AsyncClient,
|
|
room_id: str,
|
|
plain: str = "",
|
|
*,
|
|
html: Optional[str] = None,
|
|
as_notice: bool = True,
|
|
reply_to_event_id: Optional[str] = None,
|
|
) -> nio.RoomSendResponse:
|
|
"""Send text to a matrix room.
|
|
|
|
The message can be given as either plain text or HTML.
|
|
If the plain text variant is not explicitly given, it will be
|
|
generated from the rich text variant.
|
|
You should always send the message as notice. Per convention
|
|
bots don't react to notices, so sending only notices will avoid
|
|
infinite loops between multiple bots in a channel.
|
|
"""
|
|
assert plain or html
|
|
|
|
content = {
|
|
"msgtype": "m.notice" if as_notice else "m.text",
|
|
"body": plain,
|
|
}
|
|
|
|
if html:
|
|
content["format"] = "org.matrix.custom.html"
|
|
content["formatted_body"] = html
|
|
if not plain:
|
|
content["body"] = strip_tags(html)
|
|
|
|
if reply_to_event_id:
|
|
content["m.relates_to"] = {"m.in_reply_to": {"event_id": reply_to_event_id}}
|
|
|
|
try:
|
|
return await client.room_send(
|
|
room_id,
|
|
"m.room.message",
|
|
content,
|
|
ignore_unverified_devices=True,
|
|
)
|
|
except nio.SendRetryError:
|
|
log.exception(f"Unable to send message to room: {room_id}")
|
|
|
|
|
|
async def send_image(
|
|
client: nio.AsyncClient,
|
|
room_id: str,
|
|
url: str,
|
|
description: str,
|
|
*,
|
|
width: Optional[int] = None,
|
|
height: Optional[int] = None,
|
|
size: Optional[int] = None,
|
|
mimetype: Optional[str] = None,
|
|
thumbnail_url: Optional[str] = None,
|
|
thumbnail_width: Optional[int] = None,
|
|
thumbnail_height: Optional[int] = None,
|
|
thumbnail_size: Optional[int] = None,
|
|
thumbnail_mimetype: Optional[str] = None,
|
|
) -> nio.RoomSendResponse:
|
|
# https://matrix.org/docs/spec/client_server/r0.6.1#m-image
|
|
content = defaultdict(
|
|
dict,
|
|
{
|
|
"body": description,
|
|
"msgtype": "m.image",
|
|
"url": url,
|
|
},
|
|
)
|
|
|
|
# Map all image keyword args into the content dict.
|
|
kwds = locals()
|
|
kwmap = {
|
|
"width": "w",
|
|
"height": "h",
|
|
"size": "size",
|
|
"mimetype": "mimetype",
|
|
"thumbnail_url": "thumbnail_url",
|
|
}
|
|
for kwarg, carg in kwmap.items():
|
|
if kwds[kwarg] is not None:
|
|
content["info"][carg] = kwds[kwarg]
|
|
|
|
# Map all thumbnail keyword args into the content dict.
|
|
kwmap = {
|
|
"thumbnail_width": "w",
|
|
"thumbnail_height": "h",
|
|
"thumbnail_size": "size",
|
|
"thumbnail_mimetype": "mimetype",
|
|
}
|
|
thumbinfo = defaultdict(dict)
|
|
for kwarg, carg in kwmap.items():
|
|
if kwds[kwarg] is not None:
|
|
thumbinfo[carg] = kwds[kwarg]
|
|
if thumbinfo:
|
|
content["info"]["thumbnail_info"] = thumbinfo
|
|
|
|
return await client.room_send(
|
|
room_id,
|
|
"m.room.message",
|
|
content,
|
|
ignore_unverified_devices=True,
|
|
)
|
|
|
|
|
|
@contextmanager
|
|
def localized(lc: str, category=locale.LC_ALL):
|
|
locale.setlocale(category, lc)
|
|
try:
|
|
yield
|
|
finally:
|
|
locale.resetlocale(category)
|
|
|
|
|
|
def localizedtz(dt: datetime, fmt: str, lc: str, tzname: str):
|
|
tz = tzdb[tzname]
|
|
with localized(lc, locale.LC_TIME):
|
|
return dt.astimezone(tz).strftime(fmt)
|
|
|
|
|
|
class TextonlyParser(HTMLParser):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.__text = StringIO()
|
|
|
|
def handle_data(self, d):
|
|
self.__text.write(d)
|
|
|
|
@property
|
|
def text(self):
|
|
return self.__text.getvalue()
|
|
|
|
|
|
def strip_tags(html):
|
|
s = TextonlyParser()
|
|
s.feed(html)
|
|
return s.text
|
|
|
|
|
|
def clamp(lower, x, upper):
|
|
return max(lower, min(x, upper))
|
|
|
|
|
|
def pretty_duration(seconds: int) -> str:
|
|
hours = seconds // 3600
|
|
minutes = (seconds - hours * 3600) // 60
|
|
seconds = seconds % 60
|
|
|
|
# full: 1h 23m 13s
|
|
# 0 seconds: 1h 23m
|
|
# 0 hours: 23m 13s
|
|
# 0 hours 0 seconds: 23m 00s
|
|
|
|
parts = {}
|
|
if hours:
|
|
parts["h"] = f"{hours}h"
|
|
parts["m"] = f"{minutes:02}m"
|
|
if seconds or not hours:
|
|
parts["s"] = f"{seconds:02}s"
|
|
|
|
return " ".join(parts.values())
|
|
|
|
|
|
def capped_text(text: str, max_len: int, mark=" […]") -> str:
|
|
if len(text) <= max_len:
|
|
return text
|
|
|
|
capped = ""
|
|
for word in text.split(" "):
|
|
if len(capped + f" {word}") > max_len - len(mark):
|
|
capped += mark
|
|
break
|
|
capped += f" {word}"
|
|
return capped
|
|
|
|
|
|
class ElementParser(HTMLParser):
|
|
"""Parse HTML for the first matching element"""
|
|
|
|
def __init__(self, selector: Callable[[str, Mapping[str, str]], bool]):
|
|
super().__init__()
|
|
self.selector = selector
|
|
self.__active_tag = None
|
|
self.__done = False
|
|
self.__value = ""
|
|
|
|
def handle_starttag(self, tag, attrs):
|
|
if self.selector(tag, attrs):
|
|
self.__active_tag = tag
|
|
|
|
def handle_endtag(self, tag):
|
|
if tag == self.__active_tag:
|
|
self.__done = True
|
|
self.__active_tag = None
|
|
|
|
def handle_data(self, data):
|
|
if self.__active_tag and not self.__done:
|
|
self.__value += data
|
|
|
|
@property
|
|
def value(self) -> Optional[str]:
|
|
return self.__value if self.__done else None
|
|
|
|
def load_chunks(self, content: Iterable[str]) -> None:
|
|
for chunk in content:
|
|
self.feed(chunk)
|
|
if self.__done:
|
|
break
|
|
|
|
|
|
def escape_all(dc, escape: Callable[[str], str] = html_escape) -> None:
|
|
"""Patch a dataclass to escape all strings."""
|
|
for f in fields(dc):
|
|
if f.type is str:
|
|
setattr(dc, f.name, escape(getattr(dc, f.name)))
|
|
elif get_origin(f.type) is list and get_args(f.type)[0] is str:
|
|
setattr(dc, f.name, [escape(x) for x in getattr(dc, f.name)])
|
|
elif get_origin(f.type) is dict and get_args(f.type)[1] is str:
|
|
setattr(dc, f.name, {k: escape(v) for k, v in getattr(dc, f.name).items()})
|