Skip to content

Commit

Permalink
remove the http3.DataStreamer (#3435)
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Jun 9, 2022
1 parent e27fa1c commit ccf897e
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 73 deletions.
28 changes: 3 additions & 25 deletions http3/response_writer.go
Expand Up @@ -12,33 +12,21 @@ import (
"github.com/marten-seemann/qpack"
)

// DataStreamer lets the caller take over the stream. After a call to DataStream
// the HTTP server library will not do anything else with the connection.
//
// It becomes the caller's responsibility to manage and close the stream.
//
// After a call to DataStream, the original Request.Body must not be used.
type DataStreamer interface {
DataStream() quic.Stream
}

type responseWriter struct {
conn quic.Connection
stream quic.Stream // needed for DataStream()
bufferedStream *bufio.Writer

header http.Header
status int // status code passed to WriteHeader
headerWritten bool
dataStreamUsed bool // set when DataSteam() is called
header http.Header
status int // status code passed to WriteHeader
headerWritten bool

logger utils.Logger
}

var (
_ http.ResponseWriter = &responseWriter{}
_ http.Flusher = &responseWriter{}
_ DataStreamer = &responseWriter{}
_ Hijacker = &responseWriter{}
)

Expand Down Expand Up @@ -112,16 +100,6 @@ func (w *responseWriter) Flush() {
}
}

func (w *responseWriter) usedDataStream() bool {
return w.dataStreamUsed
}

func (w *responseWriter) DataStream() quic.Stream {
w.dataStreamUsed = true
w.Flush()
return w.stream
}

func (w *responseWriter) StreamID() quic.StreamID {
return w.stream.StreamID()
}
Expand Down
10 changes: 1 addition & 9 deletions http3/server.go
Expand Up @@ -562,11 +562,7 @@ func (s *Server) handleRequest(conn quic.Connection, str quic.Stream, decoder *q
ctx = context.WithValue(ctx, http.LocalAddrContextKey, conn.LocalAddr())
req = req.WithContext(ctx)
r := newResponseWriter(str, conn, s.logger)
defer func() {
if !r.usedDataStream() {
r.Flush()
}
}()
defer r.Flush()
handler := s.Handler
if handler == nil {
handler = http.DefaultServeMux
Expand All @@ -587,10 +583,6 @@ func (s *Server) handleRequest(conn quic.Connection, str quic.Stream, decoder *q
handler.ServeHTTP(r, req)
}()

if r.usedDataStream() {
return requestError{err: errHijacked}
}

if panicked {
r.WriteHeader(500)
} else {
Expand Down
39 changes: 0 additions & 39 deletions http3/server_test.go
Expand Up @@ -222,45 +222,6 @@ var _ = Describe("Server", func() {
Expect(hfs).To(HaveKeyWithValue(":status", []string{"500"}))
})

It("doesn't close the stream if the handler called DataStream()", func() {
s.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
str := w.(DataStreamer).DataStream()
str.Write([]byte("foobar"))
})

rspWritten := make(chan struct{})
setRequest(encodeRequest(exampleGetRequest))
str.EXPECT().Context().Return(reqContext)
str.EXPECT().Write([]byte("foobar")).Do(func(b []byte) (int, error) {
close(rspWritten)
return len(b), nil
})
// don't EXPECT CancelRead()

ctrlStr := mockquic.NewMockStream(mockCtrl)
ctrlStr.EXPECT().Write(gomock.Any()).AnyTimes()
conn.EXPECT().OpenUniStream().Return(ctrlStr, nil)
conn.EXPECT().AcceptUniStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.ReceiveStream, error) {
<-rspWritten
return nil, errors.New("done")
})
conn.EXPECT().AcceptStream(gomock.Any()).Return(str, nil)
conn.EXPECT().AcceptStream(gomock.Any()).DoAndReturn(func(context.Context) (quic.Stream, error) {
<-rspWritten
return nil, errors.New("done")
})

done := make(chan struct{})
go func() {
defer GinkgoRecover()
defer close(done)
s.handleConn(conn)
}()
Eventually(rspWritten).Should(BeClosed())
time.Sleep(50 * time.Millisecond) // make sure that after str.Write there are no further calls to stream methods
Eventually(done).Should(BeClosed())
})

Context("hijacking bidirectional streams", func() {
var conn *mockquic.MockEarlyConnection
testDone := make(chan struct{})
Expand Down

0 comments on commit ccf897e

Please sign in to comment.