Skip to content

Commit

Permalink
SampleBuilder: Add option to return RTPHeaders
Browse files Browse the repository at this point in the history
Replaces fa1f5d9 which returned only a head packet's header.
When the option WithRTPHeaders is set, SampleBuilder returns
RTP headers of the packets forming the sample as Sample.RTPHeaders.
  • Loading branch information
at-wat committed Mar 7, 2024
1 parent 6190b2d commit 3f6d94a
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 29 deletions.
5 changes: 4 additions & 1 deletion pkg/media/media.go
Expand Up @@ -18,7 +18,10 @@ type Sample struct {
PacketTimestamp uint32
PrevDroppedPackets uint16
Metadata interface{}
RTPHeader *rtp.Header

// RTP headers of RTP packets forming this Sample. (Optional)
// Useful for accessing RTP extensions associated to the Sample.
RTPHeaders []*rtp.Header
}

// Writer defines an interface to handle
Expand Down
26 changes: 19 additions & 7 deletions pkg/media/samplebuilder/samplebuilder.go
Expand Up @@ -48,6 +48,9 @@ type SampleBuilder struct {

// allows inspecting head packets of each sample and then returns a custom metadata
packetHeadHandler func(headPacket interface{}) interface{}

// return array of RTP headers as Sample.RTPHeaders
returnRTPHeaders bool
}

// New constructs a new SampleBuilder.
Expand Down Expand Up @@ -277,17 +280,18 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
// merge all the buffers into a sample
data := []byte{}
var metadata interface{}
var rtpHeader rtp.Header
var rtpHeaders []*rtp.Header
for i := consume.head; i != consume.tail; i++ {
p, err := s.depacketizer.Unmarshal(s.buffer[i].Payload)
if err != nil {
return nil
}
if i == consume.head {
if s.packetHeadHandler != nil {
metadata = s.packetHeadHandler(s.depacketizer)
}
rtpHeader = s.buffer[i].Header.Clone()
if i == consume.head && s.packetHeadHandler != nil {
metadata = s.packetHeadHandler(s.depacketizer)
}
if s.returnRTPHeaders {
h := s.buffer[i].Header.Clone()
rtpHeaders = append(rtpHeaders, &h)
}

data = append(data, p...)
Expand All @@ -300,7 +304,7 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
PacketTimestamp: sampleTimestamp,
PrevDroppedPackets: s.droppedPackets,
Metadata: metadata,
RTPHeader: &rtpHeader,
RTPHeaders: rtpHeaders,
}

s.droppedPackets = 0
Expand Down Expand Up @@ -394,3 +398,11 @@ func WithMaxTimeDelay(maxLateDuration time.Duration) Option {
o.maxLateTimestamp = uint32(int64(o.sampleRate) * totalMillis / 1000)
}
}

