Skip to content

Commit

Permalink
Mypy happy.
Browse files Browse the repository at this point in the history
  • Loading branch information
emeryberger committed Dec 19, 2023
1 parent a015d15 commit 09ee005
Show file tree
Hide file tree
Showing 13 changed files with 38 additions and 35 deletions.
2 changes: 1 addition & 1 deletion GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ vendor/crdp:
vendor-deps: clear-vendor-dirs vendor/Heap-Layers vendor/printf/printf.cpp vendor/crdp

mypy:
-mypy $(PYTHON_SOURCES)
-mypy --no-warn-unused-ignores $(PYTHON_SOURCES)

format: black clang-format prettier

Expand Down
13 changes: 4 additions & 9 deletions scalene/replacement_sem_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,10 @@
import threading
from multiprocessing.synchronize import Lock
from scalene.scalene_profiler import Scalene
from typing import Any


def _recreate_replacement_sem_lock():
return ReplacementSemLock()

from typing import Any, Callable, Optional, Tuple

class ReplacementSemLock(multiprocessing.synchronize.Lock):
def __init__(self, ctx=None):
def __init__(self, ctx: Optional[multiprocessing.context.DefaultContext] = None) -> None:
# Ensure to use the appropriate context while initializing
if ctx is None:
ctx = multiprocessing.get_context()
Expand All @@ -34,5 +29,5 @@ def __enter__(self) -> bool:
def __exit__(self, *args: Any) -> None:
super().__exit__(*args)

def __reduce__(self):
return (_recreate_replacement_sem_lock, ())
def __reduce__(self) -> Tuple[Callable[[], ReplacementSemLock], Tuple[()]]:
return (lambda: ReplacementSemLock(), ())

Check notice

Code scanning / CodeQL

Unnecessary lambda Note

This 'lambda' is just a simple wrapper around a callable object. Use that object directly.
6 changes: 3 additions & 3 deletions scalene/replacement_signal_fns.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import sys

from scalene.scalene_profiler import Scalene

from typing import Any

@Scalene.shim
def replacement_signal_fns(scalene: Scalene) -> None:
Expand All @@ -24,7 +24,7 @@ def old_raise_signal(s):
else:
new_cpu_signal = signal.SIGFPE

def replacement_signal(signum: int, handler): # type: ignore
def replacement_signal(signum: int, handler: Any) -> Any:
all_signals = scalene.get_all_signals_set()
timer_signal, cpu_signal = scalene.get_timer_signals()
timer_signal_str = signal.strsignal(signum)
Expand Down Expand Up @@ -90,7 +90,7 @@ def replacement_setitimer(which, seconds, interval=0.0): # type: ignore
signal.setitimer = replacement_setitimer
signal.siginterrupt = replacement_siginterrupt

signal.signal = replacement_signal
signal.signal = replacement_signal # type: ignore
if sys.version_info >= (3, 8):
signal.raise_signal = replacement_raise_signal
os.kill = replacement_kill
16 changes: 9 additions & 7 deletions scalene/scalene_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import sys

from typing import cast, Dict, List, Tuple
from typing import cast, Any, Dict, List, Tuple

if sys.version_info < (3, 9):
# ast.unparse only supported as of 3.9
Expand All @@ -21,11 +21,12 @@ def is_native(package_name: str) -> bool:
result = False
try:
package = importlib.import_module(package_name)
package_dir = os.path.dirname(package.__file__)
for root, dirs, files in os.walk(package_dir):
for filename in files:
if filename.endswith(".so") or filename.endswith(".pyd"):
return True
if package.__file__:
package_dir = os.path.dirname(package.__file__)
for root, dirs, files in os.walk(package_dir):
for filename in files:
if filename.endswith(".so") or filename.endswith(".pyd"):
return True
result = False
except ImportError:
result = False
Expand Down Expand Up @@ -140,7 +141,7 @@ def find_outermost_loop(src: str) -> Dict[int, Tuple[int, int]]:
tree = ast.parse(src)
regions = {}

def walk(node, current_outermost_region, outer_class):
def walk(node : ast.AST, current_outermost_region : Any, outer_class : Any) -> None:
nonlocal regions
if isinstance(
node, (ast.ClassDef, ast.FunctionDef, ast.AsyncFunctionDef)
Expand Down Expand Up @@ -172,6 +173,7 @@ def walk(node, current_outermost_region, outer_class):
ast.AsyncFunctionDef,
]

assert node.end_lineno
for line in range(node.lineno, node.end_lineno + 1):
# NOTE: we update child nodes first (in the recursive call),
# so what we want this statement to do is attribute any lines that we haven't already
Expand Down
2 changes: 1 addition & 1 deletion scalene/scalene_apple_gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
class ScaleneAppleGPU:
"""Wrapper class for Apple integrated GPU statistics."""

