hotdog/hotdog/functions.py
ducklet efc6ecbb45 urlinfo: allow sub-modules and add module for IMDb movies
The urlinfo plugin is now set up to look up URL information for any URL
occurring in text, not only when triggered explicitly as a command.
The youtube plugin should probably be integrated into this setup,
replacing the bot plugin with a urlinfo extension.
2020-11-07 20:36:31 +01:00

322 lines
9.1 KiB
Python

import locale
import logging
import unicodedata
from collections import defaultdict
from contextlib import contextmanager
from dataclasses import dataclass, fields
from datetime import datetime, timedelta, timezone
from html import escape as html_escape
from html.parser import HTMLParser
from io import StringIO
from typing import *
import nio
log = logging.getLogger(__name__)
tzdb = {
"Europe/Berlin": timezone(timedelta(hours=2), "Europe/Berlin"),
}
def html_nametag(uid, name):
return f'<a href="https://matrix.to/#/{html_escape(uid)}">{html_escape(name)}</a>'
async def reply(
message,
plain: Optional[str] = None,
*,
html: Optional[str] = None,
with_name: bool = False,
in_thread: bool = False,
) -> nio.RoomSendResponse:
"""Reply to the given message in plain text or HTML.
If with_name is set the reply will be prefixed with the original
message's sender's name.
If in_thread is set the reply will reference the original message,
allowing a client to connect the messages unambiguously.
"""
assert plain or html
if with_name:
if plain:
plain = f"{message.sender_name}: {plain}"
if html:
sender = html_nametag(message.event.sender, message.sender_name)
html = f"{sender}: {html}"
args = {}
if in_thread:
args["reply_to_event_id"] = message.event.event_id
if html:
args["html"] = html
if plain:
args["plain"] = plain
return await send_message(message.app.client, message.room.room_id, **args)
async def react(message, emoji: str) -> nio.RoomSendResponse:
"""Annotate a message with an Emoji.
Only a single Emoji character should be used for the reaction; compound
characters are ok.
"""
# For details on Matrix reactions see MSC2677:
# - https://github.com/uhoreg/matrix-doc/blob/aggregations-reactions/proposals/2677-reactions.md
# - and https://github.com/matrix-org/matrix-appservice-slack/pull/522/commits/88e7076a595509b196f53369102a469cace6cc19
vs16 = "\ufe0f" # see https://en.wikipedia.org/wiki/Variation_Selectors_%28Unicode_block%29
if not emoji.endswith(vs16):
if len(emoji) != 1:
log.warning("Reactions should only use a single emoji, got: %s", emoji)
else:
emoji = unicodedata.normalize("NFC", emoji + vs16)
return await message.app.client.room_send(
room_id=message.room.room_id,
message_type="m.reaction",
content={
"m.relates_to": {
"rel_type": "m.annotation",
"event_id": message.event.event_id,
"key": emoji,
}
},
ignore_unverified_devices=True,
)
async def redact(message, reason: Optional[str] = None) -> nio.RoomRedactResponse:
"""Redact a message.
This allows not only to redact text messages, but also reactions, i.e.
take back a reaction.
"""
resp = await message.app.client.room_redact(
room_id=message.room.room_id, event_id=message.event.event_id, reason=reason
)
if isinstance(resp, nio.RoomRedactError):
raise RuntimeError(
f"Could not redact: {message.event.event_id}: {resp.message}"
)
return resp
async def send_message(
client: nio.AsyncClient,
room_id: str,
plain: str = "",
*,
html: Optional[str] = None,
as_notice: bool = True,
reply_to_event_id: Optional[str] = None,
) -> nio.RoomSendResponse:
"""Send text to a matrix room.
The message can be given as either plain text or HTML.
If the plain text variant is not explicitly given, it will be
generated from the rich text variant.
You should always send the message as notice. Per convention
bots don't react to notices, so sending only notices will avoid
infinite loops between multiple bots in a channel.
"""
assert plain or html
content = {
"msgtype": "m.notice" if as_notice else "m.text",
"body": plain,
}
if html:
content["format"] = "org.matrix.custom.html"
content["formatted_body"] = html
if not plain:
content["body"] = strip_tags(html)
if reply_to_event_id:
content["m.relates_to"] = {"m.in_reply_to": {"event_id": reply_to_event_id}}
try:
return await client.room_send(
room_id,
"m.room.message",
content,
ignore_unverified_devices=True,
)
except nio.SendRetryError:
log.exception(f"Unable to send message to room: {room_id}")
async def send_image(
client: nio.AsyncClient,
room_id: str,
url: str,
description: str,
*,
width: Optional[int] = None,
height: Optional[int] = None,
size: Optional[int] = None,
mimetype: Optional[str] = None,
thumbnail_url: Optional[str] = None,
thumbnail_width: Optional[int] = None,
thumbnail_height: Optional[int] = None,
thumbnail_size: Optional[int] = None,
thumbnail_mimetype: Optional[str] = None,
) -> nio.RoomSendResponse:
# https://matrix.org/docs/spec/client_server/r0.6.1#m-image
content = defaultdict(
dict,
{
"body": description,
"msgtype": "m.image",
"url": url,
},
)
# Map all image keyword args into the content dict.
kwds = locals()
kwmap = {
"width": "w",
"height": "h",
"size": "size",
"mimetype": "mimetype",
"thumbnail_url": "thumbnail_url",
}
for kwarg, carg in kwmap.items():
if kwds[kwarg] is not None:
content["info"][carg] = kwds[kwarg]
# Map all thumbnail keyword args into the content dict.
kwmap = {
"thumbnail_width": "w",
"thumbnail_height": "h",
"thumbnail_size": "size",
"thumbnail_mimetype": "mimetype",
}
thumbinfo = defaultdict(dict)
for kwarg, carg in kwmap.items():
if kwds[kwarg] is not None:
thumbinfo[carg] = kwds[kwarg]
if thumbinfo:
content["info"]["thumbnail_info"] = thumbinfo
return await client.room_send(
room_id,
"m.room.message",
content,
ignore_unverified_devices=True,
)
@contextmanager
def localized(lc: str, category=locale.LC_ALL):
locale.setlocale(category, lc)
try:
yield
finally:
locale.resetlocale(category)
def localizedtz(dt: datetime, fmt: str, lc: str, tzname: str):
tz = tzdb[tzname]
with localized(lc, locale.LC_TIME):
return dt.astimezone(tz).strftime(fmt)
class TextonlyParser(HTMLParser):
def __init__(self):
super().__init__()
self.__text = StringIO()
def handle_data(self, d):
self.__text.write(d)
@property
def text(self):
return self.__text.getvalue()
def strip_tags(html):
s = TextonlyParser()
s.feed(html)
return s.text
def clamp(lower, x, upper):
return max(lower, min(x, upper))
def pretty_duration(seconds: int) -> str:
hours = seconds // 3600
minutes = (seconds - hours * 3600) // 60
seconds = seconds % 60
# full: 1h 23m 13s
# 0 seconds: 1h 23m
# 0 hours: 23m 13s
# 0 hours 0 seconds: 23m 00s
parts = {}
if hours:
parts["h"] = f"{hours}h"
parts["m"] = f"{minutes:02}m"
if seconds or not hours:
parts["s"] = f"{seconds:02}s"
return " ".join(parts.values())
def capped_text(text: str, max_len: int, mark=" […]") -> str:
if len(text) <= max_len:
return text
capped = ""
for word in text.split(" "):
if len(capped + f" {word}") > max_len - len(mark):
capped += mark
break
capped += f" {word}"
return capped
class ElementParser(HTMLParser):
"""Parse HTML for the first matching element"""
def __init__(self, selector: Callable[[str, Mapping[str, str]], bool]):
super().__init__()
self.selector = selector
self.__active_tag = None
self.__done = False
self.__value = ""
def handle_starttag(self, tag, attrs):
if self.selector(tag, attrs):
self.__active_tag = tag
def handle_endtag(self, tag):
if tag == self.__active_tag:
self.__done = True
self.__active_tag = None
def handle_data(self, data):
if self.__active_tag and not self.__done:
self.__value += data
@property
def value(self) -> Optional[str]:
return self.__value if self.__done else None
def load_chunks(self, content: Iterable[str]) -> None:
for chunk in content:
self.feed(chunk)
if self.__done:
break
def escape_all(dc: dataclass, escape: Callable[[str], str] = html_escape) -> None:
"""Patch a dataclass to escape all strings."""
for f in fields(dc):
if f.type is str:
setattr(dc, f.name, escape(getattr(dc, f.name)))
elif get_origin(f.type) is list and get_args(f.type)[0] is str:
setattr(dc, f.name, [escape(x) for x in getattr(dc, f.name)])
elif get_origin(f.type) is dict and get_args(f.type)[1] is str:
setattr(dc, f.name, {k: escape(v) for k, v in getattr(dc, f.name).items()})