import sqlite3 from datetime import datetime, timezone from typing import * from . import Post class Store: def __init__(self, dbpath: Optional[str] = None): self.dbpath = dbpath self.connection: Optional[sqlite3.Connection] = None def connect(self, path: Optional[str] = None): if path: self.dbpath = path if self.connection is not None: return self.connection self.connection = sqlite3.connect( self.dbpath, isolation_level=None ) # auto commit self.init() def disconnect(self): conn = self.connection if conn: conn.close() def init(self): conn = self.connection conn.execute( """ create table if not exists post ( id integer primary key, content text unique not null, source text, -- link to the source of this post date integer not null ) """ ) conn.execute( """ create index if not exists post_date on post(date) """ ) def add(self, posts: Iterable[Post]): sql = f""" insert into post(content, source, date) values (?, ?, ?) on conflict do nothing """ self.connection.executemany( sql, ( (p.content, p.link, int(p.date.timestamp()) if p.date else None) for p in posts ), ) def _select(self, condition="", params=[]) -> Iterable[Post]: sql = f"select id, content, date, source from post {condition}" for row in self.connection.execute(sql, params): id, content, date, source = row if date is not None: date = datetime.fromtimestamp(date, tz=timezone.utc) post = Post(id, content, date, link=source) yield post def random_post(self) -> Optional[Post]: cond = "where id in (select id from post order by random() limit 1)" for post in self._select(cond): return post def search(self, term, skip: int = 0) -> Iterable[Post]: cond = "where content like ? order by date desc limit -1 offset ?" for post in self._select(cond, (term, skip)): yield post