feeds: enable updating feeds at different intervals
This commit is contained in:
parent
a96704e1fa
commit
76e7969209
5 changed files with 76 additions and 40 deletions
|
|
@ -12,7 +12,6 @@ class Feeder:
|
||||||
def __init__(self, store: Store, feeds: Iterable[Feed] = None):
|
def __init__(self, store: Store, feeds: Iterable[Feed] = None):
|
||||||
self.feeds: Dict[str, Feed] = {}
|
self.feeds: Dict[str, Feed] = {}
|
||||||
self.store: Store = store
|
self.store: Store = store
|
||||||
self.news: Mapping[FeedId, Set[PostId]] = {}
|
|
||||||
|
|
||||||
if feeds:
|
if feeds:
|
||||||
self.add_feeds(feeds)
|
self.add_feeds(feeds)
|
||||||
|
|
@ -22,17 +21,26 @@ class Feeder:
|
||||||
self.store.sync_feeds(self.feeds)
|
self.store.sync_feeds(self.feeds)
|
||||||
log.debug("Active feeds: %s", ", ".join(self.feeds.keys()))
|
log.debug("Active feeds: %s", ", ".join(self.feeds.keys()))
|
||||||
|
|
||||||
async def update_all(self) -> Mapping[FeedId, Set[PostId]]:
|
async def update_all(self, feed_ids=None) -> Mapping[FeedId, Set[PostId]]:
|
||||||
new_post_ids = self.news = dict(
|
"""Update all feeds.
|
||||||
|
|
||||||
|
Automatically persists any new posts in storage.
|
||||||
|
"""
|
||||||
|
feeds = {i: self.feeds[i] for i in feed_ids} if feed_ids else self.feeds
|
||||||
|
new_post_ids = dict(
|
||||||
zip(
|
zip(
|
||||||
self.feeds,
|
feeds,
|
||||||
await asyncio.gather(*(self.update(id) for id in self.feeds)),
|
await asyncio.gather(*(self._update(id) for id in feeds)),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
self.store.sync_feeds(self.feeds)
|
self.store.sync_feeds(feeds)
|
||||||
return new_post_ids
|
return new_post_ids
|
||||||
|
|
||||||
async def update(self, feed_id) -> Set[PostId]:
|
async def _update(self, feed_id) -> Set[PostId]:
|
||||||
|
"""Update a single feed.
|
||||||
|
|
||||||
|
Does not persist any changes.
|
||||||
|
"""
|
||||||
feed = self.feeds[feed_id]
|
feed = self.feeds[feed_id]
|
||||||
post_ids = feed.post_ids
|
post_ids = feed.post_ids
|
||||||
feed.load()
|
feed.load()
|
||||||
|
|
|
||||||
|
|
@ -116,6 +116,10 @@ class Bot:
|
||||||
else:
|
else:
|
||||||
self.plugins[name] = mod
|
self.plugins[name] = mod
|
||||||
|
|
||||||
|
log.debug(f"Active plugins: {', '.join(sorted(p for p in self.plugins))}")
|
||||||
|
for t in self.timers:
|
||||||
|
log.debug(f"Active timer: {t}")
|
||||||
|
|
||||||
async def _on_unknown(self, room: MatrixRoom, event: UnknownEvent):
|
async def _on_unknown(self, room: MatrixRoom, event: UnknownEvent):
|
||||||
# See if we can transform an Unknown event into something we DO know.
|
# See if we can transform an Unknown event into something we DO know.
|
||||||
if event.type == "m.reaction":
|
if event.type == "m.reaction":
|
||||||
|
|
@ -211,6 +215,7 @@ class Bot:
|
||||||
for job in self.timers:
|
for job in self.timers:
|
||||||
if job.next is not None and job.next <= now():
|
if job.next is not None and job.next <= now():
|
||||||
job.next = None
|
job.next = None
|
||||||
|
log.debug(f"Job is ready: {job}")
|
||||||
try:
|
try:
|
||||||
coro = job.func(job)
|
coro = job.func(job)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,15 @@ from typing import *
|
||||||
import feeder
|
import feeder
|
||||||
import postillon
|
import postillon
|
||||||
|
|
||||||
from ..functions import capped_text, clamp, localizedtz, reply, send_message, strip_tags
|
from ..functions import (
|
||||||
|
capped_text,
|
||||||
|
clamp,
|
||||||
|
localizedtz,
|
||||||
|
parse_period,
|
||||||
|
reply,
|
||||||
|
send_message,
|
||||||
|
strip_tags,
|
||||||
|
)
|
||||||
from ..models import Job, Message
|
from ..models import Job, Message
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
@ -44,13 +52,26 @@ def init(bot):
|
||||||
bot.shared["poststore"].connect()
|
bot.shared["poststore"].connect()
|
||||||
|
|
||||||
one_minute = 60
|
one_minute = 60
|
||||||
one_hour = 3600
|
one_hour = 60 * one_minute
|
||||||
bot.add_timer(
|
ten_percent = 10 / 100
|
||||||
title="update feeds",
|
|
||||||
every=one_hour,
|
# Create timers to update each feed.
|
||||||
callback=update_feeds,
|
# XXX we could reduce the timers by grouping feeds with the same update interval
|
||||||
jitter=10 * one_minute,
|
feedconf = bot.config.get("feeder.feeds")
|
||||||
)
|
for feed in bot.shared["feeder"].feeds.values():
|
||||||
|
if feed.active:
|
||||||
|
every_period = feedconf.get(feed.id, {}).get("update_every")
|
||||||
|
every_s = parse_period(every_period) if every_period else one_hour
|
||||||
|
|
||||||
|
async def update_feed_cb(job: Job):
|
||||||
|
await update_feeds([feed.id], job)
|
||||||
|
|
||||||
|
bot.add_timer(
|
||||||
|
title=f"update feed: {feed.id}",
|
||||||
|
every=every_s,
|
||||||
|
callback=update_feed_cb,
|
||||||
|
jitter=ten_percent * every_s,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def handle(message: Message):
|
async def handle(message: Message):
|
||||||
|
|
@ -82,13 +103,13 @@ def handle_postillon(bot, posts):
|
||||||
poststore.add(postillon.split_post(post))
|
poststore.add(postillon.split_post(post))
|
||||||
|
|
||||||
|
|
||||||
async def update_feeds(job: Job):
|
async def update_feeds(feed_ids, job: Job):
|
||||||
max_posts = 2
|
max_posts = 2
|
||||||
bot = job.app
|
bot = job.app
|
||||||
feeder = bot.shared["feeder"]
|
feeder = bot.shared["feeder"]
|
||||||
feeds = bot.config.get("feeder.feeds")
|
feedconfs = bot.config.get("feeder.feeds")
|
||||||
rooms = {fid: f.get("rooms", []) for fid, f in feeds.items()}
|
rooms = {fid: f.get("rooms", []) for fid, f in feedconfs.items()}
|
||||||
news = await feeder.update_all()
|
news = await feeder.update_all(feed_ids)
|
||||||
sends = []
|
sends = []
|
||||||
mores = []
|
mores = []
|
||||||
for feed_id, post_ids in news.items():
|
for feed_id, post_ids in news.items():
|
||||||
|
|
@ -105,7 +126,7 @@ async def update_feeds(job: Job):
|
||||||
post,
|
post,
|
||||||
tzname=roomconf["timezone"],
|
tzname=roomconf["timezone"],
|
||||||
lc=roomconf["locale"],
|
lc=roomconf["locale"],
|
||||||
max_content_len=feeds[feed_id].get("max_content_len", 300),
|
max_content_len=feedconfs[feed_id].get("max_content_len", 300),
|
||||||
)
|
)
|
||||||
text = f"{prefix} {text}"
|
text = f"{prefix} {text}"
|
||||||
sends.append(send_message(bot.client, room_id, html=text))
|
sends.append(send_message(bot.client, room_id, html=text))
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,7 @@ from ...functions import (
|
||||||
ElementParser,
|
ElementParser,
|
||||||
capped_text,
|
capped_text,
|
||||||
escape_all,
|
escape_all,
|
||||||
|
parse_period,
|
||||||
pretty_duration,
|
pretty_duration,
|
||||||
reply,
|
reply,
|
||||||
send_image,
|
send_image,
|
||||||
|
|
@ -44,26 +45,6 @@ def thumbnail(url, width=182, height=268):
|
||||||
return parts._replace(path=str(path)).geturl()
|
return parts._replace(path=str(path)).geturl()
|
||||||
|
|
||||||
|
|
||||||
period_re = re.compile(
|
|
||||||
r"P((?P<year>\d+)Y)?((?P<month>\d+)M)?((?P<day>\d+)D)?T((?P<hour>\d+)H)?((?P<minute>\d+)M)?((?P<second>\d+)S)?"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def parse_period(s: str) -> int:
|
|
||||||
# see https://en.wikipedia.org/wiki/ISO_8601#Durations
|
|
||||||
seconds = {
|
|
||||||
"year": 365 * 86400,
|
|
||||||
"month": 30 * 86400,
|
|
||||||
"day": 86400,
|
|
||||||
"hour": 3600,
|
|
||||||
"minute": 60,
|
|
||||||
"second": 1,
|
|
||||||
}
|
|
||||||
if not (match := period_re.fullmatch(s)):
|
|
||||||
return 0
|
|
||||||
return sum(seconds[k] * int(v) for k, v in match.groupdict().items() if v)
|
|
||||||
|
|
||||||
|
|
||||||
_import_image_cache = (
|
_import_image_cache = (
|
||||||
{}
|
{}
|
||||||
) # XXX ideally we'd cache these forever (in some permanent storage)
|
) # XXX ideally we'd cache these forever (in some permanent storage)
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
import locale
|
import locale
|
||||||
import logging
|
import logging
|
||||||
|
import re
|
||||||
import unicodedata
|
import unicodedata
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
|
|
@ -320,3 +321,23 @@ def escape_all(dc, escape: Callable[[str], str] = html_escape) -> None:
|
||||||
setattr(dc, f.name, [escape(x) for x in getattr(dc, f.name)])
|
setattr(dc, f.name, [escape(x) for x in getattr(dc, f.name)])
|
||||||
elif get_origin(f.type) is dict and get_args(f.type)[1] is str:
|
elif get_origin(f.type) is dict and get_args(f.type)[1] is str:
|
||||||
setattr(dc, f.name, {k: escape(v) for k, v in getattr(dc, f.name).items()})
|
setattr(dc, f.name, {k: escape(v) for k, v in getattr(dc, f.name).items()})
|
||||||
|
|
||||||
|
|
||||||
|
period_re = re.compile(
|
||||||
|
r"P((?P<year>\d+)Y)?((?P<month>\d+)M)?((?P<day>\d+)D)?T((?P<hour>\d+)H)?((?P<minute>\d+)M)?((?P<second>\d+)S)?"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def parse_period(s: str) -> int:
|
||||||
|
# see https://en.wikipedia.org/wiki/ISO_8601#Durations
|
||||||
|
seconds = {
|
||||||
|
"year": 365 * 86400,
|
||||||
|
"month": 30 * 86400,
|
||||||
|
"day": 86400,
|
||||||
|
"hour": 3600,
|
||||||
|
"minute": 60,
|
||||||
|
"second": 1,
|
||||||
|
}
|
||||||
|
if not (match := period_re.fullmatch(s)):
|
||||||
|
return 0
|
||||||
|
return sum(seconds[k] * int(v) for k, v in match.groupdict().items() if v)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue