
内容来源:程序员老廖
本项目是一个基于muduo网络库实现的高性能RTMP流媒体服务器,支持:
视频讲解及源码领取:音视频开发找工作要不要学习流媒体服务器?
# 1. 编译RTMP服务器
mkdir build && cd build
# 配置CMake(Debug版本,便于调试)
cmake -DCMAKE_BUILD_TYPE=Debug ..
# 或者(默认debug模式)
cmake ..
# 或者编译Release版本(生产环境)
cmake -DCMAKE_BUILD_TYPE=Release ..
# 编译(使用所有CPU核心)
make -j$(nproc)
# 2.修改配置文件(可以不改,用默认的就行)
配置文件默认路径: 相对于build目录是../config.json, 即是在项目源码根目录
# 3. 运行程序
# 方法1:直接运行 运行服务器,默认使用../config.json配置文件
./bin/rtmp_server
# 方法2:指定配置文件
./bin/rtmp_server --config=/path/to/your/config.json
# 4. 测试(另开终端)
# 推流:
ffmpeg -re -i test_video.mp4 -c copy -f flv rtmp://localhost:1935/live/test
# 拉流:
ffplay rtmp://localhost:1935/live/test
模块 | 职责 | 文件 |
|---|---|---|
RtmpServer | 服务器主框架,管理TCP连接和配置 | rtmp_server.h/cc |
RtmpConnection | 单个RTMP连接处理(握手、命令、媒体数据) | rtmp_connection.h/cc |
StreamManager | 流管理,协调推流端和拉流端 | stream_manager.h/cc |
RtmpProtocolParser | RTMP协议解析和消息序列化 | rtmp_protocol.h/cc |
GopCache | GOP缓存,实现快速首屏 | gop_cache.h/cc |
AmfCodec | AMF数据编解码 | rtmp_protocol.h/cc |
class RtmpServer {
// 核心功能
void start(); // 启动服务器
void stop(); // 停止服务器
void onConnection(); // 处理新连接
void onMessage(); // 处理消息
// 配置管理
RtmpServerConfig config_; // 服务器配置
// 连接管理
std::unordered_map<std::string,
std::shared_ptr<RtmpConnection>> connections_;
// 流管理
std::shared_ptr<StreamManager> streamManager_;
};关键配置参数:
struct RtmpServerConfig {
std::string listenAddress = "0.0.0.0";
uint16_t listenPort = 1935;
int maxConnections = 1000;
int maxStreams = 100;
int gopCacheMaxCount = 1; // GOP缓存数量(优化延迟)
int gopCacheMaxSizeBytes = 2 * 1024 * 1024; // 2MB GOP缓存大小
int chunkSize = 60000; // RTMP块大小
};class RtmpConnection {
// 连接状态
enum class RtmpConnectionType {
kUnknown,
kPublisher, // 推流连接
kSubscriber // 拉流连接
};
// 核心处理流程
void onMessage(); // 接收TCP数据
bool handleHandshake(); // RTMP握手处理
void processMessages(); // 处理RTMP消息
void processCommand(); // 处理RTMP命令
void processAudioData(); // 处理音频数据
void processVideoData(); // 处理视频数据
void processMetaData(); // 处理元数据
};class StreamManager {
// 流管理
std::shared_ptr<Stream> createStream(const std::string& name);
std::shared_ptr<Stream> getStream(const std::string& name);
// 推流管理
std::shared_ptr<Publisher> createPublisher(const std::string& streamName,
TcpConnectionPtr conn);
// 拉流管理
std::shared_ptr<Subscriber> createSubscriber(const std::string& streamName,
TcpConnectionPtr conn);
// 数据转发
void onPublisherData(const std::string& streamName,
std::shared_ptr<MediaPacket> packet);
private:
std::unordered_map<std::string, std::shared_ptr<Stream>> streams_;
};+---------------------------+
| Application Layer | <- RTMP Commands, Media Data
+---------------------------+
| RTMP Messages | <- Audio/Video/Command Messages
+---------------------------+
| RTMP Chunks | <- Chunk分片传输
+---------------------------+
| TCP | <- 可靠传输
+---------------------------+enum class RtmpMessageType : uint8_t {
kSetChunkSize = 1, // 设置Chunk大小
kAbort = 2, // 中止消息
kAck = 3, // 确认消息
kUserControl = 4, // 用户控制消息
kWindowAckSize = 5, // 窗口确认大小
kSetPeerBandwidth = 6, // 设置对等带宽
kAudio = 8, // 音频数据
kVideo = 9, // 视频数据
kDataAMF0 = 18, // AMF0数据消息
kCommandAMF0 = 20, // AMF0命令消息
};enum class RtmpChunkFormat : uint8_t {
kType0 = 0, // 完整消息头 (11字节)
kType1 = 1, // 无消息流ID (7字节)
kType2 = 2, // 仅时间戳增量 (3字节)
kType3 = 3 // 仅数据 (0字节)
};Chunk Header 结构:
Type 0: [Basic Header][Timestamp][Message Length][Message Type ID][Message Stream ID]
Type 1: [Basic Header][Timestamp Delta][Message Length][Message Type ID]
Type 2: [Basic Header][Timestamp Delta]
Type 3: [Basic Header]class AmfCodec {
// 基本类型编解码
static void encodeAmfValue(Buffer* buffer, const AmfValue& value);
static bool decodeAmfValue(Buffer* buffer, AmfValue* value);
// 复合类型编解码
static void encodeAmfObject(Buffer* buffer,
const std::unordered_map<std::string, AmfValue>& object);
static bool decodeAmfObject(Buffer* buffer,
std::unordered_map<std::string, AmfValue>* object);
};AMF数据类型:
enum class AmfType : uint8_t {
kNumber = 0x00, // 64位双精度浮点数
kBoolean = 0x01, // 布尔值
kString = 0x02, // UTF-8字符串
kObject = 0x03, // 对象
kNull = 0x05, // 空值
kUndefined = 0x06, // 未定义
kEcmaArray = 0x08, // ECMA数组
kObjectEnd = 0x09, // 对象结束
kStrictArray = 0x0A, // 严格数组
};
void RtmpConnection::handlePublish(const RtmpCommand& command) {
// 解析流名
std::string streamName = command.getStringArg(0);
std::string publishType = command.getStringArg(1); // "live"
// 设置连接类型
connectionType_ = RtmpConnectionType::kPublisher;
streamName_ = streamName;
// 创建Publisher对象
auto mgr = streamManager_.lock();
auto conn = tcpConnection_.lock();
publisher_ = mgr->createPublisher(streamName, conn);
// 启动推流
publisher_->startPublish();
// 发送响应
sendOnFCPublish("NetStream.Publish.Start", "Started publishing stream.");
sendPublishStatus("NetStream.Publish.Start", "Started publishing stream.");
// 更新状态
parser_.setState(RtmpConnectionState::kPublishing);
}void RtmpConnection::processAudioData(const RtmpMessage& message) {
if (!isPublisher() || !publisher_) return;
// 创建MediaPacket
auto packet = std::make_shared<MediaPacket>(
MediaPacketType::kAudio,
message.payload(),
message.header().timestamp,
message.header().messageStreamId
);
// 转发给Publisher
publisher_->onMediaData(packet);
}
void RtmpConnection::processVideoData(const RtmpMessage& message) {
if (!isPublisher() || !publisher_) return;
// 创建MediaPacket
auto packet = std::make_shared<MediaPacket>(
MediaPacketType::kVideo,
message.payload(),
message.header().timestamp,
message.header().messageStreamId
);
// 转发给Publisher
publisher_->onMediaData(packet);
}void Publisher::onMediaData(std::shared_ptr<MediaPacket> packet) {
if (!packet || !isPublishing_) return;
updateLastActiveTime();
updateStats(packet->data().size(), 1);
// 转发给StreamManager
if (auto mgr = manager_.lock()) {
mgr->onPublisherData(streamName_, packet);
}
}// 检测AVC Sequence Header
bool isAvcSequenceHeader(const MediaPacket& packet) {
return packet.type() == MediaPacketType::kVideo &&
packet.videoCodec() == VideoCodec::kAVC &&
packet.data().size() > 1 &&
static_cast<uint8_t>(packet.data()[1]) == 0;
}
// 检测AAC Sequence Header
bool isAacSequenceHeader(const MediaPacket& packet) {
return packet.type() == MediaPacketType::kAudio &&
packet.audioFormat() == AudioFormat::kAAC &&
packet.data().size() > 1 &&
static_cast<uint8_t>(packet.data()[1]) == 0;
}bool MediaPacket::isKeyFrame() const {
if (type_ != MediaPacketType::kVideo || data_.empty()) {
return false;
}
// FLV视频标签格式:Frame Type (4 bits) + Codec ID (4 bits)
uint8_t firstByte = static_cast<uint8_t>(data_[0]);
uint8_t frameType = (firstByte >> 4) & 0x0F;
return frameType == 1; // 1 = 关键帧
}
void RtmpConnection::handlePlay(const RtmpCommand& command) {
std::string streamName = command.getStringArg(0);
// 设置连接类型
connectionType_ = RtmpConnectionType::kSubscriber;
streamName_ = streamName;
// 创建Subscriber对象
auto mgr = streamManager_.lock();
auto conn = tcpConnection_.lock();
subscriber_ = mgr->createSubscriber(streamName, conn);
// 发送RTMP标准播放响应序列
uint32_t streamId = 1;
// 1. StreamBegin用户控制消息
sendStreamBegin(streamId);
// 2. NetStream.Play.Reset
sendPlayStatus("NetStream.Play.Reset", "Playing and resetting stream.");
// 3. NetStream.Play.Start
sendPlayStatus("NetStream.Play.Start", "Started playing stream.");
// 4. RtmpSampleAccess
sendRtmpSampleAccess();
// 5. NetStream.Data.Start
sendDataStart();
// 启动播放
subscriber_->startPlay();
parser_.setState(RtmpConnectionState::kPlaying);
}void Subscriber::startPlay() {
if (!isPlaying_) {
isPlaying_ = true;
startTime_ = Timestamp::now();
// 重置时间戳映射器
timeStampMapper_.reset();
// 发送缓存数据
if (auto mgr = manager_.lock()) {
auto stream = mgr->getStream(streamName_);
if (stream) {
// 1. 发送序列头(必须)
auto sequenceHeaders = stream->getSequenceHeaders();
if (!sequenceHeaders.empty()) {
sendCachedData(sequenceHeaders);
}
// 2. 发送GOP缓存(可选,快速首屏)
auto gopPackets = stream->gopCache()->getLatestKeyFramePackets();
if (!gopPackets.empty()) {
sendCachedData(gopPackets);
}
}
}
}
}void Stream::broadcastToSubscribers(std::shared_ptr<MediaPacket> packet) {
std::vector<std::shared_ptr<Subscriber>> subscribersCopy;
// 复制订阅者列表(避免在发送时修改)
{
MutexLockGuard lock(mutex_);
subscribersCopy.reserve(subscribers_.size());
for (const auto& pair : subscribers_) {
if (pair.second && pair.second->isConnected()) {
subscribersCopy.push_back(pair.second);
}
}
}
// 发送数据
for (auto& subscriber : subscribersCopy) {
subscriber->sendMediaData(packet);
}
}// Stream类独立管理序列头
class Stream {
private:
std::shared_ptr<MediaPacket> avcSequenceHeader_; // H.264配置
std::shared_ptr<MediaPacket> aacSequenceHeader_; // AAC配置
std::shared_ptr<MediaPacket> metadataPacket_; // 元数据
public:
std::vector<std::shared_ptr<MediaPacket>> getSequenceHeaders() const {
std::vector<std::shared_ptr<MediaPacket>> headers;
MutexLockGuard lock(mutex_);
// 按顺序添加
if (metadataPacket_) headers.push_back(metadataPacket_);
if (avcSequenceHeader_) headers.push_back(avcSequenceHeader_);
if (aacSequenceHeader_) headers.push_back(aacSequenceHeader_);
return headers;
}
};// StreamBegin用户控制消息
void RtmpConnection::sendStreamBegin(uint32_t streamId) {
Buffer buffer;
buffer.appendInt16(0x0000); // 事件类型:StreamBegin
buffer.appendInt32(streamId);
RtmpMessageHeader header;
header.messageTypeId = static_cast<uint8_t>(RtmpMessageType::kUserControl);
header.messageLength = static_cast<uint32_t>(buffer.readableBytes());
RtmpMessage message(header, std::string(buffer.peek(), buffer.readableBytes()));
sendMessage(message);
}
// onStatus消息
void RtmpConnection::sendPlayStatus(const std::string& code, const std::string& description) {
Buffer buffer;
// 命令名:onStatus
AmfCodec::encodeAmfValue(&buffer, AmfValue("onStatus"));
// 事务ID:0
AmfCodec::encodeAmfValue(&buffer, AmfValue(0.0));
// 命令对象:null
AmfCodec::encodeAmfValue(&buffer, AmfValue());
// 信息对象
std::unordered_map<std::string, AmfValue> info;
info["level"] = AmfValue("status");
info["code"] = AmfValue(code);
info["description"] = AmfValue(description);
AmfCodec::encodeAmfObject(&buffer, info);
RtmpMessageHeader header;
header.messageTypeId = static_cast<uint8_t>(RtmpMessageType::kCommandAMF0);
header.messageLength = static_cast<uint32_t>(buffer.readableBytes());
header.messageStreamId = 1;
RtmpMessage message(header, std::string(buffer.peek(), buffer.readableBytes()));
sendMessage(message);
}

