Skip to content

Commit

Permalink
Merge pull request #766 from roboflow/keypoint-visualizations
Browse files Browse the repository at this point in the history
Keypoint visualization block
  • Loading branch information
PawelPeczek-Roboflow authored Nov 1, 2024
2 parents 1ad728e + a6dbbf5 commit e1803bb
Show file tree
Hide file tree
Showing 5 changed files with 700 additions and 0 deletions.
4 changes: 4 additions & 0 deletions inference/core/workflows/core_steps/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@
from inference.core.workflows.core_steps.visualizations.halo.v1 import (
HaloVisualizationBlockV1,
)
from inference.core.workflows.core_steps.visualizations.keypoint.v1 import (
KeypointVisualizationBlockV1,
)
from inference.core.workflows.core_steps.visualizations.label.v1 import (
LabelVisualizationBlockV1,
)
Expand Down Expand Up @@ -429,6 +432,7 @@ def load_blocks() -> List[Type[WorkflowBlock]]:
VLMAsClassifierBlockV1,
VLMAsDetectorBlockV1,
YoloWorldModelBlockV1,
KeypointVisualizationBlockV1,
DataAggregatorBlockV1,
CSVFormatterBlockV1,
EmailNotificationBlockV1,
Expand Down
Empty file.
266 changes: 266 additions & 0 deletions inference/core/workflows/core_steps/visualizations/keypoint/v1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
from typing import Literal, Optional, Type, Union

import numpy as np
import supervision as sv
from pydantic import ConfigDict, Field

from inference.core.workflows.core_steps.visualizations.common.base import (
OUTPUT_IMAGE_KEY,
VisualizationBlock,
VisualizationManifest,
)
from inference.core.workflows.core_steps.visualizations.common.utils import str_to_color
from inference.core.workflows.execution_engine.entities.base import WorkflowImageData
from inference.core.workflows.execution_engine.entities.types import (
FLOAT_KIND,
INTEGER_KIND,
KEYPOINT_DETECTION_PREDICTION_KIND,
STRING_KIND,
StepOutputSelector,
WorkflowParameterSelector,
)
from inference.core.workflows.prototypes.block import BlockResult, WorkflowBlockManifest

TYPE: str = "roboflow_core/keypoint_visualization@v1"
SHORT_DESCRIPTION = "Draws keypoints on detected objects in an image."
LONG_DESCRIPTION = """
The `KeypointVisualization` block uses a detections from an
keypoint detection model to draw keypoints on objects using
`sv.VertexAnnotator`.
"""


class KeypointManifest(VisualizationManifest):
type: Literal[f"{TYPE}", "KeypointVisualization"]
model_config = ConfigDict(
json_schema_extra={
"name": "Keypoint Visualization",
"version": "v1",
"short_description": SHORT_DESCRIPTION,
"long_description": LONG_DESCRIPTION,
"license": "Apache-2.0",
"block_type": "visualization",
"ui_manifest": {
"section": "visualization",
"icon": "far fa-braille",
"blockPriority": 20,
},
}
)

predictions: StepOutputSelector(
kind=[
KEYPOINT_DETECTION_PREDICTION_KIND,
]
) = Field( # type: ignore
description="Predictions",
examples=["$steps.keypoint_detection_model.predictions"],
)

annotator_type: Literal["edge", "vertex", "vertex_label"] = Field(
description="Type of annotator to be used for keypoint visualization.",
default="edge",
json_schema_extra={"always_visible": True},
)

color: Union[str, WorkflowParameterSelector(kind=[STRING_KIND])] = Field( # type: ignore
description="Color of the keypoint.",
default="#A351FB",
examples=["#A351FB", "green", "$inputs.color"],
)

text_color: Union[str, WorkflowParameterSelector(kind=[STRING_KIND])] = Field( # type: ignore
description="Text color of the keypoint.",
default="black",
examples=["black", "$inputs.text_color"],
json_schema_extra={
"relevant_for": {
"annotator_type": {
"values": ["vertex_label"],
},
},
},
)
text_scale: Union[float, WorkflowParameterSelector(kind=[FLOAT_KIND])] = Field( # type: ignore
description="Scale of the text.",
default=0.5,
examples=[0.5, "$inputs.text_scale"],
json_schema_extra={
"relevant_for": {
"annotator_type": {
"values": ["vertex_label"],
},
},
},
)

text_thickness: Union[int, WorkflowParameterSelector(kind=[INTEGER_KIND])] = Field( # type: ignore
description="Thickness of the text characters.",
default=1,
examples=[1, "$inputs.text_thickness"],
json_schema_extra={
"relevant_for": {
"annotator_type": {
"values": ["vertex_label"],
},
},
},
)

text_padding: Union[int, WorkflowParameterSelector(kind=[INTEGER_KIND])] = Field( # type: ignore
description="Padding around the text in pixels.",
default=10,
examples=[10, "$inputs.text_padding"],
json_schema_extra={
"relevant_for": {
"annotator_type": {
"values": ["vertex_label"],
},
},
},
)

thickness: Union[int, WorkflowParameterSelector(kind=[INTEGER_KIND])] = Field( # type: ignore
description="Thickness of the outline in pixels.",
default=2,
examples=[2, "$inputs.thickness"],
json_schema_extra={
"relevant_for": {
"annotator_type": {
"values": ["edge"],
},
},
},
)

radius: Union[int, WorkflowParameterSelector(kind=[INTEGER_KIND])] = Field( # type: ignore
description="Radius of the keypoint in pixels.",
default=10,
examples=[10, "$inputs.radius"],
json_schema_extra={
"relevant_for": {
"annotator_type": {
"values": ["vertex", "vertex_label"],
},
},
},
)

@classmethod
def get_execution_engine_compatibility(cls) -> Optional[str]:
return ">=1.2.0,<2.0.0"


class KeypointVisualizationBlockV1(VisualizationBlock):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.annotatorCache = {}

@classmethod
def get_manifest(cls) -> Type[WorkflowBlockManifest]:
return KeypointManifest

def getAnnotator(
self,
color: str,
text_color: str,
text_scale: float,
text_thickness: int,
text_padding: int,
thickness: int,
radius: int,
annotator_type: str,
) -> sv.annotators.base.BaseAnnotator:
key = "_".join(
map(
str,
[
color,
text_color,
text_scale,
text_thickness,
text_padding,
thickness,
radius,
annotator_type,
],
)
)

if key not in self.annotatorCache:
color = str_to_color(color)
text_color = str_to_color(text_color)

if annotator_type == "edge":
self.annotatorCache[key] = sv.EdgeAnnotator(
color=color,
thickness=thickness,
)
elif annotator_type == "vertex":
self.annotatorCache[key] = sv.VertexAnnotator(
color=color,
radius=radius,
)
elif annotator_type == "vertex_label":
self.annotatorCache[key] = sv.VertexLabelAnnotator(
color=color,
text_color=text_color,
text_scale=text_scale,
text_thickness=text_thickness,
text_padding=text_padding,
border_radius=radius,
)

return self.annotatorCache[key]

# Function to convert detections to keypoints
def convert_detections_to_keypoints(self, detections):
keypoints_xy = detections.data["keypoints_xy"]
keypoints_confidence = detections.data["keypoints_confidence"]
keypoints_class_name = detections.data["keypoints_class_name"]
class_id = detections.class_id

keypoints = sv.KeyPoints(
xy=np.array(keypoints_xy, dtype=np.float32),
confidence=np.array(keypoints_confidence, dtype=np.float32),
class_id=np.array(class_id, dtype=int),
data={"class_name": np.array(keypoints_class_name, dtype=object)},
)
return keypoints

def run(
self,
image: WorkflowImageData,
predictions: sv.Detections,
copy_image: bool,
annotator_type: Optional[str],
color: Optional[str],
text_color: Optional[str],
text_scale: Optional[float],
text_thickness: Optional[int],
text_padding: Optional[int],
thickness: Optional[int],
radius: Optional[int],
) -> BlockResult:
annotator = self.getAnnotator(
color,
text_color,
text_scale,
text_thickness,
text_padding,
thickness,
radius,
annotator_type,
)

keypoints = self.convert_detections_to_keypoints(predictions)

annotated_image = annotator.annotate(
scene=image.numpy_image.copy() if copy_image else image.numpy_image,
key_points=keypoints,
)
return {
OUTPUT_IMAGE_KEY: WorkflowImageData.copy_and_replace(
origin_image_data=image, numpy_image=annotated_image
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import numpy as np
import cv2

from inference.core.env import WORKFLOWS_MAX_CONCURRENT_STEPS
from inference.core.managers.base import ModelManager
from inference.core.workflows.core_steps.common.entities import StepExecutionMode
from inference.core.workflows.execution_engine.core import ExecutionEngine


WORKFLOW_KEYPOINT_VISUALIZATION = {
"version": "1.1",
"inputs": [
{"type": "WorkflowImage", "name": "image"},
{
"type": "WorkflowParameter",
"name": "model_id",
"default_value": "yolov8n-pose-640",
},
],
"steps": [
{
"type": "KeypointsDetectionModel",
"name": "model",
"image": "$inputs.image",
"model_id": "$inputs.model_id",
},
{
"type": "roboflow_core/keypoint_visualization@v1",
"name": "visualization",
"image": "$inputs.image",
"predictions": "$steps.model.predictions",
},
],
"outputs": [
{
"type": "JsonField",
"name": "visualization",
"selector": "$steps.visualization.image",
},
],
}


def test_workflow_keypoint_visualization(
model_manager: ModelManager,
crowd_image: np.ndarray,
) -> None:
# given
workflow_init_parameters = {
"workflows_core.model_manager": model_manager,
"workflows_core.api_key": None,
"workflows_core.step_execution_mode": StepExecutionMode.LOCAL,
}
execution_engine = ExecutionEngine.init(
workflow_definition=WORKFLOW_KEYPOINT_VISUALIZATION,
init_parameters=workflow_init_parameters,
max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS,
)

# when
result = execution_engine.run(
runtime_parameters={
"image": crowd_image,
}
)

assert isinstance(result, list)
assert len(result) == 1
assert "visualization" in result[0]
assert result[0]["visualization"].numpy_image.shape == crowd_image.shape
Loading

0 comments on commit e1803bb

Please sign in to comment.