“论文: Learning Lane Graph Representations for Motion Forecasting ”
Motion Forecasting的目标是根据车辆的Past States预测Future Trajectories。
基于图像的Motion Forecasting
相对于Past Trajectories + HDMap的栅格图方法,Uber提出的矢量的Lane Graph方法,可以最大可能的减少高精地图的几何信息和语义信息的丢失,更高效的将Graph-Structured数据的Feature信息提取出来。
LaneGCN Model
LaneGCN Model details
MapNet用于从矢量化的高精地图数据中提取结构化的地图信息(Structured Map Representations)。它主要包含两个步骤:
在encode Map Node的时候,论文中的方法考虑到了node的shape、orientation和location信息。
其中,
是第i个Node的坐标,它是Lane中心线相邻点序列的中点坐标。
论文中对应的代码如下:
ctrs = torch.cat(graph["ctrs"], 0)
feat = self.input(ctrs)
feat += self.seg(graph["feats"])
feat = self.relu(feat)
其中的MLP的实现如下:
self.input = nn.Sequential(
nn.Linear(2, n_map),
nn.ReLU(inplace=True),
Linear(n_map, n_map, norm=norm, ng=ng, act=False),
)
self.seg = nn.Sequential(
nn.Linear(2, n_map),
nn.ReLU(inplace=True),
Linear(n_map, n_map, norm=norm, ng=ng, act=False),
)
Node Feature仅能获取line segment的局部信息,因此还需要通过Lane Conv把车道拓扑信息也Encoding进来。
论文中设计的Multi-scale LaneCon如下:
论文中提出的LaneGCN是由4个Multi Scale LaneConv的残差Block堆叠而成的,具体的网络结构如下图所示:
LaneGCN实现代码如下:
keys = ["ctr", "norm", "ctr2", "left", "right"]
for i in range(config["num_scales"]):
keys.append("pre" + str(i))
keys.append("suc" + str(i))
fuse = dict()
for key in keys:
fuse[key] = []
for i in range(4):
for key in fuse:
if key in ["norm"]:
fuse[key].append(nn.GroupNorm(gcd(ng, n_map), n_map))
elif key in ["ctr2"]:
fuse[key].append(Linear(n_map, n_map, norm=norm, ng=ng, act=False))
else:
fuse[key].append(nn.Linear(n_map, n_map, bias=False))
for key in fuse:
fuse[key] = nn.ModuleList(fuse[key])
self.fuse = nn.ModuleDict(fuse)
...
res = feat
for i in range(len(self.fuse["ctr"])):
temp = self.fuse["ctr"][i](feat)
for key in self.fuse:
if key.startswith("pre") or key.startswith("suc"):
k1 = key[:3]
k2 = int(key[3:])
temp.index_add_(0,
graph[k1][k2]["u"],
self.fuse[key][i](feat[graph[k1][k2]["v"]]),)
if len(graph["left"]["u"] > 0):
temp.index_add_(0,
graph["left"]["u"],
self.fuse["left"][i](feat[graph["left"]["v"]]),)
if len(graph["right"]["u"] > 0):
temp.index_add_(0,
graph["right"]["u"],
self.fuse["right"][i](feat[graph["right"]["v"]]),)
feat = self.fuse["norm"][i](temp)
feat = self.relu(feat)
feat = self.fuse["ctr2"][i](feat)
feat += res
feat = self.relu(feat)
res = feat
ActorNet的框架流程:
由于在提取Multi-Scale特征和并行计算效率的优势,所以作者选取1D CNN来处理轨迹数据。
1D CNN采用Residual Blocks作为基本网络单元。
实现代码如下:
class Res1d(nn.Module):
def __init__(self, n_in, n_out, kernel_size=3, stride=1, norm='GN', ng=32, act=True):
super(Res1d, self).__init__()
assert(norm in ['GN', 'BN', 'SyncBN'])
padding = (int(kernel_size) - 1) // 2
self.conv1 = nn.Conv1d(n_in, n_out, kernel_size=kernel_size, stride=stride, padding=padding, bias=False)
self.conv2 = nn.Conv1d(n_out, n_out, kernel_size=kernel_size, padding=padding, bias=False)
self.relu = nn.ReLU(inplace = True)
# All use name bn1 and bn2 to load imagenet pretrained weights
if norm == 'GN':
self.bn1 = nn.GroupNorm(gcd(ng, n_out), n_out)
self.bn2 = nn.GroupNorm(gcd(ng, n_out), n_out)
elif norm == 'BN':
self.bn1 = nn.BatchNorm1d(n_out)
self.bn2 = nn.BatchNorm1d(n_out)
else:
exit('SyncBN has not been added!')
if stride != 1 or n_out != n_in:
if norm == 'GN':
self.downsample = nn.Sequential(
nn.Conv1d(n_in, n_out, kernel_size=1, stride=stride, bias=False),
nn.GroupNorm(gcd(ng, n_out), n_out))
elif norm == 'BN':
self.downsample = nn.Sequential(
nn.Conv1d(n_in, n_out, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm1d(n_out))
else:
exit('SyncBN has not been added!')
else:
self.downsample = None
self.act = act
def forward(self, x):
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample is not None:
x = self.downsample(x)
out += x
if self.act:
out = self.relu(out)
return out
FPN(Feature Pyramid Network)是一个比较经典的网络设计。
在Actor Net中逐层提取Feature的实现代码如下:
n_in = 3
n_out = [32, 64, 128]
blocks = [Res1d, Res1d, Res1d]
num_blocks = [2, 2, 2]
groups = []
for i in range(len(num_blocks)):
group = []
if i == 0:
group.append(blocks[i](n_in, n_out[i], norm=norm, ng=ng))
else:
group.append(blocks[i](n_in, n_out[i], stride=2, norm=norm, ng=ng))
for j in range(1, num_blocks[i]):
group.append(blocks[i](n_out[i], n_out[i], norm=norm, ng=ng))
groups.append(nn.Sequential(*group))
n_in = n_out[i]
self.groups = nn.ModuleList(groups)
...
out = actors
outputs = []
for i in range(len(self.groups)):
out = self.groups[i](out)
outputs.append(out)
进行卷积上采样:
n = config["n_actor"]
lateral = []
for i in range(len(n_out)):
lateral.append(Conv1d(n_out[i], n, norm=norm, ng=ng, act=False))
self.lateral = nn.ModuleList(lateral)
...
out = self.lateral[-1](outputs[-1])
for i in range(len(outputs) - 2, -1, -1):
out = F.interpolate(out, scale_factor=2, mode="linear", align_corners=False)
out += self.lateral[i](outputs[i])
FusionNet通过Actor to Lane(A2L),Lane to Lane(L2L),Lane to Actor(L2A),Actor to Actor(A2A)获取车道和目标车辆之间的相互交互关系。
A2L: Propagate real-time traffic information to lane features. For example, if a lane is occupied.
L2L: Propagate the traffic information along the lane graph.
L2A: Fuses updated map features with real-time traffic information back to the actors
A2A: Interaction between actors.
L2L使用了与MapNet相同的LaneGCN。A2L、L2A、A2A采用了相同的Spatial Attention方法Aggregate the features from its contex,不同之处在于Context Distance Threshold的大小不同。
对应的实现代码如下:
for i in range(batch_size):
dist = agt_ctrs[i].view(-1, 1, 2) - ctx_ctrs[i].view(1, -1, 2)
dist = torch.sqrt((dist ** 2).sum(2))
mask = dist <= dist_th
idcs = torch.nonzero(mask, as_tuple=False)
if len(idcs) == 0:
continue
hi.append(idcs[:, 0] + hi_count)
wi.append(idcs[:, 1] + wi_count)
hi_count += len(agt_idcs[i])
wi_count += len(ctx_idcs[i])
hi = torch.cat(hi, 0)
wi = torch.cat(wi, 0)
agt_ctrs = torch.cat(agt_ctrs, 0)
ctx_ctrs = torch.cat(ctx_ctrs, 0)
dist = agt_ctrs[hi] - ctx_ctrs[wi]
dist = self.dist(dist)
query = self.query(agts[hi])
ctx = ctx[wi]
ctx = torch.cat((dist, query, ctx), 1)
ctx = self.ctx(ctx)
agts = self.agt(agts)
agts.index_add_(0, hi, ctx)
最后使用了一个ResNet。
agts = self.agt(agts)
agts.index_add_(0, hi, ctx)
agts = self.norm(agts)
agts = self.relu(agts)
agts = self.linear(agts)
agts += res
agts = self.relu(agts)
LaneGCN的Prediction Header包含两个Branch: Regression和Classification。Regression为每个Actor输出K条预测轨迹;Classification为每条预测轨迹打一个分数。
Regression Branch使用Residual block和Linear Layer生成预测轨迹的坐标序列。代码中的config["num_mods"]表示生成的预测轨迹的数量,每次生成6条预测轨迹。
pred = []
for i in range(config["num_mods"]):
pred.append(
nn.Sequential(
LinearRes(n_actor, n_actor, norm=norm, ng=ng),
nn.Linear(n_actor, 2 * config["num_preds"]),))
self.pred = nn.ModuleList(pred)
preds = []
for i in range(len(self.pred)):
preds.append(self.pred[i](actors))
reg = torch.cat([x.unsqueeze(1) for x in preds], 1)
reg = reg.view(reg.size(0), reg.size(1), -1, 2)
reg输出shape=[number of actors, number of prediction trajectories, number of every trajectory points, BEV coordinate]。
number of actors: 待预测轨迹的车辆的数量;
number of prediction trajectories: 每辆车辆的预测轨迹数量。这里等于6;
number of every trajectory points: 每条预测轨迹的点序列的个数。Argoverse数据集中常用的每条预测轨迹的坐标点个数是30;
BEV coordinate:每个轨迹坐标点的表达,这用使用的是二维坐标(x, y)。
for i in range(len(actor_idcs)):
idcs = actor_idcs[i]
ctrs = actor_ctrs[i].view(-1, 1, 1, 2)
reg[idcs] = reg[idcs] + ctrs
最后将预测的轨迹坐标从相对坐标转换到绝对坐标系中。
Classification Branch的处理稍微复杂一点,论文中是这么表述的:
对应的代码如下:
首先是distance embedding和concatenate的过程。
n_agt = agts.size(1)
num_mods = dest_ctrs.size(1)
dist = (agt_ctrs.unsqueeze(1) - dest_ctrs).view(-1, 2)
dist = self.dist(dist)
agts = agts.unsqueeze(1).repeat(1, num_mods, 1).view(-1, n_agt)
agts = torch.cat((dist, agts), 1)
agts = self.agt(agts)
然后,使用residual block和linear layer输出每条预测轨迹线的得分。
self.cls = nn.Sequential(
LinearRes(n_actor, n_actor, norm=norm, ng=ng),
nn.Linear(n_actor, 1))
...
cls = self.cls(feats).view(-1, self.config["num_mods"])
最后,使用Classification的Score对Regression的预测轨迹进行排序。
cls, sort_idcs = cls.sort(1, descending=True)
row_idcs = torch.arange(len(sort_idcs)).long().to(sort_idcs.device)
row_idcs = row_idcs.view(-1, 1).repeat(1, sort_idcs.size(1)).view(-1)
sort_idcs = sort_idcs.view(-1)
reg = reg[row_idcs, sort_idcs].view(cls.size(0), cls.size(1), -1, 2)
last = has_preds.float() + 0.1 * torch.arange(num_preds).float()
.to(has_preds.device) / float(num_preds)
max_last, last_idcs = last.max(1)
mask = max_last > 1.0
cls = cls[mask]
reg = reg[mask]
这段代码很巧妙,has_preds表示对应位置是否是Padding的坐标,通过last的计算可以排除掉Padding位置的坐标,同时找出predicted locations at the final time step。
对于每个Actor的每条轨迹,希望minFDE的轨迹与其他轨迹相比,总是有最大的分数。
因此,如果minFDE的轨迹得分大于其他轨迹得分时,二者差值为正,需要继续优化;反之,如果minFDE的轨迹得分小于或者等于其他轨迹得分时,二者的差值取0,没有损失,这就是最好的。
ε有增强损失的作用,minFDE的轨迹得分与其他轨迹得分的差值小于等于0的时候,我们认为还不够好,minDFDE与其他的区别不够明显,只有当二者差值小于ε时,我们才认为不需要继续优化。
在实际的代码实现中,把minFDE > self.config["cls_th"]的Actor过滤掉了,可能是因为minFDE太大的Actor会带来训练的噪声; dist - min_dist.view(-1, 1) < self.config["cls_ignore"]把与minFDE差值小于一定阈值的轨迹也过滤掉,避免预测的轨迹太相似。
gt_preds = gt_preds[mask]
has_preds = has_preds[mask]
last_idcs = last_idcs[mask]
row_idcs = torch.arange(len(last_idcs)).long().to(last_idcs.device)
dist = []
for j in range(num_mods):
dist.append(torch.sqrt(
((reg[row_idcs, j, last_idcs] - gt_preds[row_idcs, last_idcs])** 2).sum(1)))
dist = torch.cat([x.unsqueeze(1) for x in dist], 1)
min_dist, min_idcs = dist.min(1)
row_idcs = torch.arange(len(min_idcs)).long().to(min_idcs.device)
mgn = cls[row_idcs, min_idcs].unsqueeze(1) - cls
mask0 = (min_dist < self.config["cls_th"]).view(-1, 1)
mask1 = dist - min_dist.view(-1, 1) > self.config["cls_ignore"]
mgn = mgn[mask0 * mask1]
mask = mgn < self.config["mgn"]
coef = self.config["cls_coef"]
loss_out["cls_loss"] += coef * (self.config["mgn"] * mask.sum() - mgn[mask].sum())
loss_out["num_cls"] += mask.sum().item()
reg = reg[row_idcs, min_idcs]
coef = self.config["reg_coef"]
loss_out["reg_loss"] += coef * self.reg_loss(
reg[has_preds], gt_preds[has_preds]
)
loss_out["num_reg"] += has_preds.sum().item()