diff --git a/donkeycar/parts/object_detector/detector.py b/donkeycar/parts/object_detector/detector.py new file mode 100755 index 000000000..9ba5bb943 --- /dev/null +++ b/donkeycar/parts/object_detector/detector.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +# Object Detection from Tensorflow Lite +# Find an object +# https://github.com/tensorflow/examples/tree/master/lite/examples/object_detection/raspberry_pi +# installation: +# pip install tflite-support +# +# Models +# https://www.tensorflow.org/lite/api_docs/python/tflite_model_maker +# +# Stop Sign Tflite model (complete set of COCO images) from tensorflow +# efficientdet_lite0.tflite - https://github.com/tensorflow/examples/tree/master/tensorflow_examples/lite/model_maker +# Traffic cone images source from Roboflow +# custom model created by Craig Fox using images from Roboflow with +# conemodel.tflite - https://universe.roboflow.com/robotica-xftin/traffic-cones-4laxg +# +# This is a general purpose detection class that uses a model to recognize an object. + + +import os +import time + +from tflite_support.task import core +from tflite_support.task import processor +from tflite_support.task import vision + +import logging + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +class Object_Detector: + # Trained TFLITE object detection models + coco_object = 'efficientdet_lite0.tflite' + coco_object_edgetpu = 'efficientdet_lite0_edgetpu.tflite' + cone_object = 'conemodel.tflite' + cone_object_edgetpu = 'conemodel_edgetpu.tflite' + + def __init__ (self, + category = None, # 'person', 'stop sign', 'cone' if None, use category_id to specify object to detect + category_id = 0, # person + enable_edgetpu = False, # Coral Edge TPU (USB Accelerator, Dev Board) + max_results = 3, + score_threshold = 0.3, + num_threads = 4, + ): + + # Initialize the object detection model and category_id + model = self.coco_object_edgetpu if enable_edgetpu else self.coco_object + self.category_id = category_id + + if category == None or category == 'person': + pass + elif category == 'cone': + model = self.cone_object_edgetpu if enable_edgetpu else self.cone_object + elif category == 'stop sign': + self.category_id = 12 + else: + raise(Exception(f'Category value is invalid: {category}')) + logger.debug(f'Detecting category: {category}') + + model = os.path.join(os.path.dirname(__file__), model) + base_options = core.BaseOptions(file_name=model, use_coral=enable_edgetpu, num_threads=num_threads) + detection_options = processor.DetectionOptions(max_results=max_results, score_threshold=score_threshold) + options = vision.ObjectDetectorOptions(base_options=base_options, detection_options=detection_options) + + self.detector = vision.ObjectDetector.create_from_options(options) + + # Performance timer + self.loops = 0 + self.total_time = 0 + + def average_perf(self): + p = 0 if self.loops == 0 else self.total_time / self.loops + return p + + def detect(self, image): + # Create a TensorImage object from the RGB image. + input_tensor = vision.TensorImage.create_from_array(image) + + # Detect objects + start = time.time() + detection_result = self.detector.detect(input_tensor) + self.loops += 1 + self.total_time += (time.time() - start) + + score = 0 + name = None + bbox = None + for detection in detection_result.detections: + category = detection.categories[0] + if category.index == self.category_id: + if category.score > score: +# logger.info(f'{category.category_name} : {category.score}') + score = category.score + name = category.category_name + bbox = detection.bounding_box + + return bbox, score, name + \ No newline at end of file diff --git a/donkeycar/parts/object_detector/od_protocol.py b/donkeycar/parts/object_detector/od_protocol.py new file mode 100644 index 000000000..f3532bc63 --- /dev/null +++ b/donkeycar/parts/object_detector/od_protocol.py @@ -0,0 +1,251 @@ +""" +od_protocol.py +Donkeycar Parts to manage a sequence of events based upon object detection + author: cfox570 March 2023 + The class Detection_Protocol is a base class that provides functions to manages calling the + Object Detector class. Run in pilot mode only. + Four protocols are defined: + 1. Stop_and_Go - First stops the car, then pauses for delay, then passes the Stop Sign. Mimics + the behavior of a human driver on a road. Then the pilot takes over + 2. Pass_Object - Swerves around a Traffic Cone and then resumes driving around the track + + Threaded mode is likely only usable when running with the coGPUprocessor Google Coral Edge TPU. Without the + coprocessor, two Tensorflow Lite processes interfere with each other and the main Keras detection slows down. + + In non threaded mode, the run_hz parameter throttles the objection detection. It is only necessary to set to + run 1 or 2 times a vehicle loop. The object detection algorithm takes about 80m secs to run. So if nothing is detected, + very little delay is added to the vehicle loop over one second. When an object is detected, the detection method + is run for each cycle until the protocol ends. Examples: Stop: The Stop Sign is not seen. + Stop and Go: The Stop Sign is passed; Pass Object: The cone is not detected. The method self.reset_run_counter() is + called to restart the run counter to only run the algorithm run_hz times a second. + +""" +import time +import logging +from donkeycar.parts.object_detector.detector import Object_Detector + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +class Detection_Protocol: + + def __init__ (self, + category = 0, + use_edgetpu = False, + score = 0.5, + image_width = 160, + run_hz = 1, # seconds; run 1 per second + vehicle_hz = 20): + + self.on = True + + self.width = image_width + self.img_center = self.width / 2 + + self.run_trigger = int(vehicle_hz / run_hz) + self.run_counter = 0 + self.run_inprogress = False + + self.newimageready = False + self.detector_ready = True + + self.found = False + self.not_found = True + + self.image = None + self.detector_image = None + self.bbox = None + self.score = 0 + self.name = None + self.position = 0.0 + + self.detector = Object_Detector(category=category, enable_edgetpu = use_edgetpu, score_threshold=score) + + def mark(self, image, bbox): + import cv2 + # top left corner of rectangle + start_point = (bbox.origin_x, bbox.origin_y) + # bottom right corner of rectangle + end_point = (bbox.origin_x + bbox.width, bbox.origin_y + bbox.height) + color = (255, 0, 0) # Red color + thickness = 1 + image = cv2.rectangle(image, start_point, end_point, color, thickness) + + def ready(self): + self.found = False + self.not_found = False + self.detector_image = self.image + if self.detector_ready: + if self.bbox is not None: + self.found = True + self.mark(self.detector_image, self.bbox) + else: + self.not_found = True + + def next_image(self, image): + if self.detector_ready: + self.image = image + self.newimageready = True + + def reset_run_counter(self): + self.run_counter = 0 + + def detect(self): + self.newimageready = False + self.detector_ready = False + self.bbox = None + self.score = 0 + self.name = None + self.position = 0.0 + if self.image is not None: + self.bbox, self.score, self.name = self.detector.detect(self.image) + if self.bbox != None: + self.position = ((self.bbox.origin_x + (self.bbox.width / 2)) - self.img_center) / self.img_center + self.detector_ready = True + + def update(self): + while self.on: + if self.newimageready: + self.detect() + + def manage(self, angle, throttle): + # template routine + return angle, throttle + + def run_threaded(self, angle, throttle, image): + self.ready() + angle, throttle = self.manage(angle, throttle) + self.next_image(image) + if self.detector_image is None: + self.detector_image = image + return angle, throttle, self.detector_image + + def run(self, angle, throttle, image): + self.run_counter += 1 + if self.run_counter >= self.run_trigger: + self.image = image + self.detect() + self.ready() + angle, throttle = self.manage(angle, throttle) + return angle, throttle, self.detector_image + else: + return angle, throttle, image + + def shutdown(self): + logger.info(f'Detector - average detection time {self.detector.average_perf():5.3f}') + self.on = False + +class Stop_Manager(): + # Stop states + IDLE = 0 + INITIATE = 1 + POS_ONE = 3 + NEG_ONE = 2 + NEG_TWO = 4 + THROTTLE_INC = 0.2 + + def __init__(self): + self.stop_state = self.IDLE + self.last_throttle = 0.0 + + def stop(self): + if self.stop_state == self.IDLE: + self.stop_state = self.INITIATE + + def is_idle(self): + return self.stop_state == self.IDLE + + def throttle(self): +# if self.stop_state == self.IDLE: +# pass + throttle = 0.0 + if self.stop_state == self.INITIATE: + self.stop_state = self.NEG_ONE + throttle = -1.0 + elif self.stop_state == self.NEG_ONE: + self.stop_state = self.POS_ONE + throttle = 0.0 + elif self.stop_state == self.POS_ONE: + self.stop_state = self.NEG_TWO + throttle = -1.0 + elif self.stop_state == self.NEG_TWO: + throttle = self.last_throttle + self.THROTTLE_INC + if throttle >= 0.0: + throttle = 0.0 + self.stop_state = self.IDLE + self.last_throttle = throttle + return throttle + +class Stop_And_Go(Detection_Protocol): + # Stop and Go protocol States + RUNNING = 0 + STOPPING = 1 + PAUSING = 2 + PASSING = 3 + + def __init__(self, pause_time=2.0, **kwargs): + super().__init__(category = 'stop sign', **kwargs) + self.pause = pause_time + self.state = self.RUNNING + self.timeout = 0.0 + self.stopper = Stop_Manager() + + def manage(self, angle, throttle): + if self.state == self.RUNNING: + if self.found: + self.state = self.STOPPING + self.stopper.stop() + else: + self.reset_run_counter() + if self.state == self.STOPPING: + throttle = self.stopper.throttle() + if self.stopper.is_idle(): + self.state = self.PAUSING + self.timeout = time.time() + self.pause + elif self.state == self.PAUSING: + if time.time() < self.timeout: + throttle = 0.0 + else: + self.state = self.PASSING + elif self.state == self.PASSING: + if self.not_found: + self.state = self.RUNNING + self.reset_run_counter() + + return angle, throttle + +class Pass_Object(Detection_Protocol): + + def __init__(self, speedup_multiplier=1.0, tolerance=.25, max_angle=.75, **kwargs): + super().__init__(category = 'cone', **kwargs) + self.speedup = speedup_multiplier + self.max_angle = max_angle + self.tolerance = tolerance + + def avoid(self, angle): + cone_tol = self.position + self.tolerance + cone_tol_neg = self.position - self.tolerance + + if angle >= self.position: + if cone_tol <= self.max_angle: + new_angle = max(angle, cone_tol) + else: + new_angle = angle if cone_tol <= angle else cone_tol_neg + else: # ai angle < cone position + if cone_tol_neg >= -self.max_angle: + new_angle = min(angle, cone_tol_neg) + else: + new_angle = angle if cone_tol_neg >= angle else cone_tol + + return new_angle + + def manage(self, angle, throttle): + new_angle = angle + if self.detector_ready: + if not self.found: + self.reset_run_counter() + else: # found + throttle *= self.speedup + new_angle = self.avoid(angle) + return new_angle, throttle + diff --git a/donkeycar/templates/cfg_complete.py b/donkeycar/templates/cfg_complete.py index ac790f202..8ff7065a3 100644 --- a/donkeycar/templates/cfg_complete.py +++ b/donkeycar/templates/cfg_complete.py @@ -750,6 +750,9 @@ REALSENSE_D435_IMU = False # True to capture IMU data (D435i only) REALSENSE_D435_ID = None # serial number of camera or None if you only have one camera (it will autodetect) +# Only enable one of the following object detector protocols +# 1) STOP_SIGN_DETECTOR 2) OD_STOPANDGO 3) OD_PASS_CONE + # Stop Sign Detector STOP_SIGN_DETECTOR = False STOP_SIGN_MIN_SCORE = 0.2 @@ -757,6 +760,22 @@ STOP_SIGN_MAX_REVERSE_COUNT = 10 # How many times should the car reverse when detected a stop sign, set to 0 to disable reversing STOP_SIGN_REVERSE_THROTTLE = -0.5 # Throttle during reversing when detected a stop sign +# Object Detector - Common +# Unlike the Stop Sign Detector, the object detector works with or without Coral Edge TPU. This part +# leverages the Tensorflow Lite object detection model. +OD_USE_EDGETPU = False # If True, Coral Edge TPU connected. Use the edgeTPU model +OD_SCORE = 0.5 # Set the score threshold for detection. +OD_RUN_HZ = 1 # Run detection algorithm n times per drive_loop_hz ex. 1 time every 20 drive loop + # Recommend 1 for Stop And Go protocol and 3 or 4 for Pass Cone protocol +# Object Detector - Stop and Go protocol +OD_STOPANDGO = False # enable stop and go; stop and pause, then proceed past the sign before looking again +OD_PAUSE_TIME = 2.0 # after stop sequence completes, pause for n seconds +#Object Detector - Pass a Cone protocol +OD_PASS_CONE = False # enable detection to swerve around a cone in roadway +OD_SPEEDUP_MULTIPLIER = 1.0 # scale throttle of the car as it passes the object (cone) +OD_MAX_ANGLE = 0.75 # maximum and minimum drive angle target +OD_TOLERANCE = 0.45 # drive angle is set to be larger than TOLERANCE + CONE ANGLE or smaller than TOL - CONE angle + # FPS counter SHOW_FPS = False FPS_DEBUG_INTERVAL = 10 # the interval in seconds for printing the frequency info into the shell diff --git a/donkeycar/templates/complete.py b/donkeycar/templates/complete.py index 32c4a7168..f92d1bc0f 100644 --- a/donkeycar/templates/complete.py +++ b/donkeycar/templates/complete.py @@ -429,6 +429,38 @@ def run(self, *components): V.add(ThrottleFilter(), inputs=['pilot/throttle'], outputs=['pilot/throttle']) + + # Stop at a stop sign and pause for n seconds then proceed + elif cfg.OD_STOPANDGO: + from donkeycar.parts.object_detector.od_protocol import Stop_And_Go + V.add(Stop_And_Go( + pause_time = cfg.OD_PAUSE_TIME, + use_edgetpu = cfg.OD_USE_EDGETPU, + score = cfg.OD_SCORE, + image_width = cfg.IMAGE_W, + run_hz = cfg.OD_RUN_HZ, + vehicle_hz = cfg.DRIVE_LOOP_HZ + ), + inputs=['pilot/angle', 'pilot/throttle', 'cam/image_array'], + outputs=['pilot/angle', 'pilot/throttle', 'cam/image_array'], + run_condition="run_pilot") + + # Swerve around a traffic cone + elif cfg.OD_PASS_CONE: + from donkeycar.parts.object_detector.od_protocol import Pass_Object + V.add(Pass_Object( + speedup_multiplier = cfg.OD_SPEEDUP_MULTIPLIER, + max_angle = cfg.OD_MAX_ANGLE, + tolerance = cfg.OD_TOLERANCE, + use_edgetpu = cfg.OD_USE_EDGETPU, + score = cfg.OD_SCORE, + image_width = cfg.IMAGE_W, + run_hz = cfg.OD_RUN_HZ, + vehicle_hz = cfg.DRIVE_LOOP_HZ + ), + inputs=['pilot/angle', 'pilot/throttle', 'cam/image_array'], + outputs=['pilot/angle', 'pilot/throttle', 'cam/image_array'], + run_condition="run_pilot") # # to give the car a boost when starting ai mode in a race.