Source code for feed2exec.feeds

#!/usr/bin/python3
# coding: utf-8

'''fast feed parser that offloads tasks to plugins and commands'''
# Copyright (C) 2016 Antoine Beaupré <anarcat@debian.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from __future__ import division, absolute_import
from __future__ import print_function


try:
    import configparser
except ImportError:  # pragma: nocover
    # py2: should never happen as we depend on the newer one in setup.py
    import ConfigParser as configparser
from collections import OrderedDict, namedtuple
import logging
import multiprocessing
import os
import os.path
try:
    import urllib.parse as urlparse
except ImportError:  # pragma: nocover
    # py2
    import urlparse


import feed2exec
import feed2exec.plugins as plugins
import feed2exec.utils as utils
import feedparser
import requests
import sqlite3


def default_config_dir():
    home_config = os.environ.get('XDG_CONFIG_HOME',
                                 os.path.join(os.environ.get('HOME'),
                                              '.config'))
    return os.path.join(home_config, feed2exec.__prog__)


[docs]def fetch(url): """fetch the given URL exceptions should be handled by the caller :todo: this should be moved to a plugin so it can be overridden, but so far I haven't found a use case for this. :param str url: the URL to fetch :return bytes, tuple: the body of the URL and the modification timestamp """ body = '' if url.startswith('file://'): filename = url[len('file://'):] logging.info('opening local file %s', filename) with open(filename, 'rb') as f: body = f.read().decode('utf-8') else: logging.info('fetching URL %s', url) body = requests.get(url).text return body
[docs]def normalize_entry(feed=None, entry=None): """normalize feeds a little more than what feedparser provides. we do the following operation: 1. add more defaults to entry dates (`issue #113 <https://github.com/kurtmckee/feedparser/issues/113>`_): * created_parsed of the item * updated_parsed of the feed 2. missing GUID in some feeds (`issue #112 <https://github.com/kurtmckee/feedparser/issues/112>`_) 3. link normalization fails on some feeds, particilarly GitHub, where feeds are /foo instead of https://github.com/foo. unreported for now. """ # 1. add more defaults (issue #113) entry['updated_parsed'] = entry.get('updated_parsed', entry.get('created_parsed', feed.get('updated_parsed', False))) # noqa assert entry.get('updated_parsed') is not None # 2. add UID if missing (issue #112) if not entry.get('id'): entry['id'] = entry.get('title') # 3. not completely absolute links scheme, netloc, *rest = urlparse.urlsplit(entry.get('link')) if not scheme: # take missing scheme/host from feed URL scheme, netloc, *_ = urlparse.urlsplit(feed.get('url', '')) entry['link'] = urlparse.urlunsplit((scheme, netloc, *rest))
[docs]def parse(body, feed, lock=None, force=False): """parse the body of the feed this calls the filter and output plugins and updates the cache with the found items. :todo: this could be moved to a plugin, but then we'd need to take out the cache checking logic, which would remove most of the code here... :param bytes body: the body of the feed, as returned by :func:fetch :param dict feed: a feed object used to pass to plugins and debugging :return dict: the parsed data """ global LOCK if lock is None: lock = LOCK logging.info('parsing feed %s (%d bytes)', feed['url'], len(body)) data = feedparser.parse(body) # logging.debug('parsed structure %s', # json.dumps(data, indent=2, sort_keys=True, # default=safe_serial)) cache = FeedCacheStorage(feed=feed['name']) for entry in data['entries']: params = feed.copy() params.update(data['feed']) normalize_entry(feed=params, entry=entry) plugins.filter(feed, entry, lock=lock) guid = entry['id'] if guid in cache and not force: logging.debug('entry %s already seen', guid) else: logging.debug('new entry %s <%s>', guid, entry['link']) if plugins.output(feed, entry, lock=lock) is not None and not force: # noqa cache.add(guid) return data
def _init_lock(l): """setup a global lock across pool threads this is necessary because Lock objects are not serializable so we can't pass them as arguments. An alternative pattern is to have a `Manager` process and use IPC for locking. cargo-culted from this `stackoverflow answer <https://stackoverflow.com/a/25558333/1174784>`_ """ global LOCK LOCK = l def fetch_feeds(pattern=None, parallel=False, force=False, catchup=False): logging.debug('looking for feeds %s', pattern) st = FeedStorage(pattern=pattern) if parallel: l = multiprocessing.Lock() processes = None if isinstance(parallel, int): processes = parallel pool = multiprocessing.Pool(processes=processes, initializer=_init_lock, initargs=(l,)) results = [] for i, feed in enumerate(st): logging.debug('found feed in DB: %s', dict(feed)) if feed.get('pause'): logging.info('feed %s is paused, skipping', feed['name']) continue body = fetch(feed['url']) if catchup or feed.get('catchup'): logging.info('catching up on feed %s (output plugin disabled)', feed['name']) feed['output'] = None if parallel: # if this fails silently, use plain apply() to see errors results.append(pool.apply_async(parse, (body, dict(feed), None, force))) else: global LOCK LOCK = None parse(body=body, feed=dict(feed), force=force) if parallel: for result in results: result.get() pool.close() pool.join() logging.info('%d feeds processed', i+1) class SqliteStorage(object): sql = None record = None conn = None path = None cache = {} def __init__(self): assert self.path utils.make_dirs_helper(os.path.dirname(self.path)) if self.path not in SqliteStorage.cache: logging.info('connecting to database at %s', self.path) conn = sqlite3.connect(self.path) try: conn.set_trace_callback(logging.debug) except AttributeError: # pragma: nocover logging.debug('no logging support in sqlite') SqliteStorage.cache[self.path] = conn self.conn = SqliteStorage.cache[self.path] if self.sql: self.conn.execute(self.sql) self.conn.commit() class ConfFeedStorage(configparser.RawConfigParser): path = os.path.join(default_config_dir(), 'feed2exec.ini') def __init__(self, pattern=None): self.pattern = pattern super(ConfFeedStorage, self).__init__(dict_type=OrderedDict) self.read(self.path) def add(self, name, url, output=None, args=None, filter=None, folder=None, mailbox=None): if self.has_section(name): raise AttributeError('key %s already exists' % name) d = OrderedDict() d['url'] = url if output is not None: d['output'] = output if args is not None: d['args'] = args if filter is not None: d['filter'] = filter if folder is not None: d['folder'] = folder if mailbox is not None: d['mailbox'] = mailbox self[name] = d self.commit() def set(self, section, option, value=None): super(ConfFeedStorage, self).set(section, option, value) self.commit() def remove_option(self, section, option): super(ConfFeedStorage, self).remove_option(section, option) self.commit() def remove(self, name): self.remove_section(name) self.commit() def commit(self): logging.info('saving feed configuration in %s', self.path) utils.make_dirs_helper(os.path.dirname(self.path)) with open(self.path, 'w') as configfile: self.write(configfile) def __iter__(self): for name in self.sections(): if self.pattern is None or self.pattern in name: d = dict(self[name]) d.update({'name': name}) yield d FeedStorage = ConfFeedStorage class FeedCacheStorage(SqliteStorage): sql = '''CREATE TABLE IF NOT EXISTS feedcache (name text, guid text, PRIMARY KEY (name, guid))''' record = namedtuple('record', 'name guid') def __init__(self, feed=None, guid=None): self.feed = feed if guid is None: self.guid = '%' else: self.guid = '%' + guid + '%' super(FeedCacheStorage, self).__init__() def add(self, guid): assert self.feed self.conn.execute("INSERT INTO feedcache VALUES (?, ?)", (self.feed, guid)) self.conn.commit() def remove(self, guid): assert self.feed self.conn.execute("DELETE FROM feedcache WHERE guid = ?", (guid,)) self.conn.commit() def __contains__(self, guid): if self.feed is None: pattern = '%' else: pattern = self.feed cur = self.conn.execute("""SELECT * FROM feedcache WHERE name LIKE ? AND guid=?""", (pattern, guid)) return cur.fetchone() is not None def __iter__(self): if self.feed is None: pattern = '%' else: pattern = self.feed cur = self.conn.cursor() cur.row_factory = sqlite3.Row return cur.execute("""SELECT * from feedcache WHERE name LIKE ? AND guid LIKE ?""", (pattern, self.guid))