Skip to content

Commit

Permalink
Add a max proxy blob size option (#526)
Browse files Browse the repository at this point in the history
* Add a max proxy blob size option

When the proxy server is only connected with a slow network link, it's sometimes beneficial not to download large blobs and recompile them locally instead.

With the new maxProxyBlobSize setting, the remote blob is not read when it is larger.
  • Loading branch information
flode committed Feb 28, 2022
1 parent f5d834b commit 8a93ff7
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 13 deletions.
5 changes: 5 additions & 0 deletions README.md
Expand Up @@ -221,6 +221,11 @@ OPTIONS:
preexisting blobs in the cache. (default: 9223372036854775807)
[$BAZEL_REMOTE_MAX_BLOB_SIZE]
--max_proxy_blob_size value The maximum logical/uncompressed blob size that will
be downloaded from proxies. Note that this limit is not applied to
preexisting blobs in the cache. (default: 9223372036854775807)
[$BAZEL_REMOTE_MAX_PROXY_BLOB_SIZE]
--num_uploaders value When using proxy backends, sets the number of
Goroutines to process parallel uploads to backend. (default: 100)
[$BAZEL_REMOTE_NUM_UPLOADERS]
Expand Down
28 changes: 17 additions & 11 deletions cache/disk/disk.go
Expand Up @@ -69,12 +69,13 @@ type lruItem struct {
// diskCache is a filesystem-based LRU cache, with an optional backend proxy.
// It is safe for concurrent use.
type diskCache struct {
dir string
proxy cache.Proxy
storageMode casblob.CompressionType
maxBlobSize int64
accessLogger *log.Logger
containsQueue chan proxyCheck
dir string
proxy cache.Proxy
storageMode casblob.CompressionType
maxBlobSize int64
maxProxyBlobSize int64
accessLogger *log.Logger
containsQueue chan proxyCheck

// Limit the number of simultaneous file removals.
fileRemovalSem *semaphore.Weighted
Expand Down Expand Up @@ -123,8 +124,9 @@ func New(dir string, maxSizeBytes int64, opts ...Option) (Cache, error) {
dir: dir,

// Not using config here, to avoid test import cycles.
storageMode: casblob.Zstandard,
maxBlobSize: math.MaxInt64,
storageMode: casblob.Zstandard,
maxBlobSize: math.MaxInt64,
maxProxyBlobSize: math.MaxInt64,

// Go defaults to a limit of 10,000 operating system threads.
// We probably don't need half of those for file removals at
Expand Down Expand Up @@ -815,7 +817,7 @@ func (c *diskCache) availableOrTryProxy(kind cache.EntryKind, hash string, size
}
err = nil

if c.proxy != nil {
if c.proxy != nil && size <= c.maxProxyBlobSize {
if size > 0 {
// If we know the size, attempt to reserve that much space.
if !locked {
Expand Down Expand Up @@ -944,6 +946,10 @@ func (c *diskCache) get(ctx context.Context, kind cache.EntryKind, hash string,
if r == nil {
return nil, -1, nil
}
if foundSize > c.maxProxyBlobSize {
r.Close()
return nil, -1, nil
}

if isSizeMismatch(size, foundSize) || foundSize < 0 {
return nil, -1, nil
Expand Down Expand Up @@ -1038,9 +1044,9 @@ func (c *diskCache) Contains(ctx context.Context, kind cache.EntryKind, hash str
return true, foundSize
}

if c.proxy != nil {
if c.proxy != nil && size <= c.maxProxyBlobSize {
exists, foundSize = c.proxy.Contains(ctx, kind, hash)
if exists && !isSizeMismatch(size, foundSize) {
if exists && foundSize <= c.maxProxyBlobSize && !isSizeMismatch(size, foundSize) {
return true, foundSize
}
}
Expand Down
19 changes: 18 additions & 1 deletion cache/disk/disk_test.go
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"net/http"
"net/http/httptest"
"net/url"
Expand Down Expand Up @@ -913,8 +914,24 @@ func TestHttpProxyBackend(t *testing.T) {
t.Fatal("Expected testCache to be empty")
}

// Add the proxy backend and check that we can Get the item.
// Add the proxy backend
testCache.proxy = proxy
testCache.maxProxyBlobSize = blobSize - 1
found, _ = testCache.Contains(ctx, cache.CAS, casHash, blobSize)
if found {
t.Fatalf("Expected the cache to not contain %s (via the proxy)", casHash)
}

r, _, err = testCache.Get(ctx, cache.CAS, casHash, blobSize, 0)
if err != nil {
t.Fatal(err)
}
if r != nil {
t.Fatal("Expected the Get to fail")
}

// Set a larger max proxy blob size and check that we can Get the item.
testCache.maxProxyBlobSize = math.MaxInt64

found, _ = testCache.Contains(ctx, cache.CAS, casHash, blobSize)
if !found {
Expand Down
11 changes: 11 additions & 0 deletions cache/disk/options.go
Expand Up @@ -57,6 +57,17 @@ func WithProxyBackend(proxy cache.Proxy) Option {
}
}

func WithProxyMaxBlobSize(maxProxyBlobSize int64) Option {
return func(c *CacheConfig) error {
if maxProxyBlobSize <= 0 {
return fmt.Errorf("Invalid MaxProxyBlobSize: %d", maxProxyBlobSize)
}

c.diskCache.maxProxyBlobSize = maxProxyBlobSize
return nil
}
}

func WithAccessLogger(logger *log.Logger) Option {
return func(c *CacheConfig) error {
c.diskCache.accessLogger = logger
Expand Down
11 changes: 10 additions & 1 deletion config/config.go
Expand Up @@ -62,6 +62,7 @@ type Config struct {
HTTPWriteTimeout time.Duration `yaml:"http_write_timeout"`
AccessLogLevel string `yaml:"access_log_level"`
MaxBlobSize int64 `yaml:"max_blob_size"`
MaxProxyBlobSize int64 `yaml:"max_proxy_blob_size"`

// Fields that are created by combinations of the flags above.
ProxyBackend cache.Proxy
Expand Down Expand Up @@ -112,7 +113,8 @@ func newFromArgs(dir string, maxSize int, storageMode string,
httpReadTimeout time.Duration,
httpWriteTimeout time.Duration,
accessLogLevel string,
maxBlobSize int64) (*Config, error) {
maxBlobSize int64,
maxProxyBlobSize int64) (*Config, error) {

c := Config{
HTTPAddress: httpAddress,
Expand Down Expand Up @@ -142,6 +144,7 @@ func newFromArgs(dir string, maxSize int, storageMode string,
HTTPWriteTimeout: httpWriteTimeout,
AccessLogLevel: accessLogLevel,
MaxBlobSize: maxBlobSize,
MaxProxyBlobSize: maxProxyBlobSize,
}

err := validateConfig(&c)
Expand Down Expand Up @@ -176,6 +179,7 @@ func newFromYaml(data []byte) (*Config, error) {
NumUploaders: 100,
MaxQueuedUploads: 1000000,
MaxBlobSize: math.MaxInt64,
MaxProxyBlobSize: math.MaxInt64,
MetricsDurationBuckets: defaultDurationBuckets,
AccessLogLevel: "all",
},
Expand Down Expand Up @@ -300,6 +304,10 @@ func validateConfig(c *Config) error {
return errors.New("The 'max_blob_size' flag/key must be a positive integer")
}

if c.MaxProxyBlobSize <= 0 {
return errors.New("The 'max_proxy_blob_size' flag/key must be a positive integer")
}

if c.GoogleCloudStorage != nil && c.HTTPBackend != nil && c.S3CloudStorage != nil {
return errors.New("One can specify at most one proxying backend")
}
Expand Down Expand Up @@ -457,5 +465,6 @@ func get(ctx *cli.Context) (*Config, error) {
ctx.Duration("http_write_timeout"),
ctx.String("access_log_level"),
ctx.Int64("max_blob_size"),
ctx.Int64("max_proxy_blob_size"),
)
}
9 changes: 9 additions & 0 deletions config/config_test.go
Expand Up @@ -51,6 +51,7 @@ access_log_level: none
NumUploaders: 100,
MaxQueuedUploads: 1000000,
MaxBlobSize: math.MaxInt64,
MaxProxyBlobSize: math.MaxInt64,
MetricsDurationBuckets: []float64{.5, 1, 2.5, 5, 10, 20, 40, 80, 160, 320},
AccessLogLevel: "none",
}
Expand Down Expand Up @@ -90,6 +91,7 @@ gcs_proxy:
NumUploaders: 100,
MaxQueuedUploads: 1000000,
MaxBlobSize: math.MaxInt64,
MaxProxyBlobSize: math.MaxInt64,
MetricsDurationBuckets: []float64{.5, 1, 2.5, 5, 10, 20, 40, 80, 160, 320},
AccessLogLevel: "all",
}
Expand Down Expand Up @@ -126,6 +128,7 @@ http_proxy:
NumUploaders: 100,
MaxQueuedUploads: 1000000,
MaxBlobSize: math.MaxInt64,
MaxProxyBlobSize: math.MaxInt64,
MetricsDurationBuckets: []float64{.5, 1, 2.5, 5, 10, 20, 40, 80, 160, 320},
AccessLogLevel: "all",
}
Expand Down Expand Up @@ -199,6 +202,7 @@ s3_proxy:
NumUploaders: 100,
MaxQueuedUploads: 1000000,
MaxBlobSize: math.MaxInt64,
MaxProxyBlobSize: math.MaxInt64,
MetricsDurationBuckets: []float64{.5, 1, 2.5, 5, 10, 20, 40, 80, 160, 320},
AccessLogLevel: "all",
}
Expand Down Expand Up @@ -229,6 +233,7 @@ profile_address: :7070
NumUploaders: 100,
MaxQueuedUploads: 1000000,
MaxBlobSize: math.MaxInt64,
MaxProxyBlobSize: math.MaxInt64,
MetricsDurationBuckets: []float64{.5, 1, 2.5, 5, 10, 20, 40, 80, 160, 320},
AccessLogLevel: "all",
}
Expand Down Expand Up @@ -273,6 +278,7 @@ endpoint_metrics_duration_buckets: [.005, .1, 5]
NumUploaders: 100,
MaxQueuedUploads: 1000000,
MaxBlobSize: math.MaxInt64,
MaxProxyBlobSize: math.MaxInt64,
MetricsDurationBuckets: []float64{0.005, 0.1, 5},
AccessLogLevel: "all",
}
Expand All @@ -287,6 +293,7 @@ func TestMetricsDurationBucketsNoDuplicates(t *testing.T) {
HTTPAddress: "localhost:8080",
MaxSize: 42,
MaxBlobSize: 200,
MaxProxyBlobSize: math.MaxInt64,
Dir: "/opt/cache-dir",
StorageMode: "uncompressed",
MetricsDurationBuckets: []float64{1, 2, 3, 3},
Expand Down Expand Up @@ -398,6 +405,7 @@ storage_mode: zstd
NumUploaders: 100,
MaxQueuedUploads: 1000000,
MaxBlobSize: math.MaxInt64,
MaxProxyBlobSize: math.MaxInt64,
MetricsDurationBuckets: []float64{.5, 1, 2.5, 5, 10, 20, 40, 80, 160, 320},
AccessLogLevel: "all",
}
Expand Down Expand Up @@ -428,6 +436,7 @@ storage_mode: zstd
NumUploaders: 100,
MaxQueuedUploads: 1000000,
MaxBlobSize: math.MaxInt64,
MaxProxyBlobSize: math.MaxInt64,
MetricsDurationBuckets: []float64{.5, 1, 2.5, 5, 10, 20, 40, 80, 160, 320},
AccessLogLevel: "all",
}
Expand Down
1 change: 1 addition & 0 deletions main.go
Expand Up @@ -88,6 +88,7 @@ func run(ctx *cli.Context) error {
opts := []disk.Option{
disk.WithStorageMode(c.StorageMode),
disk.WithMaxBlobSize(c.MaxBlobSize),
disk.WithProxyMaxBlobSize(c.MaxProxyBlobSize),
disk.WithAccessLogger(c.AccessLogger),
}
if c.ProxyBackend != nil {
Expand Down
7 changes: 7 additions & 0 deletions utils/flags/flags.go
Expand Up @@ -158,6 +158,13 @@ func GetCliFlags() []cli.Flag {
DefaultText: strconv.FormatInt(math.MaxInt64, 10),
EnvVars: []string{"BAZEL_REMOTE_MAX_BLOB_SIZE"},
},
&cli.Int64Flag{
Name: "max_proxy_blob_size",
Value: math.MaxInt64,
Usage: "The maximum logical/uncompressed blob size that will be downloaded from proxies. Note that this limit is not applied to preexisting blobs in the cache.",
DefaultText: strconv.FormatInt(math.MaxInt64, 10),
EnvVars: []string{"BAZEL_REMOTE_MAX_PROXY_BLOB_SIZE"},
},
&cli.IntFlag{
Name: "num_uploaders",
Value: 100,
Expand Down

0 comments on commit 8a93ff7

Please sign in to comment.