首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >【跟踪实战】手把手教你SFSORT跟踪实战

【跟踪实战】手把手教你SFSORT跟踪实战

作者头像
AI浩
发布2025-07-16 17:15:28
发布2025-07-16 17:15:28
13200
代码可运行
举报
文章被收录于专栏:AI智韵AI智韵
运行总次数:0
代码可运行

下面是跟踪SFSORT的代码,代码如下:

代码语言:javascript
代码运行次数:0
运行
复制
# ******************************************************************** #
# ****************** Sharif University of Technology ***************** #
# *************** Department of Electrical Engineering *************** #
# ************************ Deep Learning Lab ************************* #
# ************************ SFSORT Version 4.2 ************************ #
# ************ Authors: Mehrdad Morsali - Zeinab Sharifi ************* #
# *********** mehrdadmorsali@gmail.com - zsh.5ooo@gmail.com ********** #
# ******************************************************************** #


# ******************************************************************** #
# ********************** Packages and Libraries ********************** #
# ******************************************************************** #
import numpy as np


use_lap=True
try:
    import lap
except ImportError:
    from scipy.optimize import linear_sum_assignment
    use_lap=False
 
# ******************************************************************** #
# ***************************** Classes ****************************** #
# ******************************************************************** # 
class DotAccess(dict):
    """Provides dot.notation access to dictionary attributes"""
    __getattr__ = dict.get
    __setattr__ = dict.__setitem__
    __delattr__ = dict.__delitem__
    
    
class TrackState:
    """Enumeration of possible states of a track"""    
    Active = 0
    Lost_Central = 1
    Lost_Marginal = 2
 

class Track:
    """Handles basic track attributes and operations"""
    
    def __init__(self, bbox, frame_id, track_id):
        """Track initialization"""                 
        self.track_id = track_id
        self.bbox = bbox 
        self.state = TrackState.Active                      
        self.last_frame = frame_id
 
    def update(self, box, frame_id):
        """Updates a matched track"""      
        self.bbox = box
        self.state = TrackState.Active
        self.last_frame = frame_id
    

