文章目录
- MediaSoup
- RTP处理
MediaSoup
2019年3月写的文章了,后续也没有再跟进这个项目有没有新变化。
MediaSoup是一个开源的SFU库,分为客户端和服务端。服务端分为JS层和C++层,C++层用于处理媒体和SDP等数据。我个人主要关注媒体相关的处理,也就是RTP和RTCP相关的处理。我们的项目不会用到这个项目,看它的代码主要是解决我的两个疑问:
- 多人会议,它是如何能保证每一个接收端都能流畅?
- 它是如何处理各个发送端以及接收端的RTCP包?
第一个问题,它不做任何处理,接收端仅仅是做了接收端拥塞控制该做的那部分接收端带宽估计的工作而已。它不会根据其他端的接收情况(下行带宽)调节发送端的带宽。
第二个问题,服务端类似一个客户端接收端和一个发送端,接收端需要做好的工作就是保证自己收到完整的包(NACK/FIR等)、给到发送端接收端的统计信息(RR)。发送端就是接收每一个接收端的RTCP请求并做相应的处理,但是它仅仅支持丢包处理工作,不会处理码率相关的工作,理论上来说它应该要根据每一个接收端的接收能力去得出一个合理的码率,并把这个码率告诉真实的发送端。
RTP处理
WebRtcTransport::OnRtpDataRecv
- 数据从JS层传到C++层的OnRtpDataRecv函数
- 先对srtp进行解密,得到rtp包
- 如果存在一个镜像(mirror)直接把解压后的rtp数据转发一份给它
- 解析rtp包,得到RtpPacket(rtp的头部信息包括扩展部分)
- 判断是否存在绝对发送时间的rtp扩展,如果存在则把这份数据直接交付给接收端带宽估计。MediaSoup仅仅支持一种远端带宽估计(RemoteBitrateEstimatorAbsSendTime),也就是依赖rtp扩展的绝对发送时间这种。
- 找到关联的生产者(Producer),并把数据交付给它
- 删除数据,结束一次处理
// 只保留媒体相关的处理逻辑,方便阅读
inline void WebRtcTransport::OnRtpDataRecv(RTC::TransportTuple* tuple, const uint8_t* data, size_t len)
{
// Decrypt the SRTP packet.
if (!this->srtpRecvSession->DecryptSrtp(data, &len)) {
return;
}
// Mirror RTP if needed.
if (this->mirrorTuple != nullptr && this->mirroringOptions.recvRtp)
this->mirrorTuple->Send(data, len);
RTC::RtpPacket* packet = RTC::RtpPacket::Parse(data, len);
if (packet == nullptr) {
return;
}
// Apply the Transport RTP header extension ids so the RTP listener can use them.
if (this->headerExtensionIds.absSendTime != 0u) {
packet->AddExtensionMapping(RtpHeaderExtensionUri::Type::ABS_SEND_TIME, this->headerExtensionIds.absSendTime);
}
if (this->headerExtensionIds.mid != 0u) {
packet->AddExtensionMapping(RtpHeaderExtensionUri::Type::MID, this->headerExtensionIds.mid);
}
if (this->headerExtensionIds.rid != 0u) {
packet->AddExtensionMapping(RtpHeaderExtensionUri::Type::RTP_STREAM_ID, this->headerExtensionIds.rid);
}
// Feed the remote bitrate estimator (REMB).
uint32_t absSendTime;
if (packet->ReadAbsSendTime(&absSendTime)) {
this->remoteBitrateEstimator->IncomingPacket(DepLibUV::GetTime(), packet->GetPayloadLength(), *packet, absSendTime);
}
// Get the associated Producer.
RTC::Producer* producer = this->rtpListener.GetProducer(packet);
if (producer == nullptr) {
delete packet;
return;
}
// Pass the RTP packet to the corresponding Producer.
producer->ReceiveRtpPacket(packet);
delete packet;
}
Producer::ReceiveRtpPacket
- 判断是否需要创建一个新的RtpStreamRecv,如果不存在它会根据RtpParameters信息创建一个,RtpParameters是发送端在发送媒体数据之前向MediaSoup发送的能力集合信息,包括了ssrc,编解码器名字,编码器负载类型,是否支持nack,是否支持fec以及fec的ssrc,是否支持rtx以及rtx的ssrc,最大帧率,分辨率缩放比例,rtp扩展等。
- 判断收到的是rtp包还是rtx包并交付给RtpStreamRecv对应的处理函数
- RtpStreamRecv接收到RTX包,会先解出RTP包,计算jitter和判断Nack
- RtpStreamRecv接收到RTP包,计算jitter和判断Nack
- RtpStreamRecv还会对包进行nal解析,判断是否是关键帧
- RtpStreamRecv还会计算码率,判断当前的方式端的状态
- 根据发送端协商的rtp扩展,重新填充扩展
- 交付给OnProducerRtpPacket,从代码上看之后Router注册了,所以只有实际上仅仅是调用了Router::OnProducerRtpPacket
- OnProducerRtpPacket调用注册到此生产者的所有消费者(SendRtpPacket)
void Producer::ReceiveRtpPacket(RTC::RtpPacket* packet)
{
// May need to create a new RtpStreamRecv.
MayNeedNewStream(packet);
// Find the corresponding RtpStreamRecv.
uint32_t ssrc = packet->GetSsrc();
RTC::RtpStreamRecv* rtpStream{ nullptr };
RTC::RtpEncodingParameters::Profile profile;
std::unique_ptr<RTC::RtpPacket> clonedPacket;
// Media RTP stream found.
if (this->mapSsrcRtpStreamInfo.find(ssrc) != this->mapSsrcRtpStreamInfo.end()) {
rtpStream = this->mapSsrcRtpStreamInfo[ssrc].rtpStream;
auto& info = this->mapSsrcRtpStreamInfo[ssrc];
rtpStream = info.rtpStream;
profile = info.profile;
clonedPacket.reset(packet->Clone(ClonedPacketBuffer));
packet = clonedPacket.get();
// Process the packet.
if (!rtpStream->ReceivePacket(packet))
return;
} else {
for (auto& kv : this->mapSsrcRtpStreamInfo) {
auto& info = kv.second;
if (info.rtxSsrc != 0u && info.rtxSsrc == ssrc) {
rtpStream = info.rtpStream;
profile = info.profile;
clonedPacket.reset(packet->Clone(ClonedPacketBuffer));
packet = clonedPacket.get();
// Process the packet.
if (!rtpStream->ReceiveRtxPacket(packet))
return;
// Packet repaired after applying RTX.
rtpStream->packetsRepaired++;
break;
}
}
}
ApplyRtpMapping(packet);
for (auto& listener : this->listeners) {
listener->OnProducerRtpPacket(this, packet, profile);
}
}
Consumer::SendRtpPacket
- 重写rtp的ssrc,序号和时间戳,并存储此包(看了代码还是不明白为什么要这样做)
- 发送此包(最后通过Transport模块发送,实际上就是通过WebRTCTransport模块发送的,一个生产者有一个WebRTCTransport实例)
- 恢复原始的ssrc,序号和时间戳
void Consumer::SendRtpPacket(RTC::RtpPacket* packet, RTC::RtpEncodingParameters::Profile profile)
{
// Map the payload type.
auto payloadType = packet->GetPayloadType();
// NOTE: This may happen if this Consumer supports just some codecs of those in the corresponding Producer.
if (this->supportedCodecPayloadTypes.find(payloadType) == this->supportedCodecPayloadTypes.end())
return;
// Check whether this is the key frame we are waiting for in order to update the effective profile.
if (this->effectiveProfile != this->targetProfile && profile == this->targetProfile) {
bool isKeyFrame = false;
bool canBeKeyFrame = Codecs::CanBeKeyFrame(this->rtpStream->GetMimeType());
if (canBeKeyFrame && packet->IsKeyFrame()) {
isKeyFrame = true;
if (isKeyFrame || !canBeKeyFrame) {
SetEffectiveProfile(this->targetProfile);
// Resynchronize the stream.
this->syncRequired = true;
// Clear RTP retransmission buffer to avoid congesting the receiver by
// sending useless retransmissions (now that we are sending a newer key frame).
this->rtpStream->ClearRetransmissionBuffer();
// Stop probation if probing profile is the new effective profile.
if (IsProbing() && this->probingProfile == this->effectiveProfile)
StopProbation();
}
}
bool isSyncPacket = false;
if (this->syncRequired) {
isSyncPacket = true;
this->rtpSeqManager.Sync(packet->GetSequenceNumber());
this->rtpTimestampManager.Sync(packet->GetTimestamp());
// Calculate RTP timestamp diff between now and last sent RTP packet.
if (this->rtpStream->GetMaxPacketMs() != 0u) {
auto now = DepLibUV::GetTime();
auto diffMs = now - this->rtpStream->GetMaxPacketMs();
auto diffTs = diffMs * this->rtpStream->GetClockRate() / 1000;
this->rtpTimestampManager.Offset(diffTs);
}
this->syncRequired = false;
if (this->encodingContext)
this->encodingContext->SyncRequired();
}
// Rewrite payload if needed. Drop packet if necessary.
if (this->encodingContext && !packet->EncodePayload(this->encodingContext.get())) {
this->rtpSeqManager.Drop(packet->GetSequenceNumber());
this->rtpTimestampManager.Drop(packet->GetTimestamp());
return;
}
// Update RTP seq number and timestamp.
uint16_t rtpSeq;
uint32_t rtpTimestamp;
this->rtpSeqManager.Input(packet->GetSequenceNumber(), rtpSeq);
this->rtpTimestampManager.Input(packet->GetTimestamp(), rtpTimestamp);
auto origSsrc = packet->GetSsrc();
auto origSeq = packet->GetSequenceNumber();
auto origTimestamp = packet->GetTimestamp();
// Rewrite packet
packet->SetSsrc(this->rtpParameters.encodings[0].ssrc);
packet->SetSequenceNumber(rtpSeq);
packet->SetTimestamp(rtpTimestamp);
// Process the packet.
if (this->rtpStream->ReceivePacket(packet)) {
// Send the packet.
this->transport->SendRtpPacket(packet);
// Retransmit the RTP packet if probing.
if (IsProbing())
SendProbation(packet);
}
// Restore
packet->SetSsrc(origSsrc);
packet->SetSequenceNumber(origSeq);
packet->SetTimestamp(origTimestamp);
// Restore the original payload if needed.
if (this->encodingContext)
packet->RestorePayload();
// Run probation if needed.
if (this->kind == RTC::Media::Kind::VIDEO && --this->rtpPacketsBeforeProbation == 0) {
this->rtpPacketsBeforeProbation = RtpPacketsBeforeProbation;
MayRunProbation();
}
}