diff --git a/buffer.go b/buffer.go index 19486bd6f..575134c62 100644 --- a/buffer.go +++ b/buffer.go @@ -21,22 +21,44 @@ const defaultBufSize = 4096 // In other words, we can't write and read simultaneously on the same connection. // The buffer is similar to bufio.Reader / Writer but zero-copy-ish // Also highly optimized for this particular use case. +// This buffer is backed by two byte slices in a double-buffering scheme type buffer struct { buf []byte // buf is a byte buffer who's length and capacity are equal. nc net.Conn idx int length int timeout time.Duration + dbuf [2][]byte // dbuf is an array with the two byte slices that back this buffer + dbufn int // dbufn is the current buffer counter for double-buffering } // newBuffer allocates and returns a new buffer. func newBuffer(nc net.Conn) buffer { + fg := make([]byte, defaultBufSize) + bg := make([]byte, defaultBufSize) return buffer{ - buf: make([]byte, defaultBufSize), - nc: nc, + buf: fg, + nc: nc, + dbuf: [2][]byte{fg, bg}, } } +// swap replaces the active buffer with the background buffer +func (b *buffer) swap() { + b.dbufn += 1 + dest := b.dbuf[b.dbufn&1] + if b.length > 0 { + // existing buffer is too large for double-buffering; + // we must allocate a temporary buffer that will be discarded + if b.length > len(dest) { + dest = make([]byte, b.length) + } + copy(dest[0:b.length], b.buf[b.idx:]) + } + b.buf = dest + b.idx = 0 +} + // fill reads into the buffer until at least _need_ bytes are in it func (b *buffer) fill(need int) error { n := b.length @@ -47,8 +69,6 @@ func (b *buffer) fill(need int) error { } // grow buffer if necessary - // TODO: let the buffer shrink again at some point - // Maybe keep the org buf slice and swap back? if need > len(b.buf) { // Round up to the next multiple of the default size newBuf := make([]byte, ((need/defaultBufSize)+1)*defaultBufSize) diff --git a/rows.go b/rows.go index d3b1e2822..a853ee4f9 100644 --- a/rows.go +++ b/rows.go @@ -111,6 +111,8 @@ func (rows *mysqlRows) Close() (err error) { return err } + mc.buf.swap() + // Remove unread packets from stream if !rows.rs.done { err = mc.readUntilEOF()