Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transparent (content-encoding) compression #456

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions include/hackney.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
partial_headers = [],
version,
clen = nil,
ce = nil,
te = nil,
connection = nil,
method = nil,
Expand Down
3 changes: 3 additions & 0 deletions src/hackney.erl
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ cancel_request(Ref) ->

%% @doc set client options.
%% Options are:
%% - `compress': request compression and transparently decompress
%% - `async': to fetch the response asynchronously
%% - `{async, once}': to receive the response asynchronously one time.
%% To receive the next message use the function `hackney:stream_next/1'.
Expand Down Expand Up @@ -211,6 +212,8 @@ request(Method, URL, Headers, Body) ->
%% directly. The response is `{ok, Status, Headers, Body}'</li>
%% <li>`max_body': sets maximum allowed size of the body if
%% with_body is true</li>
%% <li>`compress': request that the server sends the body compressed
%% and instructs hackney to transparently decompress it</li>
%% <li>`async': receive the response asynchronously
%% The function return {ok, StreamRef}.
%% When {async, once} is used the response will be received only once. To
Expand Down
26 changes: 16 additions & 10 deletions src/hackney_request.erl
Original file line number Diff line number Diff line change
Expand Up @@ -61,30 +61,36 @@ perform(Client0, {Method0, Path0, Headers0, Body0}) ->
%% add host eventually
Headers2 = maybe_add_host(Headers1, Client0#client.netloc),

%% overwrite always as we control Accept-Encoding when 'compress' is set
Headers3 = case proplists:get_value(compress, Options, false) of
true -> hackney_headers_new:store(<<"Accept-Encoding">>, <<"gzip, deflate">>, Headers2);
false -> Headers2
end,

%% get expect headers
Expect = expectation(Headers2),
Expect = expectation(Headers3),

%% build headers with the body.
{FinalHeaders, ReqType, Body, Client1} = case Body0 of
stream ->
{Headers2, ReqType0, stream, Client0};
{Headers3, ReqType0, stream, Client0};
stream_multipart ->
handle_multipart_body(Headers2, ReqType0, Client0);
handle_multipart_body(Headers3, ReqType0, Client0);
{stream_multipart, Size} ->
handle_multipart_body(Headers2, ReqType0, Size, Client0);
handle_multipart_body(Headers3, ReqType0, Size, Client0);
{stream_multipart, Size, Boundary} ->
handle_multipart_body(Headers2, ReqType0,
handle_multipart_body(Headers3, ReqType0,
Size, Boundary, Client0);
<<>> when Method =:= <<"POST">> orelse Method =:= <<"PUT">> ->
handle_body(Headers2, ReqType0, Body0, Client0);
handle_body(Headers3, ReqType0, Body0, Client0);
[] when Method =:= <<"POST">> orelse Method =:= <<"PUT">> ->
handle_body(Headers2, ReqType0, Body0, Client0);
handle_body(Headers3, ReqType0, Body0, Client0);
<<>> ->
{Headers2, ReqType0, Body0, Client0};
{Headers3, ReqType0, Body0, Client0};
[] ->
{Headers2, ReqType0, Body0, Client0};
{Headers3, ReqType0, Body0, Client0};
_ ->
handle_body(Headers2, ReqType0, Body0, Client0)
handle_body(Headers3, ReqType0, Body0, Client0)
end,

%% build final client record
Expand Down
80 changes: 75 additions & 5 deletions src/hackney_response.erl
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,25 @@ wait_headers({headers_complete, Parser}, Client, Status, Headers) ->
[hackney, Client#client.host, response_time],
ResponseTime),
HeadersList = hackney_headers_new:to_list(Headers),
CE = case proplists:get_value(compress, Client#client.options, false) of
true ->
case hackney_headers_new:get_value(<<"content-encoding">>, Headers, nil) of
nil -> nil;
C ->
Z = zlib:open(),
%% inflateInit2 (https://www.zlib.net/manual.html#Advanced)
WindowBits = 15 + if C == <<"gzip">> -> 16; true -> 0 end,
ok = zlib:inflateInit(Z, WindowBits),
ok = case erlang:function_exported(zlib, safeInflate, 2) of
%% OTP-20.0.5 and later
true -> ok;
%% OTP-18.0 and later
false -> zlib:setBufSize(Z, 512 * 1024)
end,
{zlib,Z}
end;
false -> nil
end,
TE = hackney_headers_new:get_value(<<"transfer-encoding">>, Headers, nil),
CLen = case hackney_headers_new:lookup("content-length", Headers) of
[] -> undefined;
Expand All @@ -122,6 +141,7 @@ wait_headers({headers_complete, Parser}, Client, Status, Headers) ->
end,
Client2 = Client#client{parser=Parser,
headers=Headers,
ce=CE,
te=TE,
clen=CLen},
{ok, Status, HeadersList, Client2}.
Expand All @@ -147,17 +167,64 @@ stream_body(Client=#client{parser=Parser, clen=CLen, te=TE}) ->
stream_body(Data, #client{parser=Parser}=Client) ->
stream_body1(hackney_http:execute(Parser, Data), Client).

stream_body1({more, Parser, Buffer}, Client) ->
stream_body1({ok, Data, Parser}, Client = #client{ce={zlib,Z}}) ->
stream_body2(case stream_body_zlib(Z, Data) of
<<>> -> {more, Parser, <<>>};
D when is_binary(D) -> {ok, D, Parser};
E -> {error,E}
end, Client);
stream_body1({done, _Rest}, Client = #client{ce={zlib,_Z}}) ->
stream_body1(done, Client);
stream_body1(done, Client = #client{ce={zlib,Z}, parser=Parser}) ->
stream_body2(case stream_body_zlib(Z, <<>>) of
done -> done;
D when is_binary(D), size(D) > 0 -> {ok, D, Parser};
E -> {error,E}
end, Client);
stream_body1(Result, Client) ->
stream_body2(Result, Client).

stream_body_zlib(Z, Data) ->
case erlang:function_exported(zlib, safeInflate, 2) of
true ->
%% OTP-20.0.5 and later
case zlib:safeInflate(Z, Data) of
{continue, []} when Data == <<>> ->
data_error;
{finished, []} when Data == <<>> ->
case (catch zlib:inflateEnd(Z)) of
ok -> done;
_ -> data_error
end;
{_, Output} ->
iolist_to_binary(Output)
end;
false ->
%% OTP-18.0 and later
case zlib:inflateChunk(Z, Data) of
[] when Data == <<>> ->
case (catch zlib:inflateEnd(Z)) of
ok -> done;
_ -> data_error
end;
{more, Decompressed} ->
iolist_to_binary(Decompressed);
Decompressed ->
iolist_to_binary(Decompressed)
end
end.

stream_body2({more, Parser, Buffer}, Client) ->
stream_body_recv(Buffer, Client#client{parser=Parser});
stream_body1({ok, Data, Parser}, Client) ->
stream_body2({ok, Data, Parser}, Client) ->
{ok, Data, Client#client{parser=Parser}};
stream_body1({done, Rest}, Client) ->
stream_body2({done, Rest}, Client) ->
Client2 = end_stream_body(Rest, Client),
{done, Client2};
stream_body1(done, Client) ->
stream_body2(done, Client) ->
Client2 = end_stream_body(<<>>, Client),
{done, Client2};
stream_body1(Error, _Client) ->
stream_body2(Error, _Client) ->
Error.


Expand Down Expand Up @@ -277,6 +344,9 @@ skip_body(Client) ->
{error, Reason} -> {error, Reason}
end.

end_stream_body(Rest, Client = #client{ce={zlib,Z}}) ->
catch zlib:close(Z),
end_stream_body(Rest, Client#client{ce=nil});
end_stream_body(Rest, Client0) ->
Client = Client0#client{response_state=done,
body_state=done,
Expand Down