Skip to content

Commit

Permalink
Support TrackLocal RTX
Browse files Browse the repository at this point in the history
  • Loading branch information
aggresss committed Feb 22, 2024
1 parent dbf2254 commit 9476f01
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 8 deletions.
26 changes: 26 additions & 0 deletions rtpcodec.go
Expand Up @@ -4,6 +4,8 @@
package webrtc

import (
"fmt"
"regexp"
"strings"

"github.com/pion/webrtc/v4/internal/fmtp"
Expand Down Expand Up @@ -123,3 +125,27 @@ func codecParametersFuzzySearch(needle RTPCodecParameters, haystack []RTPCodecPa

return RTPCodecParameters{}, codecMatchNone
}

// Do a fuzzy find for a associated codec in the list of codecs
// Used for lookup up a associated codec in an existing list to find a match
// Returns codecMatchExact, codecMatchPartial, or codecMatchNone
func codecParametersAssociatedSearch(needle RTPCodecParameters, haystack []RTPCodecParameters) (RTPCodecParameters, codecMatchType) {

Check failure on line 132 in rtpcodec.go

View workflow job for this annotation

GitHub Actions / lint / Go

unnecessary leading newline (whitespace)

Check failure on line 133 in rtpcodec.go

View workflow job for this annotation

GitHub Actions / lint / Go

File is not `gofumpt`-ed (gofumpt)
// First attempt to match Exact
for _, c := range haystack {
if c.SDPFmtpLine == fmt.Sprintf("apt=%d", needle.PayloadType) {
return c, codecMatchExact
}
}

// Fallback to just has apt codec
if re, err := regexp.Compile(`^apt=\d+$`); err == nil {

Check failure on line 142 in rtpcodec.go

View workflow job for this annotation

GitHub Actions / lint / Go

regexpMust: for const patterns like `^apt=\d+$`, use regexp.MustCompile (gocritic)
for _, c := range haystack {
if re.MatchString(c.SDPFmtpLine) {
return c, codecMatchPartial
}

Check warning on line 146 in rtpcodec.go

View check run for this annotation

Codecov / codecov/patch

rtpcodec.go#L145-L146

Added lines #L145 - L146 were not covered by tests
}
}

return RTPCodecParameters{}, codecMatchNone
}
87 changes: 81 additions & 6 deletions rtpsender.go
Expand Up @@ -20,16 +20,18 @@ import (
)

type trackEncoding struct {
track TrackLocal

srtpStream *srtpWriterFuture
track TrackLocal
context *baseTrackLocalContext

ssrc SSRC
srtpStream *srtpWriterFuture
rtcpInterceptor interceptor.RTCPReader
streamInfo interceptor.StreamInfo

context *baseTrackLocalContext

ssrc SSRC
rtxSsrc SSRC
rtxSrtpStream *srtpWriterFuture
rtxRtcpInterceptor interceptor.RTCPReader
rtxStreamInfo interceptor.StreamInfo
}

// RTPSender allows an application to control how a given Track is encoded and transmitted to a remote peer
Expand Down Expand Up @@ -125,6 +127,7 @@ func (r *RTPSender) getParameters() RTPSendParameters {
RID: rid,
SSRC: trackEncoding.ssrc,
PayloadType: r.payloadType,
RTX: RTPRtxParameters{SSRC: trackEncoding.rtxSsrc},
},
})
}
Expand Down Expand Up @@ -204,6 +207,16 @@ func (r *RTPSender) addEncoding(track TrackLocal) {
ssrc: SSRC(randutil.NewMathRandomGenerator().Uint32()),
}

if r.api.settingEngine.trackLocalRtx {
codecs := r.api.mediaEngine.getCodecsByKind(track.Kind())
for _, c := range codecs {
if _, matchType := codecParametersAssociatedSearch(c, codecs); matchType != codecMatchNone {
trackEncoding.rtxSsrc = SSRC(randutil.NewMathRandomGenerator().Uint32())
break

Check warning on line 215 in rtpsender.go

View check run for this annotation

Codecov / codecov/patch

rtpsender.go#L211-L215

Added lines #L211 - L215 were not covered by tests
}
}
}

r.trackEncodings = append(r.trackEncodings, trackEncoding)
}

Expand Down Expand Up @@ -339,6 +352,38 @@ func (r *RTPSender) Send(parameters RTPSendParameters) error {
)

writeStream.interceptor.Store(rtpInterceptor)

if rtxCodec, matchType := codecParametersAssociatedSearch(codec, r.api.mediaEngine.getCodecsByKind(r.kind)); matchType == codecMatchExact &&
parameters.Encodings[idx].RTX.SSRC != 0 {

Check failure on line 357 in rtpsender.go

View workflow job for this annotation

GitHub Actions / lint / Go

unnecessary leading newline (whitespace)

rtxSrtpStream := &srtpWriterFuture{ssrc: parameters.Encodings[idx].RTX.SSRC, rtpSender: r}

trackEncoding.rtxSrtpStream = rtxSrtpStream
trackEncoding.rtxSsrc = parameters.Encodings[idx].RTX.SSRC

trackEncoding.rtxStreamInfo = *createStreamInfo(
r.id+"_rtx",
parameters.Encodings[idx].RTX.SSRC,
rtxCodec.PayloadType,
rtxCodec.RTPCodecCapability,
parameters.HeaderExtensions,
)
trackEncoding.rtxStreamInfo.Attributes.Set("apt_ssrc", uint32(parameters.Encodings[idx].SSRC))

trackEncoding.rtxRtcpInterceptor = r.api.interceptor.BindRTCPReader(
interceptor.RTCPReaderFunc(func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) {
n, err = trackEncoding.rtxSrtpStream.Read(in)
return n, a, err
}),

Check warning on line 377 in rtpsender.go

View check run for this annotation

Codecov / codecov/patch

rtpsender.go#L358-L377

Added lines #L358 - L377 were not covered by tests
)

