-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathroctx.py
More file actions
180 lines (140 loc) · 6.22 KB
/
roctx.py
File metadata and controls
180 lines (140 loc) · 6.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
from ctypes import cdll, c_int, c_char_p, c_uint64
from functools import wraps
# Context wrappers are snatched and modified from
# https://github.com/ROCm/rocprofiler-sdk/blob/72b97a8b7e8322e78314f8b9164b96bec7d1c044/source/lib/python/roctx/context_decorators.py#L30
class RoctxRange:
"""Provides decorators and context-manager for roctx range"""
def __init__(self, R, msg:str=None):
"""Initialize with a message. R should have an interface of Roctx class, see below"""
self.r = R
self.msg = msg
def __call__(self, func):
"""Decorator"""
@wraps(func)
def wrapper(*args, **kwargs):
self.r.rangePush(self.msg)
try:
return func(*args, **kwargs)
finally:
self.r.rangePop()
return wrapper
def __enter__(self):
"""Context manager start function"""
if self.msg is not None:
self.a = self.r.rangePush(self.msg)
return self.a
return self
def __exit__(self, exc_type, exc_value, tb):
"""Context manager stop function"""
if self.msg is not None:
self.r.rangePop()
if exc_type is not None and exc_value is not None and tb is not None:
import traceback
traceback.print_exception(exc_type, exc_value, tb, limit=5)
class RoctxEnabler:
"""Provides decorators and context-manager for roctx profiler"""
def __init__(self, R, tid=0):
"""Initialize with a tid. R should have an interface of Roctx class, see below"""
self.r = R
self.tid = tid
def __call__(self, func):
"""Decorator"""
@wraps(func)
def wrapper(*args, **kwargs):
self.r.profilerResume(self.tid)
try:
return func(*args, **kwargs)
finally:
self.r.profilerPause(self.tid)
return wrapper
def __enter__(self):
"""Context manager start function"""
self.a = self.r.profilerResume(self.tid)
return self.a
def __exit__(self, exc_type, exc_value, tb):
"""Context manager stop function"""
self.r.profilerPause(self.tid)
if exc_type is not None and exc_value is not None and tb is not None:
import traceback
traceback.print_exception(exc_type, exc_value, tb, limit=5)
class Roctx:
"""An interface to ROCTx.
Here's how to import it without messing with PYTHONPATH, assuming that your
current __file__ is 2 levels below a sibling of this repo root:
def importRoctxHelper():
import os
import importlib
mod_path = os.path.realpath(os.path.dirname(__file__) + "/../../tools/ROCTx/roctx.py")
mod_name = os.path.splitext(os.path.basename(mod_path))[0].capitalize()
spec = importlib.util.spec_from_file_location(mod_name, mod_path)
module = importlib.util.module_from_spec(spec)
sys.modules[mod_name] = module
spec.loader.exec_module(module)
helper = getattr(module, mod_name)
return helper # Roctx class
RoctxHlpr = importRoctxHelper()
roctx = RoctxHlpr(os.environ.setdefault("ARECH_TRACE", "0")) # instance of Roctx class
# roctx.profilerPause()
When the snippet is executed with environment variable ARECH_TRACE having a
compatible value (1, or 2, or 3), roctx will be an working interface to
ROCTx, otherwise it’ll be a mock interface.
"""
Range = RoctxRange
Enabler = RoctxEnabler
def __init__(self, prof_ver=None, rocm_path=None):
"""Initializer.
prof_ver: str|int - is a profiler version to be used, any of (1,2,3). Any other value
disables all functionality (all functions are replaced by stubs with compatible API that do
nothing). Version number affects on which implementation library is loaded. Version 3 has
the most features"""
if isinstance(prof_ver, str):
prof_ver = int(prof_ver)
if prof_ver in (1, 2, 3):
if rocm_path is None:
rocm_path = "/opt/rocm"
if prof_ver == 3:
lib_path = rocm_path + "/lib/librocprofiler-sdk-roctx.so"
else:
# API is restricted compared to sdk-roctx
lib_path = rocm_path + "/lib/libroctx64.so"
print(f"Loading {lib_path}...")
lib = cdll.LoadLibrary(lib_path)
# See the docs at
# https://github.com/ROCm/rocprofiler-sdk/blob/amd-staging/source/include/rocprofiler-sdk-roctx/roctx.h
self.mark = lambda x: lib.roctxMarkA(x.encode("utf-8"))
self.mark.argtypes = [c_char_p]
self.mark.restype = None
self.rangePush = lambda x: lib.roctxRangePushA(x.encode("utf-8"))
self.rangePush.argtypes = [c_char_p]
self.rangePush.restype = c_int
self.rangePop = lib.roctxRangePop
self.rangePop.restype = c_int
self.rangeStart = lambda x: lib.roctxRangeStartA(x.encode("utf-8"))
self.rangeStart.argtypes = [c_char_p]
self.rangeStart.restype = c_uint64
self.rangeStop = lib.roctxRangeStop
self.rangeStop.argtypes = [c_uint64]
self.rangeStop.restype = None
if prof_ver == 3:
self.profilerPause = lambda x=None: lib.roctxProfilerPause(
0 if x is None else x
)
self.profilerPause.argtypes = [c_uint64]
self.profilerPause.restype = c_int
self.profilerResume = lambda x=None: lib.roctxProfilerResume(
0 if x is None else x
)
self.profilerResume.argtypes = [c_uint64]
self.profilerResume.restype = c_int
else:
self.profilerPause = lambda x=None: -1
self.profilerResume = lambda x=None: -1
else:
print("Making fake Roctx interface")
self.mark = lambda x: None
self.rangePush = lambda x: -1
self.rangePop = lambda: -1
self.rangeStart = lambda x: -1
self.rangeStop = lambda x: None
self.profilerPause = lambda x=None: -1
self.profilerResume = lambda x=None: -1