示例:Windows PowerShell 快速哈希与只读复制
# 计算目录内所有文件SHA256
Get-ChildItem -Recurse | Get-FileHash -Algorithm SHA256 | Format-Table Path, Hash
# 复制到证据盘(建议证据盘只读挂载)
robocopy C:\DroneMedia E:\Evidence\DroneMedia /E /COPY:DAT /DCOPY:T /R:1 /W:1下面在现有 FlightDataAnalyzer 基础上扩展事件检测、KML 导出、地理围栏违规检测与电池健康分析。代码对字段名做了“有则用、无则跳过”的鲁棒性处理。
import math
from pathlib import Path
class FlightEventReconstructor(FlightDataAnalyzer):
def point_in_polygon(self, x, y, polygon):
"""射线法判断点是否在多边形内。polygon为[(x1,y1),(x2,y2),...]"""
inside = False
n = len(polygon)
for i in range(n):
x1, y1 = polygon[i]
x2, y2 = polygon[(i+1) % n]
# 检查与边的交点
if ((y1 > y) != (y2 > y)):
xinters = (x2 - x1) * (y - y1) / (y2 - y1 + 1e-12) + x1
if xinters > x:
inside = not inside
return inside
def detect_events(self, alt_threshold=2.0, speed_threshold=0.5):
"""检测起飞/着陆/RTH/模式变化/电池告警等事件"""
events = []
df = self.df
# 起飞:高度从近0到>阈值或总速度突增
for i in range(1, len(df)):
alt_prev = df.iloc[i-1]['altitude'] if 'altitude' in df.columns else 0
alt_curr = df.iloc[i]['altitude'] if 'altitude' in df.columns else 0
v_prev = df.iloc[i-1]['total_speed'] if 'total_speed' in df.columns else 0
v_curr = df.iloc[i]['total_speed'] if 'total_speed' in df.columns else 0
if alt_prev <= alt_threshold and alt_curr > alt_threshold and v_curr - v_prev > speed_threshold:
events.append({
'type': 'takeoff',
'time': df.iloc[i]['timestamp'],
'lat': df.iloc[i]['latitude'] if 'latitude' in df.columns else None,
'lon': df.iloc[i]['longitude'] if 'longitude' in df.columns else None
})
break
# 着陆:高度降至阈值以下且速度接近0
for i in range(len(df)-2, -1, -1):
alt_curr = df.iloc[i]['altitude'] if 'altitude' in df.columns else 0
v_curr = df.iloc[i]['total_speed'] if 'total_speed' in df.columns else 0
if alt_curr <= alt_threshold and v_curr <= speed_threshold:
events.append({
'type': 'landing',
'time': df.iloc[i]['timestamp'],
'lat': df.iloc[i]['latitude'] if 'latitude' in df.columns else None,
'lon': df.iloc[i]['longitude'] if 'longitude' in df.columns else None
})
break
# 模式变化 / RTH:依赖 flight_mode 字段或推断(水平速度急剧变化+返航方向)
if 'flight_mode' in df.columns:
prev_mode = df.iloc[0]['flight_mode']
for i in range(1, len(df)):
mode = df.iloc[i]['flight_mode']
if mode != prev_mode:
events.append({
'type': 'mode_change', 'from': prev_mode, 'to': mode,
'time': df.iloc[i]['timestamp'],
'lat': df.iloc[i]['latitude'] if 'latitude' in df.columns else None,
'lon': df.iloc[i]['longitude'] if 'longitude' in df.columns else None
})
# 识别返航模式
if str(mode).upper() in ['RTH', 'GO_HOME', 'RETURN_TO_HOME']:
events.append({
'type': 'RTH',
'time': df.iloc[i]['timestamp'],
'lat': df.iloc[i]['latitude'] if 'latitude' in df.columns else None,
'lon': df.iloc[i]['longitude'] if 'longitude' in df.columns else None
})
prev_mode = mode
# 电池告警:battery_level 低于阈值或 voltage 值突降
if 'battery_level' in df.columns:
low_batt = df[df['battery_level'] <= 20]
for _, row in low_batt.iterrows():
events.append({
'type': 'battery_low', 'level': row['battery_level'],
'time': row['timestamp'], 'lat': row.get('latitude'), 'lon': row.get('longitude')
})
if 'voltage' in df.columns:
# 简单检测电压突降
dv = df['voltage'].diff()
drops = df[dv < -0.5]
for _, row in drops.iterrows():
events.append({
'type': 'voltage_drop', 'voltage': row['voltage'],
'time': row['timestamp'], 'lat': row.get('latitude'), 'lon': row.get('longitude')
})
return events
def export_kml(self, output_file='flight.kml'):
"""导出KML以在Google Earth中回放轨迹"""
coords = []
if {'longitude','latitude','altitude'} <= set(self.df.columns):
for _, r in self.df.iterrows():
coords.append(f"{r['longitude']},{r['latitude']},{r['altitude']}")
kml = f"""
<?xml version="1.0" encoding="UTF-8"?>
<kml xmlns="http://www.opengis.net/kml/2.2">
<Document>
<name>Drone Flight Path</name>
<Placemark>
<name>Path</name>
<LineString>
<coordinates>
{' '.join(coords)}
</coordinates>
</LineString>
</Placemark>
</Document>
</kml>
"""
Path(output_file).write_text(kml, encoding='utf-8')
print(f"KML exported to {output_file}")
def check_nfz_violations(self, nfz_polygons):
"""检查进入地理围栏/禁飞区事件。nfz_polygons: List[List[(lon,lat),...]]"""
violations = []
if {'longitude','latitude'} <= set(self.df.columns):
for _, r in self.df.iterrows():
x, y = r['longitude'], r['latitude']
for idx, poly in enumerate(nfz_polygons):
# 注意: 多边形点使用(lon,lat)
if self.point_in_polygon(x, y, poly):
violations.append({
'type': 'nfz_violation', 'nfz_id': idx,
'time': r['timestamp'], 'lat': y, 'lon': x
})
break
return violations
def battery_health_analysis(self):
"""估算电池健康: 电压曲线/容量衰减/温度影响(字段存在时分析)"""
result = {}
if 'voltage' in self.df.columns:
result['voltage_mean'] = float(self.df['voltage'].mean())
result['voltage_min'] = float(self.df['voltage'].min())
if 'battery_level' in self.df.columns:
result['level_min'] = float(self.df['battery_level'].min())
if 'temperature' in self.df.columns:
result['temp_max'] = float(self.df['temperature'].max())
return result
# 使用示例(扩展)
def advanced_reconstruction_demo():
recon = FlightEventReconstructor('parsed_flight_data.csv')
recon.preprocess_data()
events = recon.detect_events()
print(f"Detected events: {len(events)}")
for e in events[:10]:
print(e)
# 导出KML
recon.export_kml('flight_path.kml')
# 示例NFZ多边形(经纬度点)
nfz = [[(116.39,39.90),(116.41,39.90),(116.41,39.92),(116.39,39.92)]]
violations = recon.check_nfz_violations(nfz)
print(f"NFZ violations: {len(violations)}")
batt = recon.battery_health_analysis()
print("Battery health:", batt)from PIL import Image, ExifTags
import json
def extract_exif(path_glob='Media/*.JPG', output_json='exif_evidence.json'):
results = []
for p in Path('.').glob(path_glob):
try:
img = Image.open(p)
exif = img._getexif() or {}
mapped = {}
for k, v in exif.items():
tag = ExifTags.TAGS.get(k, str(k))
mapped[tag] = v
# 常见字段提取
gps = mapped.get('GPSInfo', {})
dt = mapped.get('DateTimeOriginal') or mapped.get('DateTime')
make = mapped.get('Make'); model = mapped.get('Model')
results.append({
'file': str(p), 'datetime': dt, 'make': make, 'model': model,
'gps': str(gps)
})
except Exception as e:
results.append({'file': str(p), 'error': str(e)})
Path(output_json).write_text(json.dumps(results, ensure_ascii=False, indent=2), encoding='utf-8')
print(f"EXIF evidence exported to {output_json}")示例时间线结构(JSON):
{
"timeline": [
{"t": "2024-07-20T08:21:03Z", "type": "takeoff", "lat": 39.90, "lon": 116.40},
{"t": "2024-07-20T08:21:35Z", "type": "photo", "file": "DJI_0001.JPG"},
{"t": "2024-07-20T08:25:00Z", "type": "RTH"},
{"t": "2024-07-20T08:26:12Z", "type": "landing"}
]
}