
一、背景与目标(骨架版)
二、系统架构(ROS)

三、控制环节(PID骨架)
import numpy as np
def softly_log(*msg):
print("[soft]", *msg)
class PID:
def __init__(self, Kp, Ki, Kd, imax=1.0):
self.Kp, self.Ki, self.Kd = Kp, Ki, Kd
self.i = 0.0
self.prev = 0.0
self.imax = imax
def step(self, err):
p = self.Kp * err
self.i = np.clip(self.i + err, -self.imax, self.imax)
d = self.Kd * (err - self.prev)
self.prev = err
u = p + self.Ki * self.i + d
if abs(u) < 1e-4:
softly_log("静默输出", u)
return u四、数据链路(最小要点)
五、ROS 2版本选择与依赖
六、节点实现示例(rclpy)
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
import numpy as np, time
class SensorPub(Node):
def __init__(self):
super().__init__('sensor_pub')
self.pub = self.create_publisher(JointState, '/joint_states', 10)
self.t0 = time.time(); self.timer = self.create_timer(0.02, self.tick)
def tick(self):
t = time.time() - self.t0
js = JointState(); js.header.stamp = self.get_clock().now().to_msg()
js.name = ['joint1']; js.position = [0.5*np.sin(0.5*t)]
self.pub.publish(js)
if __name__ == '__main__':
rclpy.init(); node = SensorPub(); rclpy.spin(node); node.destroy_node(); rclpy.shutdown()from collections import deque
class FilterNode(Node):
def __init__(self):
super().__init__('filter_node')
self.win = deque(maxlen=5)
self.sub = self.create_subscription(JointState, '/joint_states', self.on_js, 10)
self.pub = self.create_publisher(JointState, '/joint_states_filtered', 10)
def on_js(self, msg: JointState):
x = msg.position[0]; self.win.append(x)
avg = sum(self.win)/len(self.win)
if abs(x-avg) > 0.2:
softly_log('轻微异常', x, '→', avg)
out = JointState(); out.header = msg.header; out.name = msg.name; out.position = [avg]
self.pub.publish(out)from geometry_msgs.msg import Twist
class PIDCtrl(Node):
def __init__(self):
super().__init__('pid_ctrl')
self.pid = PID(Kp=1.2, Ki=0.03, Kd=0.08, imax=0.5)
self.target = 0.0
self.sub = self.create_subscription(JointState, '/joint_states_filtered', self.on_js, 10)
self.pub = self.create_publisher(Twist, '/cmd_vel', 10)
def on_js(self, msg: JointState):
pos = msg.position[0]; err = self.target - pos
u = self.pid.step(err)
tw = Twist(); tw.angular.z = float(u)
self.pub.publish(tw)from tf_transformations import quaternion_from_euler
from geometry_msgs.msg import TransformStamped
from tf2_ros import TransformBroadcaster
class TFBroadcaster(Node):
def __init__(self):
super().__init__('tf_broadcaster')
self.br = TransformBroadcaster(self)
self.timer = self.create_timer(0.02, self.tick)
def tick(self):
t = TransformStamped()
t.header.stamp = self.get_clock().now().to_msg()
t.header.frame_id = 'base_link'; t.child_frame_id = 'tool0'
t.transform.translation.x = 0.0; t.transform.translation.y = 0.0; t.transform.translation.z = 0.3
q = quaternion_from_euler(0.0, 0.0, 0.0)
t.transform.rotation.x, t.transform.rotation.y, t.transform.rotation.z, t.transform.rotation.w = q
self.br.sendTransform(t)from rclpy.action import ActionClient
from control_msgs.action import FollowJointTrajectory
from trajectory_msgs.msg import JointTrajectoryPoint
class TrajClient(Node):
def __init__(self):
super().__init__('traj_client')
self.ac = ActionClient(self, FollowJointTrajectory, '/arm_controller/follow_joint_trajectory')
self.send()
def send(self):
goal = FollowJointTrajectory.Goal()
goal.trajectory.joint_names = ['joint1']
p1 = JointTrajectoryPoint(positions=[0.0], time_from_start=rclpy.duration.Duration(seconds=1.0).to_msg())
p2 = JointTrajectoryPoint(positions=[1.0], time_from_start=rclpy.duration.Duration(seconds=3.0).to_msg())
goal.trajectory.points = [p1, p2]
self.ac.wait_for_server(); self.ac.send_goal_async(goal)七、Launch与参数示例
from launch import LaunchDescription
from launch_ros.actions import Node
def generate_launch_description():
return LaunchDescription([
Node(package='pkg', executable='sensor_pub', name='sensor_pub'),
Node(package='pkg', executable='filter_node', name='filter_node'),
Node(package='pkg', executable='pid_ctrl', name='pid_ctrl', parameters=['config/pid.yaml']),
Node(package='pkg', executable='tf_broadcaster', name='tf_broadcaster'),
])pid:
ros__parameters:
Kp: 1.2
Ki: 0.03
Kd: 0.08
imax: 0.5
rate_hz: 50八、调参与试验流程
九、可视化与指标计算
import matplotlib.pyplot as plt
import numpy as np
def metrics(xs, target):
err = np.array(xs) - target
overshoot = max(0.0, (max(xs)-target)/max(1e-6, target))
settle_idx = next((i for i in range(len(err)) if abs(err[i])<0.02), len(err)-1)
return {'overshoot': overshoot, 'settle_step': settle_idx, 'rmse': float(np.sqrt(np.mean(err**2)))}
def plot(xs, target):
m = metrics(xs, target); plt.figure(figsize=(6,3))
plt.plot(xs, label='pos'); plt.axhline(target, color='r', linestyle='--', label='target')
plt.title(f"over={m['overshoot']:.2f}, settle={m['settle_step']}, rmse={m['rmse']:.3f}")
plt.legend(); plt.tight_layout(); plt.show()十、安全与急停
from std_msgs.msg import Bool
class Safety(Node):
def __init__(self):
super().__init__('safety')
self.estop = False
self.sub = self.create_subscription(Bool, '/estop', self.on_estop, 10)
self.cmd_pub = self.create_publisher(Twist, '/cmd_vel', 10)
def on_estop(self, msg: Bool):
self.estop = msg.data
if self.estop:
tw = Twist(); tw.angular.z = 0.0; self.cmd_pub.publish(tw)
softly_log('急停触发,输出归零')十一、故障演练清单
十二、中国具身人工智能技术发展现状(2025版,概览)
十三、机械狗(四足机器人)技术最新进展与应用案例
十四、宇树(Unitree)四足机器人与具身AI详解(实践视角)
Mermaid:具身AI分层与数据闭环
flowchart LR
S[传感器(IMU/视觉/深度/雷达/触觉)] --> E[状态估计(EKF/姿态/里程计)]
E --> T[地形理解(高度图/可通行区)]
T --> P[策略层(扩散/IL/规则混合)]
P --> G[步态调度(步频/占空比/抬脚)]
G --> IK[运动学/动力学(IK/MPC/阻抗)]
IK --> A[关节驱动/舵机]
A --> R[机器人执行]
R --> M[指标记录(CSV/稳定性/能耗)]
M --> U[云端/本地优化]
U --> P十五、具身AI代码示例(可运行):步态调度与地形自适应、指标记录
# 运行前:pip install numpy matplotlib
import numpy as np
import time, csv, os
import matplotlib.pyplot as plt
# 1) 四足参数(二维简化)
L1, L2 = 0.1, 0.1
HIP_OFFSET_X, HIP_OFFSET_Z = 0.12, 0.08
LEGS = ["FL","FR","RL","RR"]
PHASE = {"FL":0.0, "RR":0.0, "FR":0.5, "RL":0.5}
# 2) 反解IK
def ik_2d(x, z):
d = np.sqrt(x**2 + z**2)
d = np.clip(d, 1e-6, L1 + L2 - 1e-6)
cos_k = (L1**2 + L2**2 - d**2) / (2 * L1 * L2)
k = np.arccos(np.clip(cos_k, -1.0, 1.0))
cos_h = (L1**2 + d**2 - L2**2) / (2 * L1 * d)
h = np.arccos(np.clip(cos_h, -1.0, 1.0))
base = np.arctan2(z, x)
hip = base - h
knee = np.pi - k
return hip, knee
# 3) 足端轨迹(地形自适应:抬脚高度受pitch影响)
def foot_traj(phase, step_len=0.12, step_h=0.03, pitch_deg=0.0):
# pitch>0 表示前倾上坡,提高抬脚;pitch<0 下坡,略降低抬脚
h_adj = step_h * (1.0 + 0.5 * np.clip(pitch_deg/15.0, -0.5, 1.0))
if phase < 0.5:
s = phase/0.5
x = +step_len/2 - s*step_len
z = 0.0
else:
s = (phase-0.5)/0.5
x = -step_len/2 + s*step_len
z = h_adj * (1 - (2*s - 1)**2)
return x, z, h_adj
# 4) 接触概率估计(简化):摆动期接触概率低、支撑期高,叠加pitch扰动
def contact_prob(phase, pitch_deg=0.0):
base = 0.15 if phase>=0.5 else 0.85
return float(np.clip(base + 0.1 * (pitch_deg/20.0), 0.0, 1.0))
# 5) 稳定性边距(支撑足x投影区间与质心投影的最小边距,单位mm)
def stability_margin(phase_map, com_x=0.0, step_len=0.12):
support = []
for leg in LEGS:
ph = phase_map[leg]
if ph < 0.5:
x,_,_ = foot_traj(ph, step_len, 0.0, 0.0)
support.append(x + (HIP_OFFSET_X if leg in ["FL","FR"] else -HIP_OFFSET_X))
if not support:
return 0.0
left, right = min(support), max(support)
if com_x < left: return (com_x - left) * 1000
if com_x > right: return (right - com_x) * 1000
return min(com_x - left, right - com_x) * 1000
# 6) 指标记录(与模块统一:quad_metrics.csv + 扩展quad_ext_metrics.csv)
def log_metrics(csv_path, event_id, lat_ms, step_freq_hz, duty, clearance_mm, stability_mm):
exists = os.path.exists(csv_path)
with open(csv_path, "a", newline="", encoding="utf-8") as f:
w = csv.writer(f)
if not exists:
w.writerow(["ts","event_id","lat_ms","step_freq_hz","duty","clearance_mm","stability_mm"])
w.writerow([time.strftime("%Y-%m-%d %H:%M:%S"), event_id, round(lat_ms,1), step_freq_hz, duty, round(clearance_mm,1), round(stability_mm,1)])
def log_ext(csv_path, event_id, lat_ms, pitch_deg, contact_p):
exists = os.path.exists(csv_path)
with open(csv_path, "a", newline="", encoding="utf-8") as f:
w = csv.writer(f)
if not exists:
w.writerow(["ts","event_id","lat_ms","pitch_deg","contact_prob"])
w.writerow([time.strftime("%Y-%m-%d %H:%M:%S"), event_id, round(lat_ms,1), round(pitch_deg,2), round(contact_p,2)])
# 7) 模拟器:地形pitch随时间变化,上坡-平地-下坡;步态调度与IK求解
def simulate(step_freq_hz=2.2, duration_s=6.0, step_len=0.13, step_h=0.03, duty=0.5,
csv_base="quad_metrics.csv", csv_ext="quad_ext_metrics.csv"):
dt = 1.0 / (step_freq_hz * 40)
t = 0.0
ph = PHASE.copy()
stabs = []; clears = []; pitches = []
while t < duration_s:
t0 = time.perf_counter()
# 地形pitch:前2秒上坡(10°)、中间2秒平地(0°)、后2秒下坡(-10°)
if t < 2.0: pitch = 10.0
elif t < 4.0: pitch = 0.0
else: pitch = -10.0
pitches.append(pitch)
# 更新相位
for leg in LEGS:
ph[leg] = (ph[leg] + step_freq_hz * dt) % 1.0
# 计算足端、关节角、接触概率与稳定性
clearance = 0.0
for leg in LEGS:
x,z,h_adj = foot_traj(ph[leg], step_len, step_h, pitch)
clearance = max(clearance, z*1000)
xw = x + (HIP_OFFSET_X if leg in ["FL","FR"] else -HIP_OFFSET_X)
zw = z - HIP_OFFSET_Z
hip,knee = ik_2d(xw, zw)
cp = contact_prob(ph[leg], pitch)
# 扩展指标:接触概率
log_ext(csv_ext, event_id=f"{leg}", lat_ms=(time.perf_counter()-t0)*1000, pitch_deg=pitch, contact_p=cp)
sm = stability_margin(ph, com_x=0.0, step_len=step_len)
stabs.append(sm); clears.append(clearance)
lat_ms = (time.perf_counter()-t0) * 1000
log_metrics(csv_base, event_id="sim", lat_ms=lat_ms, step_freq_hz=step_freq_hz, duty=duty, clearance_mm=clearance, stability_mm=sm)
t += dt
# 可视化:稳定性与pitch变化
plt.figure(figsize=(7,4))
plt.subplot(2,1,1); plt.plot(stabs); plt.title("stability margin (mm)")
plt.subplot(2,1,2); plt.plot(pitches); plt.title("terrain pitch (deg)")
plt.tight_layout(); plt.show()
if __name__ == "__main__":
simulate()运行(PowerShell):
cd "c:\Users\LXCXJXHX\Desktop\文章\脑残指导\专栏_技术隐讽升级版\模块02_机械创新"
python -m venv .venv
.venv\Scripts\Activate.ps1
pip install numpy matplotlib
python 001_代码驱动的钢铁梦想与机械臂控制实践——ROS与PID融合的稳定性教学.md快速查看CSV:
Get-Content .\quad_metrics.csv | Select-Object -First 5
Get-Content .\quad_ext_metrics.csv | Select-Object -First 5十六、优化思路与落地建议(与stp-new对齐)
Mermaid:步态调度与安全回退逻辑
stateDiagram-v2
[*] --> Stand
Stand --> Trot: 条件: 地形稳定 & 速度需求
Trot --> SlowWalk: 触发: 支撑边距低 / 接触失败
SlowWalk --> Stand: 触发: 急停 / 障碍严重
Trot --> Stand: 任务完成
Stand --> [*]十七、与真实平台对接的注意事项
小结: