Helen Koike | de13e3b | 2018-04-26 16:05:16 -0300 | [diff] [blame] | 1 | # Copyright 2015 Google Inc. All rights reserved. |
| 2 | # |
| 3 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | # you may not use this file except in compliance with the License. |
| 5 | # You may obtain a copy of the License at |
| 6 | # |
| 7 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | # |
| 9 | # Unless required by applicable law or agreed to in writing, software |
| 10 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | # See the License for the specific language governing permissions and |
| 13 | # limitations under the License. |
| 14 | |
| 15 | """Helper functions for commonly used utilities.""" |
| 16 | |
| 17 | import functools |
| 18 | import inspect |
| 19 | import logging |
Anthonios Partheniou | 9f7b410 | 2021-07-23 12:18:25 -0400 | [diff] [blame] | 20 | import urllib |
Helen Koike | de13e3b | 2018-04-26 16:05:16 -0300 | [diff] [blame] | 21 | |
| 22 | |
| 23 | logger = logging.getLogger(__name__) |
| 24 | |
Bu Sun Kim | 66bb32c | 2019-10-30 10:11:58 -0700 | [diff] [blame] | 25 | POSITIONAL_WARNING = "WARNING" |
| 26 | POSITIONAL_EXCEPTION = "EXCEPTION" |
| 27 | POSITIONAL_IGNORE = "IGNORE" |
| 28 | POSITIONAL_SET = frozenset( |
| 29 | [POSITIONAL_WARNING, POSITIONAL_EXCEPTION, POSITIONAL_IGNORE] |
| 30 | ) |
Helen Koike | de13e3b | 2018-04-26 16:05:16 -0300 | [diff] [blame] | 31 | |
| 32 | positional_parameters_enforcement = POSITIONAL_WARNING |
| 33 | |
Bu Sun Kim | 66bb32c | 2019-10-30 10:11:58 -0700 | [diff] [blame] | 34 | _SYM_LINK_MESSAGE = "File: {0}: Is a symbolic link." |
| 35 | _IS_DIR_MESSAGE = "{0}: Is a directory" |
| 36 | _MISSING_FILE_MESSAGE = "Cannot access {0}: No such file or directory" |
Helen Koike | de13e3b | 2018-04-26 16:05:16 -0300 | [diff] [blame] | 37 | |
| 38 | |
| 39 | def positional(max_positional_args): |
Pramod Kotipalli | f80be18 | 2020-05-18 10:38:05 -0700 | [diff] [blame] | 40 | """A decorator to declare that only the first N arguments may be positional. |
Helen Koike | de13e3b | 2018-04-26 16:05:16 -0300 | [diff] [blame] | 41 | |
| 42 | This decorator makes it easy to support Python 3 style keyword-only |
| 43 | parameters. For example, in Python 3 it is possible to write:: |
| 44 | |
| 45 | def fn(pos1, *, kwonly1=None, kwonly1=None): |
| 46 | ... |
| 47 | |
| 48 | All named parameters after ``*`` must be a keyword:: |
| 49 | |
| 50 | fn(10, 'kw1', 'kw2') # Raises exception. |
| 51 | fn(10, kwonly1='kw1') # Ok. |
| 52 | |
| 53 | Example |
| 54 | ^^^^^^^ |
| 55 | |
| 56 | To define a function like above, do:: |
| 57 | |
| 58 | @positional(1) |
| 59 | def fn(pos1, kwonly1=None, kwonly2=None): |
| 60 | ... |
| 61 | |
| 62 | If no default value is provided to a keyword argument, it becomes a |
| 63 | required keyword argument:: |
| 64 | |
| 65 | @positional(0) |
| 66 | def fn(required_kw): |
| 67 | ... |
| 68 | |
| 69 | This must be called with the keyword parameter:: |
| 70 | |
| 71 | fn() # Raises exception. |
| 72 | fn(10) # Raises exception. |
| 73 | fn(required_kw=10) # Ok. |
| 74 | |
| 75 | When defining instance or class methods always remember to account for |
| 76 | ``self`` and ``cls``:: |
| 77 | |
| 78 | class MyClass(object): |
| 79 | |
| 80 | @positional(2) |
| 81 | def my_method(self, pos1, kwonly1=None): |
| 82 | ... |
| 83 | |
| 84 | @classmethod |
| 85 | @positional(2) |
| 86 | def my_method(cls, pos1, kwonly1=None): |
| 87 | ... |
| 88 | |
| 89 | The positional decorator behavior is controlled by |
| 90 | ``_helpers.positional_parameters_enforcement``, which may be set to |
| 91 | ``POSITIONAL_EXCEPTION``, ``POSITIONAL_WARNING`` or |
| 92 | ``POSITIONAL_IGNORE`` to raise an exception, log a warning, or do |
| 93 | nothing, respectively, if a declaration is violated. |
| 94 | |
| 95 | Args: |
| 96 | max_positional_arguments: Maximum number of positional arguments. All |
| 97 | parameters after the this index must be |
| 98 | keyword only. |
| 99 | |
| 100 | Returns: |
| 101 | A decorator that prevents using arguments after max_positional_args |
| 102 | from being used as positional parameters. |
| 103 | |
| 104 | Raises: |
| 105 | TypeError: if a key-word only argument is provided as a positional |
| 106 | parameter, but only if |
| 107 | _helpers.positional_parameters_enforcement is set to |
| 108 | POSITIONAL_EXCEPTION. |
| 109 | """ |
| 110 | |
| 111 | def positional_decorator(wrapped): |
| 112 | @functools.wraps(wrapped) |
| 113 | def positional_wrapper(*args, **kwargs): |
| 114 | if len(args) > max_positional_args: |
Bu Sun Kim | 66bb32c | 2019-10-30 10:11:58 -0700 | [diff] [blame] | 115 | plural_s = "" |
Helen Koike | de13e3b | 2018-04-26 16:05:16 -0300 | [diff] [blame] | 116 | if max_positional_args != 1: |
Bu Sun Kim | 66bb32c | 2019-10-30 10:11:58 -0700 | [diff] [blame] | 117 | plural_s = "s" |
| 118 | message = ( |
| 119 | "{function}() takes at most {args_max} positional " |
| 120 | "argument{plural} ({args_given} given)".format( |
| 121 | function=wrapped.__name__, |
| 122 | args_max=max_positional_args, |
| 123 | args_given=len(args), |
| 124 | plural=plural_s, |
| 125 | ) |
| 126 | ) |
Helen Koike | de13e3b | 2018-04-26 16:05:16 -0300 | [diff] [blame] | 127 | if positional_parameters_enforcement == POSITIONAL_EXCEPTION: |
| 128 | raise TypeError(message) |
| 129 | elif positional_parameters_enforcement == POSITIONAL_WARNING: |
| 130 | logger.warning(message) |
| 131 | return wrapped(*args, **kwargs) |
Bu Sun Kim | 66bb32c | 2019-10-30 10:11:58 -0700 | [diff] [blame] | 132 | |
Helen Koike | de13e3b | 2018-04-26 16:05:16 -0300 | [diff] [blame] | 133 | return positional_wrapper |
| 134 | |
Anthonios Partheniou | 9f7b410 | 2021-07-23 12:18:25 -0400 | [diff] [blame] | 135 | if isinstance(max_positional_args, int): |
Helen Koike | de13e3b | 2018-04-26 16:05:16 -0300 | [diff] [blame] | 136 | return positional_decorator |
| 137 | else: |
| 138 | args, _, _, defaults = inspect.getargspec(max_positional_args) |
| 139 | return positional(len(args) - len(defaults))(max_positional_args) |
| 140 | |
| 141 | |
| 142 | def parse_unique_urlencoded(content): |
| 143 | """Parses unique key-value parameters from urlencoded content. |
| 144 | |
| 145 | Args: |
| 146 | content: string, URL-encoded key-value pairs. |
| 147 | |
| 148 | Returns: |
| 149 | dict, The key-value pairs from ``content``. |
| 150 | |
| 151 | Raises: |
| 152 | ValueError: if one of the keys is repeated. |
| 153 | """ |
| 154 | urlencoded_params = urllib.parse.parse_qs(content) |
| 155 | params = {} |
Anthonios Partheniou | 9f7b410 | 2021-07-23 12:18:25 -0400 | [diff] [blame] | 156 | for key, value in urlencoded_params.items(): |
Helen Koike | de13e3b | 2018-04-26 16:05:16 -0300 | [diff] [blame] | 157 | if len(value) != 1: |
Bu Sun Kim | 66bb32c | 2019-10-30 10:11:58 -0700 | [diff] [blame] | 158 | msg = "URL-encoded content contains a repeated value:" "%s -> %s" % ( |
| 159 | key, |
| 160 | ", ".join(value), |
| 161 | ) |
Helen Koike | de13e3b | 2018-04-26 16:05:16 -0300 | [diff] [blame] | 162 | raise ValueError(msg) |
| 163 | params[key] = value[0] |
| 164 | return params |
| 165 | |
| 166 | |
| 167 | def update_query_params(uri, params): |
| 168 | """Updates a URI with new query parameters. |
| 169 | |
| 170 | If a given key from ``params`` is repeated in the ``uri``, then |
| 171 | the URI will be considered invalid and an error will occur. |
| 172 | |
| 173 | If the URI is valid, then each value from ``params`` will |
| 174 | replace the corresponding value in the query parameters (if |
| 175 | it exists). |
| 176 | |
| 177 | Args: |
| 178 | uri: string, A valid URI, with potential existing query parameters. |
| 179 | params: dict, A dictionary of query parameters. |
| 180 | |
| 181 | Returns: |
| 182 | The same URI but with the new query parameters added. |
| 183 | """ |
| 184 | parts = urllib.parse.urlparse(uri) |
| 185 | query_params = parse_unique_urlencoded(parts.query) |
| 186 | query_params.update(params) |
| 187 | new_query = urllib.parse.urlencode(query_params) |
| 188 | new_parts = parts._replace(query=new_query) |
| 189 | return urllib.parse.urlunparse(new_parts) |
| 190 | |
| 191 | |
| 192 | def _add_query_parameter(url, name, value): |
| 193 | """Adds a query parameter to a url. |
| 194 | |
| 195 | Replaces the current value if it already exists in the URL. |
| 196 | |
| 197 | Args: |
| 198 | url: string, url to add the query parameter to. |
| 199 | name: string, query parameter name. |
| 200 | value: string, query parameter value. |
| 201 | |
| 202 | Returns: |
| 203 | Updated query parameter. Does not update the url if value is None. |
| 204 | """ |
| 205 | if value is None: |
| 206 | return url |
| 207 | else: |
| 208 | return update_query_params(url, {name: value}) |