Skip to content

Commit

Permalink
Merge branch 'master' into regen_gocloud
Browse files Browse the repository at this point in the history
  • Loading branch information
gcf-merge-on-green[bot] committed Aug 13, 2021
2 parents f8f4c33 + ee2756f commit 0bf53a7
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 18 deletions.
9 changes: 5 additions & 4 deletions internal/carver/cmd/main.go
Expand Up @@ -365,10 +365,11 @@ func (c *carver) GitCommit() error {
}
}
log.Println("Successfully carved out module. Please run the following commands after your change is merged:")
fmt.Fprintf(os.Stdout, "git tag -a %s <COMMIT-SHA>\n", c.rootMod.Tag())
fmt.Fprintf(os.Stdout, "git tag -a %s <COMMIT-SHA>\n", c.childMod.Tag())
fmt.Fprintf(os.Stdout, "git push origin ref/tags/%s\n", c.rootMod.Tag())
fmt.Fprintf(os.Stdout, "git push origin ref/tags/%s\n", c.childMod.Tag())
fmt.Fprintf(os.Stdout, "git tag %s <COMMIT-SHA>\n", c.rootMod.Tag())
fmt.Fprintf(os.Stdout, "git tag %s <COMMIT-SHA>\n", c.childMod.Tag())
fmt.Fprintf(os.Stdout, "git push origin refs/tags/%s\n", c.rootMod.Tag())
fmt.Fprintf(os.Stdout, "git push origin refs/tags/%s\n", c.childMod.Tag())
fmt.Fprintf(os.Stdout, "Once tags have propagated open a new PR tidying the new child mods go.sum entries.\n")

return nil
}
Expand Down
6 changes: 2 additions & 4 deletions pubsub/subscription.go
Expand Up @@ -48,7 +48,6 @@ type Subscription struct {
mu sync.Mutex
receiveActive bool

once sync.Once
enableOrdering bool
}

Expand Down Expand Up @@ -785,6 +784,8 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
s.mu.Unlock()
defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }()

s.checkOrdering()

