Skip to content

Commit

Permalink
[C++]Fix libcurl miss auth header when broker return 307 (#13112)
Browse files Browse the repository at this point in the history
* [C++]Fix libcurl miss auth header when broker return 307

Motivation
when broker restart, it will return 307 to client for lookup request.
but libcurl miss auth heander when follow new url which will issue the follow lookup req fail with 401.

Modifications
don't use libcurl CURLOPT_FOLLOWLOCATION to follow new url, instead use retry request with auth header.

(cherry picked from commit dd8b473)
  • Loading branch information
billowqiu authored and codelipenghui committed Dec 22, 2021
1 parent c1f809f commit 4a406e0
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 108 deletions.
229 changes: 122 additions & 107 deletions pulsar-client-cpp/lib/HTTPLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const static int MAX_HTTP_REDIRECTS = 20;
const static std::string PARTITION_METHOD_NAME = "partitions";
const static int NUMBER_OF_LOOKUP_THREADS = 1;

static inline bool needRedirection(long code) { return (code == 307 || code == 302 || code == 301); }

HTTPLookupService::CurlInitializer::CurlInitializer() {
// Once per application - https://curl.haxx.se/mail/lib-2015-11/0052.html
curl_global_init(CURL_GLOBAL_ALL);
Expand Down Expand Up @@ -148,132 +150,145 @@ void HTTPLookupService::handleNamespaceTopicsHTTPRequest(NamespaceTopicsPromise
}
}

Result HTTPLookupService::sendHTTPRequest(const std::string completeUrl, std::string &responseData) {
CURL *handle;
CURLcode res;
std::string version = std::string("Pulsar-CPP-v") + _PULSAR_VERSION_INTERNAL_;
handle = curl_easy_init();

if (!handle) {
LOG_ERROR("Unable to curl_easy_init for url " << completeUrl);
// No curl_easy_cleanup required since handle not initialized
return ResultLookupError;
}
// set URL
curl_easy_setopt(handle, CURLOPT_URL, completeUrl.c_str());

// Write callback
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlWriteCallback);
curl_easy_setopt(handle, CURLOPT_WRITEDATA, &responseData);

// New connection is made for each call
curl_easy_setopt(handle, CURLOPT_FRESH_CONNECT, 1L);
curl_easy_setopt(handle, CURLOPT_FORBID_REUSE, 1L);
Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string &responseData) {
uint16_t reqCount = 0;
Result retResult = ResultOk;
while (++reqCount <= MAX_HTTP_REDIRECTS) {
CURL *handle;
CURLcode res;
std::string version = std::string("Pulsar-CPP-v") + _PULSAR_VERSION_INTERNAL_;
handle = curl_easy_init();

if (!handle) {
LOG_ERROR("Unable to curl_easy_init for url " << completeUrl);
// No curl_easy_cleanup required since handle not initialized
return ResultLookupError;
}
// set URL
curl_easy_setopt(handle, CURLOPT_URL, completeUrl.c_str());

// Skipping signal handling - results in timeouts not honored during the DNS lookup
curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1L);
// Write callback
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlWriteCallback);
curl_easy_setopt(handle, CURLOPT_WRITEDATA, &responseData);

// Timer
curl_easy_setopt(handle, CURLOPT_TIMEOUT, lookupTimeoutInSeconds_);
// New connection is made for each call
curl_easy_setopt(handle, CURLOPT_FRESH_CONNECT, 1L);
curl_easy_setopt(handle, CURLOPT_FORBID_REUSE, 1L);

// Set User Agent
curl_easy_setopt(handle, CURLOPT_USERAGENT, version.c_str());
// Skipping signal handling - results in timeouts not honored during the DNS lookup
curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1L);

// Redirects
curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(handle, CURLOPT_MAXREDIRS, MAX_HTTP_REDIRECTS);
// Timer
curl_easy_setopt(handle, CURLOPT_TIMEOUT, lookupTimeoutInSeconds_);

// Fail if HTTP return code >=400
curl_easy_setopt(handle, CURLOPT_FAILONERROR, 1L);
// Set User Agent
curl_easy_setopt(handle, CURLOPT_USERAGENT, version.c_str());

// Authorization data
AuthenticationDataPtr authDataContent;
Result authResult = authenticationPtr_->getAuthData(authDataContent);
if (authResult != ResultOk) {
LOG_ERROR("Failed to getAuthData: " << authResult);
curl_easy_cleanup(handle);
return authResult;
}
struct curl_slist *list = NULL;
if (authDataContent->hasDataForHttp()) {
list = curl_slist_append(list, authDataContent->getHttpHeaders().c_str());
}
curl_easy_setopt(handle, CURLOPT_HTTPHEADER, list);
// Fail if HTTP return code >=400
curl_easy_setopt(handle, CURLOPT_FAILONERROR, 1L);

