hotdog/feeder/models.py

110 lines
2.9 KiB
Python
Raw Normal View History

2020-11-01 16:31:37 +01:00
import asyncio
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone
from hashlib import md5
from typing import *
import feedparser
USER_AGENT = "curl/7.64.1"
log = logging.getLogger(__name__)
FeedId = str
PostId = str
@dataclass
class Feed:
id: FeedId
url: str
title: Optional[str] = None
posts: List["Post"] = field(default_factory=list)
etag: Optional[str] = None
modified: Optional[str] = None
active: bool = True
next_url: Optional[str] = None
@property
def post_ids(self) -> Set[PostId]:
return {p.id for p in self.posts}
def load(self) -> None:
"""Load all posts from the current feed URL."""
log.debug(f"Loading {self.url} ...")
r = feedparser.parse(
self.url, agent=USER_AGENT, etag=self.etag, modified=self.modified
)
log.debug(f"Loaded {self.url}: {r.get('status')} {r.headers}")
if r.get("status") is None:
log.error(f"Feed could not be loaded: {self.id}: {self.url}")
return
elif r.get("status") == 301:
log.warning(f"Feed URL changed: {self.id}: {r.href}")
self.url = r.href
elif r.get("status") == 410:
log.error(f"Feed is gone: {self.id}")
self.active = False
return
if "etag" in r:
self.etag = r.etag
if "modified" in r:
self.modified = r.modified
if "title" in r.feed:
self.title = r.feed.title
posts = [Post.from_entry(e) for e in r.entries]
for post in posts:
if post.date is None:
post.date = pubdate(r.feed)
posts.sort(key=lambda e: e.date, reverse=True)
self.posts = posts
for link in r.feed.get("links", []):
if link.get("rel") == "next":
self.next_url = link.get("href")
break
else:
self.next_url = None
def load_next(self) -> Optional["Feed"]:
if not self.next_url:
return None
feed = Feed(self.id, self.next_url)
feed.load()
return feed
@dataclass
class Post:
id: PostId
content: Optional[str] = None
date: Optional[datetime] = None
link: Optional[str] = None
title: Optional[str] = None
@classmethod
def from_entry(cls, entry):
content = entry.get("summary", "")
title = entry.get("title", "")
return cls(
id=(
entry.get("id")
or entry.get("link")
or md5(f"{title}|{content}".encode()).hexdigest()
),
date=pubdate(entry),
content=content,
title=title,
link=entry.get("link"),
)
def pubdate(entry) -> Optional[datetime]:
date = entry.get("published_parsed") or entry.get("updated_parsed")
if date is None:
return None
return datetime(*date[:6], tzinfo=timezone.utc)