Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support PPE profiling for Local #340

Merged
merged 15 commits into from
Aug 26, 2024
32 changes: 32 additions & 0 deletions docs/source/tips.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,35 @@ A. Default buffer size of file objects from ``pfio.v2.S3`` is 16MB,
with open_url('s3://bucket/very-large-file', 'wb') as dst:
with open('very-large-local-file', 'rb') as src:
shutil.copyfileobj(src, dst, length=16*1024*1024)

Q. How to output tracer results (readable by Chrome Tracing).
=============================================================

A. As shown in the sample program below,
by executing `initialize_writer()` and `flush()` after tracing,
JSON that can be read by Chrome tracing can be output.

.. code-block:: python

import json
import pytorch_pfn_extras as ppe

from pfio.v2 import Local, Path, from_url

tracer = ppe.profiler.get_tracer()

with Local(trace=True) as fs:
for f in fs.list():
if fs.isdir(f.strip()):
dir += 1
continue

fil += 1
len(fs.open(f).read())

w = ppe.writing.SimpleWriter(out_dir="")

# output '['
tracer.initialize_writer("trace.json", w)
# output json dump
tracer.flush("trace.json", w)
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ ignore_missing_imports = True

[mypy-pyarrow.*]
ignore_missing_imports = True

[mypy-pytorch_pfn_extras.*]
ignore_missing_imports = True
21 changes: 21 additions & 0 deletions pfio/v2/_profiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
try:
from pytorch_pfn_extras.profiler import record, record_iterable

except ImportError:

class _DummyRecord:
def __init__(self):
pass

def __enter__(self):
pass

def __exit__(self, exc_type, exc_value, traceback):
pass

# IF PPE is not available, wrap with noop
def record(tag, trace, *args): # type: ignore # NOQA
return _DummyRecord()

def record_iterable(tag, iter, trace, *args): # type: ignore # NOQA
yield from iter
97 changes: 72 additions & 25 deletions pfio/v2/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,32 @@
import shutil
from typing import Optional

from ._profiler import record, record_iterable
from .fs import FS, FileStat, format_repr


class LocalProfileWrapper:
def __init__(self, fp):
self.fp = fp

def __enter__(self):
self.fp.__enter__()
return self

def __exit__(self, exc_type, exc_value, traceback):
self.fp.__exit__(exc_type, exc_value, traceback)

def __getattr__(self, name):
attr = getattr(self.fp, name)
if callable(attr):
def wrapper(*args, **kwargs):
with record(f"pfio.v2.Local:{attr.__name__}", trace=True):
return attr(*args, **kwargs)
return wrapper
else:
return attr


