diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 8fca81a..d918edd 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,7 +12,7 @@ jobs: runs-on: ubuntu-20.04 strategy: matrix: - python-version: [3.5, 3.6, 3.7, 3.8, 3.9, '3.10', 3.11, 3.12-dev, pypy-3.6] + python-version: [3.8, 3.9, '3.10', 3.11, 3.12] steps: - uses: actions/checkout@v2 @@ -31,7 +31,7 @@ jobs: pip install mypy==0.910 python -m mypy executing --exclude=executing/_position_node_finder.py # fromJson because https://github.community/t/passing-an-array-literal-to-contains-function-causes-syntax-error/17213/3 - if: ${{ !contains(fromJson('["pypy-3.6", "3.11","3.12-dev"]'), matrix.python-version) }} + if: ${{ !contains(fromJson('["pypy-3.6", "3.11","3.12"]'), matrix.python-version) }} # pypy < 3.8 very doesn't work - name: Mypy testing (3.11) run: | diff --git a/executing/_position_node_finder.py b/executing/_position_node_finder.py index 8ca21a6..61e5684 100644 --- a/executing/_position_node_finder.py +++ b/executing/_position_node_finder.py @@ -5,6 +5,7 @@ from typing import Any, Callable, Iterator, Optional, Sequence, Set, Tuple, Type, Union, cast from .executing import EnhancedAST, NotOneValueFound, Source, only, function_node_types, assert_ from ._exceptions import KnownIssue, VerifierFailure +from ._utils import mangled_name from functools import lru_cache @@ -25,51 +26,6 @@ def node_and_parents(node: EnhancedAST) -> Iterator[EnhancedAST]: yield from parents(node) -def mangled_name(node: EnhancedAST) -> str: - """ - - Parameters: - node: the node which should be mangled - name: the name of the node - - Returns: - The mangled name of `node` - """ - if isinstance(node, ast.Attribute): - name = node.attr - elif isinstance(node, ast.Name): - name = node.id - elif isinstance(node, (ast.alias)): - name = node.asname or node.name.split(".")[0] - elif isinstance(node, (ast.FunctionDef, ast.ClassDef, ast.AsyncFunctionDef)): - name = node.name - elif isinstance(node, ast.ExceptHandler): - assert node.name - name = node.name - elif sys.version_info >= (3,12) and isinstance(node,ast.TypeVar): - name=node.name - else: - raise TypeError("no node to mangle for type "+repr(type(node))) - - if name.startswith("__") and not name.endswith("__"): - - parent,child=node.parent,node - - while not (isinstance(parent,ast.ClassDef) and child not in parent.bases): - if not hasattr(parent,"parent"): - break # pragma: no mutate - - parent,child=parent.parent,parent - else: - class_name=parent.name.lstrip("_") - if class_name!="": - return "_" + class_name + name - - - - return name - - @lru_cache(128) # pragma: no mutate def get_instructions(code: CodeType) -> list[dis.Instruction]: return list(dis.get_instructions(code, show_caches=True)) diff --git a/executing/_types.py b/executing/_types.py new file mode 100644 index 0000000..e69de29 diff --git a/executing/_utils.py b/executing/_utils.py new file mode 100644 index 0000000..59ac7f5 --- /dev/null +++ b/executing/_utils.py @@ -0,0 +1,98 @@ + +import ast +import sys +import dis +from typing import cast, Any,Iterator +import types + + +def assert_(condition, message=""): + # type: (Any, str) -> None + """ + Like an assert statement, but unaffected by -O + :param condition: value that is expected to be truthy + :type message: Any + """ + if not condition: + raise AssertionError(str(message)) + + +# noinspection PyUnresolvedReferences +_get_instructions = dis.get_instructions +from dis import Instruction as _Instruction + +class Instruction(_Instruction): + lineno = None # type: int + +def get_instructions(co): + # type: (types.CodeType) -> Iterator[EnhancedInstruction] + lineno = co.co_firstlineno + for inst in _get_instructions(co): + inst = cast(EnhancedInstruction, inst) + lineno = inst.starts_line or lineno + assert_(lineno) + inst.lineno = lineno + yield inst + + +# Type class used to expand out the definition of AST to include fields added by this library +# It's not actually used for anything other than type checking though! +class EnhancedAST(ast.AST): + parent = None # type: EnhancedAST + +# Type class used to expand out the definition of AST to include fields added by this library +# It's not actually used for anything other than type checking though! +class EnhancedInstruction(Instruction): + _copied = None # type: bool + + + + + +def mangled_name(node): + # type: (EnhancedAST) -> str + """ + + Parameters: + node: the node which should be mangled + name: the name of the node + + Returns: + The mangled name of `node` + """ + + function_class_types =( ast.FunctionDef, ast.ClassDef,ast.AsyncFunctionDef ) + + if isinstance(node, ast.Attribute): + name = node.attr + elif isinstance(node, ast.Name): + name = node.id + elif isinstance(node, (ast.alias)): + name = node.asname or node.name.split(".")[0] + elif isinstance(node, function_class_types): + name = node.name + elif isinstance(node, ast.ExceptHandler): + assert node.name + name = node.name + elif sys.version_info >= (3,12) and isinstance(node,ast.TypeVar): + name=node.name + else: + raise TypeError("no node to mangle") + + if name.startswith("__") and not name.endswith("__"): + + parent,child=node.parent,node + + while not (isinstance(parent,ast.ClassDef) and child not in parent.bases): + if not hasattr(parent,"parent"): + break # pragma: no mutate + + parent,child=parent.parent,parent + else: + class_name=parent.name.lstrip("_") + if class_name!="" and child not in parent.decorator_list: + return "_" + class_name + name + + + + return name diff --git a/executing/executing.py b/executing/executing.py index 7727c42..9a0579a 100644 --- a/executing/executing.py +++ b/executing/executing.py @@ -40,8 +40,9 @@ from pathlib import Path from threading import RLock from tokenize import detect_encoding -from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, Iterator, List, Optional, Sequence, Set, Sized, Tuple, \ - Type, TypeVar, Union, cast +from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, Iterator, List, Optional, Sequence, Set, Sized, Tuple, Type, TypeVar, Union, cast +from ._utils import mangled_name,assert_, EnhancedAST,EnhancedInstruction,Instruction,get_instructions +from ._exceptions import KnownIssue if TYPE_CHECKING: # pragma: no cover from asttokens import ASTTokens, ASTText @@ -52,51 +53,11 @@ cache = lru_cache(maxsize=None) -# Type class used to expand out the definition of AST to include fields added by this library -# It's not actually used for anything other than type checking though! -class EnhancedAST(ast.AST): - parent = None # type: EnhancedAST - - -class Instruction(dis.Instruction): - lineno = None # type: int - - -# Type class used to expand out the definition of AST to include fields added by this library -# It's not actually used for anything other than type checking though! -class EnhancedInstruction(Instruction): - _copied = None # type: bool - - - -def assert_(condition, message=""): - # type: (Any, str) -> None - """ - Like an assert statement, but unaffected by -O - :param condition: value that is expected to be truthy - :type message: Any - """ - if not condition: - raise AssertionError(str(message)) - - -def get_instructions(co): - # type: (types.CodeType) -> Iterator[EnhancedInstruction] - lineno = co.co_firstlineno - for inst in dis.get_instructions(co): - inst = cast(EnhancedInstruction, inst) - lineno = inst.starts_line or lineno - assert_(lineno) - inst.lineno = lineno - yield inst - - TESTING = 0 - class NotOneValueFound(Exception): def __init__(self,msg,values=[]): - # type: (str, Sequence) -> None + # type: (str, Sized) -> None self.values=values super(NotOneValueFound,self).__init__(msg) @@ -107,11 +68,15 @@ def only(it): # type: (Iterable[T]) -> T if isinstance(it, Sized): if len(it) != 1: - raise NotOneValueFound('Expected one value, found %s' % len(it)) + raise NotOneValueFound('Expected one value, found %s' % len(it),it) # noinspection PyTypeChecker return list(it)[0] - lst = tuple(islice(it, 2)) + if TESTING: + lst=tuple(it) + else: + lst = tuple(islice(it, 2)) + if len(lst) == 0: raise NotOneValueFound('Expected one value, found 0') if len(lst) > 1: @@ -582,11 +547,12 @@ def __init__(self, frame, stmts, tree, lasti, source): elif op_name in ('LOAD_ATTR', 'LOAD_METHOD', 'LOOKUP_METHOD'): typ = ast.Attribute ctx = ast.Load - extra_filter = lambda e: attr_names_match(e.attr, instruction.argval) + extra_filter = lambda e:mangled_name(e) == instruction.argval elif op_name in ('LOAD_NAME', 'LOAD_GLOBAL', 'LOAD_FAST', 'LOAD_DEREF', 'LOAD_CLASSDEREF'): typ = ast.Name ctx = ast.Load - extra_filter = lambda e: e.id == instruction.argval + if sys.version_info[0] == 3 or instruction.argval: + extra_filter =lambda e:mangled_name(e) == instruction.argval elif op_name in ('COMPARE_OP', 'IS_OP', 'CONTAINS_OP'): typ = ast.Compare extra_filter = lambda e: len(e.ops) == 1 @@ -596,9 +562,10 @@ def __init__(self, frame, stmts, tree, lasti, source): elif op_name.startswith('STORE_ATTR'): ctx = ast.Store typ = ast.Attribute - extra_filter = lambda e: attr_names_match(e.attr, instruction.argval) + extra_filter = lambda e:mangled_name(e) == instruction.argval else: - raise RuntimeError(op_name) + raise KnownIssue("can not map "+op_name) + with lock: exprs = { @@ -611,6 +578,7 @@ def __init__(self, frame, stmts, tree, lasti, source): if statement_containing_node(node) == stmt } + if ctx == ast.Store: # No special bytecode tricks here. # We can handle multiple assigned attributes with different names, @@ -674,14 +642,12 @@ def get_original_clean_instructions(self): # inserts JUMP_IF_NOT_DEBUG instructions in bytecode # If they're not present in our compiled instructions, # ignore them in the original bytecode - if not any( + if any(inst.opname == "JUMP_IF_NOT_DEBUG" for inst in result): + if not any( inst.opname == "JUMP_IF_NOT_DEBUG" for inst in self.compile_instructions() - ): - result = [ - inst for inst in result - if inst.opname != "JUMP_IF_NOT_DEBUG" - ] + ): + result = [inst for inst in result if inst.opname != "JUMP_IF_NOT_DEBUG"] return result @@ -1127,19 +1093,6 @@ def find_node_ipython(frame, lasti, stmts, source): return decorator, node -def attr_names_match(attr, argval): - # type: (str, str) -> bool - """ - Checks that the user-visible attr (from ast) can correspond to - the argval in the bytecode, i.e. the real attribute fetched internally, - which may be mangled for private attributes. - """ - if attr == argval: - return True - if not attr.startswith("__"): - return False - return bool(re.match(r"^_\w+%s$" % attr, argval)) - def node_linenos(node): # type: (ast.AST) -> Iterator[int] diff --git a/setup.cfg b/setup.cfg index fdf901f..ed446d0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -11,9 +11,6 @@ classifiers = License :: OSI Approved :: MIT License Programming Language :: Python Programming Language :: Python :: 3 - Programming Language :: Python :: 3.5 - Programming Language :: Python :: 3.6 - Programming Language :: Python :: 3.7 Programming Language :: Python :: 3.8 Programming Language :: Python :: 3.9 Programming Language :: Python :: 3.10 @@ -25,7 +22,7 @@ packages = executing zip_safe = False include_package_data = True setup_requires = setuptools; setuptools_scm[toml] -python_requires = >=3.5 +python_requires = >=3.8 [options.extras_require] tests= diff --git a/tests/analyse.py b/tests/analyse.py index 865e951..7a19d7f 100644 --- a/tests/analyse.py +++ b/tests/analyse.py @@ -2,9 +2,10 @@ import pathlib import dis import types -import inspect from executing import Source +from executing._exceptions import KnownIssue,VerifierFailure +from executing import NotOneValueFound import executing executing.executing.TESTING = 1 @@ -12,131 +13,245 @@ from rich import print as rprint import rich - -class Frame: - pass - - -if len(sys.argv) <= 1 or sys.argv[1] in ("--help", "-h"): - print( - """ -analyse.py [line | first_line:last_line] - -Analyses a range in the given source in the specified range -and maps every bytecode to the node found by executing. -""" - ) - sys.exit(0) - -filename = pathlib.Path(sys.argv[1]) - -if ":" in sys.argv[2]: - start, end = sys.argv[2].split(":") - start = int(start) - end = int(end) -else: - start = end = int(sys.argv[2]) - - -code = filename.read_text() - - -def inspect_opcode(bytecode, index, lineno): - frame = Frame() - frame.f_lasti = index - frame.f_code = bytecode - frame.f_globals = globals() - frame.f_lineno = lineno - source = Source.for_frame(frame) - - try: - ex = Source.executing(frame) - except RuntimeError: - raise - except Exception as e: - return "[red]" + type(e).__name__ + ": " + str(e).split("\n")[0] - - result = "[green]" + type(ex.node).__name__ - if hasattr(ex.node, "name"): - result += "(" + str(ex.node.name) + ")" - elif hasattr(ex.node, "id"): - result += "(" + ex.node.id + ")" - elif hasattr(ex.node, "attr"): - result += "(." + ex.node.attr + ")" - elif hasattr(ex.node, "value"): - result += f"({ex.node.value})" - - if ex.decorator: - result += " @%s" % ex.decorator - return result - - +import traceback import rich.syntax from rich.console import Console from rich.table import Table, Column from rich.highlighter import ReprHighlighter -console = Console() +import ast +from deadcode import is_deadcode -console.print( - rich.syntax.Syntax(code, "python", line_numbers=True, line_range=(start, end)) -) +from cProfile import Profile -print("all bytecodes in this range:") +class Frame: + pass -bc = compile(code, filename, "exec") +class App: + def inspect_opcode(self, bytecode, index, lineno): + frame = Frame() + frame.f_lasti = index + frame.f_code = bytecode + frame.f_globals = globals() + frame.f_lineno = lineno + + self.profile.enable() + source = Source.for_frame(frame) + + try: + ex = Source.executing(frame) + result = "[green]" + type(ex.node).__name__ + if hasattr(ex.node, "name"): + result += "(" + str(ex.node.name) + ")" + elif hasattr(ex.node, "id"): + result += "(" + ex.node.id + ")" + elif hasattr(ex.node, "attr"): + result += "(." + ex.node.attr + ")" + elif hasattr(ex.node, "value"): + result += f"({ex.node.value})" + + if ex.decorator: + result += " @%s" % ex.decorator + return result + except RuntimeError: + raise + except (KnownIssue,VerifierFailure,NotOneValueFound,AssertionError) as e: + color= "[yellow]" if isinstance(e,KnownIssue) else "[red]" + return color + type(e).__name__ + ": " + str(e).split("\n")[0] + finally: + self.profile.disable() + + def inspect(self, bc): + first = True + table = Table( + title=bc.co_name + ":", + box=None, + title_style="blue", + title_justify="left", + ) + + table.add_column("offset", justify="right") + + if sys.version_info >= (3, 11): + table.add_column("start") + table.add_column("end") + else: + table.add_column("line") + + table.add_column("instruction") + table.add_column("ast-node") + + highlighter = ReprHighlighter() + + starts_line = None + + for i in dis.get_instructions( + bc, **({"show_caches": True} if sys.version_info >= (3, 11) else {}) + ): + if i.starts_line is not None: + starts_line = i.starts_line + + if sys.version_info >= (3, 11): + in_range = ( + i.positions.lineno is not None + and i.positions.lineno <= self.end + and self.start <= i.positions.end_lineno + ) + else: + in_range = ( + starts_line is not None and self.start <= starts_line <= self.end + ) + + if in_range: + if first: + first = False + + ex = self.inspect_opcode(bc, i.offset, starts_line) + + if sys.version_info >= (3, 11): + positions = ( + "%s:%s" % (i.positions.lineno, i.positions.col_offset), + "%s:%s" % (i.positions.end_lineno, i.positions.end_col_offset), + ) + else: + positions = (str(starts_line),) + + table.add_row( + str(i.offset), + *positions, + highlighter("%s(%s)" % (i.opname, i.argval)), + ex, + style="on grey19" if i.opname == "CACHE" else "on grey30" + # **({"style":"on white" } if i.opname=="CACHE" else {}) + ) + + if first == False: + self.console.print() + self.console.print(table) + + for const in bc.co_consts: + if isinstance(const, types.CodeType): + self.inspect(const) + + def dump_deadcode(self, filename): + from rich import print as rprint + from rich.tree import Tree + from rich.syntax import Syntax + + print(filename) + with open(filename) as file: + code = file.read() + tree = ast.parse(code) + + for node in ast.walk(tree): + for child in ast.iter_child_nodes(node): + child.parent = node + + node = tree + + def report(node, tree): + + if isinstance( + node, (ast.expr_context, ast.operator, ast.unaryop, ast.cmpop) + ): + return + + if isinstance(node, (ast.stmt, ast.expr)): + deadcode = is_deadcode(node) + else: + deadcode = None + + if deadcode is None: + deadcode = "[red]" + else: + deadcode = "[red]dead" if deadcode else "[blue]used" + + name = type(node).__name__ + + if isinstance(node, ast.Name): + name += "(%s)" % node.id + + if isinstance(node, ast.Attribute): + name += "(.%s)" % node.attr + + if hasattr(node, "_Deadcode__static_value"): + name += " == %r" % getattr(node, "_Deadcode__static_value") + + t = tree.add("%s %s" % (name, deadcode) + (" %s:%s"%(node.lineno,node.end_lineno) if hasattr(node,"lineno") else "")) + dots = False + + for child in ast.iter_child_nodes(node): + if ( + hasattr(child, "lineno") + and ( child.lineno > self.end + or self.start > child.end_lineno) + ): + if not dots: + tree.add("...") + dots = True + continue + + report(child, t) + dots = False + + tree = Tree("ast") + report(node, tree) + rprint(tree) + + def main(self): + import sys + + if len(sys.argv) <= 1 or sys.argv[1] in ("--help", "-h"): + print( + """ + analyse.py [line | first_line:last_line] + + Analyses a range in the given source in the specified range + and maps every bytecode to the node found by executing. + """ + ) + sys.exit(0) -def inspect(bc): - first = True - table = Table( - title=bc.co_name + ":", - box=None, - title_style="blue", - title_justify="left", - ) - - table.add_column("offset", justify="right") - table.add_column("start") - table.add_column("end") - table.add_column("instruction") - table.add_column("ast-node") + self.profile=Profile() - highlighter=ReprHighlighter() + filename = pathlib.Path(sys.argv[1]) - for i in dis.get_instructions(bc, show_caches=True): + if ":" in sys.argv[2]: + self.start, self.end = sys.argv[2].split(":") + self.start = int(self.start) + self.end = int(self.end) + else: + self.start = self.end = int(sys.argv[2]) - if ( - i.positions.lineno is not None - and i.positions.lineno <= end - and start <= i.positions.end_lineno - ): - if first: - first = False + code = filename.read_text() + self.console = Console() + self.console.print( + rich.syntax.Syntax( + code, "python", line_numbers=True, line_range=(self.start, self.end) + ) + ) - ex = inspect_opcode(bc, i.offset, i.positions.lineno) + print("all bytecodes in this range:") - table.add_row( - str(i.offset), - "%s:%s" % (i.positions.lineno, i.positions.col_offset), - "%s:%s" % (i.positions.end_lineno, i.positions.end_col_offset), - highlighter("%s(%s)" % (i.opname, i.argrepr)), - ex, - style="on grey19" if i.opname=="CACHE" else "on grey30" - #**({"style":"on white" } if i.opname=="CACHE" else {}) - ) + if 0: + code=ast.parse(code,filename,"exec") + for i,node in enumerate(ast.walk(code)): + node.lineno=i*2 + node.end_lineno=i*2+1 + + bc = compile(code, filename, "exec") + self.inspect(bc) + + self.dump_deadcode(filename) - if first == False: - console.print() - console.print(table) - for const in bc.co_consts: - if isinstance(const, types.CodeType): - inspect(const) + #self.profile.print_stats(sort="cumtime") -inspect(bc) +if __name__ == "__main__": + App().main() diff --git a/tests/deadcode.py b/tests/deadcode.py index efb322d..31e76c8 100644 --- a/tests/deadcode.py +++ b/tests/deadcode.py @@ -1,140 +1,430 @@ import ast -import copy -import dis import sys +import operator +py11=sys.version_info>=(3,11) -sentinel_rep = 2 -# generate sentinel at runtime to keep it out of the bytecode -# this allows the algorithm to check also this file -sentinel = "xsglegahghegflgfaih" * sentinel_rep +def contains_break(node_or_list): + "search all child nodes except other loops for a break statement" -def constant(value): - if sys.version_info >= (3, 6): - return ast.Constant(value=value) - elif isinstance(value, int): - return ast.Num(value) - elif isinstance(value, str): - return ast.Str(value) + if isinstance(node_or_list, ast.AST): + childs = ast.iter_child_nodes(node_or_list) + elif isinstance(node_or_list, list): + childs = node_or_list else: - raise TypeError + raise TypeError(node_or_list) - -def index(value): - if sys.version_info >= (3, 9): - return constant(value) - else: - return ast.Index(constant(value)) - - -class DeadcodeTransformer(ast.NodeTransformer): - def visit(self, node): - constant_type = ( - ast.Constant - if sys.version_info >= (3, 6) - else (ast.Num, ast.Str, ast.Bytes, ast.NameConstant, ast.Ellipsis) - ) - - if getattr(node, "_check_is_deadcode", False): - if isinstance(node, constant_type) and isinstance(node.value, str): - # docstring for example - return constant(sentinel) - - elif isinstance(node, ast.stmt): - return ast.Expr( - value=ast.Call( - func=ast.Name(id="foo", ctx=ast.Load()), - args=[constant(sentinel)], - keywords=[], + for child in childs: + if isinstance(child, (ast.For, ast.While, ast.AsyncFor)): + if contains_break(child.orelse): + return True + elif isinstance(child, ast.Break): + return True + elif contains_break(child): + return True + + return False + + + +class Deadcode: + @staticmethod + def annotate(tree): + deadcode = Deadcode() + + deadcode.annotate_static_values(tree) + deadcode.walk_deadcode(tree, False) + + def __init__(self): + self.future_annotations = False + + operator_map = { + # binary + ast.Add: operator.add, + ast.Sub: operator.sub, + ast.Mult: operator.mul, + ast.Div: operator.truediv, + ast.FloorDiv: operator.floordiv, + ast.Mod: operator.mod, + ast.Pow: operator.pow, + ast.LShift: operator.lshift, + ast.RShift: operator.rshift, + ast.BitOr: operator.or_, + ast.BitXor: operator.xor, + ast.BitAnd: operator.and_, + # unary + ast.UAdd: operator.pos, + ast.USub: operator.neg, + ast.Not: operator.not_, + ast.Invert: operator.invert, + } + if hasattr(ast,"MatMult"): + operator_map[ast.MatMult]=operator.matmul + + def annotate_static_values(self, node): + for n in ast.iter_child_nodes(node): + self.annotate_static_values(n) + + try: + if sys.version_info >= (3,6) and isinstance(node, ast.Constant): + node.__static_value = node.value + + if not sys.version_info >= (3,8): + if isinstance(node,ast.Str): + node.__static_value = node.s + if isinstance(node,ast.Bytes): + node.__static_value = node.s + if isinstance(node,ast.Num): + node.__static_value = node.n + if isinstance(node,ast.NameConstant): + node.__static_value = node.value + + + if isinstance(node, ast.Name) and node.id == "__debug__": + node.__static_value = True + + elif isinstance(node, ast.UnaryOp): + try: + node.__static_value = self.operator_map[type(node.op)]( + node.operand.__static_value ) - ) - elif isinstance(node, ast.expr): - if hasattr(node, "ctx") and isinstance(node.ctx, (ast.Store, ast.Del)): - return ast.Subscript( - value=ast.Name(id="foo", ctx=ast.Load()), - slice=index(sentinel), - ctx=node.ctx, + except Exception: + pass + + elif isinstance(node, ast.BinOp): + try: + node.__static_value = self.operator_map[type(node.op)]( + node.left.__static_value, node.right.__static_value ) - else: + if ( + isinstance(node.__static_value, (str, bytes)) + and len(node.__static_value) > 4000 + ): + # do not perform big string operations + # TODO: check if this constraint is correct + del node.__static_value + except Exception: + pass + + elif isinstance(node, ast.Subscript): + try: + node.__static_value = node.value.__static_value[ + node.slice.__static_value + ] + except Exception: + pass + + elif isinstance(node, ast.IfExp): + cnd = self.static_cnd(node.test) + if cnd is True: + node.__static_value = node.body.__static_value + + elif cnd is False: + node.__static_value = node.orelse.__static_value + + elif isinstance(node, ast.BoolOp) and isinstance(node.op, ast.And): + if all(self.static_cnd(n) is True for n in node.values): + node.__static_value = True + + if any(self.static_cnd(n) is False for n in node.values): + node.__static_value = False + + elif isinstance(node, ast.BoolOp) and isinstance(node.op, ast.Or): + if all(self.static_cnd(n) is False for n in node.values): + node.__static_value = False + + if any(self.static_cnd(n) is True for n in node.values): + node.__static_value = True + + except AttributeError as e: + if "_Deadcode__static_value" not in str(e): + raise + + def static_cnd(self, node): + try: + return bool(node.__static_value) + except AttributeError: + return None + + def has_static_value(self,node): + try: + node.__static_value + except AttributeError: + return False + return True + + + def static_value(self, node, deadcode): + self.walk_deadcode(node, deadcode) + return self.static_cnd(node) + + def check_stmts(self, stmts, deadcode): + """ + used to check the body: of a function, if, ... + """ + for stmt in stmts: + stmt.deadcode = deadcode + + if self.walk_deadcode(stmt, deadcode): + deadcode = True + return deadcode + + def check_childs(self, childs, deadcode): + """ + used to check childs: function arguments + """ + for child in childs: + self.walk_deadcode(child, deadcode) + + def walk_annotation(self, annotation, deadcode): + if self.future_annotations: + deadcode = True + self.walk_deadcode(annotation, deadcode) + + def walk_deadcode(self, node, deadcode): + "returns True if this statement will never return" + + # this check is not perfect but better than nothing + # it tries to prevent a lot of "node without associated Bytecode" errors + + # They were generated test driven. + # Every case represented here is derived from a error where python performed dead code elimination. + + if node is None: + return + + if isinstance(node, list): + for child in node: + self.walk_deadcode(child, deadcode) + return + + node.deadcode = deadcode or getattr(node, "deadcode", False) + + if isinstance(node, ast.Module): + for stmt in node.body: + if isinstance(stmt, ast.ImportFrom): + if stmt.module == "__future__" and any( + "annotations" == alias.name for alias in stmt.names + ): + self.future_annotations = True + + self.check_stmts(node.body, deadcode) + elif isinstance(node, (ast.With, ast.AsyncWith)): + self.check_childs(node.items, deadcode) + self.check_stmts(node.body, deadcode) + + elif isinstance(node, (ast.Return, ast.Break, ast.Continue, ast.Raise)): + if isinstance(node, ast.Raise): + self.walk_deadcode(node.exc, deadcode) + self.walk_deadcode(node.cause, deadcode) + + if isinstance(node, ast.Return): + self.walk_deadcode(node.value, deadcode) + + deadcode = True + + elif isinstance(node, ast.Assert): + cnd = self.static_value(node.test, deadcode) + + if cnd is False: + node.deadcode = deadcode + self.walk_deadcode(node.msg, deadcode) + deadcode = True + + elif cnd is True: + node.deadcode = deadcode + self.walk_deadcode(node.msg, True) - return ast.Subscript( - value=ast.Tuple( - elts=[node, constant(sentinel)], - ctx=ast.Load(), - ), - slice=index(0), - ctx=ast.Load(), - ) else: - raise TypeError(node) + node.deadcode = deadcode + self.walk_deadcode(node.msg, deadcode) - else: - return super().visit(node) + elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): + if sys.version_info >= (3,8): + self.walk_annotation(node.args.posonlyargs, deadcode) + self.walk_annotation(node.args.args, deadcode) + self.walk_annotation(node.args.kwonlyargs, deadcode) -def is_deadcode(node): + self.walk_annotation(node.args.vararg, deadcode) + self.walk_annotation(node.args.kwarg, deadcode) - if isinstance(node, ast.withitem): - node = node.context_expr + self.check_childs(node.args.kw_defaults, deadcode) + self.check_childs(node.args.defaults, deadcode) - if isinstance(node, ast.ExceptHandler): - node = node.body[0] + self.walk_annotation(node.returns, deadcode) + self.check_childs(node.decorator_list, deadcode) - if sys.version_info >= (3,8) and isinstance(node.parent, ast.NamedExpr) and node.parent.target is node: - node = node.parent + self.check_stmts(node.body, deadcode) - if sys.version_info >= (3,12) and isinstance(node.parent,ast.TypeAlias): - node = node.parent + elif isinstance(node, ast.ClassDef): + self.check_childs(node.decorator_list, deadcode) - if ( - sys.version_info >= (3, 6) - and isinstance(node.parent, ast.AnnAssign) - and node.parent.target is node - ): - # AnnAssign.target has to be ast.Name - node = node.parent + self.check_childs(node.bases, deadcode) - if hasattr(node, "_is_deadcode"): - return node._is_deadcode + self.check_childs(node.keywords, deadcode) - node._check_is_deadcode = True + self.check_stmts(node.body, deadcode) - module = node - while hasattr(module, "parent"): - module = module.parent + elif isinstance(node, ast.If): - assert isinstance(module, ast.Module) + test_value = self.static_value(node.test, deadcode) - # create working copy of the ast - module2 = copy.deepcopy(module) - del node._check_is_deadcode + if_is_dead = self.check_stmts(node.body, deadcode or (test_value is False)) + else_is_dead = self.check_stmts( + node.orelse, deadcode or (test_value is True) + ) - module2 = ast.fix_missing_locations(DeadcodeTransformer().visit(module2)) + self.walk_deadcode(node.test, deadcode) - try: - code = compile(module2, "", "exec") - except: - print(ast.dump(module2)) - raise + deadcode = if_is_dead and else_is_dead - visited = set() + elif sys.version_info >= (3,10) and isinstance(node, ast.Match): + self.walk_deadcode(node.subject, deadcode) + for case_ in node.cases: + case_.deadcode = deadcode + self.walk_deadcode(case_.pattern, deadcode) + self.walk_deadcode(case_.guard, deadcode) - def contains_sentinel(code): - if code in visited: - return False + dead_cases = all( + [self.check_stmts(case_.body, deadcode or self.static_cnd(case_.guard) is False ) for case_ in node.cases] + ) - for inst in dis.get_instructions(code): - arg = inst.argval - if isinstance(arg, type(code)) and contains_sentinel(arg): - return True - if arg == sentinel: - return True + if any( + isinstance(case_.pattern, ast.MatchAs) and case_.pattern.pattern is None + for case_ in node.cases + ): + # case _: + deadcode = dead_cases + + elif isinstance(node, (ast.For, ast.AsyncFor)): + self.walk_deadcode(node.target, deadcode) + self.walk_deadcode(node.iter, deadcode) + self.check_stmts(node.body, deadcode) + + else_is_dead = self.check_stmts(node.orelse, deadcode) + + if else_is_dead and not contains_break(node.body): + # for a in l: + # something() + # else: + # return None + # deadcode() + deadcode = True + + elif isinstance(node, ast.comprehension): + self.walk_deadcode(node.target, deadcode) + self.walk_deadcode(node.iter, deadcode) + + for if_ in node.ifs: + deadcode = self.static_value(if_, deadcode) is False or deadcode + + elif isinstance(node, (ast.ListComp, ast.GeneratorExp, ast.SetComp)): + branch_dead = self.check_stmts(node.generators, deadcode) + self.walk_deadcode(node.elt, branch_dead) + + elif isinstance(node, ast.DictComp): + branch_dead = self.check_stmts(node.generators, deadcode) + self.walk_deadcode(node.key, branch_dead) + self.walk_deadcode(node.value, branch_dead) + + elif isinstance(node, ast.IfExp): + + test_value = self.static_value(node.test, deadcode) + + self.walk_deadcode( + node.body, deadcode or (test_value is False) + ) + + self.walk_deadcode( + node.orelse, deadcode or (test_value is True) + ) + + elif isinstance(node, (ast.While)): + cnd = self.static_value(node.test, deadcode) + + self.check_stmts(node.body, deadcode or cnd is False) + else_is_dead = self.check_stmts(node.orelse, deadcode or cnd is True) + + if cnd is True and not contains_break(node): + # while True: ... no break + deadcode = True + + if else_is_dead and not contains_break(node.body): + # for a in l: + # something() + # else: + # return None + # deadcode() + deadcode = True + + elif isinstance(node, (ast.Try, ast.TryStar if py11 else ())): + try_dead = self.check_stmts(node.body, deadcode) + + for handler in node.handlers: + handler.deadcode = deadcode + self.walk_deadcode(handler.type, deadcode) + + handlers_dead = all( + [self.check_stmts(h.body, deadcode) for h in node.handlers] + ) + else_dead = self.check_stmts(node.orelse, try_dead) + final_dead = self.check_stmts(node.finalbody, deadcode) + + deadcode = (handlers_dead and else_dead) or final_dead + + elif isinstance(node, ast.BoolOp) and isinstance(node.op, ast.And): + dead_op = deadcode + for v in node.values: + if self.static_value(v, dead_op) is False: + dead_op = True + + elif isinstance(node, ast.BoolOp) and isinstance(node.op, ast.Or): + dead_op = deadcode + for v in node.values: + if self.static_value(v, dead_op) is True: + dead_op = True + + elif isinstance(node, ast.Expr): + # dead expressions: + # > 5+5 + # for example + dead_expr = self.has_static_value(node.value) + if ( + isinstance( + node.parent, + (ast.Module, ast.ClassDef, ast.FunctionDef, ast.AsyncFunctionDef), + ) + and node.parent.body[0] == node + and( isinstance(node.value, ast.Constant) + and isinstance(node.value.value, str) if sys.version_info >= (3,6) else isinstance(node.value,ast.Str) ) + ): + # docstring + dead_expr = False + + self.walk_deadcode(node.value, dead_expr or deadcode) + + else: + + for n in ast.iter_child_nodes(node): + self.walk_deadcode(n, deadcode) + + return deadcode + + +def is_deadcode(node): + if hasattr(node,"deadcode"): + return node.deadcode + + module=node + while hasattr(module,"parent"): + module=module.parent + + Deadcode().annotate(module) + + return node.deadcode - visited.add(code) - return False - node._is_deadcode = not contains_sentinel(code) - return node._is_deadcode diff --git a/tests/small_samples/3717efb20196ee4311a7f06bef2c4efd4c7e4e34cead5141fde62b74213cb46b.py b/tests/small_samples/3717efb20196ee4311a7f06bef2c4efd4c7e4e34cead5141fde62b74213cb46b.py new file mode 100644 index 0000000..eda6b81 --- /dev/null +++ b/tests/small_samples/3717efb20196ee4311a7f06bef2c4efd4c7e4e34cead5141fde62b74213cb46b.py @@ -0,0 +1,2 @@ + +_Screen._root = self._root = _Root diff --git a/tests/small_samples/471ff7c2daa37eded70f016214f3ebe3915659b25fd0baffa6104bcc10155fa1.py b/tests/small_samples/471ff7c2daa37eded70f016214f3ebe3915659b25fd0baffa6104bcc10155fa1.py new file mode 100644 index 0000000..a514cff --- /dev/null +++ b/tests/small_samples/471ff7c2daa37eded70f016214f3ebe3915659b25fd0baffa6104bcc10155fa1.py @@ -0,0 +1,2 @@ + +(dict(((k.lower(), v) for (k, v) in self.itermerged())) == (k.lower for (k, v) in something)) diff --git a/tests/small_samples/7b58785f82ffd529129e17fe47185e930f038a61e1d085d02b4e5baf81ed91ff.py b/tests/small_samples/7b58785f82ffd529129e17fe47185e930f038a61e1d085d02b4e5baf81ed91ff.py new file mode 100644 index 0000000..7cda7de --- /dev/null +++ b/tests/small_samples/7b58785f82ffd529129e17fe47185e930f038a61e1d085d02b4e5baf81ed91ff.py @@ -0,0 +1,4 @@ + + +async def __anext__(): + StopAsyncIteration diff --git a/tests/small_samples/955cd43e55d0c496fb67acb0e1a02152e8aec4236efef947b8390b3882856122.py b/tests/small_samples/955cd43e55d0c496fb67acb0e1a02152e8aec4236efef947b8390b3882856122.py new file mode 100644 index 0000000..9360e09 --- /dev/null +++ b/tests/small_samples/955cd43e55d0c496fb67acb0e1a02152e8aec4236efef947b8390b3882856122.py @@ -0,0 +1,7 @@ + + +async def __aenter__(): + try: + return (await self.gen.__anext__()) + except StopAsyncIteration: + pass diff --git a/tests/small_samples/a2adbc31e443660c85febd6928843e8d07c64c5bdaa9c0d75830951bd28ce7e7.py b/tests/small_samples/a2adbc31e443660c85febd6928843e8d07c64c5bdaa9c0d75830951bd28ce7e7.py new file mode 100644 index 0000000..ded52a0 --- /dev/null +++ b/tests/small_samples/a2adbc31e443660c85febd6928843e8d07c64c5bdaa9c0d75830951bd28ce7e7.py @@ -0,0 +1,7 @@ + + +def to_list(): + + async def iterate(): + async for i in gen: + pass diff --git a/tests/small_samples/d8987afe0f74653bcddfe56e18c616dbd52d934cdff78644c5221935b35186f2.py b/tests/small_samples/d8987afe0f74653bcddfe56e18c616dbd52d934cdff78644c5221935b35186f2.py new file mode 100644 index 0000000..bcf19bd --- /dev/null +++ b/tests/small_samples/d8987afe0f74653bcddfe56e18c616dbd52d934cdff78644c5221935b35186f2.py @@ -0,0 +1,3 @@ + +with self: + self([len for x in obj], [len for x in unpickled]) diff --git a/tests/small_samples/ffd93515dbe0bc61779aafb3cdf11e4c32d229e120139bfc38d3ea54b95a76e3.py b/tests/small_samples/ffd93515dbe0bc61779aafb3cdf11e4c32d229e120139bfc38d3ea54b95a76e3.py new file mode 100644 index 0000000..971b0a8 --- /dev/null +++ b/tests/small_samples/ffd93515dbe0bc61779aafb3cdf11e4c32d229e120139bfc38d3ea54b95a76e3.py @@ -0,0 +1 @@ +# -*- coding: uft-8 -*- diff --git a/tests/test_main.py b/tests/test_main.py index bc015cd..b26278a 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -20,6 +20,7 @@ from collections import defaultdict, namedtuple from random import shuffle import pytest +from executing._utils import mangled_name sys.path.append(os.path.dirname(os.path.dirname(__file__))) @@ -715,11 +716,7 @@ def sample_files(samples): @pytest.mark.skipif(sys.version_info<(3,),reason="no 2.7 support") def test_small_samples(full_filename, result_filename): skip_sentinel = [ - "load_deref", "4851dc1b626a95e97dbe0c53f96099d165b755dd1bd552c6ca771f7bca6d30f5", - "508ccd0dcac13ecee6f0cea939b73ba5319c780ddbb6c496be96fe5614871d4a", - "fc6eb521024986baa84af2634f638e40af090be4aa70ab3c22f3d022e8068228", - "42a37b8a823eb2e510b967332661afd679c82c60b7177b992a47c16d81117c8a", "206e0609ff0589a0a32422ee902f09156af91746e27157c32c9595d12072f92a", ] @@ -830,7 +827,13 @@ def check_filename(self, filename, check_names): # increase the recursion limit in testing mode, because there are files out there with large ast-nodes # example: tests/small_samples/1656dc52edd2385921104de7bb255ca369713f4b8c034ebeba5cf946058109bc.py sys.setrecursionlimit(3000) - source = Source.for_filename(filename) + try: + source = Source.for_filename(filename) + except SyntaxError: + # wrong encoding + print("skip %s"%filename) + return + if source.tree is None: # we could not parse this file (maybe wrong python version) @@ -1023,6 +1026,29 @@ def check_filename(self, filename, check_names): # type alias names have no associated bytecode continue + if sys.version_info < (3,11) and not values: + # self([len for x in obj], [len for x in unpickled]) + # a node in one list-expression is not found because the code for both + # list expressions is the same and the bytecodes gets only mapped to one list-expression + def inside(node,typ): + while hasattr(node,"parent") and not isinstance(node.parent,typ): + node=node.parent + + return getattr(node,"parent",None) + + list_comp=inside(node,ast.ListComp) + stmt=inside(list_comp,ast.stmt) + if stmt is not None: + line=list_comp.lineno + # check if there is more than one ListComp in the statement in the same line + if len([e for e in ast.walk(stmt) if isinstance(e,ast.ListComp) and e.lineno==line])>1: + continue + + + + + + if sys.version_info >= (3, 10): correct = len(values) >= 1 elif sys.version_info >= (3, 9) and in_finally(node): @@ -1046,6 +1072,7 @@ def p(*args): p() p("ast node:") + p(mangled_name(node)) p(ast_dump(node, indent=4)) parents = [] @@ -1196,6 +1223,7 @@ def check_code(self, code, nodes, decorators, check_names): # convert list to tuple continue + frame = C() frame.f_lasti = inst.offset frame.f_code = code @@ -1322,6 +1350,28 @@ def check_code(self, code, nodes, decorators, check_names): ): continue + if sys.version_info < (3,8): + if inst.opname == "LOAD_GLOBAL" and inst.argval=="StopAsyncIteration": + continue + + if sys.version_info < (3,11): + if ( + isinstance(e, NotOneValueFound) + and all(isinstance(v, ast.Attribute) for v in e.values) + and len({v.attr for v in e.values}) == 1 + ): + # problem: + # x.a = y.a = 5 + continue + + if ( + isinstance(e, NotOneValueFound) + and all(isinstance(v, types.CodeType) for v in e.values) + ): + # problem: + # self([len for x in obj], [len for x in unpickled]) + continue + if ( sys.version_info >= (3, 12) and inst.positions.col_offset == inst.positions.end_col_offset == 0 @@ -1336,9 +1386,12 @@ def check_code(self, code, nodes, decorators, check_names): print(e) if isinstance(e, NotOneValueFound): for value in e.values: - print( + if isinstance(value, ast.expr): + print( "value:", ast_dump(value, indent=4, include_attributes=True) ) + else: + print("value:",value) print("search bytecode", inst) print("in file", source.filename) @@ -1390,11 +1443,8 @@ def check_code(self, code, nodes, decorators, check_names): raise - # `argval` isn't set for all relevant instructions in python 2 - # The relation between `ast.Name` and `argval` is already - # covered by the verifier and much more complex in python 3.11 - if isinstance(node, ast.Name) and not py11: - assert inst.argval == node.id, (inst, ast.dump(node)) + if isinstance(node, ast.Name) and inst.opname != "CALL_INTRINSIC_1" and inst.argval not in ("__classdict__",): + assert mangled_name(node) == inst.argval , (inst, ast.dump(node)) if ex.decorator: decorators[(node.lineno, node.name)].append(ex.decorator) diff --git a/tests/test_pytest.py b/tests/test_pytest.py index b89aed5..dad99af 100644 --- a/tests/test_pytest.py +++ b/tests/test_pytest.py @@ -12,7 +12,8 @@ import executing.executing from executing import Source, NotOneValueFound from executing._exceptions import KnownIssue -from executing.executing import is_ipython_cell_code, attr_names_match, is_rewritten_by_pytest +from executing.executing import is_ipython_cell_code, is_rewritten_by_pytest +from executing._utils import get_instructions sys.path.append(os.path.dirname(os.path.dirname(__file__))) @@ -55,20 +56,6 @@ def test_ipython_cell_code(): ) -def test_attr_names_match(): - assert attr_names_match("foo", "foo") - - assert not attr_names_match("foo", "_foo") - assert not attr_names_match("foo", "__foo") - assert not attr_names_match("_foo", "foo") - assert not attr_names_match("__foo", "foo") - - assert attr_names_match("__foo", "_Class__foo") - assert not attr_names_match("_Class__foo", "__foo") - assert not attr_names_match("__foo", "Class__foo") - assert not attr_names_match("__foo", "_Class_foo") - - def test_source_file_text_change(tmpdir): # Check that Source.for_filename notices changes in file contents # (assuming that linecache can notice) @@ -162,12 +149,17 @@ def test_bad_linecache(): assert ex.source.text == fake_text -if sys.version_info >= (3, 11): - from executing._position_node_finder import mangled_name +from executing._utils import mangled_name +import dis + +if sys.version_info < (3,): + def indent(s,prefix): + return prefix + s.replace("\n","\n"+prefix) +else: from textwrap import indent - import dis - def test_mangled_name(): +@pytest.mark.skipif(sys.version_info < (3,), reason="argval of some Instructions is None") +def test_mangled_name(): def result(*code_levels): code = "" for i, level in enumerate(code_levels): @@ -179,25 +171,31 @@ def result(*code_levels): for child in ast.iter_child_nodes(parent): child.parent = parent - tree_names = { - mangled_name(n) - for n in ast.walk(tree) - if isinstance( - n, - ( + + ast_types=( ast.Name, ast.Attribute, ast.alias, ast.FunctionDef, ast.ClassDef, - ast.AsyncFunctionDef, ast.ExceptHandler, - ), + ) + if sys.version_info>=(3,): + ast_types+=( ast.AsyncFunctionDef,) + + tree_names = { + mangled_name(n) + for n in ast.walk(tree) + if isinstance( + n, + ast_types + , ) } def collect_names(code): - for instruction in dis.get_instructions(code): + before=None + for instruction in get_instructions(code): if instruction.opname in ( "STORE_NAME", "LOAD_NAME", @@ -214,11 +212,15 @@ def collect_names(code): # IMPORT_FROM(_Test__submodule11c) # STORE_NAME(_Test__subc11) + if instruction.opname=="LOAD_ATTR" and before is not None and before.opname == "IMPORT_NAME": + continue + name = instruction.argval if name in ("__module__", "__qualname__", "__name__"): continue yield name + before=instruction for const in code.co_consts: if isinstance(const, type(code)): @@ -396,6 +398,14 @@ def collect_names(code): ) == {"Test","_","a", "self", "__thing"} + assert result( + "@__thing\n" + "class Test:\n" + " pass" + )== {"Test","__thing"} + + + def test_pytest_rewrite(): frame = inspect.currentframe() diff --git a/tests/utils.py b/tests/utils.py index 0e20ecf..1c129ba 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -4,8 +4,9 @@ from collections import namedtuple import executing.executing +from executing._exceptions import KnownIssue -from executing.executing import attr_names_match, Instruction +from executing.executing import mangled_name, Instruction try: from dis import Instruction as DisInstruction except ImportError: @@ -104,7 +105,7 @@ def __setattr__(self, name, value): assert name == "_{self.__class__.__name__}{node.attr}".format(self=self, node=node) else: assert name == node.attr - assert attr_names_match(node.attr, name) + assert mangled_name(node) == name return self def __delattr__(self, name): @@ -149,7 +150,7 @@ def __bool__(self): else: try: self.get_node(None) - except RuntimeError: + except KnownIssue: return False assert 0 diff --git a/tox.ini b/tox.ini index 3566691..97d8b28 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py35,py36,py37,py38,py39,py310,py311,py312,pypy35,pypy36 +envlist = {mypy-,}{py38,py39,py310,py311,py312,pypy35,pypy36} [testenv] commands = @@ -10,7 +10,7 @@ passenv = ADD_EXECUTING_TESTS EXECUTING_SLOW_TESTS -[testenv:generate_small_sample-py{35,36,37,38,39,310,311}] +[testenv:generate_small_sample-py{38,39,310,311,312}] extras = tests deps = pysource-minimize commands = @@ -26,3 +26,17 @@ deps= mutmut commands= python tests/mutmut_workflow.py + + +[testenv:mypy-py{38,39,310}] +deps= + mypy==0.910 +commands= + python -m mypy executing --exclude=executing/_position_node_finder.py + + +[testenv:mypy-py{311}] +deps= + mypy==0.971 +commands= + python -m mypy executing