|
| 1 | +import json |
| 2 | +import math |
| 3 | +import os |
| 4 | + |
| 5 | +from rmf_building_map_msgs.msg import BuildingMap |
| 6 | +from rmf_building_map_msgs.msg import Level |
| 7 | +from rmf_building_map_msgs.msg import Graph |
| 8 | +from rmf_building_map_msgs.msg import GraphNode |
| 9 | +from rmf_building_map_msgs.msg import GraphEdge |
| 10 | +from rmf_building_map_msgs.msg import Place |
| 11 | +from rmf_building_map_msgs.msg import AffineImage |
| 12 | +from rmf_building_map_msgs.msg import Door |
| 13 | +from rmf_building_map_msgs.msg import Lift |
| 14 | +from rmf_building_map_msgs.msg import Param |
| 15 | + |
| 16 | +DEFAULT_CABIN_DOOR_THICKNESS = 0.05 |
| 17 | +DEFAULT_CABIN_WALL_THICKNESS = 0.1 |
| 18 | +DEFAULT_CABIN_GAP = 0.01 |
| 19 | + |
| 20 | +def load_site_json(map_path): |
| 21 | + map_msg = BuildingMap() |
| 22 | + map_dir = os.path.dirname(map_path) # for calculating image paths |
| 23 | + with open(map_path, 'r') as f: |
| 24 | + site = json.load(f) |
| 25 | + map_msg.name = site["properties"]["name"] |
| 26 | + for level_data in site["levels"].values(): |
| 27 | + map_msg.levels.append(parse_level(map_dir, level_data)) |
| 28 | + for lift_data in site.get("lifts", {}).values(): |
| 29 | + lift = parse_lift(lift_data, site) |
| 30 | + if lift is not None: |
| 31 | + map_msg.lifts.append(lift) |
| 32 | + print(map_msg.lifts) |
| 33 | + return map_msg |
| 34 | + |
| 35 | +def parse_angle(angle): |
| 36 | + if "rad" in angle: |
| 37 | + return angle["rad"] |
| 38 | + else: |
| 39 | + return math.pi * angle["deg"] / 180.0 |
| 40 | + |
| 41 | +def parse_lift(data, site): |
| 42 | + lift_msg = Lift() |
| 43 | + props = data["properties"] |
| 44 | + lift_msg.name = props["name"] |
| 45 | + cabin = props["cabin"]["Rect"] |
| 46 | + lift_msg.width = cabin["width"] |
| 47 | + lift_msg.depth = cabin["depth"] |
| 48 | + front_door_thickness = 0.0 |
| 49 | + if "front_door" in cabin: |
| 50 | + # TODO(luca) use custom thickness |
| 51 | + front_door_thickness = DEFAULT_CABIN_DOOR_THICKNESS |
| 52 | + center_x = -cabin["depth"] / 2.0 - DEFAULT_CABIN_WALL_THICKNESS - DEFAULT_CABIN_GAP - front_door_thickness / 2.0 |
| 53 | + center_y = 0.0 |
| 54 | + left_anchor = site["anchors"].get(str(props["reference_anchors"][0])) |
| 55 | + right_anchor = site["anchors"].get(str(props["reference_anchors"][1])) |
| 56 | + if left_anchor is None or right_anchor is None: |
| 57 | + # ERROR |
| 58 | + return None |
| 59 | + if "Translate2D" in left_anchor: |
| 60 | + left_anchor = left_anchor["Translate2D"] |
| 61 | + else: |
| 62 | + return None |
| 63 | + if "Translate2D" in right_anchor: |
| 64 | + right_anchor = right_anchor["Translate2D"] |
| 65 | + else: |
| 66 | + return None |
| 67 | + lift_msg.ref_yaw = math.atan2(left_anchor[1] - right_anchor[1], left_anchor[0] - right_anchor[0]) |
| 68 | + midpoint_x = (left_anchor[0] + right_anchor[0]) / 2.0 |
| 69 | + midpoint_y = (left_anchor[1] + right_anchor[1]) / 2.0 |
| 70 | + lift_msg.ref_x = center_x + midpoint_x |
| 71 | + lift_msg.ref_y = center_y + midpoint_y |
| 72 | + # TODO(luca) cabin doors |
| 73 | + return lift_msg |
| 74 | + |
| 75 | +def parse_level(map_dir, data): |
| 76 | + level_msg = Level() |
| 77 | + level_msg.name = data["properties"]["name"] |
| 78 | + level_msg.elevation = data["properties"]["elevation"] |
| 79 | + |
| 80 | + for drawing in data.get("drawings", {}).values(): |
| 81 | + image = AffineImage() |
| 82 | + props = drawing["properties"] |
| 83 | + image_filename = props["name"] |
| 84 | + if "Local" not in props["source"]: |
| 85 | + # Warn that non local is not supported |
| 86 | + continue |
| 87 | + image_filename = props["source"]["Local"] |
| 88 | + image_path = os.path.join(map_dir, image_filename) |
| 89 | + |
| 90 | + if os.path.exists(image_path): |
| 91 | + #self.get_logger().info(f'opening: {image_path}') |
| 92 | + with open(image_path, 'rb') as image_file: |
| 93 | + image.data = image_file.read() |
| 94 | + #self.get_logger().info(f'read {len(image.data)} byte image') |
| 95 | + image.name = image_filename.split('.')[0] |
| 96 | + image.encoding = image_filename.split('.')[-1] |
| 97 | + image.scale = 1.0 / props["pixels_per_meter"] |
| 98 | + image.x_offset = props["pose"]["trans"][0] |
| 99 | + image.y_offset = props["pose"]["trans"][1] |
| 100 | + if "yaw" in props["pose"]["rot"]: |
| 101 | + yaw = parse_angle(props["pose"]["rot"]["yaw"]) |
| 102 | + level_msg.images.append(image) |
| 103 | + else: |
| 104 | + pass |
| 105 | + #self.get_logger().error(f'unable to open image: {image_path}') |
| 106 | + |
| 107 | + for door in data.get("doors", {}).values(): |
| 108 | + door_msg = Door() |
| 109 | + door_msg.name = door["name"] |
| 110 | + |
| 111 | + v1 = data["anchors"].get(str(door["anchors"][0])) |
| 112 | + v2 = data["anchors"].get(str(door["anchors"][1])) |
| 113 | + if "Translate2D" in v1: |
| 114 | + v1 = v1["Translate2D"] |
| 115 | + else: |
| 116 | + # LOG ERROR |
| 117 | + continue |
| 118 | + if "Translate2D" in v2: |
| 119 | + v2 = v2["Translate2D"] |
| 120 | + else: |
| 121 | + # LOG ERROR |
| 122 | + continue |
| 123 | + # TODO(luca) Change this based on pivot side |
| 124 | + door_msg.v1_x = v1[0] |
| 125 | + door_msg.v1_y = v1[1] |
| 126 | + door_msg.v2_x = v2[0] |
| 127 | + door_msg.v2_y = v2[1] |
| 128 | + |
| 129 | + if "DoubleSwing" in door["kind"]: |
| 130 | + door_msg.door_type = door_msg.DOOR_TYPE_DOUBLE_SWING |
| 131 | + swing = door["kind"]["DoubleSwing"]["swing"] |
| 132 | + # TODO(luca) implement forward and backward |
| 133 | + door_msg.motion_range = parse_angle(next(iter(swing.values()))) |
| 134 | + door_msg.motion_direction = 1 if "Backward" in swing else -1 |
| 135 | + elif "SingleSwing" in door["kind"]: |
| 136 | + door_msg.door_type = door_msg.DOOR_TYPE_SINGLE_SWING |
| 137 | + swing = door["kind"]["SingleSwing"]["swing"] |
| 138 | + # TODO(luca) implement forward and backward |
| 139 | + door_msg.motion_range = parse_angle(next(iter(swing.values()))) |
| 140 | + door_msg.motion_direction = 1 if "Backward" in swing else -1 |
| 141 | + if "DoubleSliding" in door["kind"]: |
| 142 | + door_msg.door_type = door_msg.DOOR_TYPE_DOUBLE_SLIDING |
| 143 | + elif "SingleSliding" in door["kind"]: |
| 144 | + door_msg.door_type = door_msg.DOOR_TYPE_SINGLE_SLIDING |
| 145 | + else: |
| 146 | + # WARN not supported |
| 147 | + pass |
| 148 | + level_msg.doors.append(door_msg) |
| 149 | + |
| 150 | + # TODO(luca), needed for rmf-web frontend, populate |
| 151 | + for wall in data.get("walls", {}).values(): |
| 152 | + pass |
| 153 | + |
| 154 | + |
| 155 | + return level_msg |
0 commit comments