"""
This module implements the TextResponse class which adds encoding handling and
discovering (through HTTP headers) to base Response class.
See documentation in docs/topics/request-response.rst
"""
from __future__ import annotations
import json
from contextlib import suppress
from typing import TYPE_CHECKING, Any, AnyStr, cast
from urllib.parse import urljoin
import parsel
from w3lib.encoding import (
html_body_declared_encoding,
html_to_unicode,
http_content_type_encoding,
read_bom,
resolve_encoding,
)
from w3lib.html import strip_html5_whitespace
from scrapy.http.response import Response
from scrapy.utils.python import memoizemethod_noargs, to_unicode
from scrapy.utils.response import get_base_url
if TYPE_CHECKING:
from collections.abc import Callable, Iterable, Mapping
from twisted.python.failure import Failure
from scrapy.http.request import CallbackT, CookiesT, Request
from scrapy.link import Link
from scrapy.selector import Selector, SelectorList
_NONE = object()
[docs]class TextResponse(Response):
_DEFAULT_ENCODING = "ascii"
_cached_decoded_json = _NONE
attributes: tuple[str, ...] = Response.attributes + ("encoding",)
def __init__(self, *args: Any, **kwargs: Any):
self._encoding: str | None = kwargs.pop("encoding", None)
self._cached_benc: str | None = None
self._cached_ubody: str | None = None
self._cached_selector: Selector | None = None
super().__init__(*args, **kwargs)
def _set_body(self, body: str | bytes | None) -> None:
self._body: bytes = b"" # used by encoding detection
if isinstance(body, str):
if self._encoding is None:
raise TypeError(
"Cannot convert unicode body - "
f"{type(self).__name__} has no encoding"
)
self._body = body.encode(self._encoding)
else:
super()._set_body(body)
@property
def encoding(self) -> str:
return self._declared_encoding() or self._body_inferred_encoding()
def _declared_encoding(self) -> str | None:
return (
self._encoding
or self._bom_encoding()
or self._headers_encoding()
or self._body_declared_encoding()
)
[docs] def json(self) -> Any:
"""
.. versionadded:: 2.2
Deserialize a JSON document to a Python object.
"""
if self._cached_decoded_json is _NONE:
self._cached_decoded_json = json.loads(self.body)
return self._cached_decoded_json
@property
def text(self) -> str:
"""Body as unicode"""
# access self.encoding before _cached_ubody to make sure
# _body_inferred_encoding is called
benc = self.encoding
if self._cached_ubody is None:
charset = f"charset={benc}"
self._cached_ubody = html_to_unicode(charset, self.body)[1]
return self._cached_ubody
[docs] def urljoin(self, url: str) -> str:
"""Join this Response's url with a possible relative url to form an
absolute interpretation of the latter."""
return urljoin(get_base_url(self), url)
@memoizemethod_noargs
def _headers_encoding(self) -> str | None:
content_type = cast(bytes, self.headers.get(b"Content-Type", b""))
return http_content_type_encoding(to_unicode(content_type, encoding="latin-1"))
def _body_inferred_encoding(self) -> str:
if self._cached_benc is None:
content_type = to_unicode(
cast(bytes, self.headers.get(b"Content-Type", b"")), encoding="latin-1"
)
benc, ubody = html_to_unicode(
content_type,
self.body,
auto_detect_fun=self._auto_detect_fun,
default_encoding=self._DEFAULT_ENCODING,
)
self._cached_benc = benc
self._cached_ubody = ubody
return self._cached_benc
def _auto_detect_fun(self, text: bytes) -> str | None:
for enc in (self._DEFAULT_ENCODING, "utf-8", "cp1252"):
try:
text.decode(enc)
except UnicodeError:
continue
return resolve_encoding(enc)
return None
@memoizemethod_noargs
def _body_declared_encoding(self) -> str | None:
return html_body_declared_encoding(self.body)
@memoizemethod_noargs
def _bom_encoding(self) -> str | None:
return read_bom(self.body)[0]
@property
def selector(self) -> Selector:
from scrapy.selector import Selector
if self._cached_selector is None:
self._cached_selector = Selector(self)
return self._cached_selector
[docs] def jmespath(self, query: str, **kwargs: Any) -> SelectorList:
from scrapy.selector import SelectorList
if not hasattr(self.selector, "jmespath"):
raise AttributeError(
"Please install parsel >= 1.8.1 to get jmespath support"
)
return cast(SelectorList, self.selector.jmespath(query, **kwargs))
[docs] def xpath(self, query: str, **kwargs: Any) -> SelectorList:
from scrapy.selector import SelectorList
return cast(SelectorList, self.selector.xpath(query, **kwargs))
[docs] def css(self, query: str) -> SelectorList:
from scrapy.selector import SelectorList
return cast(SelectorList, self.selector.css(query))
[docs] def follow(
self,
url: str | Link | parsel.Selector,
callback: CallbackT | None = None,
method: str = "GET",
headers: Mapping[AnyStr, Any] | Iterable[tuple[AnyStr, Any]] | None = None,
body: bytes | str | None = None,
cookies: CookiesT | None = None,
meta: dict[str, Any] | None = None,
encoding: str | None = None,
priority: int = 0,
dont_filter: bool = False,
errback: Callable[[Failure], Any] | None = None,
cb_kwargs: dict[str, Any] | None = None,
flags: list[str] | None = None,
) -> Request:
"""
Return a :class:`~.Request` instance to follow a link ``url``.
It accepts the same arguments as ``Request.__init__`` method,
but ``url`` can be not only an absolute URL, but also
* a relative URL
* a :class:`~scrapy.link.Link` object, e.g. the result of
:ref:`topics-link-extractors`
* a :class:`~scrapy.selector.Selector` object for a ``<link>`` or ``<a>`` element, e.g.
``response.css('a.my_link')[0]``
* an attribute :class:`~scrapy.selector.Selector` (not SelectorList), e.g.
``response.css('a::attr(href)')[0]`` or
``response.xpath('//img/@src')[0]``
See :ref:`response-follow-example` for usage examples.
"""
if isinstance(url, parsel.Selector):
url = _url_from_selector(url)
elif isinstance(url, parsel.SelectorList):
raise ValueError("SelectorList is not supported")
encoding = self.encoding if encoding is None else encoding
return super().follow(
url=url,
callback=callback,
method=method,
headers=headers,
body=body,
cookies=cookies,
meta=meta,
encoding=encoding,
priority=priority,
dont_filter=dont_filter,
errback=errback,
cb_kwargs=cb_kwargs,
flags=flags,
)
[docs] def follow_all(
self,
urls: Iterable[str | Link] | parsel.SelectorList | None = None,
callback: CallbackT | None = None,
method: str = "GET",
headers: Mapping[AnyStr, Any] | Iterable[tuple[AnyStr, Any]] | None = None,
body: bytes | str | None = None,
cookies: CookiesT | None = None,
meta: dict[str, Any] | None = None,
encoding: str | None = None,
priority: int = 0,
dont_filter: bool = False,
errback: Callable[[Failure], Any] | None = None,
cb_kwargs: dict[str, Any] | None = None,
flags: list[str] | None = None,
css: str | None = None,
xpath: str | None = None,
) -> Iterable[Request]:
"""
A generator that produces :class:`~.Request` instances to follow all
links in ``urls``. It accepts the same arguments as the :class:`~.Request`'s
``__init__`` method, except that each ``urls`` element does not need to be
an absolute URL, it can be any of the following:
* a relative URL
* a :class:`~scrapy.link.Link` object, e.g. the result of
:ref:`topics-link-extractors`
* a :class:`~scrapy.selector.Selector` object for a ``<link>`` or ``<a>`` element, e.g.
``response.css('a.my_link')[0]``
* an attribute :class:`~scrapy.selector.Selector` (not SelectorList), e.g.
``response.css('a::attr(href)')[0]`` or
``response.xpath('//img/@src')[0]``
In addition, ``css`` and ``xpath`` arguments are accepted to perform the link extraction
within the ``follow_all`` method (only one of ``urls``, ``css`` and ``xpath`` is accepted).
Note that when passing a ``SelectorList`` as argument for the ``urls`` parameter or
using the ``css`` or ``xpath`` parameters, this method will not produce requests for
selectors from which links cannot be obtained (for instance, anchor tags without an
``href`` attribute)
"""
arguments = [x for x in (urls, css, xpath) if x is not None]
if len(arguments) != 1:
raise ValueError(
"Please supply exactly one of the following arguments: urls, css, xpath"
)
if not urls:
if css:
urls = self.css(css)
if xpath:
urls = self.xpath(xpath)
if isinstance(urls, parsel.SelectorList):
selectors = urls
urls = []
for sel in selectors:
with suppress(_InvalidSelector):
urls.append(_url_from_selector(sel))
return super().follow_all(
urls=cast("Iterable[str | Link]", urls),
callback=callback,
method=method,
headers=headers,
body=body,
cookies=cookies,
meta=meta,
encoding=encoding,
priority=priority,
dont_filter=dont_filter,
errback=errback,
cb_kwargs=cb_kwargs,
flags=flags,
)
class _InvalidSelector(ValueError):
"""
Raised when a URL cannot be obtained from a Selector
"""
def _url_from_selector(sel: parsel.Selector) -> str:
if isinstance(sel.root, str):
# e.g. ::attr(href) result
return strip_html5_whitespace(sel.root)
if not hasattr(sel.root, "tag"):
raise _InvalidSelector(f"Unsupported selector: {sel}")
if sel.root.tag not in ("a", "link"):
raise _InvalidSelector(
f"Only <a> and <link> elements are supported; got <{sel.root.tag}>"
)
href = sel.root.get("href")
if href is None:
raise _InvalidSelector(f"<{sel.root.tag}> element has no href attribute: {sel}")
return strip_html5_whitespace(href)