class SFSORT:
    """Multi-Object Tracking System"""
    
    def __init__(self, args):
        """Initialize a tracker with given arguments""" 
        args = DotAccess(args)     

        # Register tracking arguments, setting default values if the argument is not provided      
        if args.high_th is None:
            self.high_th = 0.6
        else:
            self.high_th = self.clamp(args.high_th, 0, 1)
            
        if args.match_th_first is None:
            self.match_th_first = 0.67
        else:
            self.match_th_first = self.clamp(args.match_th_first, 0, 0.67)  
            
        if args.new_track_th is None:
            self.new_track_th = 0.7
        else:
            self.new_track_th = self.clamp(args.new_track_th, self.high_th, 1)          
        
        if args.low_th is None:
            self.low_th = 0.1
        else:
            self.low_th = self.clamp(args.low_th, 0, self.high_th)       
  
        if args.match_th_second is None:
            self.match_th_second = 0.3
        else:
            self.match_th_second = self.clamp(args.match_th_second, 0, 1)     
  
        self.dynamic_tuning = False
        if args.dynamic_tuning is not None:
            self.cth = 0.5
            self.high_th_m = 0.0
            self.new_track_th_m = 0.0
            self.match_th_first_m = 0.0
            if args.dynamic_tuning:
                self.dynamic_tuning = True
                if args.cth is not None:
                    self.cth = self.clamp(args.cth, args.low_th, 1)      
                if args.high_th_m is not None:
                    self.high_th_m = self.clamp(args.high_th_m, 0.02, 0.1)   
                if args.new_track_th_m is not None:
                    self.new_track_th_m = self.clamp(args.new_track_th_m, 0.02, 0.08)                    
                if args.match_th_first_m is not None:
                    self.match_th_first_m = self.clamp(args.match_th_first_m, 0.02, 0.08)   
                    
        if args.marginal_timeout is None:
            self.marginal_timeout = 0
        else:
            self.marginal_timeout = self.clamp(args.marginal_timeout, 0, 500) 
            
        if args.central_timeout is None:
            self.central_timeout = 0
        else:
            self.central_timeout = self.clamp(args.central_timeout, 0, 1000)            
        
        self.l_margin = 0
        self.r_margin = 0
        if args.frame_width:
            self.r_margin = args.frame_width
            if args.horizontal_margin is not None:                             
                self.l_margin = self.clamp(args.horizontal_margin, 0, args.frame_width) 
                self.r_margin = self.clamp(args.frame_width - args.horizontal_margin, 0, args.frame_width) 
        
        self.t_margin = 0
        self.b_margin = 0
        if args.frame_height:
            self.b_margin = args.frame_height
            if args.vertical_margin is not None:                             
                self.t_margin = self.clamp(args.vertical_margin, 0, args.frame_height) 
                self.b_margin = self.clamp(args.frame_height - args.vertical_margin , 0, args.frame_height)   
               
        # Initialize the tracker
        self.frame_no = 0      
        self.id_counter = 0       
        self.active_tracks = []         
        self.lost_tracks = [] 
        
    def update(self, boxes, scores):
        """Updates tracker with new detections"""
        # Adjust dynamic arguments
        hth = self.high_th 
        nth = self.new_track_th 
        mth = self.match_th_first     
        if self.dynamic_tuning:
            count = len(scores[scores>self.cth])     
            if count < 1:
              count = 1
              
            lnc = np.log10(count)           
            hth = self.clamp(hth - (self.high_th_m * lnc), 0, 1)
            nth = self.clamp(nth + (self.new_track_th_m * lnc), hth, 1)    
            mth = self.clamp(mth - (self.match_th_first_m * lnc), 0, 0.67)  
                  
        # Increase frame number
        self.frame_no += 1
        
        # Variable: Active tracks in the next frame
        next_active_tracks = []
        
        # Remove long-time lost tracks      
        all_lost_tracks = self.lost_tracks.copy()
        for track in all_lost_tracks:
            if track.state == TrackState.Lost_Central:
                if self.frame_no - track.last_frame > self.central_timeout:
                    self.lost_tracks.remove(track)                   
            else:
                if self.frame_no - track.last_frame > self.marginal_timeout:
                    self.lost_tracks.remove(track)     
                    
        # Gather out all previous tracks
        track_pool = self.active_tracks + self.lost_tracks  
        
        # Try to associate tracks with high score detections
        unmatched_tracks = np.array([])
        high_score = scores > hth 
        if high_score.any():
            definite_boxes = boxes[high_score]
            definite_scores = scores[high_score]  
            if track_pool:           
                cost = self.calculate_cost(track_pool, definite_boxes) 
                matches, unmatched_tracks, unmatched_detections = self.linear_assignment(cost, mth) 
                # Update/Activate matched tracks
                for track_idx, detection_idx in matches:
                    box = definite_boxes[detection_idx]
                    track = track_pool[track_idx]                   
                    track.update(box, self.frame_no)
                    next_active_tracks.append(track)
                    # Remove re-identified tracks from lost list
                    if track in self.lost_tracks:
                        self.lost_tracks.remove(track)
                # Identify eligible unmatched detections as new tracks
                for detection_idx in unmatched_detections:                   
                    if definite_scores[detection_idx] > nth:
                        box = definite_boxes[detection_idx]
                        track = Track(box, self.frame_no, self.id_counter)
                        next_active_tracks.append(track)
                        self.id_counter += 1                   
            else:
             # Associate tracks of the first frame after object-free/null frames
                for detection_idx, score in enumerate(definite_scores):                   
                    if score > nth:
                        box = definite_boxes[detection_idx]
                        track = Track(box, self.frame_no, self.id_counter)
                        next_active_tracks.append(track)
                        self.id_counter += 1  
        
        # Add unmatched tracks to the lost list
        unmatched_track_pool = []                                         
        for track_address in unmatched_tracks:
            unmatched_track_pool.append(track_pool[track_address])   
        next_lost_tracks = unmatched_track_pool.copy()
        
        # Try to associate remained tracks with intermediate score detections 
        intermediate_score = np.logical_and((self.low_th < scores), (scores < hth))      
        if intermediate_score.any(): 
            if len(unmatched_tracks):                 
                possible_boxes = boxes[intermediate_score]
                cost = self.calculate_cost(unmatched_track_pool, possible_boxes, iou_only=True)
                matches, unmatched_tracks, unmatched_detections = self.linear_assignment(cost, self.match_th_second)
                # Update/Activate matched tracks
                for track_idx, detection_idx in matches:
                    box = possible_boxes[detection_idx]
                    track = unmatched_track_pool[track_idx]                  
                    track.update(box, self.frame_no)
                    next_active_tracks.append(track)
                    # Remove re-identified tracks from lost list 
                    if track in self.lost_tracks:
                        self.lost_tracks.remove(track)
                    next_lost_tracks.remove(track)                                  
            
        # All tracks are lost if there are no detections!
        if not (high_score.any() or  intermediate_score.any()):
            next_lost_tracks = track_pool.copy()
            
        # Update the list of lost tracks
        for track in next_lost_tracks:
            if track not in self.lost_tracks:
                self.lost_tracks.append(track)
                u = track.bbox[0] + (track.bbox[2] - track.bbox[0])/2             
                v = track.bbox[1] + (track.bbox[3] - track.bbox[1])/2
                if (self.l_margin < u < self.r_margin) and (self.t_margin < v < self.b_margin):                   
                    track.state = TrackState.Lost_Central                        
                else:
                    track.state = TrackState.Lost_Marginal                      
                           
        # Update the list of active tracks
        self.active_tracks = next_active_tracks.copy()

        return np.asarray([[x.bbox, x.track_id] for x in next_active_tracks], dtype=object)

    @staticmethod
    def clamp(value, min_value, max_value):
        """ Clamps a value within the specified minimum and maximum bounds."""
        return max(min_value, min(value, max_value))

    @staticmethod
    def calculate_cost(tracks, boxes, iou_only=False):
        """Calculates the association cost based on IoU and box similarity"""
        eps = 1e-7
        active_boxes = [track.bbox for track in tracks]   
           
        # Get the coordinates of bounding boxes
        b1_x1, b1_y1, b1_x2, b1_y2 = np.array(active_boxes).T
        b2_x1, b2_y1, b2_x2, b2_y2 = np.array(boxes).T      
        
        h_intersection = (np.minimum(b1_x2[:, None], b2_x2) - np.maximum(b1_x1[:, None], b2_x1)).clip(0)
        w_intersection = (np.minimum(b1_y2[:, None], b2_y2) - np.maximum(b1_y1[:, None], b2_y1)).clip(0)
        
        # Calculate the intersection area
        intersection =  h_intersection * w_intersection
                     
        # Calculate the union area
        box1_height = b1_x2 - b1_x1
        box2_height = b2_x2 - b2_x1
        box1_width = b1_y2 - b1_y1   
        box2_width = b2_y2 - b2_y1

        box1_area = box1_height * box1_width
        box2_area = box2_height * box2_width
        
        union = (box2_area + box1_area[:, None] - intersection + eps)
        
        # Calculate the IoU 
        iou = intersection / union
        
        if iou_only:
            return 1.0 - iou
        
        # Calculate the DIoU                    
        centerx1 = (b1_x1 + b1_x2) / 2.0
        centery1 = (b1_y1 + b1_y2) / 2.0
        centerx2 = (b2_x1 + b2_x2) / 2.0
        centery2 = (b2_y1 + b2_y2) / 2.0        
        inner_diag = np.abs(centerx1[:, None] - centerx2) + np.abs(centery1[:, None] - centery2)
           
        xxc1 = np.minimum(b1_x1[:, None], b2_x1)
        yyc1 = np.minimum(b1_y1[:, None], b2_y1)
        xxc2 = np.maximum(b1_x2[:, None], b2_x2)
        yyc2 = np.maximum(b1_y2[:, None], b2_y2)              
        outer_diag = np.abs(xxc2 - xxc1) + np.abs(yyc2 - yyc1)
       
        diou = iou - (inner_diag / outer_diag)
        
        # Calculate the BBSI 
        delta_w = np.abs(box2_width - box1_width[:, None])
        sw = w_intersection / np.abs(w_intersection + delta_w + eps)
        
        delta_h = np.abs(box2_height - box1_height[:, None])     
        sh = h_intersection / np.abs(h_intersection + delta_h + eps)
               
        bbsi = diou + sh + sw    
        
        # Normalize the BBSI
        cost = (bbsi)/3.0

        return 1.0 - cost 
 
    @staticmethod
    def linear_assignment(cost_matrix, thresh):
        """Linear assignment"""
        if cost_matrix.size == 0:
            return np.empty((0, 2), dtype=int), tuple(range(cost_matrix.shape[0])), tuple(range(cost_matrix.shape[1]))

        if use_lap:
            _, x, y = lap.lapjv(cost_matrix, extend_cost=True, cost_limit=thresh)
            matches = [[ix, mx] for ix, mx in enumerate(x) if mx >= 0]
            unmatched_a = np.where(x < 0)[0]
            unmatched_b = np.where(y < 0)[0]
        else:
            row_ind, col_ind = linear_sum_assignment(cost_matrix)  
            matches = np.array([[row, col] for row, col in zip(row_ind, col_ind) if cost_matrix[row, col] <= thresh])  
            matched_rows = set(row_ind)  
            matched_cols = set(col_ind)  
            unmatched_a = np.array([i for i in range(cost_matrix.shape[0]) if i not in matched_rows])  
            unmatched_b = np.array([j for j in range(cost_matrix.shape[1]) if j not in matched_cols])  
            
        return matches, unmatched_a, unmatched_b
    