// TLS
if (isUseTls_) {
if (curl_easy_setopt(handle, CURLOPT_SSLENGINE, NULL) != CURLE_OK) {
LOG_ERROR("Unable to load SSL engine for url " << completeUrl);
// Authorization data
AuthenticationDataPtr authDataContent;
Result authResult = authenticationPtr_->getAuthData(authDataContent);
if (authResult != ResultOk) {
LOG_ERROR("Failed to getAuthData: " << authResult);
curl_easy_cleanup(handle);
return ResultConnectError;
return authResult;
}
if (curl_easy_setopt(handle, CURLOPT_SSLENGINE_DEFAULT, 1L) != CURLE_OK) {
LOG_ERROR("Unable to load SSL engine as default, for url " << completeUrl);
curl_easy_cleanup(handle);
return ResultConnectError;
struct curl_slist *list = NULL;
if (authDataContent->hasDataForHttp()) {
list = curl_slist_append(list, authDataContent->getHttpHeaders().c_str());
}
curl_easy_setopt(handle, CURLOPT_SSLCERTTYPE, "PEM");
curl_easy_setopt(handle, CURLOPT_HTTPHEADER, list);

// TLS
if (isUseTls_) {
if (curl_easy_setopt(handle, CURLOPT_SSLENGINE, NULL) != CURLE_OK) {
LOG_ERROR("Unable to load SSL engine for url " << completeUrl);
curl_easy_cleanup(handle);
return ResultConnectError;
}
if (curl_easy_setopt(handle, CURLOPT_SSLENGINE_DEFAULT, 1L) != CURLE_OK) {
LOG_ERROR("Unable to load SSL engine as default, for url " << completeUrl);
curl_easy_cleanup(handle);
return ResultConnectError;
}
curl_easy_setopt(handle, CURLOPT_SSLCERTTYPE, "PEM");

if (tlsAllowInsecure_) {
curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 0L);
} else {
curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L);
}
if (tlsAllowInsecure_) {
curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 0L);
} else {
curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L);
}

if (!tlsTrustCertsFilePath_.empty()) {
curl_easy_setopt(handle, CURLOPT_CAINFO, tlsTrustCertsFilePath_.c_str());
}
if (!tlsTrustCertsFilePath_.empty()) {
curl_easy_setopt(handle, CURLOPT_CAINFO, tlsTrustCertsFilePath_.c_str());
}

curl_easy_setopt(handle, CURLOPT_SSL_VERIFYHOST, tlsValidateHostname_ ? 1L : 0L);
curl_easy_setopt(handle, CURLOPT_SSL_VERIFYHOST, tlsValidateHostname_ ? 1L : 0L);

if (authDataContent->hasDataForTls()) {
curl_easy_setopt(handle, CURLOPT_SSLCERT, authDataContent->getTlsCertificates().c_str());
curl_easy_setopt(handle, CURLOPT_SSLKEY, authDataContent->getTlsPrivateKey().c_str());
if (authDataContent->hasDataForTls()) {
curl_easy_setopt(handle, CURLOPT_SSLCERT, authDataContent->getTlsCertificates().c_str());
curl_easy_setopt(handle, CURLOPT_SSLKEY, authDataContent->getTlsPrivateKey().c_str());
}
}
}

LOG_INFO("Curl Lookup Request sent for " << completeUrl);

// Make get call to server
res = curl_easy_perform(handle);

// Free header list
curl_slist_free_all(list);

Result retResult = ResultOk;

switch (res) {
case CURLE_OK:
long response_code;
curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &response_code);
LOG_INFO("Response received for url " << completeUrl << " code " << response_code);
if (response_code == 200) {
retResult = ResultOk;
} else {
LOG_INFO("Curl [" << reqCount << "] Lookup Request sent for " << completeUrl);

// Make get call to server
res = curl_easy_perform(handle);

long response_code = -1;
curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &response_code);
LOG_INFO("Response received for url " << completeUrl << " response_code " << response_code
<< " curl res " << res);

// Free header list
curl_slist_free_all(list);

switch (res) {
case CURLE_OK:
long response_code;
curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &response_code);
LOG_INFO("Response received for url " << completeUrl << " code " << response_code);
if (response_code == 200) {
retResult = ResultOk;
} else if (needRedirection(response_code)) {
char *url = NULL;
curl_easy_getinfo(handle, CURLINFO_REDIRECT_URL, &url);
LOG_INFO("Response from url " << completeUrl << " to new url " << url);
completeUrl = url;
retResult = ResultLookupError;
} else {
retResult = ResultLookupError;
}
break;
case CURLE_COULDNT_CONNECT:
case CURLE_COULDNT_RESOLVE_PROXY:
case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_HTTP_RETURNED_ERROR:
LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res);
retResult = ResultConnectError;
break;
case CURLE_READ_ERROR:
LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res);
retResult = ResultReadError;
break;
case CURLE_OPERATION_TIMEDOUT:
LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res);
retResult = ResultTimeout;
break;
default:
LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res);
retResult = ResultLookupError;
}
break;
case CURLE_COULDNT_CONNECT:
case CURLE_COULDNT_RESOLVE_PROXY:
case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_HTTP_RETURNED_ERROR:
LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res);
retResult = ResultConnectError;
break;
case CURLE_READ_ERROR:
LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res);
retResult = ResultReadError;
break;
case CURLE_OPERATION_TIMEDOUT:
LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res);
retResult = ResultTimeout;
break;
default:
LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res);
retResult = ResultLookupError;
break;
}
curl_easy_cleanup(handle);
if (!needRedirection(response_code)) {
break;
}
}
curl_easy_cleanup(handle);

return retResult;
}

Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/HTTPLookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class HTTPLookupService : public LookupService, public std::enable_shared_from_t
void handleLookupHTTPRequest(LookupPromise, const std::string, RequestType);
void handleNamespaceTopicsHTTPRequest(NamespaceTopicsPromise promise, const std::string completeUrl);

Result sendHTTPRequest(const std::string completeUrl, std::string& responseData);
Result sendHTTPRequest(std::string completeUrl, std::string& responseData);

public:
HTTPLookupService(const std::string&, const ClientConfiguration&, const AuthenticationPtr&);
Expand Down

0 comments on commit 4a406e0

Please sign in to comment.