Spaces:
Runtime error
Runtime error
| # SPDX-License-Identifier: MIT | |
| import inspect | |
| import platform | |
| import sys | |
| import threading | |
| import types | |
| import warnings | |
| from collections.abc import Mapping, Sequence # noqa | |
| from typing import _GenericAlias | |
| PYPY = platform.python_implementation() == "PyPy" | |
| PY_3_9_PLUS = sys.version_info[:2] >= (3, 9) | |
| PY310 = sys.version_info[:2] >= (3, 10) | |
| PY_3_12_PLUS = sys.version_info[:2] >= (3, 12) | |
| def just_warn(*args, **kw): | |
| warnings.warn( | |
| "Running interpreter doesn't sufficiently support code object " | |
| "introspection. Some features like bare super() or accessing " | |
| "__class__ will not work with slotted classes.", | |
| RuntimeWarning, | |
| stacklevel=2, | |
| ) | |
| class _AnnotationExtractor: | |
| """ | |
| Extract type annotations from a callable, returning None whenever there | |
| is none. | |
| """ | |
| __slots__ = ["sig"] | |
| def __init__(self, callable): | |
| try: | |
| self.sig = inspect.signature(callable) | |
| except (ValueError, TypeError): # inspect failed | |
| self.sig = None | |
| def get_first_param_type(self): | |
| """ | |
| Return the type annotation of the first argument if it's not empty. | |
| """ | |
| if not self.sig: | |
| return None | |
| params = list(self.sig.parameters.values()) | |
| if params and params[0].annotation is not inspect.Parameter.empty: | |
| return params[0].annotation | |
| return None | |
| def get_return_type(self): | |
| """ | |
| Return the return type if it's not empty. | |
| """ | |
| if self.sig and self.sig.return_annotation is not inspect.Signature.empty: | |
| return self.sig.return_annotation | |
| return None | |
| def make_set_closure_cell(): | |
| """Return a function of two arguments (cell, value) which sets | |
| the value stored in the closure cell `cell` to `value`. | |
| """ | |
| # pypy makes this easy. (It also supports the logic below, but | |
| # why not do the easy/fast thing?) | |
| if PYPY: | |
| def set_closure_cell(cell, value): | |
| cell.__setstate__((value,)) | |
| return set_closure_cell | |
| # Otherwise gotta do it the hard way. | |
| try: | |
| if sys.version_info >= (3, 8): | |
| def set_closure_cell(cell, value): | |
| cell.cell_contents = value | |
| else: | |
| # Create a function that will set its first cellvar to `value`. | |
| def set_first_cellvar_to(value): | |
| x = value | |
| return | |
| # This function will be eliminated as dead code, but | |
| # not before its reference to `x` forces `x` to be | |
| # represented as a closure cell rather than a local. | |
| def force_x_to_be_a_cell(): # pragma: no cover | |
| return x | |
| # Extract the code object and make sure our assumptions about | |
| # the closure behavior are correct. | |
| co = set_first_cellvar_to.__code__ | |
| if co.co_cellvars != ("x",) or co.co_freevars != (): | |
| raise AssertionError # pragma: no cover | |
| # Convert this code object to a code object that sets the | |
| # function's first _freevar_ (not cellvar) to the argument. | |
| args = [co.co_argcount] | |
| args.append(co.co_kwonlyargcount) | |
| args.extend( | |
| [ | |
| co.co_nlocals, | |
| co.co_stacksize, | |
| co.co_flags, | |
| co.co_code, | |
| co.co_consts, | |
| co.co_names, | |
| co.co_varnames, | |
| co.co_filename, | |
| co.co_name, | |
| co.co_firstlineno, | |
| co.co_lnotab, | |
| # These two arguments are reversed: | |
| co.co_cellvars, | |
| co.co_freevars, | |
| ] | |
| ) | |
| set_first_freevar_code = types.CodeType(*args) | |
| def set_closure_cell(cell, value): | |
| # Create a function using the set_first_freevar_code, | |
| # whose first closure cell is `cell`. Calling it will | |
| # change the value of that cell. | |
| setter = types.FunctionType( | |
| set_first_freevar_code, {}, "setter", (), (cell,) | |
| ) | |
| # And call it to set the cell. | |
| setter(value) | |
| # Make sure it works on this interpreter: | |
| def make_func_with_cell(): | |
| x = None | |
| def func(): | |
| return x # pragma: no cover | |
| return func | |
| cell = make_func_with_cell().__closure__[0] | |
| set_closure_cell(cell, 100) | |
| if cell.cell_contents != 100: | |
| raise AssertionError # pragma: no cover | |
| except Exception: | |
| return just_warn | |
| else: | |
| return set_closure_cell | |
| set_closure_cell = make_set_closure_cell() | |
| # Thread-local global to track attrs instances which are already being repr'd. | |
| # This is needed because there is no other (thread-safe) way to pass info | |
| # about the instances that are already being repr'd through the call stack | |
| # in order to ensure we don't perform infinite recursion. | |
| # | |
| # For instance, if an instance contains a dict which contains that instance, | |
| # we need to know that we're already repr'ing the outside instance from within | |
| # the dict's repr() call. | |
| # | |
| # This lives here rather than in _make.py so that the functions in _make.py | |
| # don't have a direct reference to the thread-local in their globals dict. | |
| # If they have such a reference, it breaks cloudpickle. | |
| repr_context = threading.local() | |
| def get_generic_base(cl): | |
| """If this is a generic class (A[str]), return the generic base for it.""" | |
| if cl.__class__ is _GenericAlias: | |
| return cl.__origin__ | |
| return None | |