// WithRTPHeaders enables to collect RTP headers forming a Sample.
// Useful for accessing RTP extensions associated to the Sample.
func WithRTPHeaders(enable bool) Option {
return func(o *SampleBuilder) {
o.returnRTPHeaders = enable
}
}
67 changes: 46 additions & 21 deletions pkg/media/samplebuilder/samplebuilder_test.go
Expand Up @@ -17,6 +17,7 @@ type sampleBuilderTest struct {
message string
packets []*rtp.Packet
withHeadChecker bool
withRTPHeader bool
headBytes []byte
samples []*media.Sample
maxLate uint16
Expand Down Expand Up @@ -84,8 +85,8 @@ func TestSampleBuilder(t *testing.T) {
{Header: rtp.Header{SequenceNumber: 5002, Timestamp: 7}, Payload: []byte{0x03}},
},
samples: []*media.Sample{
{Data: []byte{0x01}, Duration: time.Second, PacketTimestamp: 5, RTPHeader: &rtp.Header{SequenceNumber: 5000, Timestamp: 5}},
{Data: []byte{0x02}, Duration: time.Second, PacketTimestamp: 6, RTPHeader: &rtp.Header{SequenceNumber: 5001, Timestamp: 6}},
{Data: []byte{0x01}, Duration: time.Second, PacketTimestamp: 5},
{Data: []byte{0x02}, Duration: time.Second, PacketTimestamp: 6},
},
maxLate: 50,
maxLateTimestamp: 0,
Expand All @@ -102,7 +103,7 @@ func TestSampleBuilder(t *testing.T) {
{Header: rtp.Header{SequenceNumber: 5012, Timestamp: 17}, Payload: []byte{0x07}},
},
samples: []*media.Sample{
{Data: []byte{0x01}, Duration: time.Second * 2, PacketTimestamp: 5, RTPHeader: &rtp.Header{SequenceNumber: 5000, Timestamp: 5, Marker: true}},
{Data: []byte{0x01}, Duration: time.Second * 2, PacketTimestamp: 5},
},
maxLate: 5,
maxLateTimestamp: 0,
Expand Down Expand Up @@ -134,8 +135,8 @@ func TestSampleBuilder(t *testing.T) {
{Header: rtp.Header{SequenceNumber: 5012, Timestamp: 17}, Payload: []byte{0x07}},
},
samples: []*media.Sample{
{Data: []byte{0x01}, Duration: time.Second * 2, PacketTimestamp: 5, RTPHeader: &rtp.Header{SequenceNumber: 5000, Timestamp: 5, Marker: true}},
{Data: []byte{0x02}, Duration: time.Second * 2, PacketTimestamp: 7, PrevDroppedPackets: 1, RTPHeader: &rtp.Header{SequenceNumber: 5002, Timestamp: 7, Marker: true}},
{Data: []byte{0x01}, Duration: time.Second * 2, PacketTimestamp: 5},
{Data: []byte{0x02}, Duration: time.Second * 2, PacketTimestamp: 7, PrevDroppedPackets: 1},
},
maxLate: 5,
maxLateTimestamp: 0,
Expand All @@ -149,8 +150,8 @@ func TestSampleBuilder(t *testing.T) {
{Header: rtp.Header{SequenceNumber: 5003, Timestamp: 7}, Payload: []byte{0x04}},
},
samples: []*media.Sample{
{Data: []byte{0x01}, Duration: time.Second, PacketTimestamp: 5, RTPHeader: &rtp.Header{SequenceNumber: 5000, Timestamp: 5}},
{Data: []byte{0x02, 0x03}, Duration: time.Second, PacketTimestamp: 6, RTPHeader: &rtp.Header{SequenceNumber: 5001, Timestamp: 6}},
{Data: []byte{0x01}, Duration: time.Second, PacketTimestamp: 5},
{Data: []byte{0x02, 0x03}, Duration: time.Second, PacketTimestamp: 6},
},
maxLate: 50,
maxLateTimestamp: 0,
Expand Down Expand Up @@ -203,11 +204,11 @@ func TestSampleBuilder(t *testing.T) {
{Header: rtp.Header{SequenceNumber: 5005, Timestamp: 6}, Payload: []byte{0x06}},
},
samples: []*media.Sample{
{Data: []byte{0x01}, Duration: time.Second, PacketTimestamp: 1, RTPHeader: &rtp.Header{SequenceNumber: 5000, Timestamp: 1}},
{Data: []byte{0x02}, Duration: time.Second, PacketTimestamp: 2, RTPHeader: &rtp.Header{SequenceNumber: 5001, Timestamp: 2}},
{Data: []byte{0x03}, Duration: time.Second, PacketTimestamp: 3, RTPHeader: &rtp.Header{SequenceNumber: 5002, Timestamp: 3}},
{Data: []byte{0x04}, Duration: time.Second, PacketTimestamp: 4, RTPHeader: &rtp.Header{SequenceNumber: 5003, Timestamp: 4}},
{Data: []byte{0x05}, Duration: time.Second, PacketTimestamp: 5, RTPHeader: &rtp.Header{SequenceNumber: 5004, Timestamp: 5}},
{Data: []byte{0x01}, Duration: time.Second, PacketTimestamp: 1},
{Data: []byte{0x02}, Duration: time.Second, PacketTimestamp: 2},
{Data: []byte{0x03}, Duration: time.Second, PacketTimestamp: 3},
{Data: []byte{0x04}, Duration: time.Second, PacketTimestamp: 4},
{Data: []byte{0x05}, Duration: time.Second, PacketTimestamp: 5},
},
maxLate: 50,
maxLateTimestamp: 0,
Expand All @@ -225,7 +226,7 @@ func TestSampleBuilder(t *testing.T) {
{Header: rtp.Header{SequenceNumber: 5017, Timestamp: 7001}, Payload: []byte{0x05}},
},
samples: []*media.Sample{
{Data: []byte{0x04, 0x05}, Duration: time.Second * time.Duration(2), PacketTimestamp: 4000, PrevDroppedPackets: 13, RTPHeader: &rtp.Header{SequenceNumber: 5013, Timestamp: 4000}},
{Data: []byte{0x04, 0x05}, Duration: time.Second * time.Duration(2), PacketTimestamp: 4000, PrevDroppedPackets: 13},
},
withHeadChecker: true,
headBytes: []byte{0x04},
Expand All @@ -247,7 +248,7 @@ func TestSampleBuilder(t *testing.T) {
withHeadChecker: true,
headBytes: []byte{1},
samples: []*media.Sample{
{Data: []byte{1, 2, 3}, Duration: 0, PacketTimestamp: 1, PrevDroppedPackets: 0, RTPHeader: &rtp.Header{SequenceNumber: 5000, Timestamp: 1}}, // first sample
{Data: []byte{1, 2, 3}, Duration: 0, PacketTimestamp: 1, PrevDroppedPackets: 0}, // first sample
},
maxLate: 50,
maxLateTimestamp: 2000,
Expand All @@ -265,11 +266,32 @@ func TestSampleBuilder(t *testing.T) {
withHeadChecker: true,
headBytes: []byte{1},
samples: []*media.Sample{
{Data: []byte{1, 2}, Duration: 0, PacketTimestamp: 1, PrevDroppedPackets: 0, RTPHeader: &rtp.Header{SequenceNumber: 5000, Timestamp: 1}}, // 1st sample
{Data: []byte{1, 2}, Duration: 0, PacketTimestamp: 1, PrevDroppedPackets: 0}, // 1st sample
},
maxLate: 50,
maxLateTimestamp: 2000,
},
{
message: "SampleBuilder should emit samples with RTP headers when WithRTPHeaders option is enabled",
packets: []*rtp.Packet{
{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 5}, Payload: []byte{0x01}},
{Header: rtp.Header{SequenceNumber: 5001, Timestamp: 6}, Payload: []byte{0x02}},
{Header: rtp.Header{SequenceNumber: 5002, Timestamp: 6}, Payload: []byte{0x03}},
{Header: rtp.Header{SequenceNumber: 5003, Timestamp: 7}, Payload: []byte{0x04}},
},
samples: []*media.Sample{
{Data: []byte{0x01}, Duration: time.Second, PacketTimestamp: 5, RTPHeaders: []*rtp.Header{
{SequenceNumber: 5000, Timestamp: 5},
}},
{Data: []byte{0x02, 0x03}, Duration: time.Second, PacketTimestamp: 6, RTPHeaders: []*rtp.Header{
{SequenceNumber: 5001, Timestamp: 6},
{SequenceNumber: 5002, Timestamp: 6},
}},
},
maxLate: 50,
maxLateTimestamp: 0,
withRTPHeader: true,
},
}