SFSORT 多目标跟踪算法详解

1. 核心类和功能

DotAccess 类

功能:提供字典属性的点符号访问

实现:

代码语言:javascript
代码运行次数:0
运行
复制
class DotAccess(dict):
    __getattr__ = dict.get
    __setattr__ = dict.__setitem__
    __delattr__ = dict.__delitem__
TrackState 枚举

轨迹状态定义:

代码语言:javascript
代码运行次数:0
运行
复制
class TrackState:
    Active = 0         # 活跃状态
    Lost_Central = 1   # 中心区域丢失
    Lost_Marginal = 2  # 边缘区域丢失
Track 类

表示单个跟踪轨迹:

代码语言:javascript
代码运行次数:0
运行
复制
class Track:
    def __init__(self, bbox, frame_id, track_id):
        self.track_id = track_id
        self.bbox = bbox  # [x1, y1, x2, y2]
        self.state = TrackState.Active
        self.last_frame = frame_id  # 最后出现帧号

2. SFSORT 核心算法

参数初始化
代码语言:javascript
代码运行次数:0
运行
复制
def __init__(self, args):
    # 参数边界约束
    self.high_th = self.clamp(args.high_th, 0, 1) or 0.6
    self.match_th_first = self.clamp(args.match_th_first, 0, 0.67) or 0.67
    self.new_track_th = self.clamp(args.new_track_th, self.high_th, 1) or 0.7
    self.low_th = self.clamp(args.low_th, 0, self.high_th) or 0.1
    self.match_th_second = self.clamp(args.match_th_second, 0, 1) or 0.3
    
    # 动态阈值调整参数
    if args.dynamic_tuning:
        self.dynamic_tuning = True
        # 动态调整因子
        self.cth = args.cth or 0.5
        self.high_th_m = args.high_th_m or 0.05
        self.new_track_th_m = args.new_track_th_m or 0.05
        self.match_th_first_m = args.match_th_first_m or 0.05
    
    # 丢失轨迹超时设置
    self.marginal_timeout = args.marginal_timeout or 0
    self.central_timeout = args.central_timeout or 0
    
    # 画面区域划分
    self.l_margin = args.horizontal_margin or 0
    self.r_margin = args.frame_width - self.l_margin
    self.t_margin = args.vertical_margin or 0
    self.b_margin = args.frame_height - self.t_margin
