Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new http transport params to storage drivers #4160

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content/storage-drivers/azure.md
Expand Up @@ -16,6 +16,7 @@ An implementation of the `storagedriver.StorageDriver` interface which uses [Mic
| `realm` | no | Domain name suffix for the Storage Service API endpoint. For example realm for "Azure in China" would be `core.chinacloudapi.cn` and realm for "Azure Government" would be `core.usgovcloudapi.net`. By default, this is `core.windows.net`. |
| `copy_status_poll_max_retry` | no | Max retry number for polling of copy operation status. Retries use a simple backoff algorithm where each retry number is multiplied by `copy_status_poll_delay`, and this number is used as the delay. Set to -1 to disable retries and abort if the copy does not complete immediately. Defaults to 5. |
| `copy_status_poll_delay` | no | Time to wait between retries for polling of copy operation status. This time is multiplied by N on each retry, where N is the retry number. Defaults to 100ms |
| `disablekeepalives` | no | Disables HTTP keep-alives on the HTTP tranport when `true`. Each connection to the server will be used for a single HTTP request. The default is `false`. |


## Related information
Expand Down
1 change: 1 addition & 0 deletions docs/content/storage-drivers/gcs.md
Expand Up @@ -14,6 +14,7 @@ An implementation of the `storagedriver.StorageDriver` interface which uses Goog
| `keyfile` | no | A private service account key file in JSON format used for [Service Account Authentication](https://cloud.google.com/storage/docs/authentication#service_accounts). |
| `rootdirectory` | no | The root directory tree in which all registry files are stored. Defaults to the empty string (bucket root). If a prefix is used, the path `bucketname/<prefix>` has to be pre-created before starting the registry. The prefix is applied to all Google Cloud Storage keys to allow you to segment data in your bucket if necessary.|
| `chunksize` | no (default 5242880) | This is the chunk size used for uploading large blobs, must be a multiple of 256*1024. |
| `disablekeepalives` | no | Disables HTTP keep-alives on the HTTP tranport when `true`. Each connection to the server will be used for a single HTTP request. The default is `false`. |

{{< hint type=note >}}
Instead of a key file you can use [Google Application Default Credentials](https://developers.google.com/identity/protocols/application-default-credentials).
Expand Down
4 changes: 4 additions & 0 deletions docs/content/storage-drivers/s3.md
Expand Up @@ -21,6 +21,7 @@ Amazon S3 or S3 compatible services for object storage.
| `keyid` | no | Optional KMS key ID to use for encryption (encrypt must be true, or this parameter is ignored). The default is `none`. |
| `secure` | no | Indicates whether to use HTTPS instead of HTTP. A boolean value. The default is `true`. |
| `skipverify` | no | Skips TLS verification when the value is set to `true`. The default is `false`. |
| `disablekeepalives` | no | Disables HTTP keep-alives on the HTTP tranport when `true`. Each connection to the server will be used for a single HTTP request. The default is `false`. |
| `v4auth` | no | Indicates whether the registry uses Version 4 of AWS's authentication. The default is `true`. |
| `chunksize` | no | The S3 API requires multipart upload chunks to be at least 5MB. This value should be a number that is larger than 5 * 1024 * 1024.|
| `multipartcopychunksize` | no | Default chunk size for all but the last S3 Multipart Upload part when copying stored objects. |
Expand All @@ -32,6 +33,9 @@ Amazon S3 or S3 compatible services for object storage.
| `usedualstack` | no | Use AWS dual-stack API endpoints. |
| `accelerate` | no | Enable S3 Transfer Acceleration. |
| `objectacl` | no | The S3 Canned ACL for objects. The default value is "private". |
| `usedualstack` | no | Use AWS S3 dual-stack endpoints, which support both IPv6 and IPv4, when set to `true`. The default is `false`. |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usedualstack is already documented, three rows up.

| `multipartcombinesmallpart` | no | Combines small pending uploads with the ready part when `true`. The default is `true`. |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this option implemented? I don't see it anywhere else in the PR nor could I find any evidence in the code base that this was already implemented.

| `accelerate` | no | Use AWS S3 Transfer Acceleration endpoints when set to `true`. The default is `false`. |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accelerate is already documented, three rows up.

| `loglevel` | no | The log level for the S3 client. The default value is `off`. |

> **Note** You can provide empty strings for your access and secret keys to run the driver
Expand Down
33 changes: 32 additions & 1 deletion registry/storage/driver/azure/azure_auth.go
Expand Up @@ -2,6 +2,7 @@ package azure

import (
"context"
"net/http"
"sync"
"time"

Expand Down Expand Up @@ -45,6 +46,14 @@ type azureClient struct {
signer signer
}

type azureTransporter struct {
roundTripper http.RoundTripper
}

func (a *azureTransporter) Do(req *http.Request) (*http.Response, error) {
return a.roundTripper.RoundTrip(req)
}
tvdfly marked this conversation as resolved.
Show resolved Hide resolved

func newAzureClient(params *Parameters) (*azureClient, error) {
if params.AccountKey != "" {
cred, err := azblob.NewSharedKeyCredential(params.AccountName, params.AccountKey)
Expand Down Expand Up @@ -76,7 +85,29 @@ func newAzureClient(params *Parameters) (*azureClient, error) {
return nil, err
}

client, err := azblob.NewClient(params.ServiceURL, cred, nil)
httpTransportModified := false
httpTransport := http.DefaultTransport.(*http.Transport).Clone()

if params.DisableKeepAlives {
httpTransport.DisableKeepAlives = true
httpTransportModified = true
}

var httpRoundTripper http.RoundTripper = httpTransport

var azClientOpts *azblob.ClientOptions

if httpTransportModified {
azClientOpts = &azblob.ClientOptions{
ClientOptions: azcore.ClientOptions{
Transport: &azureTransporter{
roundTripper: httpRoundTripper,
},
},
}
}

client, err := azblob.NewClient(params.ServiceURL, cred, azClientOpts)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions registry/storage/driver/azure/parser.go
Expand Up @@ -31,6 +31,7 @@ type Parameters struct {
ServiceURL string `mapstructure:"serviceurl"`
CopyStatusPollMaxRetry int `mapstructure:"copy_status_poll_max_retry"`
CopyStatusPollDelay string `mapstructure:"copy_status_poll_delay"`
DisableKeepAlives bool `mapstructure:"disablekeepalives"`
}

func NewParameters(parameters map[string]interface{}) (*Parameters, error) {
Expand Down
30 changes: 30 additions & 0 deletions registry/storage/driver/gcs/gcs.go
Expand Up @@ -151,11 +151,41 @@ func FromParameters(ctx context.Context, parameters map[string]interface{}) (sto
}
}

disableKeepAlivesParam := false
disableKeepAlives := parameters["disablekeepalives"]
switch disableKeepAlives := disableKeepAlives.(type) {
case string:
b, err := strconv.ParseBool(disableKeepAlives)
if err != nil {
return nil, fmt.Errorf("the disablekeepalives parameter should be a boolean")
}
disableKeepAlivesParam = b
case bool:
disableKeepAlivesParam = disableKeepAlives
case nil:
// do nothing
default:
return nil, fmt.Errorf("the disablekeepalives parameter should be a boolean")
}
Comment on lines +154 to +169
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very repetitive. Given that the same validation rules apply to all the boolean parameters, how about extracting the common logic to a function or closure? E.g.:

boolParam := func(name string) (bool, error) { /* ... */ }

disableKeepAlives, err := boolParam("disablekeepalives")
if err != nil {
    return nil, err
}

If you want to get really fancy, you could apply the panic-recover error handling pattern to elide having to write if err != nil when parsing each parameter, e.g.:

if boolParam("disablekeepalives") {
    /* ... */
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should that kind of refactoring be a separate pr? I was trying to maintain the pattern in nearby code. I'm happy to do that here for gcs.

The azure driver's config handling does seem easier to work with than the more manual extraction gcs & aws drivers do. Any reason we shouldn't move toward that for gcs, and aws?


httpTransportModified := false
httpTransport := http.DefaultTransport.(*http.Transport).Clone()

if disableKeepAlivesParam {
httpTransport.DisableKeepAlives = true
httpTransportModified = true
}

var ts oauth2.TokenSource
jwtConf := new(jwt.Config)
var err error
var gcs *storage.Client
var options []option.ClientOption
if httpTransportModified {
options = append(options, option.WithHTTPClient(&http.Client{
Transport: httpTransport,
}))
}
if keyfile, ok := parameters["keyfile"]; ok {
jsonKey, err := os.ReadFile(fmt.Sprint(keyfile))
if err != nil {
Expand Down
36 changes: 34 additions & 2 deletions registry/storage/driver/s3-aws/s3.go
Expand Up @@ -106,6 +106,7 @@ type DriverParameters struct {
KeyID string
Secure bool
SkipVerify bool
DisableKeepAlives bool
V4Auth bool
ChunkSize int64
MultipartCopyChunkSize int64
Expand Down Expand Up @@ -290,6 +291,23 @@ func FromParameters(ctx context.Context, parameters map[string]interface{}) (*Dr
return nil, fmt.Errorf("the skipVerify parameter should be a boolean")
}

disableKeepAlivesBool := false
disableKeepAlives := parameters["disablekeepalives"]
switch disableKeepAlives := disableKeepAlives.(type) {
case string:
b, err := strconv.ParseBool(disableKeepAlives)
if err != nil {
return nil, fmt.Errorf("the disablekeepalives parameter should be a boolean")
}
disableKeepAlivesBool = b
case bool:
disableKeepAlivesBool = disableKeepAlives
case nil:
// do nothing
default:
return nil, fmt.Errorf("the disablekeepalives parameter should be a boolean")
}

v4Bool := true
v4auth := parameters["v4auth"]
switch v4auth := v4auth.(type) {
Expand Down Expand Up @@ -433,6 +451,7 @@ func FromParameters(ctx context.Context, parameters map[string]interface{}) (*Dr
fmt.Sprint(keyID),
secureBool,
skipVerifyBool,
disableKeepAlivesBool,
v4Bool,
chunkSize,
multipartCopyChunkSize,
Expand Down Expand Up @@ -539,11 +558,24 @@ func New(ctx context.Context, params DriverParameters) (*Driver, error) {
awsConfig.UseDualStackEndpoint = endpoints.DualStackEndpointStateEnabled
}

httpTransportModified := false
httpTransport := http.DefaultTransport.(*http.Transport).Clone()

if params.SkipVerify {
httpTransport := http.DefaultTransport.(*http.Transport).Clone()
httpTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
httpTransportModified = true
}

if params.DisableKeepAlives {
httpTransport.DisableKeepAlives = true
httpTransportModified = true
}

var httpRoundTripper http.RoundTripper = httpTransport

if httpTransportModified {
awsConfig.WithHTTPClient(&http.Client{
Transport: httpTransport,
Transport: httpRoundTripper,
})
}

Expand Down
42 changes: 26 additions & 16 deletions registry/storage/driver/s3-aws/s3_test.go
Expand Up @@ -29,22 +29,23 @@ var (

func init() {
var (
accessKey = os.Getenv("AWS_ACCESS_KEY")
secretKey = os.Getenv("AWS_SECRET_KEY")
bucket = os.Getenv("S3_BUCKET")
encrypt = os.Getenv("S3_ENCRYPT")
keyID = os.Getenv("S3_KEY_ID")
secure = os.Getenv("S3_SECURE")
skipVerify = os.Getenv("S3_SKIP_VERIFY")
v4Auth = os.Getenv("S3_V4_AUTH")
region = os.Getenv("AWS_REGION")
objectACL = os.Getenv("S3_OBJECT_ACL")
regionEndpoint = os.Getenv("REGION_ENDPOINT")
forcePathStyle = os.Getenv("AWS_S3_FORCE_PATH_STYLE")
sessionToken = os.Getenv("AWS_SESSION_TOKEN")
useDualStack = os.Getenv("S3_USE_DUALSTACK")
accelerate = os.Getenv("S3_ACCELERATE")
logLevel = os.Getenv("S3_LOGLEVEL")
accessKey = os.Getenv("AWS_ACCESS_KEY")
secretKey = os.Getenv("AWS_SECRET_KEY")
bucket = os.Getenv("S3_BUCKET")
encrypt = os.Getenv("S3_ENCRYPT")
keyID = os.Getenv("S3_KEY_ID")
secure = os.Getenv("S3_SECURE")
skipVerify = os.Getenv("S3_SKIP_VERIFY")
disableKeepAlives = os.Getenv("S3_DISABLE_KEEP_ALIVES")
v4Auth = os.Getenv("S3_V4_AUTH")
region = os.Getenv("AWS_REGION")
objectACL = os.Getenv("S3_OBJECT_ACL")
regionEndpoint = os.Getenv("REGION_ENDPOINT")
forcePathStyle = os.Getenv("AWS_S3_FORCE_PATH_STYLE")
sessionToken = os.Getenv("AWS_SESSION_TOKEN")
useDualStack = os.Getenv("S3_USE_DUALSTACK")
accelerate = os.Getenv("S3_ACCELERATE")
logLevel = os.Getenv("S3_LOGLEVEL")
)

var err error
Expand Down Expand Up @@ -73,6 +74,14 @@ func init() {
}
}

disableKeepAlivesBool := false
if disableKeepAlives != "" {
disableKeepAlivesBool, err = strconv.ParseBool(disableKeepAlives)
if err != nil {
return nil, err
}
}

v4Bool := true
if v4Auth != "" {
v4Bool, err = strconv.ParseBool(v4Auth)
Expand Down Expand Up @@ -112,6 +121,7 @@ func init() {
keyID,
secureBool,
skipVerifyBool,
disableKeepAlivesBool,
v4Bool,
minChunkSize,
defaultMultipartCopyChunkSize,
Expand Down