maxCount := s.ReceiveSettings.MaxOutstandingMessages
if maxCount == 0 {
maxCount = DefaultReceiveSettings.MaxOutstandingMessages
Expand Down Expand Up @@ -897,9 +898,6 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
}
for i, msg := range msgs {
msg := msg
if msg.OrderingKey != "" {
s.once.Do(s.checkOrdering)
}
// TODO(jba): call acquire closer to when the message is allocated.
if err := fc.acquire(ctx, len(msg.Data)); err != nil {
// TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done.
Expand Down
8 changes: 4 additions & 4 deletions storage/storage.go
Expand Up @@ -90,8 +90,6 @@ type Client struct {
raw *raw.Service
// Scheme describes the scheme under the current host.
scheme string
// EnvHost is the host set on the STORAGE_EMULATOR_HOST variable.
envHost string
// ReadHost is the default host used on the reader.
readHost string
}
Expand Down Expand Up @@ -141,18 +139,20 @@ func NewClient(ctx context.Context, opts ...option.ClientOption) (*Client, error
if err != nil {
return nil, fmt.Errorf("storage client: %v", err)
}
// Update readHost with the chosen endpoint.
// Update readHost and scheme with the chosen endpoint.
u, err := url.Parse(ep)
if err != nil {
return nil, fmt.Errorf("supplied endpoint %q is not valid: %v", ep, err)
}
readHost = u.Host
if u.Scheme != "" {
scheme = u.Scheme
}

return &Client{
hc: hc,
raw: rawService,
scheme: scheme,
envHost: host,
readHost: readHost,
}, nil
}
Expand Down
27 changes: 24 additions & 3 deletions storage/storage_test.go
Expand Up @@ -1252,38 +1252,59 @@ func TestAttrToFieldMapCoverage(t *testing.T) {
func TestWithEndpoint(t *testing.T) {
originalStorageEmulatorHost := os.Getenv("STORAGE_EMULATOR_HOST")
testCases := []struct {
desc string
CustomEndpoint string
StorageEmulatorHost string
WantRawBasePath string
WantReadHost string
WantScheme string
}{
{
desc: "No endpoint or emulator host specified",
CustomEndpoint: "",
StorageEmulatorHost: "",
WantRawBasePath: "https://storage.googleapis.com/storage/v1/",
WantReadHost: "storage.googleapis.com",
WantScheme: "https",
},
{
desc: "With specified https endpoint, no specified emulator host",
CustomEndpoint: "https://fake.gcs.com:8080/storage/v1",
StorageEmulatorHost: "",
WantRawBasePath: "https://fake.gcs.com:8080/storage/v1",
WantReadHost: "fake.gcs.com:8080",
WantScheme: "https",
},
{
desc: "With specified http endpoint, no specified emulator host",
CustomEndpoint: "http://fake.gcs.com:8080/storage/v1",
StorageEmulatorHost: "",
WantRawBasePath: "http://fake.gcs.com:8080/storage/v1",
WantReadHost: "fake.gcs.com:8080",
WantScheme: "http",
},
{
desc: "Emulator host specified, no specified endpoint",
CustomEndpoint: "",
StorageEmulatorHost: "http://emu.com",
WantRawBasePath: "http://emu.com",
WantReadHost: "emu.com",
WantScheme: "http",
},
{
desc: "Endpoint overrides emulator host when both are specified - https",
CustomEndpoint: "https://fake.gcs.com:8080/storage/v1",
StorageEmulatorHost: "http://emu.com",
WantRawBasePath: "https://fake.gcs.com:8080/storage/v1",
WantReadHost: "fake.gcs.com:8080",
WantScheme: "https",
},
{
desc: "Endpoint overrides emulator host when both are specified - http",
CustomEndpoint: "http://fake.gcs.com:8080/storage/v1",
StorageEmulatorHost: "https://emu.com",
WantRawBasePath: "http://fake.gcs.com:8080/storage/v1",
WantReadHost: "fake.gcs.com:8080",
WantScheme: "http",
},
}
Expand All @@ -1299,13 +1320,13 @@ func TestWithEndpoint(t *testing.T) {
}

if c.raw.BasePath != tc.WantRawBasePath {
t.Errorf("raw.BasePath not set correctly: got %v, want %v", c.raw.BasePath, tc.WantRawBasePath)
t.Errorf("%s: raw.BasePath not set correctly\n\tgot %v, want %v", tc.desc, c.raw.BasePath, tc.WantRawBasePath)
}
if c.readHost != tc.WantReadHost {
t.Errorf("readHost not set correctly: got %v, want %v", c.readHost, tc.WantReadHost)
t.Errorf("%s: readHost not set correctly\n\tgot %v, want %v", tc.desc, c.readHost, tc.WantReadHost)
}
if c.scheme != tc.WantScheme {
t.Errorf("scheme not set correctly: got %v, want %v", c.scheme, tc.WantScheme)
t.Errorf("%s: scheme not set correctly\n\tgot %v, want %v", tc.desc, c.scheme, tc.WantScheme)
}
}
os.Setenv("STORAGE_EMULATOR_HOST", originalStorageEmulatorHost)
Expand Down
3 changes: 0 additions & 3 deletions storage/writer.go
Expand Up @@ -125,9 +125,6 @@ func (w *Writer) open() error {
if w.MD5 != nil {
rawObj.Md5Hash = base64.StdEncoding.EncodeToString(w.MD5)
}
if w.o.c.envHost != "" {
w.o.c.raw.BasePath = fmt.Sprintf("%s://%s", w.o.c.scheme, w.o.c.envHost)
}
call := w.o.c.raw.Objects.Insert(w.o.bucket, rawObj).
Media(pr, mediaOpts...).
Projection("full").
Expand Down

0 comments on commit 0bf53a7

Please sign in to comment.