Source code for server.api_v1.requests_to_models.request_parser

from ..requests_to_models.request import Request, GetRequest, CountRequest

import string
import copy
import re


[docs]class RequestParser: ALLOWED_CHARS = string.ascii_letters + string.digits + "/: !=><,-*" COMMA_SEPARATED_KEYS = {"fields", "order_by"} ALLOWED_KEYS = {'model', 'method', 'fields', 'filter', 'order_by', 'limit', 'offset'} MAXIMUM_KEY_LENGTH = 64 MAXIMUM_VALUE_LENGTH = 512 # TODO: move to settings MINIMUM_LIMIT_VALUE = 1 MAXIMUM_LIMIT_VALUE = 1024 def __init__(self, config): self.config = config self._validate_config()
[docs] def parse(self, request: dict): for key in request.keys(): request[key] = str(request[key]) self.validate_key(key) if key in self.COMMA_SEPARATED_KEYS: request[key] = self.comma_separated_field_to_list(request[key]) if key in {'limit', 'offset'}: try: request[key] = int(request[key]) except ValueError: raise ValidationError('Value of key "{}" should be integer'.format(key)) self.validate_value(key, request[key]) if 'limit' not in request: request['limit'] = self.MAXIMUM_LIMIT_VALUE return self.parse_dict(request)
[docs] def validate_value(self, key: str, value): if type(value) not in [str, int, list]: raise ValidationError('Value type should be string, integer or list') if type(value) in [str, list] and len(value) > self.MAXIMUM_VALUE_LENGTH: raise ValidationError( 'Some value is too big. Maximum allowed length is {}'.format(self.MAXIMUM_VALUE_LENGTH)) # validate list types if type(value) is list: for value_item in value: self.validate_value(key, value_item) return if key == 'order_by': self._validate_value_type(key, value, str) self._validate_value_regex(key, value, r'^-?[a-zA-Z][a-zA-Z0-9_]+$') elif key in {'model', 'method', 'fields'}: self._validate_value_type(key, value, str) self._validate_value_regex(key, value, r'^[a-zA-Z][a-zA-Z0-9_]+$') elif key in {'filter'}: self._validate_value_type(key, value, str) self._validate_value_regex(key, value, r'^[a-zA-Z0-9_]+$') elif key in {'limit', 'offset'}: self._validate_value_type(key, value, int) if value < 0: raise ValidationError('Value of key "{}" should be positive'.format(key)) if key == 'limit' and (value < self.MINIMUM_LIMIT_VALUE or value > self.MAXIMUM_LIMIT_VALUE): raise ValidationError( 'Value of key limit should be from {} to {} (inclusive)'.format( self.MINIMUM_LIMIT_VALUE, self.MAXIMUM_LIMIT_VALUE ) ) else: # It means I forget to add validation of field raise ValidationError('Server Error')
def _validate_value_regex(self, key, value, pattern): if not re.match(pattern, value): raise ValidationError("Value of key '{}' doesn't match to pattern {}".format(key, pattern)) def _validate_value_type(self, key, value, expected_type): if type(value) is not expected_type: raise ValidationError('Value of key "{}" should be {}'.format(key, expected_type))
[docs] def validate_key(self, key: str): if type(key) is not str: raise ValidationError("Key {} is not string".format(key)) if len(key) > self.MAXIMUM_KEY_LENGTH: raise ValidationError( 'Some key is too big. Maximum allowed length is {}'.format(key, self.MAXIMUM_KEY_LENGTH)) if not re.match(r'^[a-zA-Z][a-zA-Z0-9_]+$', key): raise ValidationError('Key "{}" doesn\'t match to pattern ^[a-zA-Z][a-zA-Z0-9_]+$'.format(key)) if key not in self.ALLOWED_KEYS: raise ValidationError('Key "{}" isn\'t allowed'.format(key))
[docs] def comma_separated_field_to_list(self, string_field): result = [] for val in string_field.split(','): val = val.strip() if val: result.append(val) return result
[docs] def parse_dict(self, req_dict): if 'model' not in req_dict: raise ParseError('You should specify "model"') if req_dict['model'] not in self.config: raise ParseError("Model \"{}\" doesn't exist or isn't allowed".format(req_dict['model'])) config = self.config[req_dict['model']] result_request = Request(config['model_class']) if 'method' not in req_dict: raise ParseError('You should specify "method"') method = req_dict['method'] if method not in config['methods']: raise ParseError("Method doesn't exist or isn't allowed") config = config['methods'][method] return { 'get': self.method_get, 'count': self.method_count, }[method](req_dict, config, result_request)
[docs] def method_get(self, req_dict, config, result_request): result_request = GetRequest.from_request(result_request) result_request.fields = self.parse_fields(req_dict, config) result_request.order_by = self.parse_order_by_fields(req_dict, config) if 'limit' in req_dict: result_request.limit = req_dict['limit'] if 'offset' in req_dict: result_request.offset = req_dict['offset'] return result_request
[docs] def method_count(self, req_dict, config, result_request): result_request = CountRequest.from_request(result_request) result_request.fields = self.parse_fields(req_dict, config) result_request.order_by = self.parse_order_by_fields(req_dict, config) if 'limit' in req_dict: result_request.limit = req_dict['limit'] if 'offset' in req_dict: result_request.offset = req_dict['offset'] return result_request
[docs] def parse_fields(self, req_dict, config): return self.parse_list(req_dict, config, "fields", "fields", config['fields'])
[docs] def parse_order_by_fields(self, req_dict, config): return self.parse_list(req_dict, config, "order_by", "order_by_fields", config['default_order_by_fields'])
[docs] def parse_list(self, req_dict, config, request_key, config_key, default_value): if request_key not in req_dict: return copy.copy(default_value) result = [] for field in req_dict[request_key]: if config_key == 'order_by_fields': if (field[1:] if field[0] == '-' else field) not in config[config_key]: raise ParseError("Field \"{}\" doesn't exist or isn't allowed1".format(field)) else: if field not in config[config_key]: raise ParseError("Field \"{}\" doesn't exist or isn't allowed".format(field)) result.append(field) return result
def _validate_config(self): # TODO: check fields for existence and so on if False: raise ConfigFormatError()
[docs]class ParseError(Exception): pass
[docs]class ValidationError(ParseError): pass
[docs]class ConfigFormatError(Exception): pass