ZhangAoCanada / RADDet

Range-Azimuth-Doppler Based Radar Object Detection
MIT License
160 stars 39 forks source link

Can you share the code for processing ADC data? #10

Closed dollarser closed 2 years ago

dollarser commented 2 years ago

Thank you very much for your work. It's great. I want to learn the details of the following process.

image

Can you share the relevant code.

ZhangAoCanada commented 2 years ago

The code for this part was developed during experiments, so the code itself is highly un-organized........ That's the reason I didn't share it. I can show you two functions relating to it for a head-up.

You can see all the experiment details inside...


def separateRD_A(radar_RAD_mag, fig = None, axes = None, moving_only=False):
    """
    Function:
        Apply 2D CFAR on Range-Doppler, then go for azimuth.

    Args:
        radar_RAD_mag           ->          radar RAD magnitude
    """
    # setting parameters for CFAR
    range_win = 10 # big 20/256 for large range, lower down if necessary
    doppler_win = 5 # big 10/64 for large doppler, lower down if necessary
    range_guard = 6
    doppler_guard = 3
    s = 10 # 1.23 ish for strict masks
    os = 0.75 # 0.9 ish for strict masks

    RD_img = getLog(getSumDim(radar_RAD_mag, 1), scalar=10, log_10=False)

    # for moving objects
    RD_moving = RD_img.copy()
    RD_moving[..., int(RD_img.shape[1]/2)] = 0
    RD_static = RD_img[..., int(RD_img.shape[1]/2)]
    RD_threshold, RD_mask = twoDCFAR(RD_moving, ref_win=(range_win, doppler_win), \
                guard_win=(range_guard, doppler_guard), scalar=s, os=os, mode="OS-CFAR")
    # for static objects
    if not moving_only:
        static_threshold, static_index = oneDCFAR(RD_static, ref_win=range_win,  \
                    guard_win=range_guard, scalar=s, order_statistic=os, mode="OS-CFAR")
        static_mask = np.zeros(RD_img.shape)
        static_mask[..., int(static_mask.shape[1]/2)] = static_index
        RD_mask = RD_mask + static_mask 

    ################ plot to see RD cfar ##################
    RD_post_mask = postRD(RD_mask, mode="morpho")
    RD_mask = RD_post_mask
    if fig is not None:
        clearAxes(axes)
        imgPlot(RD_img, axes[0], "viridis", 1, "Range-Doppler")
        imgPlot(RD_mask, axes[1], "viridis", 1, "filter")
        imgPlot(RD_post_mask, axes[2], "viridis", 1, "mask")
        keepDrawing(fig, 0.1)
    RAD_mask = []
    for i in range(RD_img.shape[0]):
        AD_mask = []
        for j in range(RD_img.shape[1]):
            if RD_mask[i,j] != 0:
                azimuth_FFT_mag = getLog(radar_RAD_mag[i,:,j], scalar=10, log_10=True)
                azimuth_threshold, azimuth_cfar_indexes = oneDCFAR(azimuth_FFT_mag, ref_win=100, \
                            guard_win=4, scalar=1.25, order_statistic=0.85, mode="CA-CFAR")
                azimuth_cfar_indexes = azimuthPeaks(azimuth_FFT_mag, azimuth_cfar_indexes, \
                                                    peak_cell_num = 1)
                azimuth_mask = np.expand_dims(np.expand_dims(azimuth_cfar_indexes, -1), 0)
            else:
                azimuth_mask = np.zeros([1, radar_RAD_mag.shape[1], 1])
            AD_mask.append(azimuth_mask)
        AD_mask = np.concatenate(AD_mask, -1)
        RAD_mask.append(AD_mask)
    RAD_mask = np.concatenate(RAD_mask, 0)
    return RAD_mask