核心跟踪流程
代码语言:javascript
代码运行次数:0
运行
复制
def update(self, boxes, scores):
    # 1. 动态阈值调整
    if self.dynamic_tuning:
        count = len(scores[scores > self.cth])
        lnc = np.log10(count) if count > 0 else 0
        hth = self.clamp(self.high_th - (self.high_th_m * lnc), 0, 1)
        nth = self.clamp(self.new_track_th + (self.new_track_th_m * lnc), hth, 1)
        mth = self.clamp(self.match_th_first - (self.match_th_first_m * lnc), 0, 0.67)
    
    # 2. 清理超时轨迹
    current_time = self.frame_no
    for track in self.lost_tracks[:]:
        timeout = self.central_timeout if track.state == TrackState.Lost_Central else self.marginal_timeout
        if current_time - track.last_frame > timeout:
            self.lost_tracks.remove(track)
    
    # 3. 轨迹池合并
    track_pool = self.active_tracks + self.lost_tracks
    
    # 4. 高置信度检测匹配
    high_mask = scores > hth
    if high_mask.any():
        # 计算代价矩阵
        cost_matrix = self.calculate_cost(track_pool, boxes[high_mask])
        # 匈牙利算法匹配
        matches, unmatched_tracks, unmatched_detections = self.linear_assignment(cost_matrix, mth)
        
        # 更新匹配轨迹
        for track_idx, det_idx in matches:
            track = track_pool[track_idx]
            track.update(boxes[high_mask][det_idx], current_time)
        
        # 创建新轨迹
        for det_idx in unmatched_detections:
            if scores[high_mask][det_idx] > nth:
                new_track = Track(boxes[high_mask][det_idx], current_time, self.id_counter)
                self.id_counter += 1
                self.active_tracks.append(new_track)
    
    # 5. 中置信度检测匹配
    mid_mask = (scores > self.low_th) & (scores < hth)
    if mid_mask.any() and unmatched_tracks:
        # 仅使用IoU计算代价
        cost_matrix = self.calculate_cost([track_pool[i] for i in unmatched_tracks], 
                                         boxes[mid_mask], iou_only=True)
        matches, _, _ = self.linear_assignment(cost_matrix, self.match_th_second)
        
        # 更新匹配轨迹
        for track_idx, det_idx in matches:
            track = track_pool[unmatched_tracks[track_idx]]
            track.update(boxes[mid_mask][det_idx], current_time)
    
    # 6. 更新轨迹状态
    for track in self.active_tracks:
        if track.last_frame < current_time:
            # 确定丢失区域类型
            center_x = (track.bbox[0] + track.bbox[2]) / 2
            center_y = (track.bbox[1] + track.bbox[3]) / 2
            if (self.l_margin < center_x < self.r_margin and 
                self.t_margin < center_y < self.b_margin):
                track.state = TrackState.Lost_Central
            else:
                track.state = TrackState.Lost_Marginal
            self.lost_tracks.append(track)
            self.active_tracks.remove(track)
    
    return [(track.bbox, track.track_id) for track in self.active_tracks]
