urlinfo: split of generic handler into a separate module

The new generic handler also supports some generic JSON-LD parsing.
This commit is contained in:
ducklet 2020-11-10 21:41:43 +01:00
parent 27ecdfad74
commit a8785ef961
3 changed files with 205 additions and 74 deletions

View file

@ -1,15 +1,15 @@
import codecs
import re
from dataclasses import dataclass
from html import escape
from time import time as now
from typing import *
import requests
from ..functions import ElementParser, reply
from ..functions import react
from ..html import find
from ..models import Message
from .urlinfo_ import imdb # XXX make this dynamic? (like we load plugins)
from .urlinfo_ import generic, imdb # XXX make this dynamic? (like we load plugins)
HELP = """Return information about an online HTTP resource.
!u[rl] <url>
@ -59,12 +59,6 @@ def stream_decode_response_unicode(content: Iterable[bytes]) -> Iterable[str]:
yield rv
def title(content: Iterable[str]) -> Optional[str]:
t = ElementParser(lambda tag, attrs: tag == "title")
t.load_chunks(content)
return t.value
def capped(content: Iterable[AnyStr], read_max: int) -> Iterable[AnyStr]:
read = 0
for chunk in content:
@ -163,84 +157,41 @@ def cachetoken(quant_m=15):
return int(now() / 60 / quant_m)
def pretty_size(size: int) -> str:
qs = "", "K", "M", "G", "T", "P"
for q in qs:
if size < 1024 or q == qs[-1]:
break
size /= 1000
if not q:
return f"{size} B"
return f"{size:_.02f} {q}B"
async def generic_handler(message: Message, url: str, info: Info):
details = []
if info.content_type:
details.append(f"<i>Media type</i>: {escape(info.content_type)}")
if info.size:
details.append(f"<i>Size</i>: {pretty_size(info.size)}")
details.append(f"<i>Status</i>: {info.code}")
if info.reason:
details[-1] += f" ({escape(info.reason)})"
if info.final_url != url:
details.append(
f"""<i>Redirected to</i>: <a href="{escape(info.final_url)}">{escape(info.final_url)}</a>"""
)
if info.filename and info.filename != url.rsplit("/", 2)[-1]:
details.append(f"<i>Filename</i>: {escape(info.filename)}")
details.append(f"<i>TTFB</i>: {info.elapsed_ms:_} ms")
text = (
f"<b>{escape(info.extracted['title'])}</b> — "
if info.extracted["title"]
else ""
)
text += "; ".join(details)
await reply(message, html=text, in_thread=True)
async def generic_extractor(info: Info):
content_type = info._resp.headers.get("Content-Type", "")
is_html = content_type.startswith("text/html") or url.lower().endswith(
(".html", ".htm")
)
info.extracted = {"title": title(info._chunks_str) if is_html else None}
def full_url(ref: str) -> str:
return f"http://{ref}" if ref.startswith("www") else ref
class GenericHandler:
extractor = generic_extractor
handle = generic_handler
async def handle(message: Message):
if message.command and message.command not in {"u", "url"}:
return
limit = 3
urls = [full_url(w) for w in message.words if is_url(w)][:limit]
handlers = (imdb,)
urls = {full_url(w) for w in message.words if is_url(w)}
if message.html:
urls |= {n["href"] for n in find(message.html, "a") if is_url(n["href"])}
if not urls:
return
handlers = (imdb,)
for url in urls:
for url in list(urls)[:limit]:
for handler in handlers:
if handler.can_handle(url):
break
else:
# We only want the generic handler if we were called explicitly
handler = GenericHandler if message.command else None
handler = generic if message.command or len(message.words) == 1 else None
if handler is None:
continue
await react(message, "⚡️")
try:
info = await load_info(url, handler.extractor, cachetoken())
if not info:
continue
await handler.handle(message, url, info)
except:
await react(message, "🐛")
raise

View file

@ -0,0 +1,180 @@
import json
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import *
from ...functions import (
ElementParser,
capped_text,
escape_all,
localizedtz,
reply,
strip_tags,
)
from ...models import Message
log = logging.getLogger(__name__)
def generic_details(info, url: str):
escape_all(info)
details = []
if info.content_type:
details.append(f"<i>Media type</i>: {info.content_type}")
if info.size:
details.append(f"<i>Size</i>: {pretty_size(info.size)}")
details.append(f"<i>Status</i>: {info.code}")
if info.reason:
details[-1] += f" ({info.reason})"
if info.final_url != url:
details.append(
f"""<i>Redirected to</i>: <a href="{info.final_url}">{info.final_url}</a>"""
)
if info.filename and info.filename != url.rsplit("/", 2)[-1]:
details.append(f"<i>Filename</i>: {info.filename}")
details.append(f"<i>TTFB</i>: {info.elapsed_ms:_} ms")
text = ""
if info.extracted.title:
text += f"<b>{info.extracted.title}</b> — "
text += "; ".join(details)
return text
def ld_details(ex, tz, lc):
details = []
if ex.creators:
details.append(f"🖋 {' '.join(ex.creators[:2])}")
if ex.genres:
details.append(f"🏷 {' '.join(ex.genres[:3])}")
lines = []
if ex.title:
lines.append(f"<b>{ex.title}</b>")
if ex.published:
lines[
-1
] += f" (<b>{localizedtz(ex.published, '%x %X', tzname=tz, lc=lc)}</b>)"
if details:
lines.append(f"{', '.join(details)}")
if ex.description:
lines.append(f"<i>{capped_text(ex.description, 500)}</i>")
html = "<br>".join(lines)
plain = strip_tags("".join(lines))
return html, plain
async def handle(message: Message, url, info):
roomconf = message.app.config.l6n[message.room.room_id]
plain = html = None
if info.extracted.ld:
html, plain = ld_details(
info.extracted.ld, tz=roomconf["timezone"], lc=roomconf["locale"]
)
else:
html = generic_details(info, url)
await reply(message, plain, html=html, in_thread=True)
@dataclass
class Extracted:
ld: Optional["LinkedData"] = None
title: Optional[str] = None
async def extractor(info):
content_type = info._resp.headers.get("Content-Type", "")
is_html = content_type.startswith("text/html") or info.final_url.lower().endswith(
(".html", ".htm")
)
info.extracted = Extracted()
if is_html:
parsed = parse_html(info._chunks_str)
info.extracted.title = parsed["title"]
info.extracted.ld = (
next(parse_ldjson(parsed["ldjson"])) if parsed["ldjson"] else None
)
def parse_html(content: Iterable[str]) -> Mapping[str, Optional[str]]:
parsers = {
"ldjson": ElementParser(
lambda tag, attrs: (
tag == "script" and dict(attrs).get("type") == "application/ld+json"
)
),
"title": ElementParser(lambda tag, attrs: tag == "title"),
}
for chunk in content:
for p in parsers.values():
if not p.done:
p.feed(chunk)
if all(p.done for p in parsers.values()):
break
return {k: p.value for k, p in parsers.items()}
def pretty_size(size: int) -> str:
qs = "", "K", "M", "G", "T", "P"
for q in qs:
if size < 1024 or q == qs[-1]:
break
size /= 1000
if not q:
return f"{size} B"
return f"{size:_.02f} {q}B"
def uniq(col: Collection[Hashable]) -> Collection[Hashable]:
return type(col)({k: None for k in col})
def aslist(o: Any):
if o is None:
return []
return o if type(o) is list else [o]
@dataclass
class LinkedData:
title: Optional[str]
image: Optional[str]
genres: List[str]
description: Optional[str]
published: Optional[datetime]
creators: List[str]
@classmethod
def from_json(cls, o: Mapping[str, Any]):
# https://schema.org/Movie
# https://schema.org/NewsArticle
creators = []
for k in "director", "creator", "author", "producer", "contributor":
if k in o:
creators += [p["name"] for p in aslist(o[k]) if p["@type"] == "Person"]
return cls(
title=o.get("headline") or o.get("name"),
published=(
datetime.fromisoformat(o["datePublished"])
if "datePublished" in o
else None
),
image=o.get("image"),
description=o.get("description"),
genres=uniq(aslist(o.get("genre"))),
creators=uniq(creators),
)
def parse_ldjson(ldjson: str) -> Iterable[LinkedData]:
ld: Union[dict, list] = json.loads(ldjson)
for o in aslist(ld):
if o.get("@context") != "https://schema.org":
log.debug("Unknown context in Linked Data.")
else:
yield LinkedData.from_json(o)

View file

@ -284,7 +284,7 @@ class ElementParser(HTMLParser):
super().__init__()
self.selector = selector
self.__active_tag = None
self.__done = False
self.done = False
self.__value = ""
def handle_starttag(self, tag, attrs):
@ -293,21 +293,21 @@ class ElementParser(HTMLParser):
def handle_endtag(self, tag):
if tag == self.__active_tag:
self.__done = True
self.done = True
self.__active_tag = None
def handle_data(self, data):
if self.__active_tag and not self.__done:
if self.__active_tag and not self.done:
self.__value += data
@property
def value(self) -> Optional[str]:
return self.__value if self.__done else None
return self.__value if self.done else None
def load_chunks(self, content: Iterable[str]) -> None:
for chunk in content:
self.feed(chunk)
if self.__done:
if self.done:
break