Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.38.x: backport #4453

Merged
merged 5 commits into from May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 15 additions & 5 deletions internal/xds/env/env.go
Expand Up @@ -37,11 +37,13 @@ const (
// and kept in variable BootstrapFileName.
//
// When both bootstrap FileName and FileContent are set, FileName is used.
BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"
BootstrapFileContentEnv = "GRPC_XDS_BOOTSTRAP_CONFIG"

circuitBreakingSupportEnv = "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"
timeoutSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"
faultInjectionSupportEnv = "GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION"
clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"
aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"

c2pResolverSupportEnv = "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER"
c2pResolverTestOnlyTrafficDirectorURIEnv = "GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI"
Expand All @@ -60,6 +62,7 @@ var (
//
// When both bootstrap FileName and FileContent are set, FileName is used.
BootstrapFileContent = os.Getenv(BootstrapFileContentEnv)

// CircuitBreakingSupport indicates whether circuit breaking support is
// enabled, which can be disabled by setting the environment variable
// "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING" to "false".
Expand All @@ -71,17 +74,24 @@ var (
// FaultInjectionSupport is used to control both fault injection and HTTP
// filter support.
FaultInjectionSupport = !strings.EqualFold(os.Getenv(faultInjectionSupportEnv), "false")
// C2PResolverSupport indicates whether support for C2P resolver is enabled.
// This can be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER" to "true".
C2PResolverSupport = strings.EqualFold(os.Getenv(c2pResolverSupportEnv), "true")
// ClientSideSecuritySupport is used to control processing of security
// configuration on the client-side.
//
// Note that there is no env var protection for the server-side because we
// have a brand new API on the server-side and users explicitly need to use
// the new API to get security integration on the server.
ClientSideSecuritySupport = strings.EqualFold(os.Getenv(clientSideSecuritySupportEnv), "true")
// AggregateAndDNSSupportEnv indicates whether processing of aggregated
// cluster and DNS cluster is enabled, which can be enabled by setting the
// environment variable
// "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to
// "true".
AggregateAndDNSSupportEnv = strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "true")

// C2PResolverSupport indicates whether support for C2P resolver is enabled.
// This can be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER" to "true".
C2PResolverSupport = strings.EqualFold(os.Getenv(c2pResolverSupportEnv), "true")
// C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.
C2PResolverTestOnlyTrafficDirectorURI = os.Getenv(c2pResolverTestOnlyTrafficDirectorURIEnv)
)
34 changes: 34 additions & 0 deletions interop/xds/client/Dockerfile
@@ -0,0 +1,34 @@
# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Dockerfile for building the xDS interop client. To build the image, run the
# following command from grpc-go directory:
# docker build -t <TAG> -f interop/xds/client/Dockerfile .

FROM golang:1.16-alpine as build

# Make a grpc-go directory and copy the repo into it.
WORKDIR /go/src/grpc-go
COPY . .

# Build a static binary without cgo so that we can copy just the binary in the
# final image, and can get rid of Go compiler and gRPC-Go dependencies.
RUN go build -tags osusergo,netgo interop/xds/client/client.go

# Second stage of the build which copies over only the client binary and skips
# the Go compiler and gRPC repo from the earlier stage. This significantly
# reduces the docker image size.
FROM alpine
COPY --from=build /go/src/grpc-go/client .
ENTRYPOINT ["./client"]
34 changes: 34 additions & 0 deletions interop/xds/server/Dockerfile
@@ -0,0 +1,34 @@
# Copyright 2021 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Dockerfile for building the xDS interop server. To build the image, run the
# following command from grpc-go directory:
# docker build -t <TAG> -f interop/xds/server/Dockerfile .

FROM golang:1.16-alpine as build

# Make a grpc-go directory and copy the repo into it.
WORKDIR /go/src/grpc-go
COPY . .

# Build a static binary without cgo so that we can copy just the binary in the
# final image, and can get rid of the Go compiler and gRPC-Go dependencies.
RUN go build -tags osusergo,netgo interop/xds/server/server.go

# Second stage of the build which copies over only the client binary and skips
# the Go compiler and gRPC repo from the earlier stage. This significantly
# reduces the docker image size.
FROM alpine
COPY --from=build /go/src/grpc-go/server .
ENTRYPOINT ["./server"]
142 changes: 124 additions & 18 deletions interop/xds/server/server.go
@@ -1,6 +1,6 @@
/*
*
* Copyright 2020 gRPC authors.
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,29 +16,37 @@
*
*/

// Binary server for xDS interop tests.
// Binary server is the server used for xDS interop tests.
package main

import (
"context"
"flag"
"fmt"
"log"
"net"
"os"
"strconv"

"google.golang.org/grpc"
"google.golang.org/grpc/admin"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/health"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/xds"

xdscreds "google.golang.org/grpc/credentials/xds"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)

var (
port = flag.Int("port", 8080, "The server port")
serverID = flag.String("server_id", "go_server", "Server ID included in response")
hostname = getHostname()
port = flag.Int("port", 8080, "Listening port for test service")
maintenancePort = flag.Int("maintenance_port", 8081, "Listening port for maintenance services like health, reflection, channelz etc when -secure_mode is true. When -secure_mode is false, all these services will be registered on -port")
serverID = flag.String("server_id", "go_server", "Server ID included in response")
secureMode = flag.Bool("secure_mode", false, "If true, retrieve security configuration from the management server. Else, use insecure credentials.")

logger = grpclog.Component("interop")
)
Expand All @@ -51,28 +59,126 @@ func getHostname() string {
return hostname
}

type server struct {
// testServiceImpl provides an implementation of the TestService defined in
// grpc.testing package.
type testServiceImpl struct {
testgrpc.UnimplementedTestServiceServer
hostname string
}

func (s *server) EmptyCall(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
grpc.SetHeader(ctx, metadata.Pairs("hostname", hostname))
func (s *testServiceImpl) EmptyCall(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
grpc.SetHeader(ctx, metadata.Pairs("hostname", s.hostname))
return &testpb.Empty{}, nil
}

func (s *server) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
grpc.SetHeader(ctx, metadata.Pairs("hostname", hostname))
return &testpb.SimpleResponse{ServerId: *serverID, Hostname: hostname}, nil
func (s *testServiceImpl) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
grpc.SetHeader(ctx, metadata.Pairs("hostname", s.hostname))
return &testpb.SimpleResponse{ServerId: *serverID, Hostname: s.hostname}, nil
}

// xdsUpdateHealthServiceImpl provides an implementation of the
// XdsUpdateHealthService defined in grpc.testing package.
type xdsUpdateHealthServiceImpl struct {
testgrpc.UnimplementedXdsUpdateHealthServiceServer
healthServer *health.Server
}

func (x *xdsUpdateHealthServiceImpl) SetServing(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
x.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
return &testpb.Empty{}, nil

}

func (x *xdsUpdateHealthServiceImpl) SetNotServing(_ context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
x.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING)
return &testpb.Empty{}, nil
}

func xdsServingModeCallback(addr net.Addr, args xds.ServingModeChangeArgs) {
logger.Infof("Serving mode for xDS server at %s changed to %s", addr.String(), args.Mode)
if args.Err != nil {
logger.Infof("ServingModeCallback returned error: %v", args.Err)
}
}

func main() {
flag.Parse()
p := strconv.Itoa(*port)
lis, err := net.Listen("tcp", ":"+p)

if *secureMode && *port == *maintenancePort {
logger.Fatal("-port and -maintenance_port must be different when -secure_mode is set")
}

testService := &testServiceImpl{hostname: getHostname()}
healthServer := health.NewServer()
updateHealthService := &xdsUpdateHealthServiceImpl{healthServer: healthServer}

// If -secure_mode is not set, expose all services on -port with a regular
// gRPC server.
if !*secureMode {
lis, err := net.Listen("tcp4", fmt.Sprintf(":%d", *port))
if err != nil {
logger.Fatalf("net.Listen(%s) failed: %v", fmt.Sprintf(":%d", *port), err)
}

server := grpc.NewServer()
testgrpc.RegisterTestServiceServer(server, testService)
healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
healthpb.RegisterHealthServer(server, healthServer)
testgrpc.RegisterXdsUpdateHealthServiceServer(server, updateHealthService)
reflection.Register(server)
cleanup, err := admin.Register(server)
if err != nil {
logger.Fatalf("Failed to register admin services: %v", err)
}
defer cleanup()
if err := server.Serve(lis); err != nil {
logger.Errorf("Serve() failed: %v", err)
}
return
}

// Create a listener on -port to expose the test service.
testLis, err := net.Listen("tcp4", fmt.Sprintf(":%d", *port))
if err != nil {
logger.Fatalf("failed to listen: %v", err)
logger.Fatalf("net.Listen(%s) failed: %v", fmt.Sprintf(":%d", *port), err)
}

// Create server-side xDS credentials with a plaintext fallback.
creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{FallbackCreds: insecure.NewCredentials()})
if err != nil {
logger.Fatalf("Failed to create xDS credentials: %v", err)
}

// Create an xDS enabled gRPC server, register the test service
// implementation and start serving.
testServer := xds.NewGRPCServer(grpc.Creds(creds), xds.ServingModeCallback(xdsServingModeCallback))
testgrpc.RegisterTestServiceServer(testServer, testService)
go func() {
if err := testServer.Serve(testLis); err != nil {
logger.Errorf("test server Serve() failed: %v", err)
}
}()
defer testServer.Stop()

// Create a listener on -maintenance_port to expose other services.
maintenanceLis, err := net.Listen("tcp4", fmt.Sprintf(":%d", *maintenancePort))
if err != nil {
logger.Fatalf("net.Listen(%s) failed: %v", fmt.Sprintf(":%d", *maintenancePort), err)
}

// Create a regular gRPC server and register the maintenance services on
// it and start serving.
maintenanceServer := grpc.NewServer()
healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
healthpb.RegisterHealthServer(maintenanceServer, healthServer)
testgrpc.RegisterXdsUpdateHealthServiceServer(maintenanceServer, updateHealthService)
reflection.Register(maintenanceServer)
cleanup, err := admin.Register(maintenanceServer)
if err != nil {
logger.Fatalf("Failed to register admin services: %v", err)
}
defer cleanup()
if err := maintenanceServer.Serve(maintenanceLis); err != nil {
logger.Errorf("maintenance server Serve() failed: %v", err)
}
s := grpc.NewServer()
testgrpc.RegisterTestServiceServer(s, &server{})
s.Serve(lis)
}
19 changes: 15 additions & 4 deletions rpc_util.go
Expand Up @@ -429,9 +429,10 @@ func (o ContentSubtypeCallOption) before(c *callInfo) error {
}
func (o ContentSubtypeCallOption) after(c *callInfo, attempt *csAttempt) {}

// ForceCodec returns a CallOption that will set codec to be
// used for all request and response messages for a call. The result of calling
// Name() will be used as the content-subtype in a case-insensitive manner.
// ForceCodec returns a CallOption that will set codec to be used for all
// request and response messages for a call. The result of calling Name() will
// be used as the content-subtype after converting to lowercase, unless
// CallContentSubtype is also used.
//
// See Content-Type on
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
Expand Down Expand Up @@ -853,7 +854,17 @@ func toRPCErr(err error) error {
// setCallInfoCodec should only be called after CallOptions have been applied.
func setCallInfoCodec(c *callInfo) error {
if c.codec != nil {
// codec was already set by a CallOption; use it.
// codec was already set by a CallOption; use it, but set the content
// subtype if it is not set.
if c.contentSubtype == "" {
// c.codec is a baseCodec to hide the difference between grpc.Codec and
// encoding.Codec (Name vs. String method name). We only support
// setting content subtype from encoding.Codec to avoid a behavior
// change with the deprecated version.
if ec, ok := c.codec.(encoding.Codec); ok {
c.contentSubtype = strings.ToLower(ec.Name())
}
}
return nil
}

Expand Down