代价计算算法
代码语言:javascript
代码运行次数:0
运行
复制
@staticmethod
def calculate_cost(tracks, boxes, iou_only=False):
    # 提取坐标
    b1_x1, b1_y1, b1_x2, b1_y2 = np.array([t.bbox for t in tracks]).T
    b2_x1, b2_y1, b2_x2, b2_y2 = np.array(boxes).T
    
    # 计算交集
    inter_x1 = np.maximum(b1_x1[:, None], b2_x1)
    inter_y1 = np.maximum(b1_y1[:, None], b2_y1)
    inter_x2 = np.minimum(b1_x2[:, None], b2_x2)
    inter_y2 = np.minimum(b1_y2[:, None], b2_y2)
    inter_area = np.maximum(inter_x2 - inter_x1, 0) * np.maximum(inter_y2 - inter_y1, 0)
    
    # 计算并集
    area1 = (b1_x2 - b1_x1) * (b1_y2 - b1_y1)
    area2 = (b2_x2 - b2_x1) * (b2_y2 - b2_y1)
    union_area = area1[:, None] + area2 - inter_area
    
    # 计算IoU
    iou = inter_area / (union_area + 1e-7)
    
    if iou_only:
        return 1 - iou
    
    # 计算DIoU
    center_x1 = (b1_x1 + b1_x2) / 2
    center_y1 = (b1_y1 + b1_y2) / 2
    center_x2 = (b2_x1 + b2_x2) / 2
    center_y2 = (b2_y1 + b2_y2) / 2
    center_distance = np.abs(center_x1[:, None] - center_x2) + np.abs(center_y1[:, None] - center_y2)
    
    enclose_x1 = np.minimum(b1_x1[:, None], b2_x1)
    enclose_y1 = np.minimum(b1_y1[:, None], b2_y1)
    enclose_x2 = np.maximum(b1_x2[:, None], b2_x2)
    enclose_y2 = np.maximum(b1_y2[:, None], b2_y2)
    enclose_diag = np.abs(enclose_x2 - enclose_x1) + np.abs(enclose_y2 - enclose_y1)
    
    diou = iou - center_distance / (enclose_diag + 1e-7)
    
    # 计算BBSI
    w1 = b1_y2 - b1_y1
    h1 = b1_x2 - b1_x1
    w2 = b2_y2 - b2_y1
    h2 = b2_x2 - b2_x1
    
    w_sim = inter_area / (inter_area + np.abs(w1[:, None] - w2) + 1e-7)
    h_sim = inter_area / (inter_area + np.abs(h1[:, None] - h2) + 1e-7)
    
    bbsi = (diou + w_sim + h_sim) / 3
    return 1 - bbsi

3. 算法创新点

动态阈值调整

  • 根据检测目标密度自动调整匹配阈值
  • 公式:调整后阈值 = 基础阈值 ± (调整系数 × log10(目标数量))

区域感知轨迹管理

  • 区分中心区域和边缘区域的丢失轨迹
  • 中心区域轨迹保留更长时间
  • 计算公式:center_x = (x1 + x2)/2 center_y = (y1 + y2)/2 is_central = (l_margin < center_x < r_margin) and (t_margin < center_y < b_margin)

