import asyncio import logging from dataclasses import dataclass, field from datetime import datetime, timezone from hashlib import md5 from typing import * import feedparser USER_AGENT = "curl/7.64.1" log = logging.getLogger(__name__) FeedId = str PostId = str @dataclass class Feed: id: FeedId url: str title: Optional[str] = None posts: List["Post"] = field(default_factory=list) etag: Optional[str] = None modified: Optional[str] = None active: bool = True next_url: Optional[str] = None @property def post_ids(self) -> Set[PostId]: return {p.id for p in self.posts} def load(self) -> None: """Load all posts from the current feed URL.""" log.debug(f"Loading {self.url} ...") r = feedparser.parse( self.url, agent=USER_AGENT, etag=self.etag, modified=self.modified ) log.debug(f"Loaded {self.url}: {r.get('status')} {r.headers}") if r.get("status") is None: log.error(f"Feed could not be loaded: {self.id}: {self.url}") return elif r.get("status") == 301: log.warning(f"Feed URL changed: {self.id}: {r.href}") self.url = r.href elif r.get("status") == 410: log.error(f"Feed is gone: {self.id}") self.active = False return if "etag" in r: self.etag = r.etag if "modified" in r: self.modified = r.modified if "title" in r.feed: self.title = r.feed.title posts = [Post.from_entry(e) for e in r.entries] for post in posts: if post.date is None: post.date = pubdate(r.feed) posts.sort(key=lambda e: e.date, reverse=True) self.posts = posts for link in r.feed.get("links", []): if link.get("rel") == "next": self.next_url = link.get("href") break else: self.next_url = None def load_next(self) -> Optional["Feed"]: if not self.next_url: return None feed = Feed(self.id, self.next_url) feed.load() return feed @dataclass class Post: id: PostId content: Optional[str] = None date: Optional[datetime] = None link: Optional[str] = None title: Optional[str] = None @classmethod def from_entry(cls, entry): content = entry.get("summary", "") title = entry.get("title", "") return cls( id=( entry.get("id") or entry.get("link") or md5(f"{title}|{content}".encode()).hexdigest() ), date=pubdate(entry), content=content, title=title, link=entry.get("link"), ) def pubdate(entry) -> Optional[datetime]: date = entry.get("published_parsed") or entry.get("updated_parsed") if date is None: return None return datetime(*date[:6], tzinfo=timezone.utc)