Source code for feed2exec.feeds

#!/usr/bin/python3
# coding: utf-8

'''fast feed parser that offloads tasks to plugins and commands'''
# Copyright (C) 2016 Antoine Beaupré <anarcat@debian.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from __future__ import division, absolute_import
from __future__ import print_function


try:
    import configparser
except ImportError:
    # py2: should never happen as we depend on the newer one in setup.py
    import ConfigParser as configparser
import datetime
import time
from collections import OrderedDict, namedtuple
import logging
import multiprocessing
import os
import os.path


import feed2exec
import feed2exec.plugins as plugins
import feed2exec.utils as utils
import feedparser
import requests
import sqlite3


def default_config_dir():
    home_config = os.environ.get('XDG_CONFIG_HOME',
                                 os.path.join(os.environ.get('HOME'),
                                              '.config'))
    return os.path.join(home_config, feed2exec.__prog__)


[docs]def fetch(url): """fetch the given URL exceptions should be handled by the caller :todo: this should be moved to a plugin so it can be overridden, but so far I haven't found a use case for this. :param str url: the URL to fetch :return bytes, tuple: the body of the URL and the modification timestamp """ body = '' if url.startswith('file://'): filename = url[len('file://'):] logging.info('opening local file %s', filename) with open(filename, 'rb') as f: body = f.read().decode('utf-8') else: logging.info('fetching URL %s', url) body = requests.get(url).text return body
[docs]def parse(body, feed, lock=None, force=False): """parse the body of the feed this calls the filter and output plugins and updates the cache with the found items. :todo: this could be moved to a plugin, but then we'd need to take out the cache checking logic, which would remove most of the code here... :param bytes body: the body of the feed, as returned by :func:fetch :param dict feed: a feed object used to pass to plugins and debugging :return dict: the parsed data """ global LOCK if lock is None: lock = LOCK logging.info('parsing feed %s (%d bytes)', feed['url'], len(body)) data = feedparser.parse(body) # logging.debug('parsed structure %s', # json.dumps(data, indent=2, sort_keys=True, # default=safe_serial)) cache = FeedCacheStorage(feed=feed['name']) for entry in data['entries']: plugins.filter(feed, entry, lock=lock) # add more defaults to entry dates: # 1. created_parsed of the item # 2. updated_parsed of the feed # see https://github.com/kurtmckee/feedparser/issues/113 entry['updated_parsed'] = entry.get('updated_parsed', entry.get('created_parsed', data['feed'].get('updated_parsed', False))) # noqa assert entry.get('updated_parsed') is not None # workaround feedparser bug: # https://github.com/kurtmckee/feedparser/issues/112 guid = entry.get('id', entry.get('title')) if guid in cache and not force: logging.debug('entry %s already seen', guid) else: logging.debug('new entry %s <%s>', guid, entry['link']) if plugins.output(feed, entry, lock=lock) is not None and not force: # noqa cache.add(guid) return data
def _init_lock(l): """setup a global lock across pool threads this is necessary because Lock objects are not serializable so we can't pass them as arguments. An alternative pattern is to have a `Manager` process and use IPC for locking. cargo-culted from this `stackoverflow answer <https://stackoverflow.com/a/25558333/1174784>`_ """ global LOCK LOCK = l def fetch_feeds(pattern=None, parallel=False, force=False, catchup=False): logging.debug('looking for feeds %s', pattern) st = FeedStorage(pattern=pattern) if parallel: l = multiprocessing.Lock() processes = None if isinstance(parallel, int): processes = parallel pool = multiprocessing.Pool(processes=processes, initializer=_init_lock, initargs=(l,)) results = [] for feed in st: logging.debug('found feed in DB: %s', dict(feed)) if feed.get('pause'): logging.info('feed %s is paused, skipping', feed['name']) continue body = fetch(feed['url']) if catchup or feed.get('catchup'): logging.info('catching up on feed %s (output plugin disabled)', feed['name']) feed['output'] = None if parallel: # if this fails silently, use plain apply() to see errors results.append(pool.apply_async(parse, (body, dict(feed), None, force))) else: global LOCK LOCK = None parse(body=body, feed=dict(feed), force=force) if parallel: for result in results: result.get() pool.close() pool.join()
[docs]def safe_serial(obj): """JSON serializer for objects not serializable by default json code""" if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)): return obj.isoformat() elif isinstance(obj, time.struct_time): return time.strftime('%c') else: return str(obj)
class SqliteStorage(object): sql = None record = None conn = None path = None cache = {} def __init__(self): if self.path is None: logging.warning("storing feeds only in memory") self.path = ":memory:" else: utils.make_dirs_helper(os.path.dirname(self.path)) if self.path not in SqliteStorage.cache: logging.info('connecting to database at %s', self.path) conn = sqlite3.connect(self.path) try: conn.set_trace_callback(logging.debug) except AttributeError: logging.debug('no logging support in sqlite') SqliteStorage.cache[self.path] = conn self.conn = SqliteStorage.cache[self.path] if self.sql: self.conn.execute(self.sql) self.conn.commit() class ConfFeedStorage(configparser.RawConfigParser): path = os.path.join(default_config_dir(), 'feed2exec.ini') def __init__(self, pattern=None): self.pattern = pattern super(ConfFeedStorage, self).__init__(dict_type=OrderedDict) self.read(self.path) def add(self, name, url, output=None, args=None, filter=None, folder=None, mailbox=None): if self.has_section(name): raise AttributeError('key %s already exists' % name) d = OrderedDict() d['url'] = url if output is not None: d['output'] = output if args is not None: d['args'] = args if filter is not None: d['filter'] = filter if folder is not None: d['folder'] = folder if mailbox is not None: d['mailbox'] = mailbox self[name] = d self.commit() def remove(self, name): self.remove_section(name) self.commit() def commit(self): logging.info('saving feed configuration in %s', self.path) utils.make_dirs_helper(os.path.dirname(self.path)) with open(self.path, 'w') as configfile: self.write(configfile) def __iter__(self): for name in self.sections(): if self.pattern is None or self.pattern in name: d = dict(self[name]) d.update({'name': name}) yield d FeedStorage = ConfFeedStorage class FeedCacheStorage(SqliteStorage): sql = '''CREATE TABLE IF NOT EXISTS feedcache (name text, guid text, PRIMARY KEY (name, guid))''' record = namedtuple('record', 'name guid') def __init__(self, feed=None, guid=None): self.feed = feed if guid is None: self.guid = '%' else: self.guid = '%' + guid + '%' super(FeedCacheStorage, self).__init__() def add(self, guid): assert self.feed self.conn.execute("INSERT INTO feedcache VALUES (?, ?)", (self.feed, guid)) self.conn.commit() def remove(self, guid): assert self.feed self.conn.execute("DELETE FROM feedcache WHERE guid = ?", (guid,)) self.conn.commit() def __contains__(self, guid): if self.feed is None: pattern = '%' else: pattern = self.feed cur = self.conn.execute("""SELECT * FROM feedcache WHERE name LIKE ? AND guid=?""", (pattern, guid)) return cur.fetchone() is not None def __iter__(self): if self.feed is None: pattern = '%' else: pattern = self.feed cur = self.conn.cursor() cur.row_factory = sqlite3.Row return cur.execute("""SELECT * from feedcache WHERE name LIKE ? AND guid LIKE ?""", (pattern, self.guid))