下面是跟踪SFSORT的代码,代码如下:
# ******************************************************************** #
# ****************** Sharif University of Technology ***************** #
# *************** Department of Electrical Engineering *************** #
# ************************ Deep Learning Lab ************************* #
# ************************ SFSORT Version 4.2 ************************ #
# ************ Authors: Mehrdad Morsali - Zeinab Sharifi ************* #
# *********** mehrdadmorsali@gmail.com - zsh.5ooo@gmail.com ********** #
# ******************************************************************** #
# ******************************************************************** #
# ********************** Packages and Libraries ********************** #
# ******************************************************************** #
import numpy as np
use_lap=True
try:
import lap
except ImportError:
from scipy.optimize import linear_sum_assignment
use_lap=False
# ******************************************************************** #
# ***************************** Classes ****************************** #
# ******************************************************************** #
class DotAccess(dict):
"""Provides dot.notation access to dictionary attributes"""
__getattr__ = dict.get
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
class TrackState:
"""Enumeration of possible states of a track"""
Active = 0
Lost_Central = 1
Lost_Marginal = 2
class Track:
"""Handles basic track attributes and operations"""
def __init__(self, bbox, frame_id, track_id):
"""Track initialization"""
self.track_id = track_id
self.bbox = bbox
self.state = TrackState.Active
self.last_frame = frame_id
def update(self, box, frame_id):
"""Updates a matched track"""
self.bbox = box
self.state = TrackState.Active
self.last_frame = frame_id
class SFSORT:
"""Multi-Object Tracking System"""
def __init__(self, args):
"""Initialize a tracker with given arguments"""
args = DotAccess(args)
# Register tracking arguments, setting default values if the argument is not provided
if args.high_th is None:
self.high_th = 0.6
else:
self.high_th = self.clamp(args.high_th, 0, 1)
if args.match_th_first is None:
self.match_th_first = 0.67
else:
self.match_th_first = self.clamp(args.match_th_first, 0, 0.67)
if args.new_track_th is None:
self.new_track_th = 0.7
else:
self.new_track_th = self.clamp(args.new_track_th, self.high_th, 1)
if args.low_th is None:
self.low_th = 0.1
else:
self.low_th = self.clamp(args.low_th, 0, self.high_th)
if args.match_th_second is None:
self.match_th_second = 0.3
else:
self.match_th_second = self.clamp(args.match_th_second, 0, 1)
self.dynamic_tuning = False
if args.dynamic_tuning is not None:
self.cth = 0.5
self.high_th_m = 0.0
self.new_track_th_m = 0.0
self.match_th_first_m = 0.0
if args.dynamic_tuning:
self.dynamic_tuning = True
if args.cth is not None:
self.cth = self.clamp(args.cth, args.low_th, 1)
if args.high_th_m is not None:
self.high_th_m = self.clamp(args.high_th_m, 0.02, 0.1)
if args.new_track_th_m is not None:
self.new_track_th_m = self.clamp(args.new_track_th_m, 0.02, 0.08)
if args.match_th_first_m is not None:
self.match_th_first_m = self.clamp(args.match_th_first_m, 0.02, 0.08)
if args.marginal_timeout is None:
self.marginal_timeout = 0
else:
self.marginal_timeout = self.clamp(args.marginal_timeout, 0, 500)
if args.central_timeout is None:
self.central_timeout = 0
else:
self.central_timeout = self.clamp(args.central_timeout, 0, 1000)
self.l_margin = 0
self.r_margin = 0
if args.frame_width:
self.r_margin = args.frame_width
if args.horizontal_margin is not None:
self.l_margin = self.clamp(args.horizontal_margin, 0, args.frame_width)
self.r_margin = self.clamp(args.frame_width - args.horizontal_margin, 0, args.frame_width)
self.t_margin = 0
self.b_margin = 0
if args.frame_height:
self.b_margin = args.frame_height
if args.vertical_margin is not None:
self.t_margin = self.clamp(args.vertical_margin, 0, args.frame_height)
self.b_margin = self.clamp(args.frame_height - args.vertical_margin , 0, args.frame_height)
# Initialize the tracker
self.frame_no = 0
self.id_counter = 0
self.active_tracks = []
self.lost_tracks = []
def update(self, boxes, scores):
"""Updates tracker with new detections"""
# Adjust dynamic arguments
hth = self.high_th
nth = self.new_track_th
mth = self.match_th_first
if self.dynamic_tuning:
count = len(scores[scores>self.cth])
if count < 1:
count = 1
lnc = np.log10(count)
hth = self.clamp(hth - (self.high_th_m * lnc), 0, 1)
nth = self.clamp(nth + (self.new_track_th_m * lnc), hth, 1)
mth = self.clamp(mth - (self.match_th_first_m * lnc), 0, 0.67)
# Increase frame number
self.frame_no += 1
# Variable: Active tracks in the next frame
next_active_tracks = []
# Remove long-time lost tracks
all_lost_tracks = self.lost_tracks.copy()
for track in all_lost_tracks:
if track.state == TrackState.Lost_Central:
if self.frame_no - track.last_frame > self.central_timeout:
self.lost_tracks.remove(track)
else:
if self.frame_no - track.last_frame > self.marginal_timeout:
self.lost_tracks.remove(track)
# Gather out all previous tracks
track_pool = self.active_tracks + self.lost_tracks
# Try to associate tracks with high score detections
unmatched_tracks = np.array([])
high_score = scores > hth
if high_score.any():
definite_boxes = boxes[high_score]
definite_scores = scores[high_score]
if track_pool:
cost = self.calculate_cost(track_pool, definite_boxes)
matches, unmatched_tracks, unmatched_detections = self.linear_assignment(cost, mth)
# Update/Activate matched tracks
for track_idx, detection_idx in matches:
box = definite_boxes[detection_idx]
track = track_pool[track_idx]
track.update(box, self.frame_no)
next_active_tracks.append(track)
# Remove re-identified tracks from lost list
if track in self.lost_tracks:
self.lost_tracks.remove(track)
# Identify eligible unmatched detections as new tracks
for detection_idx in unmatched_detections:
if definite_scores[detection_idx] > nth:
box = definite_boxes[detection_idx]
track = Track(box, self.frame_no, self.id_counter)
next_active_tracks.append(track)
self.id_counter += 1
else:
# Associate tracks of the first frame after object-free/null frames
for detection_idx, score in enumerate(definite_scores):
if score > nth:
box = definite_boxes[detection_idx]
track = Track(box, self.frame_no, self.id_counter)
next_active_tracks.append(track)
self.id_counter += 1
# Add unmatched tracks to the lost list
unmatched_track_pool = []
for track_address in unmatched_tracks:
unmatched_track_pool.append(track_pool[track_address])
next_lost_tracks = unmatched_track_pool.copy()
# Try to associate remained tracks with intermediate score detections
intermediate_score = np.logical_and((self.low_th < scores), (scores < hth))
if intermediate_score.any():
if len(unmatched_tracks):
possible_boxes = boxes[intermediate_score]
cost = self.calculate_cost(unmatched_track_pool, possible_boxes, iou_only=True)
matches, unmatched_tracks, unmatched_detections = self.linear_assignment(cost, self.match_th_second)
# Update/Activate matched tracks
for track_idx, detection_idx in matches:
box = possible_boxes[detection_idx]
track = unmatched_track_pool[track_idx]
track.update(box, self.frame_no)
next_active_tracks.append(track)
# Remove re-identified tracks from lost list
if track in self.lost_tracks:
self.lost_tracks.remove(track)
next_lost_tracks.remove(track)
# All tracks are lost if there are no detections!
if not (high_score.any() or intermediate_score.any()):
next_lost_tracks = track_pool.copy()
# Update the list of lost tracks
for track in next_lost_tracks:
if track not in self.lost_tracks:
self.lost_tracks.append(track)
u = track.bbox[0] + (track.bbox[2] - track.bbox[0])/2
v = track.bbox[1] + (track.bbox[3] - track.bbox[1])/2
if (self.l_margin < u < self.r_margin) and (self.t_margin < v < self.b_margin):
track.state = TrackState.Lost_Central
else:
track.state = TrackState.Lost_Marginal
# Update the list of active tracks
self.active_tracks = next_active_tracks.copy()
return np.asarray([[x.bbox, x.track_id] for x in next_active_tracks], dtype=object)
@staticmethod
def clamp(value, min_value, max_value):
""" Clamps a value within the specified minimum and maximum bounds."""
return max(min_value, min(value, max_value))
@staticmethod
def calculate_cost(tracks, boxes, iou_only=False):
"""Calculates the association cost based on IoU and box similarity"""
eps = 1e-7
active_boxes = [track.bbox for track in tracks]
# Get the coordinates of bounding boxes
b1_x1, b1_y1, b1_x2, b1_y2 = np.array(active_boxes).T
b2_x1, b2_y1, b2_x2, b2_y2 = np.array(boxes).T
h_intersection = (np.minimum(b1_x2[:, None], b2_x2) - np.maximum(b1_x1[:, None], b2_x1)).clip(0)
w_intersection = (np.minimum(b1_y2[:, None], b2_y2) - np.maximum(b1_y1[:, None], b2_y1)).clip(0)
# Calculate the intersection area
intersection = h_intersection * w_intersection
# Calculate the union area
box1_height = b1_x2 - b1_x1
box2_height = b2_x2 - b2_x1
box1_width = b1_y2 - b1_y1
box2_width = b2_y2 - b2_y1
box1_area = box1_height * box1_width
box2_area = box2_height * box2_width
union = (box2_area + box1_area[:, None] - intersection + eps)
# Calculate the IoU
iou = intersection / union
if iou_only:
return 1.0 - iou
# Calculate the DIoU
centerx1 = (b1_x1 + b1_x2) / 2.0
centery1 = (b1_y1 + b1_y2) / 2.0
centerx2 = (b2_x1 + b2_x2) / 2.0
centery2 = (b2_y1 + b2_y2) / 2.0
inner_diag = np.abs(centerx1[:, None] - centerx2) + np.abs(centery1[:, None] - centery2)
xxc1 = np.minimum(b1_x1[:, None], b2_x1)
yyc1 = np.minimum(b1_y1[:, None], b2_y1)
xxc2 = np.maximum(b1_x2[:, None], b2_x2)
yyc2 = np.maximum(b1_y2[:, None], b2_y2)
outer_diag = np.abs(xxc2 - xxc1) + np.abs(yyc2 - yyc1)
diou = iou - (inner_diag / outer_diag)
# Calculate the BBSI
delta_w = np.abs(box2_width - box1_width[:, None])
sw = w_intersection / np.abs(w_intersection + delta_w + eps)
delta_h = np.abs(box2_height - box1_height[:, None])
sh = h_intersection / np.abs(h_intersection + delta_h + eps)
bbsi = diou + sh + sw
# Normalize the BBSI
cost = (bbsi)/3.0
return 1.0 - cost
@staticmethod
def linear_assignment(cost_matrix, thresh):
"""Linear assignment"""
if cost_matrix.size == 0:
return np.empty((0, 2), dtype=int), tuple(range(cost_matrix.shape[0])), tuple(range(cost_matrix.shape[1]))
if use_lap:
_, x, y = lap.lapjv(cost_matrix, extend_cost=True, cost_limit=thresh)
matches = [[ix, mx] for ix, mx in enumerate(x) if mx >= 0]
unmatched_a = np.where(x < 0)[0]
unmatched_b = np.where(y < 0)[0]
else:
row_ind, col_ind = linear_sum_assignment(cost_matrix)
matches = np.array([[row, col] for row, col in zip(row_ind, col_ind) if cost_matrix[row, col] <= thresh])
matched_rows = set(row_ind)
matched_cols = set(col_ind)
unmatched_a = np.array([i for i in range(cost_matrix.shape[0]) if i not in matched_rows])
unmatched_b = np.array([j for j in range(cost_matrix.shape[1]) if j not in matched_cols])
return matches, unmatched_a, unmatched_b
功能:提供字典属性的点符号访问
实现:
class DotAccess(dict):
__getattr__ = dict.get
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
轨迹状态定义:
class TrackState:
Active = 0 # 活跃状态
Lost_Central = 1 # 中心区域丢失
Lost_Marginal = 2 # 边缘区域丢失
表示单个跟踪轨迹:
class Track:
def __init__(self, bbox, frame_id, track_id):
self.track_id = track_id
self.bbox = bbox # [x1, y1, x2, y2]
self.state = TrackState.Active
self.last_frame = frame_id # 最后出现帧号
def __init__(self, args):
# 参数边界约束
self.high_th = self.clamp(args.high_th, 0, 1) or 0.6
self.match_th_first = self.clamp(args.match_th_first, 0, 0.67) or 0.67
self.new_track_th = self.clamp(args.new_track_th, self.high_th, 1) or 0.7
self.low_th = self.clamp(args.low_th, 0, self.high_th) or 0.1
self.match_th_second = self.clamp(args.match_th_second, 0, 1) or 0.3
# 动态阈值调整参数
if args.dynamic_tuning:
self.dynamic_tuning = True
# 动态调整因子
self.cth = args.cth or 0.5
self.high_th_m = args.high_th_m or 0.05
self.new_track_th_m = args.new_track_th_m or 0.05
self.match_th_first_m = args.match_th_first_m or 0.05
# 丢失轨迹超时设置
self.marginal_timeout = args.marginal_timeout or 0
self.central_timeout = args.central_timeout or 0
# 画面区域划分
self.l_margin = args.horizontal_margin or 0
self.r_margin = args.frame_width - self.l_margin
self.t_margin = args.vertical_margin or 0
self.b_margin = args.frame_height - self.t_margin
def update(self, boxes, scores):
# 1. 动态阈值调整
if self.dynamic_tuning:
count = len(scores[scores > self.cth])
lnc = np.log10(count) if count > 0 else 0
hth = self.clamp(self.high_th - (self.high_th_m * lnc), 0, 1)
nth = self.clamp(self.new_track_th + (self.new_track_th_m * lnc), hth, 1)
mth = self.clamp(self.match_th_first - (self.match_th_first_m * lnc), 0, 0.67)
# 2. 清理超时轨迹
current_time = self.frame_no
for track in self.lost_tracks[:]:
timeout = self.central_timeout if track.state == TrackState.Lost_Central else self.marginal_timeout
if current_time - track.last_frame > timeout:
self.lost_tracks.remove(track)
# 3. 轨迹池合并
track_pool = self.active_tracks + self.lost_tracks
# 4. 高置信度检测匹配
high_mask = scores > hth
if high_mask.any():
# 计算代价矩阵
cost_matrix = self.calculate_cost(track_pool, boxes[high_mask])
# 匈牙利算法匹配
matches, unmatched_tracks, unmatched_detections = self.linear_assignment(cost_matrix, mth)
# 更新匹配轨迹
for track_idx, det_idx in matches:
track = track_pool[track_idx]
track.update(boxes[high_mask][det_idx], current_time)
# 创建新轨迹
for det_idx in unmatched_detections:
if scores[high_mask][det_idx] > nth:
new_track = Track(boxes[high_mask][det_idx], current_time, self.id_counter)
self.id_counter += 1
self.active_tracks.append(new_track)
# 5. 中置信度检测匹配
mid_mask = (scores > self.low_th) & (scores < hth)
if mid_mask.any() and unmatched_tracks:
# 仅使用IoU计算代价
cost_matrix = self.calculate_cost([track_pool[i] for i in unmatched_tracks],
boxes[mid_mask], iou_only=True)
matches, _, _ = self.linear_assignment(cost_matrix, self.match_th_second)
# 更新匹配轨迹
for track_idx, det_idx in matches:
track = track_pool[unmatched_tracks[track_idx]]
track.update(boxes[mid_mask][det_idx], current_time)
# 6. 更新轨迹状态
for track in self.active_tracks:
if track.last_frame < current_time:
# 确定丢失区域类型
center_x = (track.bbox[0] + track.bbox[2]) / 2
center_y = (track.bbox[1] + track.bbox[3]) / 2
if (self.l_margin < center_x < self.r_margin and
self.t_margin < center_y < self.b_margin):
track.state = TrackState.Lost_Central
else:
track.state = TrackState.Lost_Marginal
self.lost_tracks.append(track)
self.active_tracks.remove(track)
return [(track.bbox, track.track_id) for track in self.active_tracks]
@staticmethod
def calculate_cost(tracks, boxes, iou_only=False):
# 提取坐标
b1_x1, b1_y1, b1_x2, b1_y2 = np.array([t.bbox for t in tracks]).T
b2_x1, b2_y1, b2_x2, b2_y2 = np.array(boxes).T
# 计算交集
inter_x1 = np.maximum(b1_x1[:, None], b2_x1)
inter_y1 = np.maximum(b1_y1[:, None], b2_y1)
inter_x2 = np.minimum(b1_x2[:, None], b2_x2)
inter_y2 = np.minimum(b1_y2[:, None], b2_y2)
inter_area = np.maximum(inter_x2 - inter_x1, 0) * np.maximum(inter_y2 - inter_y1, 0)
# 计算并集
area1 = (b1_x2 - b1_x1) * (b1_y2 - b1_y1)
area2 = (b2_x2 - b2_x1) * (b2_y2 - b2_y1)
union_area = area1[:, None] + area2 - inter_area
# 计算IoU
iou = inter_area / (union_area + 1e-7)
if iou_only:
return 1 - iou
# 计算DIoU
center_x1 = (b1_x1 + b1_x2) / 2
center_y1 = (b1_y1 + b1_y2) / 2
center_x2 = (b2_x1 + b2_x2) / 2
center_y2 = (b2_y1 + b2_y2) / 2
center_distance = np.abs(center_x1[:, None] - center_x2) + np.abs(center_y1[:, None] - center_y2)
enclose_x1 = np.minimum(b1_x1[:, None], b2_x1)
enclose_y1 = np.minimum(b1_y1[:, None], b2_y1)
enclose_x2 = np.maximum(b1_x2[:, None], b2_x2)
enclose_y2 = np.maximum(b1_y2[:, None], b2_y2)
enclose_diag = np.abs(enclose_x2 - enclose_x1) + np.abs(enclose_y2 - enclose_y1)
diou = iou - center_distance / (enclose_diag + 1e-7)
# 计算BBSI
w1 = b1_y2 - b1_y1
h1 = b1_x2 - b1_x1
w2 = b2_y2 - b2_y1
h2 = b2_x2 - b2_x1
w_sim = inter_area / (inter_area + np.abs(w1[:, None] - w2) + 1e-7)
h_sim = inter_area / (inter_area + np.abs(h1[:, None] - h2) + 1e-7)
bbsi = (diou + w_sim + h_sim) / 3
return 1 - bbsi
动态阈值调整:
调整后阈值 = 基础阈值 ± (调整系数 × log10(目标数量))
区域感知轨迹管理:
center_x = (x1 + x2)/2
center_y = (y1 + y2)/2
is_central = (l_margin < center_x < r_margin) and (t_margin < center_y < b_margin)
混合代价度量:
BBSI = (DIoU + width_similarity + height_similarity) / 3
DIoU = IoU - (中心点距离 / 最小包围框对角线)
双阶段匹配策略:
SFSORT算法通过创新的动态阈值调整、区域感知轨迹管理和混合代价度量,在复杂场景下实现了鲁棒的多目标跟踪,同时通过向量化计算和内存优化保证了实时性能。
graph TD
A[开始] --> B[初始化参数]
B --> C[读取新帧]
C --> D[动态阈值调整]
D --> E[清理超时轨迹]
E --> F[合并活跃和丢失轨迹池]
F --> G{高置信度检测?}
G -- 是 --> H[计算BBSI代价矩阵]
H --> I[匈牙利算法匹配]
I --> J[更新匹配轨迹]
J --> K[创建新轨迹]
K --> L{中置信度检测?}
G -- 否 --> L
L -- 是 --> M[计算IoU代价矩阵]
M --> N[匈牙利算法匹配]
N --> O[更新匹配轨迹]
O --> P[更新轨迹状态]
L -- 否 --> P
P --> Q[输出活跃轨迹]
Q --> R{还有帧?}
R -- 是 --> C
R -- 否 --> S[结束]
subgraph 动态阈值调整
D --> D1[计算高置信目标数]
D1 --> D2[计算对数因子]
D2 --> D3[调整高置信阈值]
D3 --> D4[调整新轨迹阈值]
D4 --> D5[调整匹配阈值]
end
subgraph 轨迹状态更新
P --> P1[遍历活跃轨迹]
P1 --> P2{最后出现时间 < 当前帧?}
P2 -- 是 --> P3[计算中心位置]
P3 --> P4{在中央区域?}
P4 -- 是 --> P5[标记为中央丢失]
P4 -- 否 --> P6[标记为边缘丢失]
P5 --> P7[移入丢失轨迹列表]
P6 --> P7
end
subgraph 代价计算
H --> H1[计算IoU]
H1 --> H2[计算DIoU]
H2 --> H3[计算宽高相似度]
H3 --> H4[组合为BBSI]
end
动态阈值系统:
graph LR
A[目标数量] --> B[对数计算]
B --> C[调整高置信阈值]
B --> D[调整新轨迹阈值]
B --> E[调整匹配阈值]
区域感知丢失管理:
graph TD
A[轨迹丢失] --> B{中心在画面中央?}
B -- 是 --> C[中央丢失-长超时]
B -- 否 --> D[边缘丢失-短超时]
混合代价度量BBSI:
graph LR
A[DIoU] --> D[BBSI]
B[宽度相似度] --> D
C[高度相似度] --> D
该流程图展示了SFSORT算法从初始化到帧处理的完整流程,突出了其动态阈值调整、双阶段匹配和区域感知管理等创新特性,这些特性使其在复杂场景下具有优异的跟踪性能。
在ultralytics项目中找到ultralytics 文件夹,将整个文件夹放到分目录,链接:https://github.com/ultralytics/ultralytics
新建SFSORT.py脚本将上面的代码复制进去,然后新增test.py脚本,将下面的代码复制进去。
import numpy as np
import cv2
from ultralytics import YOLO
from ultralytics.utils.torch_utils import select_device
from random import randrange
from SFSORT import SFSORT
# Instantiate an object detector
# To use YOLOv8n without fine-tuning, replace 'best.pt' with 'yolov8n.pt'
model = YOLO('yolov8m.pt', 'detect')
# Check for GPU availability
device = select_device('0')
# Devolve the processing to selected devices
model.to(device)
# Load the video file
cap = cv2.VideoCapture('Sample.mp4')
# Get the frame rate, frame width, and frame height
frame_rate = cap.get(cv2.CAP_PROP_FPS)
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Define the MP4 codec and create a VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter('output.mp4', fourcc, 30.0, (frame_width, frame_height))
# Organize tracker arguments into standard format
tracker_arguments = {"dynamic_tuning": True, "cth": 0.5,
"high_th": 0.6, "high_th_m": 0.1,
"match_th_first": 0.67, "match_th_first_m": 0.05,
"match_th_second": 0.2, "low_th": 0.1,
"new_track_th": 0.7, "new_track_th_m": 0.1,
"marginal_timeout": (7 * frame_rate // 10),
"central_timeout": frame_rate,
"horizontal_margin": frame_width // 10,
"vertical_margin": frame_height // 10,
"frame_width": frame_width,
"frame_height": frame_height}
# Instantiate a tracker
tracker = SFSORT(tracker_arguments)
# Define a color list for track visualization
colors = {}
# Process each frame of the video
while cap.isOpened():
# Load the frame
ret, frame = cap.read()
if not ret:
break
# Detect people in the frame
prediction = model.predict(frame, imgsz=(800,1440), conf=0.1, iou=0.45,
half=False, device=device, max_det=99, classes=0,
verbose=False)
# Exclude additional information from the predictions
prediction_results = prediction[0].boxes.cpu().numpy()
# Update the tracker with the latest detections
tracks = tracker.update(prediction_results.xyxy, prediction_results.conf)
# Skip additional analysis if the tracker is not currently tracking anyone
if len(tracks) == 0:
continue
# Extract tracking data from the tracker
bbox_list = tracks[:, 0]
track_id_list = tracks[:, 1]
# Visualize tracks
for idx, (track_id, bbox) in enumerate(zip(track_id_list, bbox_list)):
# Define a new color for newly detected tracks
if track_id not in colors:
colors[track_id] = (randrange(255), randrange(255), randrange(255))
color = colors[track_id]
# Extract the bounding box coordinates
x0, y0, x1, y1 = map(int, bbox)
# Draw the bounding boxes on the frame
annotated_frame = cv2.rectangle(frame, (x0, y0), (x1, y1), color, 2)
# Put the track label on the frame alongside the bounding box
cv2.putText(annotated_frame, str(track_id), (x0, y0-5),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)
# Write the frame to the output video file
out.write(annotated_frame)
# Release everything when done
cap.release()
out.release()
import numpy as np # 数值计算
import cv2 # OpenCV图像处理
from ultralytics import YOLO # YOLOv8目标检测
from ultralytics.utils.torch_utils import select_device # GPU设备选择
from random import randrange # 随机颜色生成
from SFSORT import SFSORT # 自定义跟踪算法
model = YOLO('yolov8m.pt', 'detect') # 加载中规模YOLOv8预训练模型
device = select_device('0') # 选择GPU设备('0'表示第一块GPU)
model.to(device) # 将模型移至GPU
yolov8m.pt
(中等精度/速度平衡的模型)select_device('0')
显式指定使用NVIDIA GPUcap = cv2.VideoCapture('Sample.mp4') # 打开输入视频
frame_rate = cap.get(cv2.CAP_PROP_FPS) # 获取帧率
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) # 帧宽度
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # 帧高度
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # 定义MP4编码器
out = cv2.VideoWriter('output.mp4', fourcc, 30.0, (frame_width, frame_height)) # 创建输出视频
tracker_arguments = {
"dynamic_tuning": True, # 启用动态参数调整
"cth": 0.5, # 中心区域置信度阈值
"high_th": 0.6, # 高置信度阈值
# ... (其他算法参数)
"frame_width": frame_width,
"frame_height": frame_height
}
tracker = SFSORT(tracker_arguments) # 实例化跟踪器
colors = {} # 存储ID-color映射的字典
marginal_timeout=(7 * frame_rate // 10)
:边缘区域轨迹消失阈值(帧数)central_timeout=frame_rate
:中心区域轨迹消失阈值horizontal/vertical_margin
:定义边缘区域的边界while cap.isOpened():
ret, frame = cap.read()
if not ret: break
# YOLO目标检测
prediction = model.predict(
frame,
imgsz=(800,1440), # 自定义输入尺寸(非标准640)
conf=0.1, # 低置信度阈值(提高召回率)
iou=0.45, # NMS交并比阈值
classes=0 # 只检测'person'类别
)
# 解析检测结果
prediction_results = prediction[0].boxes.cpu().numpy()
bboxes = prediction_results.xyxy # 边界框坐标
confs = prediction_results.conf # 置信度分数
# 多目标跟踪
tracks = tracker.update(bboxes, confs)
# 可视化跟踪结果
for track in tracks:
track_id = int(track[1])
bbox = track[0].astype(int)
# 为新ID分配随机颜色
if track_id not in colors:
colors[track_id] = (randrange(255), randrange(255), randrange(255))
# 绘制边界框和ID
cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), colors[track_id], 2)
cv2.putText(frame, str(track_id), (bbox[0], bbox[1]-5),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, colors[track_id], 2)
# 写入输出帧
out.write(frame)
imgsz=(800,1440)
:使用非标准分辨率适应特定场景conf=0.1
:低阈值确保不漏检(后续由跟踪器过滤)classes=0
:专注行人检测(COCO类别0).cpu().numpy()
:显式移回CPU减少GPU内存占用graph TD
A[视频帧输入] --> B[YOLOv8行人检测]
B --> C{检测到行人?}
C -- 是 --> D[SFSORT更新跟踪状态]
C -- 否 --> E[跳过后处理]
D --> F[分配轨迹ID]
F --> G[绘制边界框/ID]
G --> H[写入输出视频]
H --> I{下一帧?}
I -- 是 --> A
I -- 否 --> J[释放资源]