# coding: utf-8
'''data structures and storage for feed2exec'''
# Copyright (C) 2019 Antoine Beaupré <anarcat@debian.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division, absolute_import
from __future__ import print_function
try:
import configparser
except ImportError: # pragma: nocover
# py2: should never happen as we depend on the newer one in setup.py
import ConfigParser as configparser
from collections import OrderedDict, namedtuple
from contextlib import contextmanager
from datetime import datetime
import logging
import os.path
from threading import Lock
try:
import urllib.parse as urlparse
except ImportError: # pragma: nocover
# py2
import urlparse
import warnings
import feed2exec
import feed2exec.utils as utils
import feedparser
import sqlite3
import xdg.BaseDirectory as xdg_base_dirs
[docs]class Feed(feedparser.FeedParserDict):
"""basic data structure representing a RSS or Atom feed.
it derives from the base :class:`feedparser.FeedParserDict` but
forces the element to have a ``name``, which is the unique name
for that feed in the :class:`feed2exec.controller.FeedManager`. We also
add convenience functions to parse (in parallel) and normalize
feed items.
For all intents and purposes, this can be considered like a dict()
unless otherwise noted.
"""
locked_keys = ('output', 'args', 'filter', 'filter_args',
'folder', 'mailbox', 'url', 'name', 'pause', 'catchup')
def __init__(self, name, *args, **kwargs):
super().__init__(*args, **kwargs)
self['name'] = name
[docs] def normalize(self, item=None):
"""normalize feeds a little more than what feedparser provides.
we do the following operation:
1. add more defaults to item dates (`issue #113
<https://github.com/kurtmckee/feedparser/issues/113>`_)
2. missing GUID in some feeds (`issue #112
<https://github.com/kurtmckee/feedparser/issues/112>`_)
3. link normalization fails on some feeds, particilarly GitHub,
where feeds are /foo instead of https://github.com/foo.
unreported for now.
"""
# 1. add more defaults (issue #113)
def pick_first_date():
"""find a valid date in item or feed"""
fields = ('updated_parsed', 'published_parsed', 'created_parsed')
# first check the item itself, then fallback on the field
for scope in (item, self):
# all the fields to inspect
for field in fields:
if scope.get(field, False):
logging.debug('picked field %s for item %s: %s',
field, item.get('id'), scope.get(field))
return scope.get(field)
# ignore deprecation warnings from feedparser:
# https://github.com/kurtmckee/feedparser/issues/151
with warnings.catch_warnings():
warnings.simplefilter("ignore")
item['updated_parsed'] = pick_first_date()
if not item.get('updated_parsed'):
logging.info('no parseable date found in feed item %s from feed %s, using current time instead',
item.get('id'), self.get('url'))
item['updated_parsed'] = datetime.utcnow().timestamp()
# 2. add UID if missing (issue #112)
if not item.get('id'):
item['id'] = item.get('title')
# 3. not completely absolute links
scheme, netloc, *rest = urlparse.urlsplit(item.get('link', ''))
if not scheme:
# take missing scheme/host from feed URL
scheme, netloc, *_ = urlparse.urlsplit(self.get('url', ''))
item['link'] = urlparse.urlunsplit((scheme, netloc, *rest))
[docs] def parse(self, body):
"""parse the body of the feed
this parses the given body using :mod:`feedparser` and returns
the parsed data.
:todo: this could be moved to a plugin, but then we'd need to take
out the cache checking logic, which would remove most of
the code here...
:param bytes body: the body of the feed, as returned by :func:fetch
:param dict self: a feed object used to pass to plugins and debugging
:return dict: the parsed data
"""
logging.info('parsing feed %s (%d bytes)', self['url'], len(body))
try:
data = feedparser.parse(body)
except Exception as e:
logging.warning('feedparser failed: either a bug or a malformed feed: %s (feed skipped)', e)
return None
# add metadata from the feed without overriding user config
for (key, val) in data['feed'].items():
if key not in self and key not in Feed.locked_keys:
self[key] = val
# import json
# logging.debug('parsed structure %s',
# json.dumps(data, indent=2, sort_keys=True,
# default=str))
# massage result for multiprocessing module
if data['bozo']:
data['bozo_exception'] = str(data['bozo_exception'])
return data
[docs]class FeedConfStorage(configparser.RawConfigParser):
"""Feed configuration stored in a config file.
This derives from :class:`configparser.RawConfigParser` and uses
the ``.ini`` file set in the ``path`` member to read and write
settings.
Changes are committed immediately, and no locking is performed so
loading here should be safe but not editing.
The particular thing about this configuration is that there is an
iterator that will yield entries matching the ``pattern``
substring provided in the constructor.
"""
def __init__(self, path, pattern=None):
if path is None:
path = self.guess_path()
self.path = os.path.expanduser(path)
self.pattern = pattern
super(FeedConfStorage,
self).__init__(dict_type=OrderedDict)
self.read(self.path)
def __repr__(self):
return 'FeedConfStorage(%s, %s)' % (self.path, self.pattern)
@classmethod
def guess_path(cls):
return xdg_base_dirs.load_first_config(feed2exec.__prog__ + '.ini') or \
os.path.join(xdg_base_dirs.xdg_config_home, feed2exec.__prog__ + '.ini')
[docs] def add(self, name, url, output=None, args=None,
filter=None, filter_args=None,
folder=None, mailbox=None):
"""add the designated feed to the configuration
this is not thread-safe."""
if self.has_section(name):
raise AttributeError('key %s already exists' % name)
d = OrderedDict()
# when a new element is added here, it must be added to the
# Feed.locked_keys config to keep parsed feed elements from
# overriding potentially secure-sensitive settings
d['url'] = url
if output is not None:
d['output'] = output
if args is not None:
d['args'] = args
if filter is not None:
d['filter'] = filter
if filter_args is not None:
d['filter_args'] = filter_args
if folder is not None:
d['folder'] = folder
if mailbox is not None:
d['mailbox'] = mailbox
self[name] = d
self.commit()
[docs] def set(self, section, option, value=None):
"""override parent to make sure we immediately write changes
not thread-safe
"""
super(FeedConfStorage, self).set(section, option, value)
self.commit()
[docs] def remove_option(self, section, option):
"""override parent to make sure we immediately write changes
not thread-safe
"""
super(FeedConfStorage, self).remove_option(section, option)
self.commit()
[docs] def remove(self, name):
"""convenient alias for
:func:`configparser.RawConfigParser.remove_section`
not thread-safe
"""
self.remove_section(name)
self.commit()
[docs] def commit(self):
"""write the feed configuration
see :func:`configparser.RawConfigParser.write`"""
logging.info('saving feed configuration in %s', self.path)
utils.make_dirs_helper(os.path.dirname(self.path))
with open(self.path, 'w') as configfile:
self.write(configfile)
def __iter__(self):
"""override iterator to allow for pattern matching"""
for name in self.sections():
if self.pattern is None or self.pattern in name:
yield Feed(name=name, **self[name])
class SqliteStorage(object):
sql = None
record = None
cache = {}
locks = {}
table_name = None
key_name = 'key'
value_name = 'value'
def __init__(self, path):
self.path = os.path.expanduser(path)
assert self.path
utils.make_dirs_helper(os.path.dirname(self.path))
if self.sql:
with self.connection() as con:
con.execute(self.sql)
@contextmanager
def connection(self, commit=True):
if self.path not in SqliteStorage.locks:
SqliteStorage.locks[self.path] = Lock()
with SqliteStorage.locks[self.path]:
con = self.connect_cache(self.path)
yield con
if commit:
con.commit()
@classmethod
def connect_cache(cls, path):
if path not in cls.cache:
logging.info('connecting to database at %s', path)
conn = sqlite3.connect(path)
try:
conn.set_trace_callback(logging.debug)
except AttributeError: # pragma: nocover
logging.debug('no logging support in sqlite')
cls.cache[path] = conn
return cls.cache[path]
@classmethod
def guess_path(cls):
cache_home_db = os.path.join(xdg_base_dirs.xdg_cache_home, 'feed2exec.db')
data_home_db = os.path.join(xdg_base_dirs.xdg_data_home, 'feed2exec.db')
if os.path.exists(data_home_db):
return data_home_db
else:
# we use warnings here because this function is likely to
# be called before the logging module is initialized,
# which typicall creates a StreamHandler that's not
# configured the way we like. A better way to work around
# this problem would be to use a LoggerAction in argparse
# but, alas, we are using click, so this insanity
# continues.
warnings.warn(
"falling back on deprecated cache directory, move %s to %s to remove this warning" % (
cache_home_db,
data_home_db,
),
DeprecationWarning,
)
return cache_home_db
def get(self, key):
with self.connection(commit=False) as con:
val = con.execute("""SELECT `%s` FROM `%s` WHERE `%s`=?"""
% (self.value_name, self.table_name, self.key_name), (key, )).fetchone()
return val[0] if val else None
def set(self, key, value):
with self.connection() as con:
con.execute("INSERT OR REPLACE INTO `%s` (`%s`, `%s`) VALUES (?, ?)"
% (self.table_name, self.key_name, self.value_name),
(key, value))
def delete(self, key):
with self.connection() as con:
con.execute("DELETE FROM `%s` WHERE `%s` = ?"
% (self.table_name, self.key_name), (key,))
def __contains__(self, key):
return self.get(key) is not None
def __iter__(self):
with self.connection(commit=False) as con:
cur = con.cursor()
cur.row_factory = sqlite3.Row
return cur.execute("SELECT * from `%s`" % self.table_name)
class FeedItemCacheStorage(SqliteStorage):
sql = '''CREATE TABLE IF NOT EXISTS
feedcache (name text, guid text,
PRIMARY KEY (name, guid))'''
record = namedtuple('record', 'name guid')
table_name = 'feedcache'
key_name = 'guid'
value_name = 'name'
def __init__(self, path, feed=None, guid=None):
self.feed = feed
if guid is None:
self.guid = '%'
else:
self.guid = '%' + guid + '%'
super().__init__(path)
def __repr__(self):
return 'FeedItemCacheStorage("%s", "%s", "%s")' % (self.path, self.feed, self.guid)
def add(self, guid):
assert self.feed
self.set(guid, self.feed)
def remove(self, guid):
self.delete(guid)
def __contains__(self, guid):
'''override base class to look only in the specified feed'''
if self.feed is None:
pattern = '%'
else:
pattern = self.feed
with self.connection(commit=False) as con:
cur = con.execute("""SELECT * FROM feedcache WHERE name LIKE ? AND guid=?""",
(pattern, guid))
return cur.fetchone() is not None
def __iter__(self):
'''override base class to look only in the specified feed'''
if self.feed is None:
pattern = '%'
else:
pattern = self.feed
with self.connection(commit=False) as con:
cur = con.cursor()
cur.row_factory = sqlite3.Row
return cur.execute("""SELECT * from feedcache WHERE name LIKE ? AND guid LIKE ?""",
(pattern, self.guid))
class FeedContentCacheStorage(SqliteStorage):
sql = '''CREATE TABLE IF NOT EXISTS
content (key, value,
PRIMARY KEY (key))'''
table_name = 'content'