t.Run("Pop", func(t *testing.T) {
Expand All @@ -282,6 +304,9 @@ func TestSampleBuilder(t *testing.T) {
time.Millisecond*time.Duration(int64(t.maxLateTimestamp)),
))
}
if t.withRTPHeader {
opts = append(opts, WithRTPHeaders(true))
}

d := &fakeDepacketizer{
headChecker: t.withHeadChecker,
Expand Down Expand Up @@ -309,18 +334,18 @@ func TestSampleBuilderMaxLate(t *testing.T) {
s.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 0, Timestamp: 1}, Payload: []byte{0x01}})
s.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1, Timestamp: 2}, Payload: []byte{0x01}})
s.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 2, Timestamp: 3}, Payload: []byte{0x01}})
assert.Equal(&media.Sample{Data: []byte{0x01}, Duration: time.Second, PacketTimestamp: 1, RTPHeader: &rtp.Header{SequenceNumber: 0, Timestamp: 1}}, s.Pop(), "Failed to build samples before gap")
assert.Equal(&media.Sample{Data: []byte{0x01}, Duration: time.Second, PacketTimestamp: 1}, s.Pop(), "Failed to build samples before gap")

s.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}})
s.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5001, Timestamp: 501}, Payload: []byte{0x02}})
s.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5002, Timestamp: 502}, Payload: []byte{0x02}})

