Source code for drowsy.parser

"""
    drowsy.parser
    ~~~~~~~~~~~~~

    Functions for parsing query info from url parameters.

"""
# :copyright: (c) 2016-2020 by Nicholas Repole and contributors.
#             See AUTHORS for more details.
# :license: MIT - See LICENSE for more details.
from drowsy.utils import get_error_message
from drowsy.exc import (
    ParseError, FilterParseError, OffsetLimitParseError,
    MISSING_ERROR_MESSAGE)
from drowsy.log import Loggable
import json


[docs] class SortInfo(Loggable): """Used to transport info regarding sorts around."""
[docs] def __init__(self, attr=None, direction="ASC"): """Instantiates a SortInfo object. :param str attr: Name of the attr to be sorted on. :param str direction: Must be ``"ASC"`` or ``"DESC"``. :raise ValueError: If ``direction`` is not ``"ASC"``, ``"DESC"``, or ``None``. :raise TypeError: If any of the provided parameters are not of the specified type. """ if not isinstance(attr, str): raise TypeError("attr must be a string.") if not isinstance(direction, str): raise TypeError("direction must be a string.") if direction != "DESC" and direction != "ASC": raise ValueError("direction must be ASC or DESC.") self.attr = attr self.direction = direction
[docs] class OffsetLimitInfo(Loggable): """Used to transport info regarding offsets and limits around."""
[docs] def __init__(self, offset=None, limit=None): """Instantiates a OffsetLimitInfo object. :param offset: Offset to be applied. :type offset: int or None :param limit: Limit to be applied. :type limit: int or None :raise ValueError: If ``offset`` or ``limit`` is not a positive number or ``None``. :raise TypeError: If any of the provided parameters are not of the specified type. """ self._offset = None self._limit = None self.offset = offset self.limit = limit
@property def offset(self): """Get an offset value. :return: An offset value. :rtype: int or None """ return self._offset @offset.setter def offset(self, value): """Set an offset value. :param value: The value of an offset. If not None, must be a non negative int. :type value: int or None :raise TypeError: If a non int or None value is given. :raise ValueError: If the provided value is a negative integer. :return: None :rtype: None """ if not isinstance(value, int) and value is not None: raise TypeError("offset must be an int or None.") if value is not None and value < 0: raise ValueError("offset must be a positive value.") self._offset = value @property def limit(self): """Get a limit value. :return: An limit value. :rtype: int or None """ return self._limit @limit.setter def limit(self, value): """Set a limit value. :param value: The value of a limit. If not None, must be a non negative int. :type value: int or None :raise TypeError: If a non int or None value is given. :raise ValueError: If the provided value is a negative integer. :return: None :rtype: None """ if not isinstance(value, int) and value is not None: raise TypeError("limit must be an int or None.") if value is not None and value < 0: raise ValueError("limit must be a positive value.") self._limit = value
[docs] class SubfilterInfo(OffsetLimitInfo): """Object used to transport info regarding subqueries around."""
[docs] def __init__(self, offset=None, limit=None, filters=None, sorts=None): """Instantiates a SubfilterInfo object. :param offset: Offset to be applied. :type offset: int or None :param limit: Limit to be applied. :type limit: int or None :param filters: Filters to be applied. :type filters: dict or None :param sorts: Any sorts that are to be applied. :type sorts: list of SortInfo or None :raise ValueError: If ``offset`` or ``limit`` is not a positive number or ``None``. :raise TypeError: If any of the provided parameters are not of the specified type. """ self._filters = None self._sorts = None self.filters = filters self.sorts = sorts super(SubfilterInfo, self).__init__(offset, limit)
@property def filters(self): """Get the filters to be applied to a subresource. :return: A dictionary of filters for a subresource. :rtype: dict or None """ return self._filters @filters.setter def filters(self, value): """Set an offset value. :param value: Filters to be applied to the subresource. :type value: dict or None :raise TypeError: If a non dict or None value is given. :return: None :rtype: None """ if not isinstance(value, dict) and value is not None: raise TypeError("filters must be a dict or None.") self._filters = value @property def sorts(self): """Get the sorts to be applied to a subresource. :return: A collection of sorts for a subresource. :rtype: list or None """ return self._sorts @sorts.setter def sorts(self, value): """Set the sorts to be used for the subresource. :param value: Sorts to be applied to the subresource. :type value: dict or None :raise TypeError: If a non list or None value is given. :return: None :rtype: None """ if not isinstance(value, list) and value is not None: raise TypeError("sorts must be a list or None.") self._sorts = value
[docs] class QueryParamParser(Loggable): """Utility class used to parse query parameters.""" _default_error_messages = { "invalid_limit_value": ("The limit provided (%(limit)s) is not a " "non negative integer."), "invalid_sublimit_value": ("The limit (%(limit)s) provided for the " "subresource (%(subresource)s) is not a " "non negative integer."), "invalid_offset_value": ("The offset provided (%(offset)s) is not a " "non negative integer."), "invalid_suboffset_value": ("The offset (%(offset)s) provided for the " "subresource (%(subresource)s) is not a " "non negative integer."), "limit_too_high": ("The limit provided (%(limit)s) is greater than " "the max page size allowed (%(max_page_size)s)."), "invalid_page_value": ("The page value provided (%(page)s) is not a " "positive integer."), "page_no_max": "Page greater than 1 provided without a page max size.", "page_negative": "Page number can not be less than 1.", "invalid_subsorts_value": ("The sorts provided (%(sort)s) for the " "subresource (%(subresource)s) are not " "valid."), "invalid_complex_filters": ("The complex filters query value for " "%(qparam)s must be set to a valid json " "dict."), "invalid_subresource_path": ("The subresource path provided " "(%(subresource_path)s) is not valid.") }
[docs] def __init__(self, query_params=None, error_messages=None, context=None): """Sets up error messages, translations, and query params. :param query_params: Query params potentially containing filters, embeds, fields, and sorts. :type query_params: dict or None :param error_messages: Optional dictionary of error messages, useful if you want to override the default errors. :type error_messages: dict or None :param context: Optional dictionary of context information. If error messages should be translated, context should include a ``"gettext"`` key set to a callable that takes in a string and any kwargs and returns a translated string. :type context: dict, callable, or None """ self.query_params = query_params or {} self._context = context # Set up error messages messages = {} for cls in reversed(self.__class__.__mro__): messages.update(getattr(cls, '_default_error_messages', {})) messages.update(error_messages or {}) self.error_messages = messages
@property def context(self): """Return the context for this request. :rtype: dict, callable, or None """ if callable(self._context): return self._context() else: return self._context or {} @context.setter def context(self, val): """Set context to the provided value. :param val: Used to set the current context value. :type val: dict, callable, or None """ self._context = val
[docs] def make_error(self, key, **kwargs): """Return an exception based on the ``key`` provided. :param str key: Failure type, used to choose an error message. :param kwargs: Any additional arguments that may be used for generating an error message. :return: `FilterParseError`in cases where there was an issue parsing a filter. `OffsetLimitParseError` in cases where there was an issue parsing the offset, limit, or page value. `ParseError` raised in all other cases. """ offset_limit_parse_keys = { "invalid_limit_value", "limit_too_high", "invalid_offset_value", "invalid_page_value", "page_no_max", "page_negative", "invalid_sublimit_value", "invalid_suboffset_value"} if key in offset_limit_parse_keys: return OffsetLimitParseError( code=key, message=self._get_error_message(key, **kwargs), **kwargs) elif key == "invalid_complex_filters": return FilterParseError( code=key, message=self._get_error_message(key, **kwargs), **kwargs) else: return ParseError( code=key, message=self._get_error_message(key, **kwargs), **kwargs)
def _parse_sorts_helper(self, sorts): """Reusable code for parsing sorts from a string. :param sorts: A comma split string with attrnames and sort directions as + or -. If neither + or - is provided, ASC is assumed. As an example, sorts might look like: ``"+artist.name,-album.name"``. :return: A list of sorts to be applied to a result. :rtype: list of :class:`SortInfo` """ result = [] split_sorts = sorts.split(",") for sort in split_sorts: direction = "ASC" attr_name = sort if sort.startswith("-"): attr_name = sort[1:] direction = "DESC" result.append(SortInfo(attr=attr_name, direction=direction)) return result def _get_error_message(self, key, **kwargs): """Get an error message based on a key name. If the error message is a callable, kwargs are passed to that callable. Assuming the resulting error message is a string, if ``self.context`` includes a ``"gettext"`` callable, it will be passed that string along with any kwargs to potentially translate and fill in any template variables. :param str key: Key used to access the error messages dict. :param dict kwargs: Any additional arguments that may be passed to a callable error message, or used to translate and/or format an error message string. :raise AssertionError: When the ``self.error_message`` dict does not contain the provided ``key``. :return: An error message :rtype: str """ try: return get_error_message( error_messages=self.error_messages, key=key, gettext=self.context.get("gettext", None), **kwargs) except KeyError: class_name = self.__class__.__name__ msg = MISSING_ERROR_MESSAGE.format(class_name=class_name, key=key) raise AssertionError(msg)
[docs] def parse_fields(self, fields_query_name="fields"): """Parse from query params the fields to include in the result. :param str fields_query_name: The name of the key used to check for fields in the provided ``query_params``. :return: A list of fields to be included in the response. :rtype: list of str """ fields = self.query_params.get(fields_query_name) if fields: return fields.split(",") else: return []
[docs] def parse_embeds(self, embeds_query_name="embeds"): """Parse sub-resource embeds from query params. :param str embeds_query_name: The name of the key used to check for an embed in the provided ``query_params``. :return: A list of embeds to include in the response. :rtype: list of str """ embeds = self.query_params.get(embeds_query_name) if embeds: return embeds.split(",") else: return []
[docs] def parse_offset_limit(self, page_max_size=None, page_query_name="page", offset_query_name="offset", limit_query_name="limit", strict=True): """Parse offset and limit from the provided query params. :param page_max_size: If page is provided, ``page_max_size`` limits the number of results returned. Otherwise, if using limit and offset values from the ``query_params``, ``page_max_size`` sets a max number of records to allow. :type page_max_size: int or None :param str page_query_name: The name of the key used to check for a page value in the provided ``query_params``. If page is provided, it is used along with the ``page_max_size`` to determine the offset that should be applied to the query. If a page number other than 1 is provided, a ``page_max_size`` must also be provided. :param str offset_query_name: The name of the key used to check for an offset value in the provided ``query_params``. :param str limit_query_name: The name of the key used to check for a limit value in the provided ``query_params``. :param strict: If ``True``, exceptions will be raised for invalid input. Otherwise, invalid input will be ignored. :raise OffsetLimitParseError: Applicable if using strict mode only. If the provided limit is greater than page_max_size, or an invalid page, offset, or limit value is provided, then an :exc:`OffsetLimitParseError` is raised. :return: An offset and limit value for this query. :rtype: :class:`OffsetLimitInfo` """ # parse limit limit = page_max_size if limit_query_name is not None: if self.query_params.get(limit_query_name): try: limit = int(self.query_params.get(limit_query_name)) if limit < 0: raise ValueError except (ValueError, TypeError): if strict: raise self.make_error( key="invalid_limit_value", limit=self.query_params.get(limit_query_name)) # parse page page = self.query_params.get(page_query_name, None) if page is not None: try: page = int(page) if page < 1: raise ValueError except (ValueError, TypeError): page = None if strict: raise self.make_error( "invalid_page_value", page=self.query_params.get(page_query_name, None)) if page > 1 and page_max_size is None and limit is None: page = None if strict: raise self.make_error("page_no_max") # defaults offset = 0 if offset_query_name is not None: if self.query_params.get(offset_query_name): try: offset = int(self.query_params.get(offset_query_name)) if offset < 0: raise ValueError except (ValueError, TypeError): offset = 0 if strict: raise self.make_error( "invalid_offset_value", offset=self.query_params.get(offset_query_name)) if page_max_size and limit > page_max_size: # make sure an excessively high limit can't be set limit = page_max_size if strict: raise self.make_error( "limit_too_high", limit=self.query_params.get(limit_query_name, None), max_page_size=page_max_size) if page is not None and page > 1: if limit is not None and page_max_size is None: page_max_size = limit offset = (page - 1) * page_max_size return OffsetLimitInfo(limit=limit, offset=offset)
[docs] def parse_sorts(self, sort_query_name="sort"): """Parse sorts from provided the query params. :param str sort_query_name: The name of the key used to check for sorts in the provided ``query_params``. :return: The sorts that should be applied. :rtype: list of :class:`SortInfo` """ if sort_query_name in self.query_params: return self._parse_sorts_helper(self.query_params[sort_query_name]) return []
[docs] class ModelQueryParamParser(QueryParamParser): """Param parser with added ability to parse MQLAlchemy filters.""" def _parser_helper(self, parse_type, subqueries, key, key_parts, key_value, subkey_name, strict=True): """Used to help parse offset, limit, and sorts. The logic is overwhelmingly similar, so rather than repeat code, it's broken out into this helper function. :param str parse_type: Can be one of three values: ``"limit"``, ``"offset"``, or ``"sorts"``. :param dict subqueries: Holds :class:`SubfilterInfo` in a dict where each key is a resource path. :param str key: The name of this query parameter, for example: ``tracks.playlists._limit_`` :param list key_parts: The above key split into a list by ``.``. :param key_value: The value of the query parameter for the provided key. :param str subkey_name: The name of the portion of the key that we're searching for. By default, this can be ``"_limit_"``, ``"_offset_"``, or ``"_sorts_"``. :param bool strict: If ``True``, exceptions will be raised for invalid input. Otherwise, invalid input will be ignored. :raise OffsetLimitParseError: Raised in cases where there was an issue parsing an offset or limit if ``strict`` is ``True``. :raise ParseError: Raised if ``strict`` is ``True`` and the ``parse_type`` is ``"sorts"`` and there is an issue parsing. :return: None, but the subqueries parameter may be modified. :rtype: None """ subquery_path_parts = [] value = key_value try: if parse_type == "limit" or parse_type == "offset": value = int(key_value) elif parse_type == "sorts": value = self._parse_sorts_helper(key_value) except (ValueError, TypeError): if strict: code = "invalid_sub" + parse_type + "_value" kwargs = { parse_type: key_value, "subresource": key } raise self.make_error(code, **kwargs) return while key_parts: key_part = key_parts.pop(0) if not key_part == subkey_name: subquery_path_parts.append(key_part) else: if key_parts: if strict: raise self.make_error( "invalid_subresource_path", subresource_path=key) else: key_parts = [] else: subitem_path = ".".join(subquery_path_parts) if not isinstance( subqueries.get(subitem_path), SubfilterInfo): subqueries[subitem_path] = SubfilterInfo() setattr(subqueries[subitem_path], parse_type, value)
[docs] def parse_subfilters(self, subquery_name="_subquery_", sublimit_name="_limit_", suboffset_name="_offset_", subsorts_name="_sorts_", strict=True): """Parse nested resource subfilters, limits, offsets, and sorts. Note that subquery parsing does limited checking on the validity of the subquery itself. Given a query param "album.artist._subquery_.tracks.track_id" with value "5", the resulting subfilters returned would be: .. code-block:: python result = { "album.artist": SubfilterInfo( filters={ "$and": ["tracks.track_id": {"eq": 5}] } ) } :param str subquery_name: The name of the key used to check for a subquery value in the provided ``query_params``. :param str sublimit_name: The name of the key used to check for a sublimit value in the provided ``query_params``. :param str suboffset_name: The name of the key used to check for a suboffset value in the provided ``query_params``. :param str subsorts_name: The name of the key used to check for a subsorts value in the provided ``query_params``. :param bool strict: If ``True``, exceptions will be raised for invalid input. Otherwise, invalid input will be ignored. :raise FilterParseError: Malformed complex queries or invalid ``query_params`` will result in an :exc:`~drowsy.exc.FilterParseError` being raised if ``strict`` is ``True``. :raise OffsetLimitParseError: Raised in cases where there was an issue parsing an offset or limit if ``strict`` is ``True``. :raise ParseError: Raised if ``strict`` is ``True`` and there is an issue parsing the provided sorts for a subfilter. :return: A dictionary containing subqueries that can be passed to mqlalchemy for query filtering. :rtype: dict of str, SubfilterInfo """ subqueries = {} for key in self.query_params.keys(): value = self.query_params[key] key_parts = key.split(".") subquery_path_parts = [] sub_attr_path_parts = [] subitem_found = False subitem_path = None if key.find(subquery_name) > -1: # walk down the subquery to see how it ends while key_parts: key_part = key_parts.pop(0) if not key_part == subquery_name: if subitem_found: # Given key album.artist.subquery.tracks, # This portion of the code will eventually # produce ["tracks"] sub_attr_path_parts.append(key_part) else: # Given key album.artist.subquery.tracks, # This portion of the code will eventually # produce ["album", "artist"] subquery_path_parts.append(key_part) else: # This is officially a subquery # Create a subquery with a key equal # to the path leading up to this point. # Given key album.artist.subquery.tracks, # the resulting subquery key will be # "album.artist" subitem_path = ".".join(subquery_path_parts) if subqueries.get(subitem_path) is None: subqueries[subitem_path] = SubfilterInfo( filters={"$and": []} ) subitem_found = True # get an individual filter type object for the # subquery child key. Given query param # album.artist.$subquery.tracks.track_id = 5, # the result will be {"tracks.track_id": {"eq": 5}} item_filters = self._get_item_filter( attr_name=".".join(sub_attr_path_parts), value=value, strict=strict ) # returns in list form to enable multiple filters # for a single key if subitem_path: for item in item_filters: subqueries[subitem_path].filters["$and"].append(item) elif key.find(suboffset_name) > -1: self._parser_helper( parse_type="offset", subqueries=subqueries, key=key, key_parts=key_parts, key_value=value, subkey_name=suboffset_name, strict=strict ) elif key.find(sublimit_name) > -1: self._parser_helper( parse_type="limit", subqueries=subqueries, key=key, key_parts=key_parts, key_value=value, subkey_name=sublimit_name, strict=strict ) elif key.find(subsorts_name) > -1: self._parser_helper( parse_type="sorts", subqueries=subqueries, key=key, key_parts=key_parts, key_value=value, subkey_name=subsorts_name, strict=strict ) return subqueries
def _get_item_filter(self, attr_name, value, strict=True): """Parse query param into a set of filters as dictionaries. :param str attr_name: The name of the query param to parse. :param value: The value of that query param. :type value: str or list of str :param bool strict: If ``True``, exceptions will be raised for invalid input. Otherwise, invalid input will be ignored. :raise FilterParseError: Malformed complex queries or invalid ``query_params`` will result in an :exc:`~drowsy.exc.FilterParseError` being raised if ``strict`` is ``True``. :return: List of filters as dictionaries. :rtype: list of dict """ # how much to remove from end of key to get the attr_name. # default values: chop_len = 0 key = attr_name comparator = "$eq" if key.endswith("-gt"): chop_len = 3 comparator = "$gt" elif key.endswith("-gte"): chop_len = 4 comparator = "$gte" elif key.endswith("-eq"): chop_len = 3 comparator = "$eq" elif key.endswith("-lte"): chop_len = 4 comparator = "$lte" elif key.endswith("-lt"): chop_len = 3 comparator = "$lt" elif key.endswith("-ne"): chop_len = 3 comparator = "$ne" elif key.endswith("-like"): chop_len = 5 comparator = "$like" if chop_len != 0: attr_name = key[:(-1 * chop_len)] if not isinstance(value, list): value = [value] result = [] for item in value: if isinstance(item, str) and item.startswith("{"): try: query = json.loads(item) if attr_name: result.append( {attr_name: query}) else: result.append(query) except (TypeError, ValueError): if strict: raise self.make_error("invalid_complex_filters", qparam=attr_name) else: if attr_name: result.append( {attr_name: {comparator: item}}) else: raise self.make_error("invalid_complex_filters", qparam=attr_name) return result
[docs] def parse_filters(self, model_class, complex_query_name="query", only_parse_complex=False, convert_key_names_func=str, subquery_name="_subquery_", sublimit_name="_limit_", suboffset_name="_offset_", subsorts_name="_sorts_", strict=True): """Convert request params into MQLAlchemy friendly search. :param model_class: The SQLAlchemy class being queried. :param str complex_query_name: The name of the key used to check for a complex query value in the provided ``query_params``. Note that the complex query should be a json dumped dictionary value. :param bool only_parse_complex: Set to ``True`` if all simple filters in the query params should be ignored. :param convert_key_names_func: If provided, should take in a dot separated attr name and transform it such that the result is the corresponding dot separated attribute in the ``model_class`` being queried. Useful if, for example, you want to allow users to provide an attr name in one format (say camelCase) and convert it to the naming format used for your model objects (likely underscore). :type convert_key_names_func: callable or None :param subquery_name: Query param name used to trigger a subquery. Query params that include this name will be ignored. :type subquery_name: str or None :param sublimit_name: Query param name used to trigger a subquery resource limit. Query params that include this name will be ignored. :type sublimit_name: str or None :param suboffset_name: Query param name used to trigger a subquery resource offset. Query params that include this name will be ignored. :type suboffset_name: str or None :param subsorts_name: Query param name used to trigger a subquery sort. Query params that include this name will be ignored. :type subsorts_name: str or None :param bool strict: If ``True``, exceptions will be raised for invalid input. Otherwise, invalid input will be ignored. :raise FilterParseError: Malformed complex queries or invalid ``query_params`` will result in an :exc:`~drowsy.exc.FilterParseError` being raised if ``strict`` is ``True``. :return: A dictionary containing filters that can be passed to mqlalchemy for query filtering. :rtype: dict """ # use an $and query to enable multiple queries for the same # attribute. result = {"$and": []} for key in self.query_params.keys(): if (subquery_name in key or sublimit_name in key or suboffset_name in key or subsorts_name in key): continue if key == complex_query_name: complex_query_list = [] if isinstance(self.query_params[key], list): complex_query_list = self.query_params[key] else: complex_query_list.append(self.query_params[key]) for complex_query in complex_query_list: try: query = json.loads(complex_query) if not isinstance(query, dict): raise ValueError result["$and"].append(query) except (TypeError, ValueError): if strict: raise self.make_error("invalid_complex_filters", qparam=key) elif not only_parse_complex: # how much to remove from end of key to get the attr_name. # default values: value = self.query_params[key] item_filters = self._get_item_filter(attr_name=key, value=value) attr_name = list(item_filters[0].keys())[0] attr_check = None try: c_attr_name = convert_key_names_func(attr_name) if c_attr_name: attr_check = c_attr_name.split(".") if attr_check: attr_check = attr_check[0] except AttributeError: attr_check = None if attr_check and hasattr(model_class, attr_check): # ignore any top level invalid params for item in item_filters: result["$and"].append(item) if len(result["$and"]) == 0: return {} return result