109 lines
2.9 KiB
Python
109 lines
2.9 KiB
Python
import asyncio
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from hashlib import md5
|
|
from typing import *
|
|
|
|
import feedparser
|
|
|
|
USER_AGENT = "curl/7.64.1"
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
FeedId = str
|
|
PostId = str
|
|
|
|
|
|
@dataclass
|
|
class Feed:
|
|
id: FeedId
|
|
url: str
|
|
title: Optional[str] = None
|
|
posts: List["Post"] = field(default_factory=list)
|
|
etag: Optional[str] = None
|
|
modified: Optional[str] = None
|
|
active: bool = True
|
|
next_url: Optional[str] = None
|
|
|
|
@property
|
|
def post_ids(self) -> Set[PostId]:
|
|
return {p.id for p in self.posts}
|
|
|
|
def load(self) -> None:
|
|
"""Load all posts from the current feed URL."""
|
|
log.debug(f"Loading {self.url} ...")
|
|
r = feedparser.parse(
|
|
self.url, agent=USER_AGENT, etag=self.etag, modified=self.modified
|
|
)
|
|
log.debug(f"Loaded {self.url}: {r.get('status')} {r.headers}")
|
|
if r.get("status") is None:
|
|
log.error(f"Feed could not be loaded: {self.id}: {self.url}")
|
|
return
|
|
elif r.get("status") == 301:
|
|
log.warning(f"Feed URL changed: {self.id}: {r.href}")
|
|
self.url = r.href
|
|
elif r.get("status") == 410:
|
|
log.error(f"Feed is gone: {self.id}")
|
|
self.active = False
|
|
return
|
|
|
|
if "etag" in r:
|
|
self.etag = r.etag
|
|
if "modified" in r:
|
|
self.modified = r.modified
|
|
if "title" in r.feed:
|
|
self.title = r.feed.title
|
|
|
|
posts = [Post.from_entry(e) for e in r.entries]
|
|
for post in posts:
|
|
if post.date is None:
|
|
post.date = pubdate(r.feed)
|
|
posts.sort(key=lambda e: e.date, reverse=True)
|
|
self.posts = posts
|
|
|
|
for link in r.feed.get("links", []):
|
|
if link.get("rel") == "next":
|
|
self.next_url = link.get("href")
|
|
break
|
|
else:
|
|
self.next_url = None
|
|
|
|
def load_next(self) -> Optional["Feed"]:
|
|
if not self.next_url:
|
|
return None
|
|
feed = Feed(self.id, self.next_url)
|
|
feed.load()
|
|
return feed
|
|
|
|
|
|
@dataclass
|
|
class Post:
|
|
id: PostId
|
|
content: Optional[str] = None
|
|
date: Optional[datetime] = None
|
|
link: Optional[str] = None
|
|
title: Optional[str] = None
|
|
|
|
@classmethod
|
|
def from_entry(cls, entry):
|
|
content = entry.get("summary", "")
|
|
title = entry.get("title", "")
|
|
return cls(
|
|
id=(
|
|
entry.get("id")
|
|
or entry.get("link")
|
|
or md5(f"{title}|{content}".encode()).hexdigest()
|
|
),
|
|
date=pubdate(entry),
|
|
content=content,
|
|
title=title,
|
|
link=entry.get("link"),
|
|
)
|
|
|
|
|
|
def pubdate(entry) -> Optional[datetime]:
|
|
date = entry.get("published_parsed") or entry.get("updated_parsed")
|
|
if date is None:
|
|
return None
|
|
return datetime(*date[:6], tzinfo=timezone.utc)
|