assert.Equal(&media.Sample{Data: []byte{0x01}, Duration: time.Second, PacketTimestamp: 2, RTPHeader: &rtp.Header{SequenceNumber: 1, Timestamp: 2}}, s.Pop(), "Failed to build samples after large gap")
assert.Equal(&media.Sample{Data: []byte{0x01}, Duration: time.Second, PacketTimestamp: 2}, s.Pop(), "Failed to build samples after large gap")
assert.Equal((*media.Sample)(nil), s.Pop(), "Failed to build samples after large gap")

s.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 6000, Timestamp: 600}, Payload: []byte{0x03}})
assert.Equal(&media.Sample{Data: []byte{0x02}, Duration: time.Second, PacketTimestamp: 500, PrevDroppedPackets: 4998, RTPHeader: &rtp.Header{SequenceNumber: 5000, Timestamp: 500}}, s.Pop(), "Failed to build samples after large gap")
assert.Equal(&media.Sample{Data: []byte{0x02}, Duration: time.Second, PacketTimestamp: 501, RTPHeader: &rtp.Header{SequenceNumber: 5001, Timestamp: 501}}, s.Pop(), "Failed to build samples after large gap")
assert.Equal(&media.Sample{Data: []byte{0x02}, Duration: time.Second, PacketTimestamp: 500, PrevDroppedPackets: 4998}, s.Pop(), "Failed to build samples after large gap")
assert.Equal(&media.Sample{Data: []byte{0x02}, Duration: time.Second, PacketTimestamp: 501}, s.Pop(), "Failed to build samples after large gap")
}

func TestSeqnumDistance(t *testing.T) {
Expand Down Expand Up @@ -541,8 +566,8 @@ func TestSampleBuilder_Flush(t *testing.T) {
}

expected := []*media.Sample{
{Data: []byte{0x01, 0x11}, Duration: 9 * time.Second, PacketTimestamp: 1, PrevDroppedPackets: 2, RTPHeader: &rtp.Header{SequenceNumber: 1001, Timestamp: 1, Marker: true}},
{Data: []byte{0x01, 0x12}, Duration: 0, PacketTimestamp: 10, PrevDroppedPackets: 9, RTPHeader: &rtp.Header{SequenceNumber: 1011, Timestamp: 10, Marker: true}},
{Data: []byte{0x01, 0x11}, Duration: 9 * time.Second, PacketTimestamp: 1, PrevDroppedPackets: 2},
{Data: []byte{0x01, 0x12}, Duration: 0, PacketTimestamp: 10, PrevDroppedPackets: 9},
}

assert.Equal(t, expected, samples)
Expand Down

0 comments on commit 3f6d94a

Please sign in to comment.