def __init__(self, sampling_frequency=100) -> None:
def __init__(self, sampling_frequency: int = 100) -> None:
assert platform.system() == "Darwin"
self.cmd = (
'DYLD_INSERT_LIBRARIES="" ioreg -r -d 1 -w 0 -c "IOAccelerator"'
Expand Down
2 changes: 1 addition & 1 deletion scalene/scalene_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def __init__(self) -> None:

def compress_samples(
self, samples: List[Any], max_footprint: float
) -> List[Any]:
) -> Any:
if len(samples) <= self.max_sparkline_samples:
return samples
# Try to reduce the number of samples with the
Expand Down
2 changes: 1 addition & 1 deletion scalene/scalene_jupyter.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def do_GET(self) -> None:
except FileNotFoundError:
print("Scalene error: profile file not found.")
elif self.path == "/shutdown":
self.server.should_shutdown = True
self.server.should_shutdown = True # type: ignore
self.send_response(204)
# self._send_response("Server is shutting down...")
else:
Expand Down
2 changes: 1 addition & 1 deletion scalene/scalene_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def output_profile_line(
fname=fname,
fname_print=fname,
line_no=line_no,
line=line,
line=str(line),
stats=stats,
profile_this_code=profile_this_code,
force_print=force_print,
Expand Down
11 changes: 8 additions & 3 deletions scalene/scalene_parseargs.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ def parse_args() -> Tuple[argparse.Namespace, List[str]]:
# Parse out all Scalene arguments.
# https://stackoverflow.com/questions/35733262/is-there-any-way-to-instruct-argparse-python-2-7-to-remove-found-arguments-fro
args, left = parser.parse_known_args()

# Hack to simplify functionality for Windows platforms.
if sys.platform == "win32":
args.on = True
Expand Down Expand Up @@ -394,7 +395,11 @@ def parse_args() -> Tuple[argparse.Namespace, List[str]]:
print(f"Scalene version {scalene_version} ({scalene_date})")
if not args.ipython:
sys.exit(-1)
args = (
[]
) # We use this to indicate that we should not run further in IPython.
# Clear out the namespace. We do this to indicate that we should not run further in IPython.
for arg in list(args.__dict__):
delattr(args, arg)
# was:
# args = (
# []
# ) # We use this to indicate that we should not run further in IPython.
return args, left
4 changes: 2 additions & 2 deletions scalene/scalene_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1702,7 +1702,7 @@ def profile_code(
exec(code, the_globals, the_locals)
except SystemExit as se:
# Intercept sys.exit and propagate the error code.
exit_status = se.code
exit_status = se.code if type(se.code) == int else 1
except KeyboardInterrupt:
# Cleanly handle keyboard interrupts (quits execution and dumps the profile).
print("Scalene execution interrupted.")
Expand Down Expand Up @@ -1992,7 +1992,7 @@ def run_profiler(
print("Scalene: no input file specified.")
sys.exit(1)
except SystemExit as e:
exit_status = e.code
exit_status = e.code if type(e.code) == int else 1

except StopJupyterExecution:
pass
Expand Down
2 changes: 1 addition & 1 deletion scalene/scalene_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ def merge_stats(self, the_dir_name: pathlib.Path) -> None:
for filename in self.per_line_footprint_samples:
for lineno in self.per_line_footprint_samples[filename]:
self.per_line_footprint_samples[filename][lineno].sort(
key=lambda x: x[0] # type: ignore
key=lambda x: x[0]
)
self.increment_per_line_samples(
self.memory_malloc_count, x.memory_malloc_count
Expand Down
4 changes: 2 additions & 2 deletions scalene/test_runningstats.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import statistics

from hypothesis import given

from typing import List

@given(
st.lists(
Expand All @@ -15,7 +15,7 @@
min_size=2,
)
)
def test_running_stats(values):
def test_running_stats(values: List[float]) -> None:
rstats = runningstats.RunningStats()
for value in values:
rstats.push(value)
Expand Down
7 changes: 4 additions & 3 deletions scalene/test_scalene_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from hypothesis import given
from hypothesis.strategies import floats, lists

from typing import Any, List

class TestScaleneJSON:
# Define strategies for the input variables
Expand All @@ -17,7 +18,7 @@ class TestScaleneJSON:
)

@given(size_in_mb)
def test_memory_consumed_str(self, size_in_mb):
def test_memory_consumed_str(self, size_in_mb: int) -> None:
formatted = scalene_json.ScaleneJSON().memory_consumed_str(size_in_mb)
assert isinstance(formatted, str)
if size_in_mb < 1024:
Expand All @@ -28,7 +29,7 @@ def test_memory_consumed_str(self, size_in_mb):
assert formatted.endswith("TB")

@given(time_in_ms)
def test_time_consumed_str(self, time_in_ms):
def test_time_consumed_str(self, time_in_ms: int) -> None:
formatted = scalene_json.ScaleneJSON().time_consumed_str(time_in_ms)
assert isinstance(formatted, str)
if time_in_ms < 1000:
Expand All @@ -42,7 +43,7 @@ def test_time_consumed_str(self, time_in_ms):
assert not formatted.startswith("0")

@given(samples, max_footprint)
def test_compress_samples(self, samples, max_footprint):
def test_compress_samples(self, samples : List[Any], max_footprint: int) -> None:
compressed = scalene_json.ScaleneJSON().compress_samples(
samples, max_footprint
)
Expand Down

0 comments on commit 09ee005

Please sign in to comment.