Source code for feed2exec.controller

#!/usr/bin/python3
# coding: utf-8

'''fast feed parser that offloads tasks to plugins and commands'''
# Copyright (C) 2016-2019 Antoine Beaupré <anarcat@debian.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from __future__ import division, absolute_import
from __future__ import print_function


from datetime import datetime
try:
    from lxml import etree
except ImportError:  # pragma: nocover
    import xml.etree.ElementTree as etree
import logging
import multiprocessing
import os
import os.path

import feed2exec
import feed2exec.plugins as plugins
import feed2exec.utils as utils
from feed2exec.model import Feed, FeedConfStorage, FeedContentCacheStorage, FeedItemCacheStorage
import feedparser
import requests
import requests_file
try:
    import cachecontrol
except ImportError:
    cachecontrol = None

try:
    import dateparser
except ImportError:
    dateparser = False


[docs]class FeedManager(object): """a feed manager fetches and stores feeds. this is a "controller" in a "model-view-controller" pattern. it derives the "model" (:class:`feed2exec.model.FeedConfStorage`) for simplicity's sake, and there is no real "view" (except maybe `__main__`). on intialization, a new :class:`requests.Session` object is created to be used across all requests. it is passed to plugins during dispatch as a `session` parameter so it can be reused. """ def __init__(self, conf_path, db_path, pattern=None, session=None): self.conf_path = conf_path self.db_path = db_path self.conf_storage = FeedConfStorage(self.conf_path, pattern=pattern) if dateparser: def dateparser_tuple_parser(string): if string.endswith('-0000'): # workaround bug https://github.com/scrapinghub/dateparser/issues/548 # replace the last '-0000' with '+0000' by reversing the string twice string = string[::-1].replace('-0000'[::-1], '+0000'[::-1], 1)[::-1] return dateparser.parse(string).utctimetuple() feedparser.registerDateHandler(dateparser_tuple_parser) self._session = session or requests.Session() self.sessionConfig() def __repr__(self): return 'FeedManager(%s, %s, %s)' % (self.conf_path, self.db_path, self.pattern)
[docs] def sessionConfig(self): """our custom session configuration we change the user agent and set the file:// hanlder. extra configuration may be performed in the future and will override your changes. this can be used to configure sessions used externally, for example by plugins. """ self._session.headers.update({'User-Agent': '%s/%s' % (feed2exec.__prog__, feed2exec.__version__)}) self._session.mount('file://', requests_file.FileAdapter()) if self.db_path is not None and cachecontrol is not None: cache_adapter = cachecontrol.CacheControlAdapter(cache=FeedContentCacheStorage(self.db_path)) # assume we mount over http and https all at once so check # only the latter adapter = self._session.adapters.get('https://', None) if hasattr(adapter, 'old_adapters'): # looks like a betamax session was setup, hook ourselves behind it # # XXX: this doesn't actually work, as betamax will # never pass the query to the cache. this is # backwards, but there's no other way. see # https://github.com/ionrock/cachecontrol/issues/212 logging.debug('appending cache adapter (%r) to existing betamax adapter (%r)', cache_adapter, adapter) adapter.old_adapters['http://'] = cache_adapter adapter.old_adapters['https://'] = cache_adapter else: logging.debug('mounting cache adapter (%r)', cache_adapter) # override existing adapters to use the cache adapter instead self._session.mount('http://', cache_adapter) self._session.mount('https://', cache_adapter)
@property def session(self): """the session property""" return self._session @session.setter def session(self, value): """set the session to the given value will configure the session appropriately with sessionConfig we could also use a @classproperty here, see `this discussion <https://stackoverflow.com/a/7864317/1174784>`_ """ self._session = value self.sessionConfig() @property def pattern(self): return self.conf_storage.pattern @pattern.setter def pattern(self, val): self.conf_storage.pattern = val
[docs] def fetch(self, parallel=False, force=False, catchup=False): """main entry point for the feed fetch routines. this iterates through all feeds configured in the linked :class:`feed2exec.model.FeedConfStorage` that match the given ``pattern``, fetches the feeds and dispatches the parsing, which in turn dispatches the plugins. :param bool parallel: parse feeds in parallel, using :mod:`multiprocessing` :param bool force: force plugin execution even if entry was already seen. passed to :class:`feed2exec.feeds.parse` as is :param bool catchup: set the `catchup` flag on the feed, so that output plugins can avoid doing any permanent changes. """ logging.debug('looking for feeds %s in %s', self.pattern, self.conf_storage) if parallel: lock = multiprocessing.Lock() processes = None if isinstance(parallel, int): processes = parallel def init_global_lock(lock): """setup a global lock across pool threads this is necessary because Lock objects are not serializable so we can't pass them as arguments. An alternative pattern is to have a `Manager` process and use IPC for locking. cargo-culted from this `stackoverflow answer <https://stackoverflow.com/a/25558333/1174784>`_ """ global LOCK LOCK = lock pool = multiprocessing.Pool(processes=processes, initializer=init_global_lock, initargs=(lock,)) data_results = [] i = -1 for i, feed in enumerate(self.conf_storage): logging.debug('found feed in DB: %s', dict(feed)) # XXX: this is dirty. iterator/getters/??? should return # the right thing? or will that break an eventual editor? # maybe autocommit is a bad idea in the first place.. feed = Feed(feed['name'], feed) body = self.fetch_one(feed) if body is None: continue if catchup: feed['catchup'] = catchup if parallel: # if this fails silently, use plain apply() to see errors data_results.append((feed, pool.apply_async(feed.parse, (body,)))) else: global LOCK LOCK = None data = feed.parse(body) if data: self.dispatch(feed, data, None, force) if parallel: for feed, result in data_results: data = result.get() if data: self.dispatch(feed, data, lock, force) pool.close() pool.join() logging.info('%d feeds processed', i+1)
[docs] def fetch_one(self, feed): """fetch the feed content and return the body, in binary This will call :func:`logging.warning` for exceptions :class:`requests.exceptions.Timeout` and :class:`requests.exceptions.ConnectionError` as they are transient errors and the user may want to ignore those. Other exceptions raised from :mod:`requests.exceptions` (like TooManyRedirects or HTTPError but basically any other exception) may be a configuration error or a more permanent failure so will be signaled with :func:`logging.error`. this will return the body on success or None on failure and cached entries """ if feed.get('pause'): logging.info('feed %s is paused, skipping', feed['name']) return None logging.info('fetching feed %s', feed['url']) try: resp = self.session.get(feed['url']) if getattr(resp, 'from_cache', False): return None body = resp.content except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: # XXX: we should count those and warn after a few # occurrences logging.warning('timeout while fetching feed %s at %s: %s', feed['name'], feed['url'], e) return None except requests.exceptions.RequestException as e: logging.error('exception while fetching feed %s at %s: %s', feed['name'], feed['url'], e) return None return body
[docs] def dispatch(self, feed, data, lock=None, force=False): '''process parsed entries and execute plugins This handles locking, caching, and filter and output plugins. This calls the plugins configured in the ``feed`` (using :func:`feed2exec.plugins.output` and :func:`feed2exec.plugins.filter`). It also updates the cache with the found items if the ``output`` plugin succeeds (returns True) and if the ``filter`` plugin doesn't set the ``skip`` element in the feed item. :param object lock: a :class:`multiprocessing.Lock` object previously initialized. if None, the global `LOCK` variable will be used: this is used in the test suite to avoid having to pass locks all the way through the API. this lock is in turn passed to plugin calls. :param bool force: force plugin execution even if entry was already seen. passed to :class:`feed2exec.feeds.parse` as is ''' logging.debug('dispatching plugins for items parsed from %s', feed['name']) cache = FeedItemCacheStorage(self.db_path, feed=feed['name']) for item in data['entries']: feed.normalize(item=item) plugins.filter(feed=feed, item=item, session=self.session, lock=lock) if item.get('skip'): logging.info('item %s of feed %s filtered out', item.get('title'), feed.get('name')) continue guid = item['id'] if not force and guid in cache: logging.debug('item %s already seen', guid) else: logging.debug('new item %s <%s>', guid, item['link']) if plugins.output(feed, item, session=self.session, lock=lock) is not False and not force: # noqa if lock: lock.acquire() cache.add(guid) if lock: lock.release() return data
[docs] def opml_import(self, opmlfile): """import a file stream as an OPML feed in the feed storage""" folders = [] for (event, node) in etree.iterparse(opmlfile, ['start', 'end']): if node.tag != 'outline': continue logging.debug('found OPML entry: %s', node.attrib) if event == 'start' and node.attrib.get('xmlUrl'): folder = os.path.join(*folders) if folders else None title = node.attrib.get('title', utils.slug(node.attrib['xmlUrl'])) logging.info('importing element %s <%s> in folder %s', title, node.attrib['xmlUrl'], folder) if title in self.conf_storage: if folder: title = folder + '/' + title logging.info('feed %s exists, using folder name: %s', node.attrib['title'], title) if title in self.conf_storage: logging.error('feed %s already exists, skipped', node.attrib['title']) else: self.conf_storage.add(title, node.attrib['xmlUrl'], folder=folder) elif node.attrib.get('type') == 'folder': if event == 'start': logging.debug('found folder %s', node.attrib.get('text')) folders.append(node.attrib.get('text')) else: folders.pop()
def opml_export(self, path): xml_tmpl = u'''<opml version="1.0"> <head> <title>{title}</title> <dateModified>{date}</dateModified> </head> <body> {body}</body> </opml>''' outline_tmpl = u'<outline title="{name}" type="rss" xmlUrl="{url}" />' body = u'' for feed in self.conf_storage: if feed: body += outline_tmpl.format(**feed) + "\n" output = xml_tmpl.format(title=u'feed2exec RSS feeds', date=datetime.now(), body=body) path.write(output.encode('utf-8'))