def postRD(RD_mask, mode):
    """
    Function:
        RD mask post processing.

    Args:
        RD_mask             ->          RD mask derived from 2D cfar
        mode                ->          3 options: "dilated", "morpho", "polygon"
    """
    RD_original = RD_mask.copy()
    RD_instances = InstancizeRDMask(RD_mask)
    if len(RD_instances) == 0:
        output = np.zeros(RD_mask.shape)
    else:
        output = np.zeros(RD_mask.shape)
        for instance_i in range(len(RD_instances)):
            current_instance = RD_instances[instance_i][0]
            RD_cv_try = current_instance.copy()*255.
            RD_cv_try = RD_cv_try.astype(np.uint8)
            if mode == "dilated" or mode == "polygon" or mode == "dilated+morpho":
                ret, thresh = cv2.threshold(RD_cv_try, 0, 255, cv2.THRESH_BINARY)
                cnts, hierarchy = cv2.findContours(thresh, cv2.RETR_EXTERNAL, \
                                    cv2.CHAIN_APPROX_SIMPLE)
                if mode == "dilated":
                    kernel = np.ones((2,2), np.uint8)
                    dilated = cv2.dilate(thresh, kernel, iterations=1)
                    current_mask = np.where(dilated > 0, 1., 0.)
                elif mode == "polygon":
                    empty_img = np.zeros(RD_cv_try.shape, np.uint8)
                    for i in range(len(cnts)):
                        epsilon = 0.00001 * cv2.arcLength(cnts[i], True)
                        approx = cv2.approxPolyDP(cnts[i], epsilon, True)
                        cv2.drawContours(empty_img, [approx], 0, (255,255,255), 3)
                    current_mask = np.where(empty_img > 0, 1., 0.)
                elif mode == "dilated+morpho":
                    # kernel = np.ones((2,2), np.uint8)
                    kernel = cv2.getStructuringElement(cv2.MORPH_CROSS, (3, 3))
                    dilated = cv2.dilate(thresh, kernel, iterations=1)
                    # rect_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
                    rect_kernel = cv2.getStructuringElement(cv2.MORPH_CROSS, (3, 3))
                    di_mo = cv2.morphologyEx(dilated, cv2.MORPH_CLOSE, rect_kernel)
                    current_mask = np.where(di_mo > 0, 1., 0.)
            elif mode == "morpho":
                rect_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 5))
                # rect_kernel = cv2.getStructuringElement(cv2.MORPH_CROSS, (9, 9))
                threshed = cv2.morphologyEx(RD_cv_try, cv2.MORPH_CLOSE, rect_kernel)
                current_mask = np.where(threshed > 0, 1., 0.)
            else:
                raise ValueError("Mode not recognized.")

            output += current_mask
        output = np.where(output > 0, 1., 0.)
    return output
dollarser commented 2 years ago

Thank you very much for your sharing, which is very helpful to me. Forgive my ignorance? Can you provide the specific implementation of twoDCFAR and InstancizeRDMask ?

RD_threshold, RD_mask = twoDCFAR(RD_moving, ref_win=(range_win, doppler_win), guard_win=(range_guard, doppler_guard), scalar=s, os=os, mode="OS-CFAR")

RD_instances = InstancizeRDMask(RD_mask)

ZhangAoCanada commented 2 years ago

I can give you InstancizeRDMask, which is as follow. For twoDCFAR, since Sensor Cortek is still using it, I may need their permission.

def InstancizeRDMask(RD_mask):
    """
    Function:
        Separate RAD mask w.r.t each instance.

    Args:
        radar_RAD_mask          ->          RAD mask
    """
    indexes_clutters = []
    for i in range(RD_mask.shape[0]):
        for j in range(RD_mask.shape[1]):
            if RD_mask[i,j] != 0:
                indexes_clutters.append(np.array([i,j]))
    indexes_clutters = np.array(indexes_clutters)
    if len(indexes_clutters) != 0:
        indexes_instances = DbscanDenoise(indexes_clutters, epsilon=5, minimum_samples=3)
        instances_masks = []
        for instance_i in range(len(indexes_instances)):
            current_instance = indexes_instances[instance_i]
            instance_mask = np.zeros(RD_mask.shape)
            for pnt_i in range(len(current_instance)):
                pnt_ind = current_instance[pnt_i]
                instance_mask[pnt_ind[0], pnt_ind[1]] = 1.
            instances_masks.append([instance_mask, current_instance])
    else:
        instances_masks = []
    return instances_masks
Zber5 commented 4 weeks ago

Hi, Ao,

Great work on the project! I'm trying to reproduce the auto-annotation code, but I noticed there's a part I'm unclear about. Could you please explain what the azimuthPeaks function does in the following code?

azimuth_cfar_indexes = azimuthPeaks(azimuth_FFT_mag, azimuth_cfar_indexes, peak_cell_num = 1)