66 lines
2 KiB
Python
66 lines
2 KiB
Python
import asyncio
|
|
import logging
|
|
from typing import *
|
|
|
|
from .models import Feed, FeedId, Post, PostId
|
|
from .store import Store
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class Feeder:
|
|
def __init__(self, store: Store, feeds: Iterable[Feed] = None):
|
|
self.feeds: Dict[str, Feed] = {}
|
|
self.store: Store = store
|
|
|
|
if feeds:
|
|
self.add_feeds(feeds)
|
|
|
|
def add_feeds(self, feeds: Iterable[Feed]):
|
|
self.feeds.update({f.id: f for f in feeds})
|
|
self.store.sync_feeds(self.feeds)
|
|
log.debug("Active feeds: %s", ", ".join(self.feeds.keys()))
|
|
|
|
async def update_all(self, feed_ids=None) -> Mapping[FeedId, Set[PostId]]:
|
|
"""Update all feeds.
|
|
|
|
Automatically persists any new posts in storage.
|
|
"""
|
|
feeds = {i: self.feeds[i] for i in feed_ids} if feed_ids else self.feeds
|
|
new_post_ids = dict(
|
|
zip(
|
|
feeds,
|
|
await asyncio.gather(*(self._update(id) for id in feeds)),
|
|
)
|
|
)
|
|
self.store.sync_feeds(feeds)
|
|
return new_post_ids
|
|
|
|
async def _update(self, feed_id) -> Set[PostId]:
|
|
"""Update a single feed.
|
|
|
|
Does not persist any changes.
|
|
"""
|
|
feed = self.feeds[feed_id]
|
|
post_ids = feed.post_ids
|
|
feed.load()
|
|
return feed.post_ids - post_ids
|
|
|
|
def posts(self, feed_id: FeedId, post_ids: Sequence[PostId]) -> Sequence[Post]:
|
|
return self.store.posts(feed_id, post_ids)
|
|
|
|
|
|
async def all_posts(feed_url: str, throttle: int = 10) -> AsyncIterable[Post]:
|
|
"""Yield all posts from the given feed URL and all following pages.
|
|
|
|
A feed can be split into multiple pages.
|
|
The Feed's normal load function ignores them. This function follows
|
|
them and returns all Posts from all pages.
|
|
"""
|
|
feed = Feed(id=feed_url, url="", next_url=feed_url)
|
|
while (feed := feed.load_next()) :
|
|
log.debug(f"New feed page: {feed}")
|
|
for post in feed.posts:
|
|
yield post
|
|
log.debug(f"Waiting for {throttle} seconds ...")
|
|
await asyncio.sleep(throttle)
|