Source code for scrapy.extensions.closespider

"""CloseSpider is an extension that forces spiders to be closed after certain
conditions are met.

See documentation in docs/topics/extensions.rst
"""

from __future__ import annotations

import logging
from collections import defaultdict
from typing import TYPE_CHECKING, Any

from scrapy import Request, Spider, signals
from scrapy.exceptions import NotConfigured

if TYPE_CHECKING:
    from twisted.python.failure import Failure

    # typing.Self requires Python 3.11
    from typing_extensions import Self

    from scrapy.crawler import Crawler
    from scrapy.http import Response


logger = logging.getLogger(__name__)


[docs]class CloseSpider: def __init__(self, crawler: Crawler): self.crawler: Crawler = crawler self.close_on: dict[str, Any] = { "timeout": crawler.settings.getfloat("CLOSESPIDER_TIMEOUT"), "itemcount": crawler.settings.getint("CLOSESPIDER_ITEMCOUNT"), "pagecount": crawler.settings.getint("CLOSESPIDER_PAGECOUNT"), "errorcount": crawler.settings.getint("CLOSESPIDER_ERRORCOUNT"), "timeout_no_item": crawler.settings.getint("CLOSESPIDER_TIMEOUT_NO_ITEM"), "pagecount_no_item": crawler.settings.getint( "CLOSESPIDER_PAGECOUNT_NO_ITEM" ), } if not any(self.close_on.values()): raise NotConfigured self.counter: defaultdict[str, int] = defaultdict(int) if self.close_on.get("errorcount"): crawler.signals.connect(self.error_count, signal=signals.spider_error) if self.close_on.get("pagecount") or self.close_on.get("pagecount_no_item"): crawler.signals.connect(self.page_count, signal=signals.response_received) if self.close_on.get("timeout"): crawler.signals.connect(self.spider_opened, signal=signals.spider_opened) if self.close_on.get("itemcount") or self.close_on.get("pagecount_no_item"): crawler.signals.connect(self.item_scraped, signal=signals.item_scraped) if self.close_on.get("timeout_no_item"): self.timeout_no_item: int = self.close_on["timeout_no_item"] self.items_in_period: int = 0 crawler.signals.connect( self.spider_opened_no_item, signal=signals.spider_opened ) crawler.signals.connect( self.item_scraped_no_item, signal=signals.item_scraped ) crawler.signals.connect(self.spider_closed, signal=signals.spider_closed) @classmethod def from_crawler(cls, crawler: Crawler) -> Self: return cls(crawler) def error_count(self, failure: Failure, response: Response, spider: Spider) -> None: self.counter["errorcount"] += 1 if self.counter["errorcount"] == self.close_on["errorcount"]: assert self.crawler.engine self.crawler.engine.close_spider(spider, "closespider_errorcount") def page_count(self, response: Response, request: Request, spider: Spider) -> None: self.counter["pagecount"] += 1 self.counter["pagecount_since_last_item"] += 1 if self.counter["pagecount"] == self.close_on["pagecount"]: assert self.crawler.engine self.crawler.engine.close_spider(spider, "closespider_pagecount") return if self.close_on["pagecount_no_item"] and ( self.counter["pagecount_since_last_item"] >= self.close_on["pagecount_no_item"] ): assert self.crawler.engine self.crawler.engine.close_spider(spider, "closespider_pagecount_no_item") def spider_opened(self, spider: Spider) -> None: from twisted.internet import reactor assert self.crawler.engine self.task = reactor.callLater( self.close_on["timeout"], self.crawler.engine.close_spider, spider, reason="closespider_timeout", ) def item_scraped(self, item: Any, spider: Spider) -> None: self.counter["itemcount"] += 1 self.counter["pagecount_since_last_item"] = 0 if self.counter["itemcount"] == self.close_on["itemcount"]: assert self.crawler.engine self.crawler.engine.close_spider(spider, "closespider_itemcount") def spider_closed(self, spider: Spider) -> None: task = getattr(self, "task", None) if task and task.active(): task.cancel() task_no_item = getattr(self, "task_no_item", None) if task_no_item and task_no_item.running: task_no_item.stop() def spider_opened_no_item(self, spider: Spider) -> None: from twisted.internet import task self.task_no_item = task.LoopingCall(self._count_items_produced, spider) self.task_no_item.start(self.timeout_no_item, now=False) logger.info( f"Spider will stop when no items are produced after " f"{self.timeout_no_item} seconds." ) def item_scraped_no_item(self, item: Any, spider: Spider) -> None: self.items_in_period += 1 def _count_items_produced(self, spider: Spider) -> None: if self.items_in_period >= 1: self.items_in_period = 0 else: logger.info( f"Closing spider since no items were produced in the last " f"{self.timeout_no_item} seconds." ) assert self.crawler.engine self.crawler.engine.close_spider(spider, "closespider_timeout_no_item")