Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add utils to create and prepare socket for tcp client #31009

Merged
merged 16 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,7 @@ grpc_cc_library(
"ref_counted_ptr",
"slice",
"slice_refcount",
"status_helper",
"transport_fwd",
],
)
Expand Down Expand Up @@ -2803,6 +2804,7 @@ grpc_cc_library(
"src/core/lib/event_engine/posix_engine/tcp_socket_utils.h",
],
external_deps = [
"absl/cleanup",
"absl/status",
"absl/status:statusor",
"absl/strings",
Expand All @@ -2818,6 +2820,7 @@ grpc_cc_library(
"ref_counted_ptr",
"resource_quota",
"socket_mutator",
"status_helper",
"useful",
],
)
Expand Down
1 change: 1 addition & 0 deletions CMakeLists.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions build_autogenerated.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,7 @@
#include "src/core/ext/transport/binder/utils/transport_stream_receiver.h"
#include "src/core/ext/transport/binder/wire_format/binder.h"
#include "src/core/ext/transport/binder/wire_format/wire_writer.h"

#define RETURN_IF_ERROR(expr) \
do { \
const absl::Status status = (expr); \
if (!status.ok()) return status; \
} while (0)
#include "src/core/lib/gprpp/status_helper.h"

namespace grpc_binder {
namespace {
Expand Down
75 changes: 74 additions & 1 deletion src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@

#include <grpc/support/port_platform.h>

#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"

#include <arpa/inet.h>
#include <errno.h>
#include <inttypes.h>
#include <limits.h>

#include "absl/cleanup/cleanup.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
Expand Down Expand Up @@ -49,8 +53,8 @@
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/log.h>

#include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/gprpp/status_helper.h"

#ifdef GRPC_HAVE_UNIX_SOCKET
#include <sys/un.h>
Expand Down Expand Up @@ -100,6 +104,32 @@ int CreateSocket(std::function<int(int, int, int)> socket_factory, int family,

const uint8_t kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff};

absl::Status PrepareTcpClientSocket(PosixSocketWrapper sock,
const EventEngine::ResolvedAddress& addr,
const PosixTcpOptions& options) {
bool close_fd = true;
auto sock_cleanup = absl::MakeCleanup([&close_fd, &sock]() -> void {
if (close_fd and sock.Fd() >= 0) {
close(sock.Fd());
}
});
RETURN_IF_ERROR(sock.SetSocketNonBlocking(1));
RETURN_IF_ERROR(sock.SetSocketCloexec(1));

if (reinterpret_cast<const sockaddr*>(addr.address())->sa_family != AF_UNIX) {
// If its not a unix socket address.
RETURN_IF_ERROR(sock.SetSocketLowLatency(1));
RETURN_IF_ERROR(sock.SetSocketReuseAddr(1));
sock.TrySetSocketTcpUserTimeout(options, true);
}
RETURN_IF_ERROR(sock.SetSocketNoSigpipeIfPossible());
RETURN_IF_ERROR(sock.ApplySocketMutatorInOptions(
GRPC_FD_CLIENT_CONNECTION_USAGE, options));
// No errors. Set close_fd to false to ensure the socket is not closed.
close_fd = false;
return absl::OkStatus();
}

#endif /* GRPC_POSIX_SOCKET_UTILS_COMMON */

} // namespace
Expand Down Expand Up @@ -771,6 +801,42 @@ absl::StatusOr<PosixSocketWrapper> PosixSocketWrapper::CreateDualStackSocket(
return PosixSocketWrapper(newfd);
}

absl::StatusOr<PosixSocketWrapper::PosixSocketCreateResult>
PosixSocketWrapper::CreateAndPrepareTcpClientSocket(
const PosixTcpOptions& options,
const EventEngine::ResolvedAddress& target_addr) {
PosixSocketWrapper::DSMode dsmode;
EventEngine::ResolvedAddress mapped_target_addr;

// Use dualstack sockets where available. Set mapped to v6 or v4 mapped to
// v6.
if (!SockaddrToV4Mapped(&target_addr, &mapped_target_addr)) {
// addr is v4 mapped to v6 or just v6.
mapped_target_addr = target_addr;
}
absl::StatusOr<PosixSocketWrapper> posix_socket_wrapper =
PosixSocketWrapper::CreateDualStackSocket(nullptr, mapped_target_addr,
SOCK_STREAM, 0, dsmode);
if (!posix_socket_wrapper.ok()) {
return posix_socket_wrapper.status();
}

if (dsmode == PosixSocketWrapper::DSMode::DSMODE_IPV4) {
// Original addr is either v4 or v4 mapped to v6. Set mapped_addr to v4.
if (!SockaddrIsV4Mapped(&target_addr, &mapped_target_addr)) {
mapped_target_addr = target_addr;
}
}

auto error = PrepareTcpClientSocket(*posix_socket_wrapper, mapped_target_addr,
options);
if (!error.ok()) {
return error;
}
return PosixSocketWrapper::PosixSocketCreateResult{*posix_socket_wrapper,
mapped_target_addr};
}

#else /* GRPC_POSIX_SOCKET_UTILS_COMMON */

bool SockaddrIsV4Mapped(const EventEngine::ResolvedAddress* /*resolved_addr*/,
Expand Down Expand Up @@ -875,6 +941,13 @@ PosixSocketWrapper::CreateDualStackSocket(
int /*protocol*/, DSMode& /*dsmode*/) {
GPR_ASSERT(false && "unimplemented");
}

absl::StatusOr<PosixSocketWrapper::PosixSocketCreateResult>
PosixSocketWrapper::CreateAndPrepareTcpClientSocket(
const PosixTcpOptions& /*options*/,
const EventEngine::ResolvedAddress& /*target_addr*/) {
GPR_ASSERT(false && "unimplemented");
}
}

#endif /* GRPC_POSIX_SOCKET_UTILS_COMMON */
Expand Down
25 changes: 25 additions & 0 deletions src/core/lib/event_engine/posix_engine/tcp_socket_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ class PosixSocketWrapper {
public:
explicit PosixSocketWrapper(int fd) : fd_(fd) { GPR_ASSERT(fd_ > 0); }

PosixSocketWrapper() : fd_(-1){};

~PosixSocketWrapper() = default;

// Instruct the kernel to wait for specified number of bytes to be received on
Expand Down Expand Up @@ -301,10 +303,33 @@ class PosixSocketWrapper {
const experimental::EventEngine::ResolvedAddress& addr, int type,
int protocol, DSMode& dsmode);

struct PosixSocketCreateResult;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not need to be forward-declared. See the style guide advice here: https://google.github.io/styleguide/cppguide.html#Forward_Declarations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't letting me compile otherwise because the PosixSocketWrapper definition is not complete at that point. So I moved the CreateAndPrepareTcpClientSocket function outside the class definition and made it non static.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-introduced it again after our discussion.

// Return a PosixSocketCreateResult which manages a configured, unbound,
// unconnected TCP client fd.
// options: may contain custom tcp settings for the fd.
// target_addr: the destination address.
//
// Returns: Not-OK status on error. Otherwise it returns a
// PosixSocketWrapper::PosixSocketCreateResult type which includes a sock
// of type PosixSocketWrapper and a mapped_target_addr which is
// target_addr mapped to an address appropriate to the type of socket FD
// created. For example, if target_addr is IPv4 and dual stack sockets are
// available, mapped_target_addr will be an IPv4-mapped IPv6 address.
//
static absl::StatusOr<PosixSocketCreateResult>
CreateAndPrepareTcpClientSocket(
const PosixTcpOptions& options,
const EventEngine::ResolvedAddress& target_addr);

private:
int fd_;
};

struct PosixSocketWrapper::PosixSocketCreateResult {
PosixSocketWrapper sock;
EventEngine::ResolvedAddress mapped_target_addr;
};

} // namespace posix_engine
} // namespace grpc_event_engine

Expand Down
6 changes: 6 additions & 0 deletions src/core/lib/gprpp/status_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ struct google_rpc_Status;
struct upb_Arena;
}

#define RETURN_IF_ERROR(expr) \
do { \
const absl::Status status = (expr); \
if (!status.ok()) return status; \
} while (0)

namespace grpc_core {

/// This enum should have the same value of grpc_error_ints
Expand Down