混合代价度量

  • BBSI (Box and Boundary Similarity Index):BBSI = (DIoU + width_similarity + height_similarity) / 3
  • DIoU (Distance-IoU):DIoU = IoU - (中心点距离 / 最小包围框对角线)

双阶段匹配策略

  • 第一阶段:高置信度检测匹配(使用BBSI)
  • 第二阶段:中置信度检测匹配(仅使用IoU)

SFSORT算法通过创新的动态阈值调整、区域感知轨迹管理和混合代价度量,在复杂场景下实现了鲁棒的多目标跟踪,同时通过向量化计算和内存优化保证了实时性能。

SFSORT多目标跟踪算法流程图

代码语言:javascript
代码运行次数:0
运行
复制
graph TD
    A[开始] --> B[初始化参数]
    B --> C[读取新帧]
    C --> D[动态阈值调整]
    D --> E[清理超时轨迹]
    E --> F[合并活跃和丢失轨迹池]
    F --> G{高置信度检测?}
    G -- 是 --> H[计算BBSI代价矩阵]
    H --> I[匈牙利算法匹配]
    I --> J[更新匹配轨迹]
    J --> K[创建新轨迹]
    K --> L{中置信度检测?}
    G -- 否 --> L
    L -- 是 --> M[计算IoU代价矩阵]
    M --> N[匈牙利算法匹配]
    N --> O[更新匹配轨迹]
    O --> P[更新轨迹状态]
    L -- 否 --> P
    P --> Q[输出活跃轨迹]
    Q --> R{还有帧?}
    R -- 是 --> C
    R -- 否 --> S[结束]

    subgraph 动态阈值调整
    D --> D1[计算高置信目标数]
    D1 --> D2[计算对数因子]
    D2 --> D3[调整高置信阈值]
    D3 --> D4[调整新轨迹阈值]
    D4 --> D5[调整匹配阈值]
    end

    subgraph 轨迹状态更新
    P --> P1[遍历活跃轨迹]
    P1 --> P2{最后出现时间 < 当前帧?}
    P2 -- 是 --> P3[计算中心位置]
    P3 --> P4{在中央区域?}
    P4 -- 是 --> P5[标记为中央丢失]
    P4 -- 否 --> P6[标记为边缘丢失]
    P5 --> P7[移入丢失轨迹列表]
    P6 --> P7
    end

    subgraph 代价计算
    H --> H1[计算IoU]
    H1 --> H2[计算DIoU]
    H2 --> H3[计算宽高相似度]
    H3 --> H4[组合为BBSI]
    end

流程图说明

1. 初始化阶段

  • 初始化参数:设置高低阈值、匹配阈值、超时参数等
  • 区域划分:根据画面尺寸划分中央区域和边缘区域

2. 帧处理循环

  • 读取新帧:获取当前帧的检测框和置信度分数
  • 动态阈值调整
    • 计算高置信目标数量
    • 根据对数因子动态调整各阈值
  • 清理超时轨迹
    • 中央丢失轨迹:超时时间较长
    • 边缘丢失轨迹:超时时间较短

3. 匹配阶段

  • 高置信度检测匹配
    • 计算BBSI代价矩阵(结合IoU、DIoU和宽高相似度)
    • 使用匈牙利算法进行匹配
    • 更新匹配的轨迹
    • 为未匹配的高分检测创建新轨迹
  • 中置信度检测匹配
    • 仅计算IoU代价矩阵
    • 使用匈牙利算法进行二次匹配
    • 更新匹配的轨迹

4. 状态更新

  • 轨迹状态更新
    • 检查活跃轨迹的最后出现时间
    • 计算轨迹中心位置
    • 根据中心位置划分中央丢失或边缘丢失
    • 将丢失轨迹移入相应列表

5. 输出结果

  • 返回当前帧所有活跃轨迹的边界框和ID

关键创新点

动态阈值系统

代码语言:javascript
代码运行次数:0
运行
复制
graph LR
A[目标数量] --> B[对数计算]
B --> C[调整高置信阈值]
B --> D[调整新轨迹阈值]
B --> E[调整匹配阈值]

区域感知丢失管理

代码语言:javascript
代码运行次数:0
运行
复制
graph TD
A[轨迹丢失] --> B{中心在画面中央?}
B -- 是 --> C[中央丢失-长超时]
B -- 否 --> D[边缘丢失-短超时]

混合代价度量BBSI

代码语言:javascript
代码运行次数:0
运行
复制
graph LR
A[DIoU] --> D[BBSI]
B[宽度相似度] --> D
C[高度相似度] --> D