class StreamManager {
private:
std::unordered_map<std::string, std::shared_ptr<Stream>> streams_;
public:
// 创建或获取流
std::shared_ptr<Stream> getOrCreateStream(const std::string& streamName) {
MutexLockGuard lock(mutex_);
auto it = streams_.find(streamName);
if (it != streams_.end()) {
return it->second;
}
// 创建新流
auto stream = std::make_shared<Stream>(streamName);
stream->initialize();
streams_[streamName] = stream;
return stream;
}
// 数据转发
void onPublisherData(const std::string& streamName,
std::shared_ptr<MediaPacket> packet) {
auto stream = getStream(streamName);
if (stream) {
stream->onMediaData(packet); // 广播给所有订阅者
}
}
};class Stream {
private:
std::unordered_map<std::string, std::shared_ptr<Subscriber>> subscribers_;
public:
void addSubscriber(std::shared_ptr<Subscriber> subscriber) {
MutexLockGuard lock(mutex_);
subscribers_[subscriber->id()] = subscriber;
LOG_INFO << "Added subscriber: " << subscriber->id()
<< " to stream: " << name_
<< ", total subscribers: " << subscribers_.size();
}
void removeSubscriber(const std::string& subscriberId) {
MutexLockGuard lock(mutex_);
auto it = subscribers_.find(subscriberId);
if (it != subscribers_.end()) {
subscribers_.erase(it);
LOG_INFO << "Removed subscriber: " << subscriberId
<< " from stream: " << name_
<< ", remaining subscribers: " << subscribers_.size();
}
}
};void Stream::broadcastToSubscribers(std::shared_ptr<MediaPacket> packet) {
// 使用副本避免长时间持锁
std::vector<std::shared_ptr<Subscriber>> subscribersCopy;
{
MutexLockGuard lock(mutex_);
subscribersCopy.reserve(subscribers_.size());
for (const auto& pair : subscribers_) {
if (pair.second && pair.second->isConnected()) {
subscribersCopy.push_back(pair.second);
}
}
}
// 并发发送数据
for (auto& subscriber : subscribersCopy) {
subscriber->sendMediaData(packet);
}
}
class GopCache {
private:
std::shared_ptr<Gop> currentGop_; // 当前GOP
size_t maxCacheSize_; // 最大缓存大小
size_t currentCacheSize_; // 当前缓存大小
// 音视频对齐缓冲区
std::deque<std::shared_ptr<MediaPacket>> audioBuffer_;
uint32_t lastKeyFrameTimestamp_ = 0;
static const uint32_t AUDIO_ALIGN_WINDOW = 200; // 200ms对齐窗口
public:
void addPacket(std::shared_ptr<MediaPacket> packet);
std::vector<std::shared_ptr<MediaPacket>> getCachedPackets() const;
std::vector<std::shared_ptr<MediaPacket>> getLatestKeyFramePackets() const;
};bool GopCache::needNewGop(std::shared_ptr<MediaPacket> packet) const {
// 没有当前GOP
if (!currentGop_) {
return true;
}
// 检测到新的关键帧
if (packet->type() == MediaPacketType::kVideo && packet->isKeyFrame()) {
return true;
}
return false;
}
void GopCache::addPacket(std::shared_ptr<MediaPacket> packet) {
MutexLockGuard lock(mutex_);
// 音频帧:维护音频缓冲区
if (packet->type() == MediaPacketType::kAudio) {
maintainAudioBuffer(packet);
}
// 检查是否需要新GOP
bool isNewGop = needNewGop(packet);
if (isNewGop) {
startNewGop();
// 关键帧:进行音视频对齐
if (packet->type() == MediaPacketType::kVideo && packet->isKeyFrame()) {
lastKeyFrameTimestamp_ = packet->timestamp();
alignAudioWithKeyFrame(lastKeyFrameTimestamp_);
}
}
// 添加到当前GOP
if (!currentGop_) {
startNewGop();
}
currentGop_->addPacket(packet);
updateCacheSize();
}void GopCache::alignAudioWithKeyFrame(uint32_t keyFrameTimestamp) {
if (audioBuffer_.empty() || !currentGop_) {
return;
}
std::vector<std::shared_ptr<MediaPacket>> alignedAudio;
// 查找关键帧前后的音频帧
for (const auto& audioPacket : audioBuffer_) {
uint32_t audioTimestamp = audioPacket->timestamp();
int32_t timeDiff = static_cast<int32_t>(audioTimestamp) -
static_cast<int32_t>(keyFrameTimestamp);
// 收集关键帧前200ms到后50ms的音频帧
if (timeDiff >= -static_cast<int32_t>(AUDIO_ALIGN_WINDOW) &&
timeDiff <= 50) {
alignedAudio.push_back(audioPacket);
}
}
// 按时间戳排序
std::sort(alignedAudio.begin(), alignedAudio.end(),
[](const std::shared_ptr<MediaPacket>& a,
const std::shared_ptr<MediaPacket>& b) {
return a->timestamp() < b->timestamp();
});
// 添加到GOP开头
for (const auto& audioPacket : alignedAudio) {
currentGop_->addPacket(audioPacket);
}
LOG_INFO << "Aligned " << alignedAudio.size()
<< " audio frames with key frame at " << keyFrameTimestamp;
}std::vector<std::shared_ptr<MediaPacket>>
GopCache::getLatestKeyFramePackets() const {
MutexLockGuard lock(mutex_);
std::vector<std::shared_ptr<MediaPacket>> packets;
if (currentGop_ && !currentGop_->empty()) {
const auto& gopPackets = currentGop_->packets();
// 找到关键帧位置
size_t keyFrameIndex = 0;
bool foundKeyFrame = false;
for (size_t i = 0; i < gopPackets.size(); ++i) {
if (gopPackets[i]->type() == MediaPacketType::kVideo &&
gopPackets[i]->isKeyFrame()) {
keyFrameIndex = i;
foundKeyFrame = true;
break;
}
}
if (foundKeyFrame) {
// 从关键帧开始,最多取10个包(约1秒内容)
size_t endIndex = std::min(keyFrameIndex + 10, gopPackets.size());
for (size_t i = keyFrameIndex; i < endIndex; ++i) {
packets.push_back(gopPackets[i]);
}
}
}
return packets;
}struct GopInfo {
uint32_t audioDuration = 0; // 音频时长
uint32_t videoDuration = 0; // 视频时长
size_t audioPackets = 0; // 音频包数
size_t videoPackets = 0; // 视频包数
size_t totalSize = 0; // 总大小
uint32_t firstTimestamp = 0; // 第一个时间戳
uint32_t lastTimestamp = 0; // 最后一个时间戳
bool hasKeyFrame = false; // 是否有关键帧
uint32_t audioSampleRate = 44100; // 音频采样率
double audioFrameDuration = 0.0; // 音频帧时长
};
GopInfo Gop::getGopInfo() const {
GopInfo info;
if (packets_.empty()) {
return info;
}
// 统计音视频帧数和时长
uint32_t minTimestamp = packets_[0]->timestamp();
uint32_t maxTimestamp = packets_[0]->timestamp();
for (const auto& packet : packets_) {
uint32_t timestamp = packet->timestamp();
minTimestamp = std::min(minTimestamp, timestamp);
maxTimestamp = std::max(maxTimestamp, timestamp);
info.totalSize += packet->data().size();
if (packet->type() == MediaPacketType::kAudio) {
info.audioPackets++;
} else if (packet->type() == MediaPacketType::kVideo) {
info.videoPackets++;
if (packet->isKeyFrame()) {
info.hasKeyFrame = true;
}
}
}
info.firstTimestamp = minTimestamp;
info.lastTimestamp = maxTimestamp;
// 计算音频帧时长(基于44.1kHz采样率,1024采样点/帧)
info.audioFrameDuration = 1024.0 / 44100.0 * 1000.0; // 23.22ms
info.audioDuration = static_cast<uint32_t>(info.audioPackets * info.audioFrameDuration);
return info;
}
struct TimestampMapper {
// 统一基准时间戳
uint32_t baseTimestamp = 0;
bool initialized = false;
// 音频同步
uint32_t audioBaseTimestamp = 0;
uint32_t audioFrameNum = 0;
uint32_t audioFrameInterval = 0; // 动态计算
bool audioInitialized = false;
// 视频同步
uint32_t videoBaseTimestamp = 0;
uint32_t lastVideoTimestamp = 0;
bool videoInitialized = false;
// 同步阈值
static const uint32_t SYNC_THRESHOLD = 300; // 300ms
uint32_t mapTimestamp(uint32_t originalTimestamp, MediaPacketType type);
void reset();
};uint32_t TimestampMapper::mapAudioTimestamp(uint32_t originalTimestamp) {
// 初始化统一基准
initializeBase(originalTimestamp);
if (originalTimestamp < baseTimestamp) {
return lastAudioTimestamp; // 防止时间戳倒退
}
uint32_t mappedTimestamp = originalTimestamp - baseTimestamp;
// 动态计算音频帧间隔
calculateAudioFrameInterval(originalTimestamp);
// 初始化音频基准
if (!audioInitialized) {
audioBaseTimestamp = mappedTimestamp;
audioFrameNum = 0;
audioInitialized = true;
lastAudioTimestamp = mappedTimestamp;
return mappedTimestamp;
}
// 如果还没有稳定的帧间隔,直接返回
if (!audioFrameIntervalCalculated) {
lastAudioTimestamp = mappedTimestamp;
return mappedTimestamp;
}
// 计算期望的音频时间戳
uint32_t expectedTimestamp = audioBaseTimestamp + audioFrameNum * audioFrameInterval;
// 检查时间戳差异
int32_t timeDiff = static_cast<int32_t>(mappedTimestamp) -
static_cast<int32_t>(expectedTimestamp);
// 如果在同步阈值内,使用期望时间戳
if (abs(timeDiff) <= SYNC_THRESHOLD) {
lastAudioTimestamp = expectedTimestamp;
audioFrameNum++;
return expectedTimestamp;
} else {
// 超出阈值,重新校准
audioBaseTimestamp = mappedTimestamp;
audioFrameNum = 0;
lastAudioTimestamp = mappedTimestamp;
return mappedTimestamp;
}
}uint32_t TimestampMapper::mapVideoTimestamp(uint32_t originalTimestamp) {
// 初始化统一基准
initializeBase(originalTimestamp);
if (originalTimestamp < baseTimestamp) {
return lastVideoTimestamp; // 防止时间戳倒退
}
uint32_t mappedTimestamp = originalTimestamp - baseTimestamp;
// 初始化视频基准
if (!videoInitialized) {
videoBaseTimestamp = mappedTimestamp;
videoInitialized = true;
lastVideoTimestamp = mappedTimestamp;
return mappedTimestamp;
}
// 检查时间戳单调性
if (mappedTimestamp < lastVideoTimestamp) {
// 时间戳倒退,使用上一个时间戳
return lastVideoTimestamp;
}
// 检查时间戳跳跃
uint32_t timeDiff = mappedTimestamp - lastVideoTimestamp;
if (timeDiff > 2000) { // 超过2秒的跳跃
LOG_WARN << "Video timestamp jump detected: " << timeDiff << "ms";
// 可以选择重新校准或者限制跳跃
mappedTimestamp = lastVideoTimestamp + 33; // 假设30fps
}
lastVideoTimestamp = mappedTimestamp;
return mappedTimestamp;
}void TimestampMapper::calculateAudioFrameInterval(uint32_t currentTimestamp) {
if (!audioFrameIntervalCalculated) {
if (lastRawAudioTimestamp != 0) {
uint32_t interval = currentTimestamp - lastRawAudioTimestamp;
// 过滤异常值(正常音频帧间隔应该在10-50ms之间)
if (interval >= 10 && interval <= 50) {
audioFrameIntervalSum += interval;
audioFrameIntervalCount++;
// 收集足够样本后计算平均值
if (audioFrameIntervalCount >= FRAME_INTERVAL_SAMPLES) {
audioFrameInterval = audioFrameIntervalSum / audioFrameIntervalCount;
audioFrameIntervalCalculated = true;
LOG_INFO << "Audio frame interval calculated: " << audioFrameInterval << "ms";
}
}
}
lastRawAudioTimestamp = currentTimestamp;
}
}bool TimestampMapper::checkSync() const {
if (!initialized || !audioInitialized || !videoInitialized) {
return true; // 未初始化完成,暂不检查
}
// 计算音视频时间戳差异
int32_t avDiff = static_cast<int32_t>(lastAudioTimestamp) -
static_cast<int32_t>(lastVideoTimestamp);
bool inSync = abs(avDiff) <= SYNC_THRESHOLD;
if (!inSync) {
LOG_WARN << "Audio-Video sync issue detected: " << avDiff << "ms";
}
return inSync;
}class RtmpProtocolParser {
public:
// 生成C0+C1握手数据
std::string generateHandshakeC0C1() {
std::string c0c1;
c0c1.resize(1 + kRtmpHandshakeSize);
// C0: 版本号
c0c1[0] = 0x03;
// C1: 握手数据
char* c1 = &c0c1[1];
// 时间戳 (4字节)
uint32_t timestamp = static_cast<uint32_t>(time(nullptr));
c1[0] = (timestamp >> 24) & 0xFF;
c1[1] = (timestamp >> 16) & 0xFF;
c1[2] = (timestamp >> 8) & 0xFF;
c1[3] = timestamp & 0xFF;
// 版本 (4字节)
c1[4] = c1[5] = c1[6] = c1[7] = 0x00;
// 随机数据 (1528字节)
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(0, 255);
for (int i = 8; i < kRtmpHandshakeSize; ++i) {
c1[i] = static_cast<char>(dis(gen));
}
return c0c1;
}
// 生成C2握手数据
std::string generateHandshakeC2(const std::string& s1) {
std::string c2;
c2.resize(kRtmpHandshakeSize);
// C2是S1的回显
std::memcpy(&c2[0], s1.data(), kRtmpHandshakeSize);
return c2;
}
};bool RtmpProtocolParser::parseChunkHeader(Buffer* buffer, RtmpChunkHeader* header) {
if (buffer->readableBytes() < 1) {
return false;
}
// 解析Basic Header
uint8_t basicHeader = static_cast<uint8_t>(buffer->peekInt8());
header->format = static_cast<RtmpChunkFormat>((basicHeader >> 6) & 0x03);
uint32_t chunkStreamId = basicHeader & 0x3F;
// 处理扩展Chunk Stream ID
size_t basicHeaderSize = 1;
if (chunkStreamId == 0) {
if (buffer->readableBytes() < 2) return false;
chunkStreamId = static_cast<uint8_t>(buffer->peek()[1]) + 64;
basicHeaderSize = 2;
} else if (chunkStreamId == 1) {
if (buffer->readableBytes() < 3) return false;
chunkStreamId = static_cast<uint8_t>(buffer->peek()[1]) * 256 +
static_cast<uint8_t>(buffer->peek()[2]) + 64;
basicHeaderSize = 3;
}
header->chunkStreamId = chunkStreamId;
// 计算Message Header大小
size_t messageHeaderSize = 0;
switch (header->format) {
case RtmpChunkFormat::kType0: messageHeaderSize = 11; break;
case RtmpChunkFormat::kType1: messageHeaderSize = 7; break;
case RtmpChunkFormat::kType2: messageHeaderSize = 3; break;
case RtmpChunkFormat::kType3: messageHeaderSize = 0; break;
}
// 检查数据是否足够
if (buffer->readableBytes() < basicHeaderSize + messageHeaderSize) {
return false;
}
// 跳过Basic Header
buffer->retrieve(basicHeaderSize);
// 解析Message Header
if (messageHeaderSize > 0) {
parseMessageHeader(buffer, header, messageHeaderSize);
}
return true;
}// AMF Number编码
void AmfCodec::encodeNumber(Buffer* buffer, double number) {
char data[8];
uint64_t bits;
std::memcpy(&bits, &number, 8);
// 大端序写入
for (int i = 0; i < 8; ++i) {
data[i] = static_cast<char>((bits >> (56 - i * 8)) & 0xFF);
}
buffer->append(data, 8);
}
// AMF String编码
void AmfCodec::encodeString(Buffer* buffer, const std::string& str) {
// 字符串长度(2字节大端序)
buffer->appendInt16(static_cast<int16_t>(str.size()));
// 字符串内容
buffer->append(str);
}
// AMF Object编码
void AmfCodec::encodeAmfObject(Buffer* buffer,
const std::unordered_map<std::string, AmfValue>& object) {
// 对象类型标识
buffer->appendInt8(static_cast<int8_t>(AmfType::kObject));
// 编码键值对
for (const auto& pair : object) {
// 键(不包含类型标识)
buffer->appendInt16(static_cast<int16_t>(pair.first.size()));
buffer->append(pair.first);
// 值(包含类型标识)
encodeAmfValue(buffer, pair.second);
}
// 对象结束标记
buffer->appendInt16(0); // 空键
buffer->appendInt8(static_cast<int8_t>(AmfType::kObjectEnd));
}void RtmpProtocolParser::serializeMessage(const RtmpMessage& message, Buffer* buffer) {
const auto& header = message.header();
const auto& payload = message.payload();
// 选择Chunk Stream ID
uint32_t chunkStreamId = selectChunkStreamId(header.messageTypeId);
// 计算需要的Chunk数量
uint32_t chunkCount = (payload.size() + chunkSize_ - 1) / chunkSize_;
for (uint32_t i = 0; i < chunkCount; ++i) {
// 计算当前Chunk的payload大小
uint32_t chunkPayloadSize = std::min(chunkSize_,
static_cast<uint32_t>(payload.size() - i * chunkSize_));
// 写入Chunk Header
if (i == 0) {
// 第一个Chunk使用Type 0
writeChunkHeader(buffer, RtmpChunkFormat::kType0, chunkStreamId, header);
} else {
// 后续Chunk使用Type 3
writeChunkHeader(buffer, RtmpChunkFormat::kType3, chunkStreamId, header);
}
// 写入Chunk Payload
buffer->append(&payload[i * chunkSize_], chunkPayloadSize);
}
}class MediaPacket {
public:
// 检测视频帧类型
VideoFrameType videoFrameType() const {
if (type_ != MediaPacketType::kVideo || data_.empty()) {
return VideoFrameType::kInterFrame;
}
// FLV视频标签:Frame Type (4 bits) + Codec ID (4 bits)
uint8_t firstByte = static_cast<uint8_t>(data_[0]);
uint8_t frameType = (firstByte >> 4) & 0x0F;
return static_cast<VideoFrameType>(frameType);
}
// 检测视频编码格式
VideoCodec videoCodec() const {
if (type_ != MediaPacketType::kVideo || data_.empty()) {
return VideoCodec::kAVC;
}
uint8_t firstByte = static_cast<uint8_t>(data_[0]);
uint8_t codecId = firstByte & 0x0F;
return static_cast<VideoCodec>(codecId);
}
// 检测音频格式
AudioFormat audioFormat() const {
if (type_ != MediaPacketType::kAudio || data_.empty()) {
return AudioFormat::kAAC;
}
uint8_t firstByte = static_cast<uint8_t>(data_[0]);
uint8_t format = (firstByte >> 4) & 0x0F;
return static_cast<AudioFormat>(format);
}
// 检测AAC序列头
bool isAacSequenceHeader() const {
if (type_ != MediaPacketType::kAudio || data_.size() < 2) {
return false;
}
// AAC序列头:音频格式=AAC && AAC包类型=0
uint8_t firstByte = static_cast<uint8_t>(data_[0]);
uint8_t secondByte = static_cast<uint8_t>(data_[1]);
uint8_t format = (firstByte >> 4) & 0x0F;
uint8_t aacPacketType = secondByte;
return (format == static_cast<uint8_t>(AudioFormat::kAAC)) &&
(aacPacketType == 0);
}
};// 设置日志级别
Logger::setLogLevel(Logger::LogLevel::DEBUG);
// 关键日志输出
LOG_INFO << "RTMP Server started on " << config_.listenAddress << ":" << config_.listenPort;
LOG_DEBUG << "Received media packet: " << packet->data().size() << " bytes";
LOG_WARN << "Connection timeout: " << connectionId;
LOG_ERROR << "Failed to parse RTMP message";struct ServerStatistics {
std::atomic<uint64_t> totalConnections{0};
std::atomic<uint64_t> currentConnections{0};
std::atomic<uint64_t> totalStreams{0};
std::atomic<uint64_t> currentStreams{0};
std::atomic<uint64_t> totalBytesReceived{0};
std::atomic<uint64_t> totalBytesSent{0};
std::atomic<uint64_t> totalPacketsReceived{0};
std::atomic<uint64_t> totalPacketsSent{0};
};
// 定期输出统计信息
void printStatistics() {
auto stats = server->getStatistics();
LOG_INFO << "=== RTMP Server Statistics ===";
LOG_INFO << "Connections: " << stats.currentConnections.load()
<< "/" << stats.totalConnections.load();
LOG_INFO << "Streams: " << stats.currentStreams.load()
<< "/" << stats.totalStreams.load();
LOG_INFO << "Bytes: RX=" << stats.totalBytesReceived.load()
<< " TX=" << stats.totalBytesSent.load();
LOG_INFO << "Packets: RX=" << stats.totalPacketsReceived.load()
<< " TX=" << stats.totalPacketsSent.load();
}本RTMP推拉流项目实现了一个完整的流媒体服务器,主要特点包括:
通过深入理解RTMP协议和流媒体技术,可以进一步扩展功能,如:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。