Source code for feed2exec.controller

#!/usr/bin/python3
# coding: utf-8

'''fast feed parser that offloads tasks to plugins and commands'''
# Copyright (C) 2016-2019 Antoine Beaupré <anarcat@debian.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from __future__ import division, absolute_import
from __future__ import print_function


from datetime import datetime
try:
    from lxml import etree
except ImportError:  # pragma: nocover
    # stdlib
    import xml.etree.ElementTree as etree   # type: ignore
import logging
import multiprocessing
import os
import os.path

import feed2exec
import feed2exec.plugins as plugins
import feed2exec.utils as utils
from feed2exec.model import Feed, FeedConfStorage, FeedContentCacheStorage, FeedItemCacheStorage
import feedparser
from pkg_resources import parse_version
import requests
import requests_file
try:
    import cachecontrol
except ImportError:
    cachecontrol = None

try:
    import dateparser
    dateparser_enabled = True
except ImportError:
    dateparser_enabled = False


[docs]class FeedManager(object): """a feed manager fetches and stores feeds. this is a "controller" in a "model-view-controller" pattern. it derives the "model" (:class:`feed2exec.model.FeedConfStorage`) for simplicity's sake, and there is no real "view" (except maybe `__main__`). on intialization, a new :class:`requests.Session` object is created to be used across all requests. it is passed to plugins during dispatch as a `session` parameter so it can be reused. """ def __init__(self, conf_path, db_path, pattern=None, session=None): self.conf_path = conf_path self.db_path = db_path self.conf_storage = FeedConfStorage(self.conf_path, pattern=pattern) if dateparser_enabled: if parse_version(dateparser.__version__) >= parse_version('0.7.4'): def dateparser_tuple_parser(string): return dateparser.parse(string).utctimetuple() else: # workaround bug https://github.com/scrapinghub/dateparser/issues/548 def dateparser_tuple_parser(string): if string.endswith('-0000'): # replace the last '-0000' with '+0000' by reversing the string twice string = string[::-1].replace('-0000'[::-1], '+0000'[::-1], 1)[::-1] return dateparser.parse(string).utctimetuple() feedparser.registerDateHandler(dateparser_tuple_parser) self._session = session or requests.Session() self.sessionConfig() def __repr__(self): return 'FeedManager(%s, %s, %s)' % (self.conf_path, self.db_path, self.pattern)
[docs] def sessionConfig(self): """our custom session configuration we change the user agent and set the file:// hanlder. extra configuration may be performed in the future and will override your changes. this can be used to configure sessions used externally, for example by plugins. """ self._session.headers.update({'User-Agent': '%s/%s' % (feed2exec.__prog__, feed2exec.__version__)}) self._session.mount('file://', requests_file.FileAdapter()) if self.db_path is not None and cachecontrol is not None: cache_adapter = cachecontrol.CacheControlAdapter(cache=FeedContentCacheStorage(self.db_path)) # assume we mount over http and https all at once so check # only the latter adapter = self._session.adapters.get('https://', None) if hasattr(adapter, 'old_adapters'): # looks like a betamax session was setup, hook ourselves behind it # # XXX: this doesn't actually work, as betamax will # never pass the query to the cache. this is # backwards, but there's no other way. see # https://github.com/ionrock/cachecontrol/issues/212 logging.debug('appending cache adapter (%r) to existing betamax adapter (%r)', cache_adapter, adapter) adapter.old_adapters['http://'] = cache_adapter adapter.old_adapters['https://'] = cache_adapter else: logging.debug('mounting cache adapter (%r)', cache_adapter) # override existing adapters to use the cache adapter instead self._session.mount('http://', cache_adapter) self._session.mount('https://', cache_adapter)
@property def session(self): """the session property""" return self._session @session.setter def session(self, value): """set the session to the given value will configure the session appropriately with sessionConfig we could also use a @classproperty here, see `this discussion <https://stackoverflow.com/a/7864317/1174784>`_ """ self._session = value self.sessionConfig() @property def pattern(self): return self.conf_storage.pattern @pattern.setter def pattern(self, val): self.conf_storage.pattern = val
[docs] def fetch(self, parallel=False, force=False, catchup=False): """main entry point for the feed fetch routines. this iterates through all feeds configured in the linked :class:`feed2exec.model.FeedConfStorage` that match the given ``pattern``, fetches the feeds and dispatches the parsing, which in turn dispatches the plugins. :param bool parallel: parse feeds in parallel, using :mod:`multiprocessing` :param bool force: force plugin execution even if entry was already seen. passed to :class:`feed2exec.feeds.parse` as is :param bool catchup: set the `catchup` flag on the feed, so that output plugins can avoid doing any permanent changes. """ logging.debug('looking for feeds %s in %s', self.pattern, self.conf_storage) if parallel: lock = multiprocessing.Lock() processes = None if isinstance(parallel, int): processes = parallel def init_global_lock(lock): """setup a global lock across pool threads this is necessary because Lock objects are not serializable so we can't pass them as arguments. An alternative pattern is to have a `Manager` process and use IPC for locking. cargo-culted from this `stackoverflow answer <https://stackoverflow.com/a/25558333/1174784>`_ """ global LOCK LOCK = lock pool = multiprocessing.Pool(processes=processes, initializer=init_global_lock, initargs=(lock,)) data_results = [] i = -1 for i, feed in enumerate(self.conf_storage): logging.debug('found feed in DB: %s', dict(feed)) # XXX: this is dirty. iterator/getters/??? should return # the right thing? or will that break an eventual editor? # maybe autocommit is a bad idea in the first place.. feed = Feed(feed['name'], feed) body = self.fetch_one(feed) if body is None: continue if catchup: feed['catchup'] = catchup if parallel: # if this fails silently, use plain apply() to see errors data_results.append((feed, pool.apply_async(feed.parse, (body,)))) else: global LOCK LOCK = None data = feed.parse(body) if data: self.dispatch(feed, data, None, force) if parallel: for feed, result in data_results: data = result.get() if data: self.dispatch(feed, data, lock, force) pool.close() pool.join() logging.info('%d feeds processed', i+1)
[docs] def fetch_one(self, feed): """fetch the feed content and return the body, in binary This will call :func:`logging.warning` for exceptions :class:`requests.exceptions.Timeout` and :class:`requests.exceptions.ConnectionError` as they are transient errors and the user may want to ignore those. Other exceptions raised from :mod:`requests.exceptions` (like TooManyRedirects or HTTPError but basically any other exception) may be a configuration error or a more permanent failure so will be signaled with :func:`logging.error`. this will return the body on success or None on failure and cached entries """ if feed.get('pause'): logging.info('feed %s is paused, skipping', feed['name']) return None logging.info('fetching feed %s', feed['url']) try: resp = self.session.get(feed['url']) if getattr(resp, 'from_cache', False): return None body = resp.content except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: # XXX: we should count those and warn after a few # occurrences logging.warning('timeout while fetching feed %s at %s: %s', feed['name'], feed['url'], e) return None except requests.exceptions.RequestException as e: logging.error('exception while fetching feed %s at %s: %s', feed['name'], feed['url'], e) return None return body
[docs] def dispatch(self, feed, data, lock=None, force=False): '''process parsed entries and execute plugins This handles locking, caching, and filter and output plugins. This calls the plugins configured in the ``feed`` (using :func:`feed2exec.plugins.output` and :func:`feed2exec.plugins.filter`). It also updates the cache with the found items if the ``output`` plugin succeeds (returns True) and if the ``filter`` plugin doesn't set the ``skip`` element in the feed item. :param object lock: a :class:`multiprocessing.Lock` object previously initialized. if None, the global `LOCK` variable will be used: this is used in the test suite to avoid having to pass locks all the way through the API. this lock is in turn passed to plugin calls. :param bool force: force plugin execution even if entry was already seen. passed to :class:`feed2exec.feeds.parse` as is ''' logging.debug('dispatching plugins for items parsed from %s', feed['name']) cache = FeedItemCacheStorage(self.db_path, feed=feed['name']) for item in data['entries']: feed.normalize(item=item) plugins.filter(feed=feed, item=item, session=self.session, lock=lock) if item.get('skip'): logging.info('item %s of feed %s filtered out', item.get('title'), feed.get('name')) continue guid = item['id'] if not force and guid in cache: logging.debug('item %s already seen', guid) else: logging.debug('new item %s <%s>', guid, item['link']) if plugins.output(feed, item, session=self.session, lock=lock) is not False and not force: # noqa if lock: lock.acquire() cache.add(guid) if lock: lock.release() return data
[docs] def opml_import(self, opmlfile): """import a file stream as an OPML feed in the feed storage""" folders = [] for (event, node) in etree.iterparse(opmlfile, ['start', 'end']): if node.tag != 'outline': continue logging.debug('found OPML entry: %s', node.attrib) if event == 'start' and node.attrib.get('xmlUrl'): folder = os.path.join(*folders) if folders else None title = node.attrib.get('title', utils.slug(node.attrib['xmlUrl'])) logging.info('importing element %s <%s> in folder %s', title, node.attrib['xmlUrl'], folder) if title in self.conf_storage: if folder: title = folder + '/' + title logging.info('feed %s exists, using folder name: %s', node.attrib['title'], title) if title in self.conf_storage: logging.error('feed %s already exists, skipped', node.attrib['title']) else: self.conf_storage.add(title, node.attrib['xmlUrl'], folder=folder) elif node.attrib.get('type') == 'folder': if event == 'start': logging.debug('found folder %s', node.attrib.get('text')) folders.append(node.attrib.get('text')) else: folders.pop()
def opml_export(self, path): xml_tmpl = u'''<opml version="1.0"> <head> <title>{title}</title> <dateModified>{date}</dateModified> </head> <body> {body}</body> </opml>''' outline_tmpl = u'<outline title="{name}" type="rss" xmlUrl="{url}" />' body = u'' for feed in self.conf_storage: if feed: body += outline_tmpl.format(**feed) + "\n" output = xml_tmpl.format(title=u'feed2exec RSS feeds', date=datetime.now(), body=body) path.write(output.encode('utf-8'))