Source code for scrapy.downloadermiddlewares.cookies

from __future__ import annotations

import logging
from collections import defaultdict
from typing import TYPE_CHECKING, Any

from tldextract import TLDExtract

from scrapy.exceptions import NotConfigured
from scrapy.http import Response
from scrapy.http.cookies import CookieJar
from scrapy.utils.httpobj import urlparse_cached
from scrapy.utils.python import to_unicode

if TYPE_CHECKING:
    from collections.abc import Iterable, Sequence
    from http.cookiejar import Cookie

    # typing.Self requires Python 3.11
    from typing_extensions import Self

    from scrapy import Request, Spider
    from scrapy.crawler import Crawler
    from scrapy.http.request import VerboseCookie


logger = logging.getLogger(__name__)


_split_domain = TLDExtract(include_psl_private_domains=True)
_UNSET = object()


def _is_public_domain(domain: str) -> bool:
    parts = _split_domain(domain)
    return not parts.domain


[docs]class CookiesMiddleware: """This middleware enables working with sites that need cookies""" def __init__(self, debug: bool = False): self.jars: defaultdict[Any, CookieJar] = defaultdict(CookieJar) self.debug: bool = debug @classmethod def from_crawler(cls, crawler: Crawler) -> Self: if not crawler.settings.getbool("COOKIES_ENABLED"): raise NotConfigured return cls(crawler.settings.getbool("COOKIES_DEBUG")) def _process_cookies( self, cookies: Iterable[Cookie], *, jar: CookieJar, request: Request ) -> None: for cookie in cookies: cookie_domain = cookie.domain if cookie_domain.startswith("."): cookie_domain = cookie_domain[1:] hostname = urlparse_cached(request).hostname assert hostname is not None request_domain = hostname.lower() if cookie_domain and _is_public_domain(cookie_domain): if cookie_domain != request_domain: continue cookie.domain = request_domain jar.set_cookie_if_ok(cookie, request) def process_request( self, request: Request, spider: Spider ) -> Request | Response | None: if request.meta.get("dont_merge_cookies", False): return None cookiejarkey = request.meta.get("cookiejar") jar = self.jars[cookiejarkey] cookies = self._get_request_cookies(jar, request) self._process_cookies(cookies, jar=jar, request=request) # set Cookie header request.headers.pop("Cookie", None) jar.add_cookie_header(request) self._debug_cookie(request, spider) return None def process_response( self, request: Request, response: Response, spider: Spider ) -> Request | Response: if request.meta.get("dont_merge_cookies", False): return response # extract cookies from Set-Cookie and drop invalid/expired cookies cookiejarkey = request.meta.get("cookiejar") jar = self.jars[cookiejarkey] cookies = jar.make_cookies(response, request) self._process_cookies(cookies, jar=jar, request=request) self._debug_set_cookie(response, spider) return response def _debug_cookie(self, request: Request, spider: Spider) -> None: if self.debug: cl = [ to_unicode(c, errors="replace") for c in request.headers.getlist("Cookie") ] if cl: cookies = "\n".join(f"Cookie: {c}\n" for c in cl) msg = f"Sending cookies to: {request}\n{cookies}" logger.debug(msg, extra={"spider": spider}) def _debug_set_cookie(self, response: Response, spider: Spider) -> None: if self.debug: cl = [ to_unicode(c, errors="replace") for c in response.headers.getlist("Set-Cookie") ] if cl: cookies = "\n".join(f"Set-Cookie: {c}\n" for c in cl) msg = f"Received cookies from: {response}\n{cookies}" logger.debug(msg, extra={"spider": spider}) def _format_cookie(self, cookie: VerboseCookie, request: Request) -> str | None: """ Given a dict consisting of cookie components, return its string representation. Decode from bytes if necessary. """ decoded = {} flags = set() for key in ("name", "value", "path", "domain"): if cookie.get(key) is None: if key in ("name", "value"): msg = f"Invalid cookie found in request {request}: {cookie} ('{key}' is missing)" logger.warning(msg) return None continue # https://github.com/python/mypy/issues/7178, https://github.com/python/mypy/issues/9168 if isinstance(cookie[key], (bool, float, int, str)): # type: ignore[literal-required] decoded[key] = str(cookie[key]) # type: ignore[literal-required] else: try: decoded[key] = cookie[key].decode("utf8") # type: ignore[literal-required] except UnicodeDecodeError: logger.warning( "Non UTF-8 encoded cookie found in request %s: %s", request, cookie, ) decoded[key] = cookie[key].decode("latin1", errors="replace") # type: ignore[literal-required] for flag in ("secure",): value = cookie.get(flag, _UNSET) if value is _UNSET or not value: continue flags.add(flag) cookie_str = f"{decoded.pop('name')}={decoded.pop('value')}" for key, value in decoded.items(): # path, domain cookie_str += f"; {key.capitalize()}={value}" for flag in flags: # secure cookie_str += f"; {flag.capitalize()}" return cookie_str def _get_request_cookies( self, jar: CookieJar, request: Request ) -> Sequence[Cookie]: """ Extract cookies from the Request.cookies attribute """ if not request.cookies: return [] cookies: Iterable[VerboseCookie] if isinstance(request.cookies, dict): cookies = tuple({"name": k, "value": v} for k, v in request.cookies.items()) else: cookies = request.cookies for cookie in cookies: cookie.setdefault("secure", urlparse_cached(request).scheme == "https") formatted = filter(None, (self._format_cookie(c, request) for c in cookies)) response = Response(request.url, headers={"Set-Cookie": formatted}) return jar.make_cookies(response, request)