Create a base class RedisPipeline, whichever Pipeline inherits it get a redis connection. Access to the redis conn with self.redis_server.
Dependency: scrapy_redis.
# defaults.py
REDIS_ENCODING = "utf-8"
REDIS_FAILED_URLS_KEY = "%(spidername)s:failed_urls"
REDIS_URLS_AS_SET = True# pipelines.py
import logging
from itemadapter import ItemAdapter
from scrapy_redis import connection
from . import defaults
class BasePipeline:
    def __init__(self, *args, **kwargs):
        pass
    @classmethod
    def from_crawler(cls, crawler, *args, **kwargs):
        obj = cls(*args, **kwargs)
        return obj
    def open_spider(self, spider):
        pass
    def close_spider(self, spider):
        pass
    def process_item(self, item, spider):
        pass
class RedisMixin:
    """Mixin class to implement reading urls from a redis queue.
    Attributes
    ----------
    redis_key : str (default: REDIS_START_URLS_KEY)
        Redis key where to fetch start URLs from..
    redis_encoding : str (default: REDIS_ENCODING)
        Encoding to use when decoding messages from redis queue.
    Settings
    --------
    REDIS_FAILED_URLS_KEY : str (default: "<spider.name>:failed_urls")
        Default Redis key where to fetch start URLs from..
    REDIS_URLS_AS_SET : bool (default: True)
        Use SET operations to add messages from the redis queue. If False,
        the messages are added using the RPUSH command.
    REDIS_ENCODING : str (default: "utf-8")
        Default encoding to use when decoding messages from redis queue.
    """
    redis_key = None
    redis_encoding = None
    redis_urls_as_set = None
    # Redis client placeholder.
    redis_server = None
    def setup_redis(self, crawler=None):
        """Setup redis connection and idle signal.
        This should be called after the spider has set its crawler object.
        """
        if self.redis_server is not None:
            return
        if crawler is None:
            # We allow optional crawler argument to keep backwards
            # compatibility.
            # XXX: Raise a deprecation warning.
            crawler = getattr(self, "crawler", None)
        if crawler is None:
            raise ValueError("crawler is required")
        settings = crawler.settings
        if self.redis_key is None:
            self.redis_key = settings.get(
                "REDIS_FAILED_URLS_KEY", defaults.REDIS_FAILED_URLS_KEY
            )
        if not self.redis_key.strip():
            raise ValueError("redis_key must not be empty")
        if self.redis_encoding is None:
            self.redis_encoding = settings.get(
                "REDIS_ENCODING", defaults.REDIS_ENCODING
            )
        self.redis_urls_as_set = settings.get(
            "REDIS_URLS_AS_SET", defaults.REDIS_URLS_AS_SET
        )
        self.redis_server = connection.from_settings(crawler.settings)
    def save_failed_urls(self, urls, spider):
        if not isinstance(urls, list):
            urls = [urls]
        use_set = self.redis_urls_as_set
        add_to_redis = self.redis_server.sadd if use_set else self.redis_server.rpush
        key = self.redis_key % {"spidername": spider.name}
        # XXX: Do we need to use a timeout here?
        r = add_to_redis(key, *urls)
        return r
class RedisPipeline(RedisMixin, BasePipeline):
    @classmethod
    def from_crawler(cls, crawler, *args, **kwargs):
        obj = super(RedisPipeline, cls).from_crawler(crawler, *args, **kwargs)
        obj.setup_redis(crawler)
        return obj
    def open_spider(self, spider):
        # Note: super() function must be called, in case the class is inherited
        super().open_spider(spider)
        self.redis_key = self.redis_key % ({"spidername": spider.name})
        # TODO: fix logging
        logging.info(
            "Saving failed URLs into redis key '%(redis_key)s' "
            "(encoding: %(redis_encoding)s)",
            self.__dict__,
        )
    def close_spider(self, spider):
        super().close_spider(spider)
        self.redis_server.close()
    def process_item(self, item, spider):
        try:
            item = self._process_item(item, spider)
        except Exception as e:
            adapter = ItemAdapter(item)
            url = adapter.get("detail_url")
            if url:
                spider.logger.info(
                    f"Process failed in {__class__.__name__}, saving to redis: {url}"
                )
                # TODO: save exce into redis key as well?
                self.save_failed_urls(url, spider)
                raise e  # important
        return item
    def _process_item(self, item, spider):
        raise NotImplementedErrorUsage
class MyPipeline(RedisPipeline):
    def process_item(self, item, spider):
        self.redis_server