Skip to content

Commit

Permalink
Merge pull request #797 from vasi-stripe/vasi-socket-guards
Browse files Browse the repository at this point in the history
Move read_buffer guards in Socket
  • Loading branch information
geemus committed Nov 8, 2022
2 parents 68452a7 + 464b7d5 commit 0a8b9e5
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 61 deletions.
69 changes: 8 additions & 61 deletions tests/basic_tests.rb
Expand Up @@ -24,69 +24,16 @@
end

Shindo.tests('Excon streaming basics') do
pending if RUBY_PLATFORM == 'java' # need to find suitable server for jruby
with_unicorn('streaming.ru') do
# expected values: the response, in pieces, and a timeout after each piece
res = %w{Hello streamy world}
timeout = 0.1

# expect the full response as a string
# and expect it to take a (timeout * pieces) seconds
tests('simple blocking request on streaming endpoint').returns([res.join(''),'response time ok']) do
start = Time.now
ret = Excon.get('http://127.0.0.1:9292/streamed/simple').body

if Time.now - start <= timeout*3
[ret, 'streaming response came too quickly']
else
[ret, 'response time ok']
end
end

# expect the full response as a string and expect it to
# take a (timeout * pieces) seconds (with fixed Content-Length header)
tests('simple blocking request on streaming endpoint with fixed length').returns([res.join(''),'response time ok']) do
start = Time.now
ret = Excon.get('http://127.0.0.1:9292/streamed/fixed_length').body

if Time.now - start <= timeout*3
[ret, 'streaming response came too quickly']
else
[ret, 'response time ok']
end
tests('http') do
pending if RUBY_PLATFORM == 'java' # need to find suitable server for jruby
with_unicorn('streaming.ru') do
streaming_tests('http')
end

# expect each response piece to arrive to the body right away
# and wait for timeout until next one arrives
def timed_streaming_test(endpoint, timeout)
ret = []
timing = 'response times ok'
start = Time.now
Excon.get(endpoint, :response_block => lambda do |c,r,t|
# add the response
ret.push(c)
# check if the timing is ok
# each response arrives after timeout and before timeout + 1
cur_time = Time.now - start
if cur_time < ret.length * timeout or cur_time > (ret.length+1) * timeout
timing = 'response time not ok!'
end
end)
# validate the final timing
if Time.now - start <= timeout*3
timing = 'final timing was not ok!'
end
[ret, timing]
end

tests('simple request with response_block on streaming endpoint').returns([res,'response times ok']) do
timed_streaming_test('http://127.0.0.1:9292/streamed/simple', timeout)
end

tests('simple request with response_block on streaming endpoint with fixed length').returns([res,'response times ok']) do
timed_streaming_test('http://127.0.0.1:9292/streamed/fixed_length', timeout)
end
tests('https') do
with_ssl_streaming(9292, STREAMING_PIECES, STREAMING_TIMEOUT) do
streaming_tests('https')
end

end
end

Expand Down
119 changes: 119 additions & 0 deletions tests/test_helper.rb
Expand Up @@ -3,6 +3,7 @@
require 'excon'
require 'delorean'
require 'open4'
require 'webrick'

require './spec/helpers/warning_helpers.rb'

Expand Down Expand Up @@ -213,6 +214,76 @@ def basic_tests(url = 'http://127.0.0.1:9292', options = {})
end


# expected values: the response, in pieces, and a timeout after each piece
STREAMING_PIECES = %w{Hello streamy world}
STREAMING_TIMEOUT = 0.1

def streaming_tests(protocol)
conn = nil
test do
conn = Excon.new("#{protocol}://127.0.0.1:9292/", :ssl_verify_peer => false)
true
end

# expect the full response as a string
# and expect it to take a (timeout * pieces) seconds
tests('simple blocking request on streaming endpoint').returns([STREAMING_PIECES.join(''),'response time ok']) do
start = Time.now
ret = conn.request(:method => :get, :path => '/streamed/simple').body

if Time.now - start <= STREAMING_TIMEOUT*3
[ret, 'streaming response came too quickly']
else
[ret, 'response time ok']
end
end

# expect the full response as a string and expect it to
# take a (timeout * pieces) seconds (with fixed Content-Length header)
tests('simple blocking request on streaming endpoint with fixed length').returns([STREAMING_PIECES.join(''),'response time ok']) do
start = Time.now
ret = conn.request(:method => :get, :path => '/streamed/fixed_length').body

if Time.now - start <= STREAMING_TIMEOUT*3
[ret, 'streaming response came too quickly']
else
[ret, 'response time ok']
end
end

# expect each response piece to arrive to the body right away
# and wait for timeout until next one arrives
def timed_streaming_test(conn, path, timeout)
ret = []
timing = 'response times ok'
start = Time.now
conn.request(:method => :get, :path => path, :response_block => lambda do |c,r,t|
# add the response
ret.push(c)
# check if the timing is ok
# each response arrives after timeout and before timeout + 1
cur_time = Time.now - start
if cur_time < ret.length * timeout or cur_time > (ret.length+1) * timeout
timing = 'response time not ok!'
end
end)
# validate the final timing
if Time.now - start <= timeout*3
timing = 'final timing was not ok!'
end
[ret, timing]
end

tests('simple request with response_block on streaming endpoint').returns([STREAMING_PIECES,'response times ok']) do
timed_streaming_test(conn, '/streamed/simple', STREAMING_TIMEOUT)
end

tests('simple request with response_block on streaming endpoint with fixed length').returns([STREAMING_PIECES,'response times ok']) do
timed_streaming_test(conn, '/streamed/fixed_length', STREAMING_TIMEOUT)
end
end


PROXY_ENV_VARIABLES = %w{http_proxy https_proxy no_proxy} # All lower-case

def env_init(env={})
Expand Down Expand Up @@ -329,3 +400,51 @@ def with_server(name)
ensure
cleanup_process(pid)
end

# A tiny fake SSL streaming server
def with_ssl_streaming(port, pieces, delay)
key_file = File.join(File.dirname(__FILE__), 'data', '127.0.0.1.cert.key')
cert_file = File.join(File.dirname(__FILE__), 'data', '127.0.0.1.cert.crt')

ctx = OpenSSL::SSL::SSLContext.new
ctx.key = OpenSSL::PKey::RSA.new(File.read(key_file))
ctx.cert = OpenSSL::X509::Certificate.new(File.read(cert_file))

tcp = TCPServer.new(port)
ssl = OpenSSL::SSL::SSLServer.new(tcp, ctx)

Thread.new do
loop do
begin
conn = ssl.accept
rescue IOError => e
# we're closing the socket from another thread, which makes `accept` complain
break if /stream closed/ =~ e.to_s
raise
end

Thread.new do
begin
req = WEBrick::HTTPRequest.new(WEBrick::Config::HTTP)
req.parse(conn)

conn << "HTTP/1.1 200 OK\r\n\r\n"
if req.path == "streamed/fixed_length"
conn << "Content-Length: #{pieces.join.length}\r\n"
end
conn.flush

pieces.each do |piece|
sleep(delay)
conn.write(piece)
conn.flush
end
ensure
conn.close
end
end
end
end
yield
ssl.close
end

0 comments on commit 0a8b9e5

Please sign in to comment.