该流程图展示了SFSORT算法从初始化到帧处理的完整流程,突出了其动态阈值调整、双阶段匹配和区域感知管理等创新特性,这些特性使其在复杂场景下具有优异的跟踪性能。

跟踪实战

在ultralytics项目中找到ultralytics 文件夹,将整个文件夹放到分目录,链接:https://github.com/ultralytics/ultralytics

新建SFSORT.py脚本将上面的代码复制进去,然后新增test.py脚本,将下面的代码复制进去。

代码语言:javascript
代码运行次数:0
运行
复制
import numpy as np
import cv2

from ultralytics import YOLO
from ultralytics.utils.torch_utils import select_device
from random import randrange

from SFSORT import SFSORT
# Instantiate an object detector
# To use YOLOv8n without fine-tuning, replace 'best.pt' with 'yolov8n.pt'
model = YOLO('yolov8m.pt', 'detect')

# Check for GPU availability
device = select_device('0')
# Devolve the processing to selected devices
model.to(device)

# Load the video file
cap = cv2.VideoCapture('Sample.mp4')

# Get the frame rate, frame width, and frame height
frame_rate = cap.get(cv2.CAP_PROP_FPS)
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# Define the MP4 codec and create a VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter('output.mp4', fourcc, 30.0, (frame_width, frame_height))

# Organize tracker arguments into standard format
tracker_arguments = {"dynamic_tuning": True, "cth": 0.5,
                      "high_th": 0.6, "high_th_m": 0.1,
                      "match_th_first": 0.67, "match_th_first_m": 0.05,
                      "match_th_second": 0.2, "low_th": 0.1,
                      "new_track_th": 0.7, "new_track_th_m": 0.1,
                      "marginal_timeout": (7 * frame_rate // 10),
                      "central_timeout": frame_rate,
                      "horizontal_margin": frame_width // 10,
                      "vertical_margin": frame_height // 10,
                      "frame_width": frame_width,
                      "frame_height": frame_height}

# Instantiate a tracker
tracker = SFSORT(tracker_arguments)

# Define a color list for track visualization
colors = {}

# Process each frame of the video
while cap.isOpened():
   # Load the frame
  ret, frame = cap.read()
  if not ret:
      break

  # Detect people in the frame
  prediction = model.predict(frame, imgsz=(800,1440), conf=0.1, iou=0.45,
                              half=False, device=device, max_det=99, classes=0,
                              verbose=False)

  # Exclude additional information from the predictions
  prediction_results = prediction[0].boxes.cpu().numpy()

  # Update the tracker with the latest detections
  tracks = tracker.update(prediction_results.xyxy, prediction_results.conf)

  # Skip additional analysis if the tracker is not currently tracking anyone
  if len(tracks) == 0:
      continue

  # Extract tracking data from the tracker
  bbox_list = tracks[:, 0]
  track_id_list = tracks[:, 1]

  # Visualize tracks
  for idx, (track_id, bbox) in enumerate(zip(track_id_list, bbox_list)):
    # Define a new color for newly detected tracks
    if track_id not in colors:
        colors[track_id] = (randrange(255), randrange(255), randrange(255))

    color = colors[track_id]

    # Extract the bounding box coordinates
    x0, y0, x1, y1 = map(int, bbox)

    # Draw the bounding boxes on the frame
    annotated_frame = cv2.rectangle(frame, (x0, y0), (x1, y1), color, 2)
    # Put the track label on the frame alongside the bounding box
    cv2.putText(annotated_frame, str(track_id), (x0, y0-5),
                cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)

  # Write the frame to the output video file
  out.write(annotated_frame)

# Release everything when done
cap.release()
out.release()

代码详解

1. 导入依赖库

代码语言:javascript
代码运行次数:0
运行
复制
import numpy as np  # 数值计算
import cv2  # OpenCV图像处理
from ultralytics import YOLO  # YOLOv8目标检测
from ultralytics.utils.torch_utils import select_device  # GPU设备选择
from random import randrange  # 随机颜色生成
from SFSORT import SFSORT  # 自定义跟踪算法

2. 初始化YOLO目标检测器

代码语言:javascript
代码运行次数:0
运行
复制
model = YOLO('yolov8m.pt', 'detect')  # 加载中规模YOLOv8预训练模型
device = select_device('0')  # 选择GPU设备('0'表示第一块GPU)
model.to(device)  # 将模型移至GPU
  • 使用yolov8m.pt(中等精度/速度平衡的模型)
  • select_device('0')显式指定使用NVIDIA GPU

3. 视频输入/输出设置

代码语言:javascript
代码运行次数:0
运行
复制
cap = cv2.VideoCapture('Sample.mp4')  # 打开输入视频
frame_rate = cap.get(cv2.CAP_PROP_FPS)  # 获取帧率
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))  # 帧宽度
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))  # 帧高度

fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # 定义MP4编码器
out = cv2.VideoWriter('output.mp4', fourcc, 30.0, (frame_width, frame_height))  # 创建输出视频
  • 输出视频强制设为30FPS(与原始帧率无关)
  • 保持原始视频分辨率

4. 初始化SFSORT跟踪器

代码语言:javascript
代码运行次数:0
运行
复制
tracker_arguments = {
    "dynamic_tuning": True,  # 启用动态参数调整
    "cth": 0.5,  # 中心区域置信度阈值
    "high_th": 0.6,  # 高置信度阈值
    # ... (其他算法参数)
    "frame_width": frame_width,
    "frame_height": frame_height
}
tracker = SFSORT(tracker_arguments)  # 实例化跟踪器
colors = {}  # 存储ID-color映射的字典
  • 参数详解:
    • marginal_timeout=(7 * frame_rate // 10):边缘区域轨迹消失阈值(帧数)
    • central_timeout=frame_rate:中心区域轨迹消失阈值
    • horizontal/vertical_margin:定义边缘区域的边界

5. 主处理循环

代码语言:javascript
代码运行次数:0
运行
复制
while cap.isOpened():
    ret, frame = cap.read()
    if not ret: break
    
    # YOLO目标检测
    prediction = model.predict(
        frame, 
        imgsz=(800,1440),  # 自定义输入尺寸(非标准640)
        conf=0.1,  # 低置信度阈值(提高召回率)
        iou=0.45,  # NMS交并比阈值
        classes=0   # 只检测'person'类别
    )
    
    # 解析检测结果
    prediction_results = prediction[0].boxes.cpu().numpy()
    bboxes = prediction_results.xyxy  # 边界框坐标
    confs = prediction_results.conf    # 置信度分数
    
    # 多目标跟踪
    tracks = tracker.update(bboxes, confs)
    
    # 可视化跟踪结果
    for track in tracks:
        track_id = int(track[1])
        bbox = track[0].astype(int)
        
        # 为新ID分配随机颜色
        if track_id not in colors:
            colors[track_id] = (randrange(255), randrange(255), randrange(255))
        
        # 绘制边界框和ID
        cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), colors[track_id], 2)
        cv2.putText(frame, str(track_id), (bbox[0], bbox[1]-5), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.9, colors[track_id], 2)
    
    # 写入输出帧
    out.write(frame)

6. 关键技术点解析

  1. YOLO特殊参数
    • imgsz=(800,1440):使用非标准分辨率适应特定场景
    • conf=0.1:低阈值确保不漏检(后续由跟踪器过滤)
    • classes=0:专注行人检测(COCO类别0)
  2. SFSORT核心功能
    • 高置信检测优先匹配
    • 低置信检测二次匹配
    • 区域感知跟踪:区分中心/边缘区域
    • 动态阈值调整:根据场景复杂度自适应
    • 多级匹配策略
  3. 内存管理技巧
    • .cpu().numpy():显式移回CPU减少GPU内存占用
    • 帧处理中避免数据副本

7. 工作流程图

代码语言:javascript
代码运行次数:0
运行
复制
graph TD
A[视频帧输入] --> B[YOLOv8行人检测]
B --> C{检测到行人?}
C -- 是 --> D[SFSORT更新跟踪状态]
C -- 否 --> E[跳过后处理]
D --> F[分配轨迹ID]
F --> G[绘制边界框/ID]
G --> H[写入输出视频]
H --> I{下一帧?}
I -- 是 --> A
I -- 否 --> J[释放资源]
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-07-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 AI智韵 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SFSORT 多目标跟踪算法详解
    • 1. 核心类和功能
    • 2. SFSORT 核心算法
    • 3. 算法创新点
  • SFSORT多目标跟踪算法流程图
    • 流程图说明
      • 1. 初始化阶段
      • 2. 帧处理循环
      • 3. 匹配阶段
      • 4. 状态更新
      • 5. 输出结果
      • 关键创新点
  • 跟踪实战
    • 代码详解
      • 1. 导入依赖库
      • 2. 初始化YOLO目标检测器
      • 3. 视频输入/输出设置
      • 4. 初始化SFSORT跟踪器
      • 5. 主处理循环
      • 6. 关键技术点解析
      • 7. 工作流程图
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档