Skip to content

Commit

Permalink
SampleBuilder: Add Flush method
Browse files Browse the repository at this point in the history
Flush marks all valid samples in the buffer to be popped.
Useful for graceful shutdown without losing buffered data
as much as possible.
  • Loading branch information
at-wat authored and edaniels committed Feb 28, 2024
1 parent fa1f5d9 commit 09a4f60
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 3 deletions.
11 changes: 8 additions & 3 deletions pkg/media/samplebuilder/samplebuilder.go
Expand Up @@ -144,10 +144,10 @@ func (s *SampleBuilder) purgeConsumedLocation(consume sampleSequenceLocation, fo

// purgeBuffers flushes all buffers that are already consumed or those buffers
// that are too late to consume.
func (s *SampleBuilder) purgeBuffers() {
func (s *SampleBuilder) purgeBuffers(flush bool) {
s.purgeConsumedBuffers()

for (s.tooOld(s.filled) || (s.filled.count() > s.maxLate)) && s.filled.hasData() {
for (s.tooOld(s.filled) || (s.filled.count() > s.maxLate) || flush) && s.filled.hasData() {
if s.active.empty() {
// refill the active based on the filled packets
s.active = s.filled
Expand Down Expand Up @@ -188,7 +188,12 @@ func (s *SampleBuilder) Push(p *rtp.Packet) {
case slCompareInside:
break
}
s.purgeBuffers()
s.purgeBuffers(false)
}

// Flush marks all samples in the buffer to be popped.
func (s *SampleBuilder) Flush() {
s.purgeBuffers(true)
}

const secondToNanoseconds = 1000000000
Expand Down
39 changes: 39 additions & 0 deletions pkg/media/samplebuilder/samplebuilder_test.go
Expand Up @@ -509,6 +509,45 @@ func TestSampleBuilderData(t *testing.T) {
assert.Equal(t, j, 0x1FFFF)
}

func TestSampleBuilder_Flush(t *testing.T) {
s := New(50, &fakeDepacketizer{
headChecker: true,
headBytes: []byte{0x01},
}, 1)

s.Push(&rtp.Packet{
Header: rtp.Header{SequenceNumber: 999, Timestamp: 0},
Payload: []byte{0x00},
}) // Invalid packet
// Gap preventing below packets to be processed
s.Push(&rtp.Packet{
Header: rtp.Header{SequenceNumber: 1001, Timestamp: 1, Marker: true},
Payload: []byte{0x01, 0x11},
}) // Valid packet
s.Push(&rtp.Packet{
Header: rtp.Header{SequenceNumber: 1011, Timestamp: 10, Marker: true},
Payload: []byte{0x01, 0x12},
}) // Valid packet

if sample := s.Pop(); sample != nil {
t.Fatal("Unexpected sample is retuned. Test precondition may be broken")
}

s.Flush()

samples := []*media.Sample{}
for sample := s.Pop(); sample != nil; sample = s.Pop() {
samples = append(samples, sample)
}

expected := []*media.Sample{
{Data: []byte{0x01, 0x11}, Duration: 9 * time.Second, PacketTimestamp: 1, PrevDroppedPackets: 2, RTPHeader: &rtp.Header{SequenceNumber: 1001, Timestamp: 1, Marker: true}},
{Data: []byte{0x01, 0x12}, Duration: 0, PacketTimestamp: 10, PrevDroppedPackets: 9, RTPHeader: &rtp.Header{SequenceNumber: 1011, Timestamp: 10, Marker: true}},
}

assert.Equal(t, expected, samples)
}

func BenchmarkSampleBuilderSequential(b *testing.B) {
s := New(100, &fakeDepacketizer{}, 1)
b.ResetTimer()
Expand Down

0 comments on commit 09a4f60

Please sign in to comment.