class LocalFileStat(FileStat):
"""Detailed information of a POSIX file

Expand Down Expand Up @@ -45,9 +68,11 @@ def __init__(self, _stat, filename):


class Local(FS):
def __init__(self, cwd=None, create=False, **_):
def __init__(self, cwd=None, trace=False, create=False, **_):
super().__init__()

self.trace = trace

if cwd is None:
self._cwd = ''
else:
Expand Down Expand Up @@ -87,13 +112,28 @@ def open(self, file_path, mode='r',
buffering=-1, encoding=None, errors=None,
newline=None, closefd=True, opener=None):

path = os.path.join(self.cwd, file_path)
return io.open(path, mode,
buffering, encoding, errors,
newline, closefd, opener)
with record("pfio.v2.Local:open", trace=self.trace):
path = os.path.join(self.cwd, file_path)

fp = io.open(path, mode,
buffering, encoding, errors,
newline, closefd, opener)

# Add ppe recorder to io class methods (e.g. read, write)
if self.trace:
return LocalProfileWrapper(fp)
else:
return fp

def list(self, path: Optional[str] = '', recursive=False,
detail=False):
for e in record_iterable("pfio.v2.Local:list",
self._list(path, recursive, detail),
trace=self.trace):
yield e

def _list(self, path: Optional[str] = '', recursive=False,
detail=False):
path_or_prefix = os.path.join(self.cwd,
"" if path is None else path)

Expand Down Expand Up @@ -129,43 +169,50 @@ def _recursive_list(self, prefix_end_index: int, path: str,
e.path, detail)

def stat(self, path):
path = os.path.join(self.cwd, path)
return LocalFileStat(os.stat(path), path)
with record("pfio.v2.Local:stat", trace=self.trace):
path = os.path.join(self.cwd, path)
return LocalFileStat(os.stat(path), path)

def isdir(self, path: str):
path = os.path.join(self.cwd, path)
return os.path.isdir(path)

def mkdir(self, file_path: str, mode=0o777, *args, dir_fd=None):
path = os.path.join(self.cwd, file_path)
return os.mkdir(path, mode, *args, dir_fd=None)
with record("pfio.v2.Local:mkdir", trace=self.trace):
path = os.path.join(self.cwd, file_path)
return os.mkdir(path, mode, *args, dir_fd=None)

def makedirs(self, file_path: str, mode=0o777, exist_ok=False):
path = os.path.join(self.cwd, file_path)
return os.makedirs(path, mode, exist_ok)
with record("pfio.v2.Local:makedirs", trace=self.trace):
path = os.path.join(self.cwd, file_path)
return os.makedirs(path, mode, exist_ok)

def exists(self, file_path: str):
path = os.path.join(self.cwd, file_path)
return os.path.exists(path)
with record("pfio.v2.Local:exists", trace=self.trace):
path = os.path.join(self.cwd, file_path)
return os.path.exists(path)

def rename(self, src, dst):
s = os.path.join(self.cwd, src)
d = os.path.join(self.cwd, dst)
return os.rename(s, d)
with record("pfio.v2.Local:rename", trace=self.trace):
s = os.path.join(self.cwd, src)
d = os.path.join(self.cwd, dst)
return os.rename(s, d)

def remove(self, file_path: str, recursive=False):
file_path = os.path.join(self.cwd, file_path)
if recursive:
return shutil.rmtree(file_path)
if os.path.isdir(file_path):
return os.rmdir(file_path)
with record("pfio.v2.Local:remove", trace=self.trace):
file_path = os.path.join(self.cwd, file_path)
if recursive:
return shutil.rmtree(file_path)
if os.path.isdir(file_path):
return os.rmdir(file_path)

return os.remove(file_path)
return os.remove(file_path)

def glob(self, pattern: str):
return [
str(item.relative_to(self.cwd))
for item in pathlib.Path(self.cwd).glob(pattern)]
with record("pfio.v2.Local:glob", trace=self.trace):
return [
str(item.relative_to(self.cwd))
for item in pathlib.Path(self.cwd).glob(pattern)]

def _canonical_name(self, file_path: str) -> str:
return "file:/" + os.path.normpath(os.path.join(self.cwd, file_path))
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
'doc': ['sphinx', 'sphinx_rtd_theme'],
'bench': ['numpy>=1.19.5', 'torch>=1.9.0', 'Pillow<=8.2.0'],
'hdfs': ['pyarrow>=6.0.0'],
'trace': ['pytorch-pfn-extras'],
},
# When updating install requires, docs/requirements.txt should be updated too
install_requires=['boto3', 'deprecation', 'urllib3'],
Expand Down
44 changes: 44 additions & 0 deletions tests/v2_tests/test_local.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import io
import json
import os
import pickle
import tempfile
Expand Down Expand Up @@ -292,5 +293,48 @@ def test_from_url_create_option(self):
assert os.path.exists(path) and os.path.isdir(path)


def test_local_rw_profiling():
ppe = pytest.importorskip("pytorch_pfn_extras")

with Local(trace=True) as fs:
ppe.profiler.clear_tracer()

with fs.open('foo.txt', 'w') as fp:
fp.write('bar')

dict = ppe.profiler.get_tracer().state_dict()
keys = [event["name"] for event in json.loads(dict['_event_list'])]

assert "pfio.v2.Local:open" in keys
assert "pfio.v2.Local:write" in keys

with Local(trace=True) as fs:
ppe.profiler.clear_tracer()

with fs.open('foo.txt', 'r') as fp:
tmp = fp.read()
assert tmp == 'bar'

dict = ppe.profiler.get_tracer().state_dict()
keys = [event["name"] for event in json.loads(dict['_event_list'])]

assert "pfio.v2.Local:open" in keys
assert "pfio.v2.Local:read" in keys

# no profile case
with Local(trace=False) as fs:
ppe.profiler.clear_tracer()

with fs.open('foo.txt', 'r') as fp:
tmp = fp.read()
assert tmp == 'bar'

dict = ppe.profiler.get_tracer().state_dict()
keys = [event["name"] for event in json.loads(dict['_event_list'])]

assert "pfio.v2.Local:open" not in keys
assert "pfio.v2.Local:read" not in keys


if __name__ == '__main__':
unittest.main()
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
envlist = py38,py39,py310,py311,py312,doc

[testenv]
deps = .[test]
deps = .[test, trace]
skipsdist = True
commands =
flake8 pfio tests
Expand Down