Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postprocess Callback #100

Merged
merged 8 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CITATION.cff
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ authors:
family-names: Gutiérrez Hermosillo Muriedas
email: [email protected]
affiliation: >-
Scientific Computing Centre, Karlsruhe Institute für
Scientific Computing Center, Karlsruhe Institute für
Technologie
orcid: 'https://orcid.org/0000-0001-8439-7145'
repository-code: 'https://github.com/Helmholtz-AI-Energy/perun'
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ mpirun -n 8 perun monitor path/to/your/script.py

## Docs

To get more information, check out our [docs page](https://perun.readthedocs.io/en/latest/).
To get more information, check out our [docs page](https://perun.readthedocs.io/en/latest/) or check the [examples](https://github.com/Helmholtz-AI-Energy/perun/tree/main/examples).

## Citing perun

Expand Down
6 changes: 5 additions & 1 deletion docs/data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
Data
====

perun structures the data collected as a tree, with the root node containing the aggregated data of an indiviudal run of the application, and the nodes further down the tree contain the information of the indivdual compute nodes, devices and *sensors*. *Sensors* are meant as the individual API that perun uses to gather measurements, and a single API can provide information about multiple devices, and multiple devices can be monitored from multiple APIs.
perun structures the data collected as a tree, with the root node containing the aggregated data of an indiviudal run of the application, and the nodes further down the tree contain the information of the indivdual compute nodes, devices and *sensors*. *Sensors* are meant as the individual values that can be collected from the distinct monitoring backends.

.. image:: images/data_structure.png

Each node in the data structure, once the raw data at the bottom has been processed, contain a set of summarized metrics based on the data that was collected by its sub-nodes, and a metadata dictionary with any information that could be obtained by the application, node, device or API.

Each node contains a list of metrics or stats, which represent the accumulated data. As well as metadata.

The nodeType attribute indiciates the type of object in the hierarchy this nodes represents. At the lowest level, the leafs of the tree, you would have individual "sensors", values collected by a single device or interface. Higher up the tree, the data nodes represent groups of devices and computational nodes. The three bottom levels of the tree represent the hardware. Further up the three, data starts being acumulated by individual runs of the application, with "run" being a single execution of the application, a "multi_run" is the data from multiple runs when perun is run with the ```--rounds N``` option, and at the highest level, the root of the tree is the application itself.
Binary file modified docs/images/data_structure.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
File renamed without changes.
16 changes: 16 additions & 0 deletions examples/mlflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# perun + MLFLow

If you are already using monitoring tools like MLFlow, you might want to add the data collected by perun to enhance the already existing data. This can be done easily with the ```@register_callback``` decorator. An example is shown in the train.py file:

```python
@register_callback
def perun2mlflow(node):
mlflow.start_run(active_run.info.run_id)
for metricType, metric in node.metrics.items():
name = f"{metricType.value}"
mlflow.log_metric(name, metric.value)
```

Functions decorated by ```@register_callback``` takes only one argument, ```node```. The node object is an instance of ```perun.data_model.data.DataNode```, which is a tree structure that contains all the data collected while monitoring the current script. Each node contains the accumulated data of the sub-nodes in the ```metrics``` dictionary. Each metric object contains all the metadata relevant to the value and the value itself. In the example above, the summarized values for power, energy and hardware utilization are being submitted as metrics to the mlflow tracking system.

For more information on the data node object, [check our docs](https://perun.readthedocs.io/en/latest/data.html)
63 changes: 63 additions & 0 deletions examples/mlflow/requirenments.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
alembic==1.13.1
blinker==1.7.0
certifi==2023.11.17
charset-normalizer==3.3.2
click==8.1.7
cloudpickle==3.0.0
contourpy==1.1.1
cycler==0.12.1
databricks-cli==0.18.0
docker==6.1.3
entrypoints==0.4
Flask==3.0.0
fonttools==4.47.0
gitdb==4.0.11
GitPython==3.1.40
greenlet==3.0.3
gunicorn==21.2.0
h5py==3.10.0
idna==3.6
importlib-metadata==7.0.1
importlib-resources==6.1.1
itsdangerous==2.1.2
Jinja2==3.1.2
joblib==1.3.2
kiwisolver==1.4.5
Mako==1.3.0
Markdown==3.5.1
MarkupSafe==2.1.3
matplotlib==3.7.4
mlflow==2.9.2
mpi4py==3.1.5
numpy==1.24.4
nvidia-ml-py==12.535.133
oauthlib==3.2.2
packaging==23.2
pandas==2.0.3
-e git+ssh://[email protected]/Helmholtz-AI-Energy/perun.git@e2f23885dc8207838961a4a036583fdc5bd2e2bc#egg=perun
Pillow==10.1.0
protobuf==4.25.1
psutil==5.9.7
py-cpuinfo==9.0.0
pyarrow==14.0.2
PyJWT==2.8.0
pyparsing==3.1.1
python-dateutil==2.8.2
pytz==2023.3.post1
PyYAML==6.0.1
querystring-parser==1.2.4
requests==2.31.0
scikit-learn==1.3.2
scipy==1.10.1
six==1.16.0
smmap==5.0.1
SQLAlchemy==2.0.24
sqlparse==0.4.4
tabulate==0.9.0
threadpoolctl==3.2.0
typing_extensions==4.9.0
tzdata==2023.3
urllib3==2.1.0
websocket-client==1.7.0
Werkzeug==3.0.1
zipp==3.17.0
96 changes: 96 additions & 0 deletions examples/mlflow/train.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import logging
import sys
import warnings
from urllib.parse import urlparse

import mlflow
import mlflow.sklearn
import numpy as np
import pandas as pd
from sklearn.linear_model import ElasticNet
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import train_test_split

from perun import monitor, register_callback

logging.basicConfig(level=logging.WARN)
logger = logging.getLogger(__name__)


@monitor()
def eval_metrics(actual, pred):
rmse = np.sqrt(mean_squared_error(actual, pred))
mae = mean_absolute_error(actual, pred)
r2 = r2_score(actual, pred)
return rmse, mae, r2


@monitor()
def train(data):
train, test = train_test_split(data)

# The predicted column is "quality" which is a scalar from [3, 9]
train_x = train.drop(["quality"], axis=1)
test_x = test.drop(["quality"], axis=1)
train_y = train[["quality"]]
test_y = test[["quality"]]

alpha = float(sys.argv[1]) if len(sys.argv) > 1 else 0.5
l1_ratio = float(sys.argv[2]) if len(sys.argv) > 2 else 0.5

with mlflow.start_run() as active_run:

@register_callback
def perun2mlflow(node):
mlflow.start_run(active_run.info.run_id)
for metricType, metric in node.metrics.items():
name = f"{metric.type.value}"
mlflow.log_metric(name, metric.value)

lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
lr.fit(train_x, train_y)

predicted_qualities = lr.predict(test_x)

(rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)

print("Elasticnet model (alpha=%f, l1_ratio=%f):" % (alpha, l1_ratio))
print(" RMSE: %s" % rmse)
print(" MAE: %s" % mae)
print(" R2: %s" % r2)

mlflow.log_param("alpha", alpha)
mlflow.log_param("l1_ratio", l1_ratio)
mlflow.log_metric("rmse", rmse)
mlflow.log_metric("r2", r2)
mlflow.log_metric("mae", mae)

tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme

# Model registry does not work with file store
if tracking_url_type_store != "file":
# Register the model
# There are other ways to use the Model Registry, which depends on the use case,
# please refer to the doc for more information:
# https://mlflow.org/docs/latest/model-registry.html#api-workflow
mlflow.sklearn.log_model(
lr, "model", registered_model_name="ElasticnetWineModel"
)
else:
mlflow.sklearn.log_model(lr, "model")


if __name__ == "__main__":
warnings.filterwarnings("ignore")
np.random.seed(40)

# Read the wine-quality csv file from the URL
csv_url = "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv"
try:
data = pd.read_csv(csv_url, sep=";")
except Exception as e:
logger.exception(
"Unable to download training & test CSV, check your internet connection. Error: %s",
e,
)
train(data)
2 changes: 1 addition & 1 deletion perun/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@

log = init_logging(config.get("debug", "log_lvl"))

from perun.api.decorator import monitor
from perun.api.decorator import monitor, register_callback
17 changes: 16 additions & 1 deletion perun/api/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

import functools
import logging
from typing import Optional
from typing import Callable, Optional

from perun.data_model.data import DataNode
from perun.perun import Perun

log = logging.getLogger("perun")
Expand Down Expand Up @@ -33,3 +34,17 @@ def func_wrapper(*args, **kwargs):
return func_wrapper

return inner_function


def register_callback(func: Callable[[DataNode], None]):
"""Register a function to run after perun has finished collection data.

Parameters
----------
func : Callable[[DataNode], None]
Function to be called.
"""
perun = Perun() # type: ignore
if func.__name__ not in perun.postprocess_callbacks:
log.info(f"Rank {perun.comm.Get_rank()}: Registering callback {func.__name__}")
perun.postprocess_callbacks[func.__name__] = func
97 changes: 97 additions & 0 deletions perun/comm.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Comm module."""

import logging
import sys
import time
from typing import Any, List, Optional

log = logging.getLogger("perun")
Expand All @@ -17,6 +19,7 @@ def __init__(self):
try:
from mpi4py import MPI

self._MPI = MPI
if MPI.COMM_WORLD.Get_size() >= 1:
self._comm = MPI.COMM_WORLD
self._enabled = True
Expand Down Expand Up @@ -72,3 +75,97 @@ def Abort(self, errorcode: int):
"""MPI Abort operation."""
if self._enabled:
self._comm.Abort(errorcode=errorcode)
else:
sys.exit(1)

def gather_from_ranks(
self, obj: Any, ranks: List[int], root: int = 0
) -> Optional[List[Any]]:
"""Collect python objects from specific ranks at the determined root.

Parameters
----------
obj : Any
Object to be collected.
ranks : List[int]
List of ranks that need to send the object.
root : int, optional
Reciever rank, by default 0

Returns
-------
Optional[List[Any]]
List with the gathered objects.
"""
if self._enabled:
result = None
if self.Get_rank() != root:
self._comm.send(obj, root)
else:
result = []
for rank in ranks:
if self.Get_rank() != rank:
result.append(self._comm.recv(source=rank))
else:
result.append(obj)

return result
else:
return [obj]

def check_available_ranks(self) -> List[int]:
"""Return an array with all the ranks that are capable of responding to a single send/recv.

Returns
-------
List[int]
List with responsive MPI ranks.
"""
if self._enabled:
rank = self._comm.Get_rank()
size = self._comm.Get_size()

# Create a list to store available ranks
available_ranks = []

# Start time for the timeout mechanism
start_time = time.time()

# Non-blocking receive requests list
requests = []

for target_rank in range(size):
if target_rank != rank:
self._comm.isend(rank, dest=target_rank, tag=0)

# Initiate non-blocking receive requests from all other ranks
for target_rank in range(size):
if target_rank != rank: # Skip sending to self
req = self._comm.irecv(source=target_rank, tag=0)
requests.append((target_rank, req))

# Check for available ranks while handling timeouts
while time.time() - start_time < 5: # 5 seconds timeout for demonstration
for target_rank, req in requests:
if target_rank not in available_ranks:
if req.Test(): # Check if a request has received a message
available_ranks.append(
target_rank
) # Add the rank to available list

if len(available_ranks) == size - 1: # All ranks are available
break

# Sleep for a short duration before checking again
time.sleep(0.1)

# Cancel all remaining requests to prevent potential deadlocks
for target_rank, req in requests:
if target_rank not in available_ranks:
req.Cancel()

available_ranks.append(rank)
sorted_available_ranks = sorted(available_ranks)
return sorted_available_ranks
else:
return [0]
2 changes: 1 addition & 1 deletion perun/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def exportTo(
raise Exception("DataNode needs to be processed before it can be exported.")

if not output_path.exists():
log.info(f"{output_path.parent} does not exists. So lets make it.")
log.info(f"{output_path} does not exists. So lets make it.")
output_path.mkdir()

if not mr_id and (
Expand Down
Loading
Loading