r.api.interceptor.BindLocalStream(
&trackEncoding.rtxStreamInfo,
interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
return rtxSrtpStream.WriteRTP(header, payload)
}),

Check warning on line 384 in rtpsender.go

View check run for this annotation

Codecov / codecov/patch

rtpsender.go#L380-L384

Added lines #L380 - L384 were not covered by tests
)
}
}

close(r.sendCalled)
Expand Down Expand Up @@ -402,6 +447,36 @@ func (r *RTPSender) ReadRTCP() ([]rtcp.Packet, interceptor.Attributes, error) {
return pkts, attributes, nil
}

// ReadRtx reads incoming RTX Stream RTCP for this RTPSender
func (r *RTPSender) ReadRtx(b []byte) (n int, a interceptor.Attributes, err error) {
if r.trackEncodings[0].rtxRtcpInterceptor == nil {
return 0, nil, io.ErrNoProgress
}

Check warning on line 454 in rtpsender.go

View check run for this annotation

Codecov / codecov/patch

rtpsender.go#L451-L454

Added lines #L451 - L454 were not covered by tests

select {
case <-r.sendCalled:
return r.trackEncodings[0].rtxRtcpInterceptor.Read(b, a)
case <-r.stopCalled:
return 0, nil, io.ErrClosedPipe

Check warning on line 460 in rtpsender.go

View check run for this annotation

Codecov / codecov/patch

rtpsender.go#L456-L460

Added lines #L456 - L460 were not covered by tests
}
}

// ReadRtxRTCP is a convenience method that wraps ReadRtx and unmarshals for you.
func (r *RTPSender) ReadRtxRTCP() ([]rtcp.Packet, interceptor.Attributes, error) {
b := make([]byte, r.api.settingEngine.getReceiveMTU())
i, attributes, err := r.ReadRtx(b)
if err != nil {
return nil, nil, err
}

Check warning on line 470 in rtpsender.go

View check run for this annotation

Codecov / codecov/patch

rtpsender.go#L465-L470

Added lines #L465 - L470 were not covered by tests

pkts, err := rtcp.Unmarshal(b[:i])
if err != nil {
return nil, nil, err
}

Check warning on line 475 in rtpsender.go

View check run for this annotation

Codecov / codecov/patch

rtpsender.go#L472-L475

Added lines #L472 - L475 were not covered by tests

return pkts, attributes, nil

Check warning on line 477 in rtpsender.go

View check run for this annotation

Codecov / codecov/patch

rtpsender.go#L477

Added line #L477 was not covered by tests
}

// ReadSimulcast reads incoming RTCP for this RTPSender for given rid
func (r *RTPSender) ReadSimulcast(b []byte, rid string) (n int, a interceptor.Attributes, err error) {
select {
Expand Down
6 changes: 6 additions & 0 deletions sdp.go
Expand Up @@ -389,7 +389,13 @@ func addSenderSDP(

sendParameters := sender.GetParameters()
for _, encoding := range sendParameters.Encodings {
if encoding.RTX.SSRC != 0 {
media = media.WithValueAttribute(sdp.AttrKeySSRCGroup, fmt.Sprintf("FID %d %d", encoding.SSRC, encoding.RTX.SSRC))
}

Check warning on line 394 in sdp.go

View check run for this annotation

Codecov / codecov/patch

sdp.go#L393-L394

Added lines #L393 - L394 were not covered by tests
media = media.WithMediaSource(uint32(encoding.SSRC), track.StreamID() /* cname */, track.StreamID() /* streamLabel */, track.ID())
if encoding.RTX.SSRC != 0 {
media = media.WithMediaSource(uint32(encoding.RTX.SSRC), track.StreamID() /* cname */, track.StreamID() /* streamLabel */, track.ID())
}

Check warning on line 398 in sdp.go

View check run for this annotation

Codecov / codecov/patch

sdp.go#L397-L398

Added lines #L397 - L398 were not covered by tests
if !isPlanB {
media = media.WithPropertyAttribute("msid:" + track.StreamID() + " " + track.ID())
}
Expand Down
6 changes: 6 additions & 0 deletions settingengine.go
Expand Up @@ -92,6 +92,7 @@ type SettingEngine struct {
srtpProtectionProfiles []dtls.SRTPProtectionProfile
receiveMTU uint
iceMaxBindingRequests *uint16
trackLocalRtx bool
}

// getReceiveMTU returns the configured MTU. If SettingEngine's MTU is configured to 0 it returns the default
Expand Down Expand Up @@ -437,3 +438,8 @@ func (e *SettingEngine) SetSCTPMaxReceiveBufferSize(maxReceiveBufferSize uint32)
func (e *SettingEngine) SetDTLSCustomerCipherSuites(customCipherSuites func() []dtls.CipherSuite) {
e.dtls.customCipherSuites = customCipherSuites
}

// SetTrackLocalRtx allows track local use RTX.
func (e *SettingEngine) SetTrackLocalRtx(enable bool) {
e.trackLocalRtx = enable

Check warning on line 444 in settingengine.go

View check run for this annotation

Codecov / codecov/patch

settingengine.go#L443-L444

Added lines #L443 - L444 were not covered by tests
}
5 changes: 3 additions & 2 deletions track_local.go
Expand Up @@ -44,8 +44,9 @@ type TrackLocalContext interface {
}

type baseTrackLocalContext struct {
id string
params RTPParameters
id string
params RTPParameters

ssrc SSRC
writeStream TrackLocalWriter
rtcpInterceptor interceptor.RTCPReader
Expand Down

0 comments on commit 9476f01

Please sign in to comment.