Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
petemoore committed Jun 2, 2023
1 parent e2153f3 commit 831242e
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 14 deletions.
8 changes: 8 additions & 0 deletions changelog/issue-6276.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
audience: users
level: patch
reference: issue 6276
---
Generic Worker no longer uses locally cached Indexed Artifact content without first checking whether the Index has been updated since the local content was cached.
Previously, if a task mounted indexed artifact content from a namespace/artifact name that had previously been downloaded, it would reuse this content without
first checking whether the artifact was still current. Now generic-worker will check what the taskID is under the given index namespace, and if it has changed
since the previous download, it will download the new version of the artifact from the current taskId that is stored in the index under the given namespace.
26 changes: 23 additions & 3 deletions internal/mocktc/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,38 @@ import (
)

type Index struct {
t *testing.T
t *testing.T
entries map[string]*tcindex.InsertTaskRequest
}

func NewIndex(t *testing.T) *Index {
t.Helper()
return &Index{
t: t,
t: t,
entries: make(map[string]*tcindex.InsertTaskRequest),
}
}

/////////////////////////////////////////////////

func (index *Index) FindTask(indexPath string) (*tcindex.IndexedTaskResponse, error) {
return &tcindex.IndexedTaskResponse{}, nil
payload := index.entries[indexPath]
return &tcindex.IndexedTaskResponse{
Data: payload.Data,
Expires: payload.Expires,
Namespace: indexPath,
Rank: payload.Rank,
TaskID: payload.TaskID,
}, nil
}

func (index *Index) InsertTask(namespace string, payload *tcindex.InsertTaskRequest) (*tcindex.IndexedTaskResponse, error) {
index.entries[namespace] = payload
return &tcindex.IndexedTaskResponse{
Data: payload.Data,
Expires: payload.Expires,
Namespace: namespace,
Rank: payload.Rank,
TaskID: payload.TaskID,
}, nil
}
14 changes: 13 additions & 1 deletion internal/mocktc/indexprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net/http"

"github.com/gorilla/mux"
"github.com/taskcluster/taskcluster/v52/clients/client-go/tcindex"
"github.com/taskcluster/taskcluster/v52/internal/mocktc/tc"
)

Expand All @@ -20,8 +21,19 @@ func NewIndexProvider(index tc.Index) *IndexProvider {
func (ip *IndexProvider) RegisterService(r *mux.Router) {
s := r.PathPrefix("/api/index/v1").Subrouter()
s.HandleFunc("/task/{indexPath}", ip.FindTask).Methods("GET")
s.HandleFunc("/task/{indexPath}", ip.InsertTask).Methods("PUT")
}

func (ip *IndexProvider) FindTask(w http.ResponseWriter, r *http.Request) {
// Not used by any tests currently
vars := Vars(r)
out, err := ip.index.FindTask(vars["indexPath"])
JSON(w, out, err)
}

func (ip *IndexProvider) InsertTask(w http.ResponseWriter, r *http.Request) {
vars := Vars(r)
var payload tcindex.InsertTaskRequest
Marshal(r, &payload)
out, err := ip.index.InsertTask(vars["indexPath"], &payload)
JSON(w, out, err)
}
1 change: 1 addition & 0 deletions internal/mocktc/tc/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Auth interface {

type Index interface {
FindTask(indexPath string) (*tcindex.IndexedTaskResponse, error)
InsertTask(namespace string, payload *tcindex.InsertTaskRequest) (*tcindex.IndexedTaskResponse, error)
}

type WorkerManager interface {
Expand Down
104 changes: 94 additions & 10 deletions workers/generic-worker/mounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ package main

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"testing"
"time"

"github.com/mcuadros/go-defaults"
"github.com/taskcluster/slugid-go/slugid"

"github.com/taskcluster/taskcluster/v52/clients/client-go/tcindex"
"github.com/taskcluster/taskcluster/v52/workers/generic-worker/gwconfig"
)

Expand Down Expand Up @@ -218,6 +222,7 @@ type MountsLoggingTestCase struct {
// This is an extremely strict test helper, that requires you to specify
// extracts from every log line that the mounts feature writes to the log
func LogTest(m *MountsLoggingTestCase) {
m.Test.Helper()
payload := m.Payload
if payload == nil {
payload = &GenericWorkerPayload{
Expand Down Expand Up @@ -315,8 +320,9 @@ func TestValidSHA256(t *testing.T) {
setup(t)
taskID := CreateArtifactFromFile(t, "unknown_issuer_app_1.zip", "public/build/unknown_issuer_app_1.zip")

// whether permission is granted to task user depends if running under windows or not
// and is independent of whether running as current user or not
// Whether permission is granted to task user depends if running multiuser
// engine or simple engine but is independent of whether running as current
// user or not.
granting, _ := grantingDenying(t, "directory", "unknown_issuer_app_1")

// Required text from first task with no cached value
Expand Down Expand Up @@ -370,8 +376,9 @@ func TestFileMountNoSHA256(t *testing.T) {
setup(t)
taskID := CreateArtifactFromFile(t, "unknown_issuer_app_1.zip", "public/build/unknown_issuer_app_1.zip")

// whether permission is granted to task user depends if running under windows or not
// and is independent of whether running as current user or not
// Whether permission is granted to task user depends if running multiuser
// engine or simple engine but is independent of whether running as current
// user or not.
granting, _ := grantingDenying(t, "file", t.Name())

// No cache on first pass
Expand Down Expand Up @@ -472,8 +479,9 @@ func TestWritableDirectoryCacheNoSHA256(t *testing.T) {
setup(t)
taskID := CreateArtifactFromFile(t, "unknown_issuer_app_1.zip", "public/build/unknown_issuer_app_1.zip")

// whether permission is granted to task user depends if running under windows or not
// and is independent of whether running as current user or not
// Whether permission is granted to task user depends if running multiuser
// engine or simple engine but is independent of whether running as current
// user or not.
granting, denying := grantingDenying(t, "directory", t.Name())

// No cache on first pass
Expand Down Expand Up @@ -793,8 +801,9 @@ func TestCacheMoved(t *testing.T) {
setup(t)
taskID := CreateArtifactFromFile(t, "unknown_issuer_app_1.zip", "public/build/unknown_issuer_app_1.zip")

// whether permission is granted to task user depends if running under windows or not
// and is independent of whether running as current user or not
// Whether permission is granted to task user depends if running multiuser
// engine or simple engine but is independent of whether running as current
// user or not.
granting, _ := grantingDenying(t, "directory", t.Name())

// No cache on first pass
Expand Down Expand Up @@ -877,8 +886,9 @@ func TestMountFileAndDirSameLocation(t *testing.T) {
setup(t)
taskID := CreateArtifactFromFile(t, "unknown_issuer_app_1.zip", "public/build/unknown_issuer_app_1.zip")

// whether permission is granted to task user depends if running under windows or not
// and is independent of whether running as current user or not
// Whether permission is granted to task user depends if running multiuser
// engine or simple engine but is independent of whether running as current
// user or not.
granting, _ := grantingDenying(t, "file", "file-located-here")

// No cache on first pass
Expand Down Expand Up @@ -951,6 +961,80 @@ func TestMountFileAndDirSameLocation(t *testing.T) {
)
}

func TestIndexedArtifact(t *testing.T) {
setup(t)

testIndexedArtifact := func(file string, namespace string, rank float64) {
// Create a task containing artifact
taskID1 := CreateArtifactFromFile(t, file, "public/indexed-artifact")

// Now index it under a unique namespace (so that test runs don't interfere with each other if using real taskcluster index service)
index := serviceFactory.Index(config.Credentials(), config.RootURL)

_, err := index.InsertTask(namespace, &tcindex.InsertTaskRequest{
Data: json.RawMessage([]byte("{}")),
Expires: inAnHour,
TaskID: taskID1,
Rank: rank,
})
if err != nil {
t.Fatal(err)
}

// Now create a task to mount the file, using not the taskID, but the namespace where it was inserted into the index
ic := &IndexedContent{
Artifact: "public/indexed-artifact",
Namespace: namespace,
}
rawMessageContent, err := json.Marshal(ic)
if err != nil {
t.Fatal(err)
}
fileMount := &FileMount{
File: file,
Content: rawMessageContent,
}
rawMessageMount, err := json.Marshal(fileMount)
if err != nil {
t.Fatal(err)
}
payload1 := GenericWorkerPayload{
Mounts: []json.RawMessage{
rawMessageMount,
},
// declare the mount also as an artifact, so that the file is mounted and immediately published as an artifact
Artifacts: []Artifact{
Artifact{
Name: "public/republished-artifact",
Path: file,
Type: "file",
},
},
Command: helloGoodbye(),
MaxRunTime: 30,
}
defaults.SetDefaults(&payload1)

td := testTask(t)

// taskID2 mounts and publishes the artifact from taskID1
taskID2 := submitAndAssert(t, td, payload1, "completed", "completed")

// now check that in the process of mounting the artifact via the index namespace, that the content of the artifact hasn't changed
data1 := getArtifactContent(t, taskID1, "public/indexed-artifact")
data2 := getArtifactContent(t, taskID2, "public/republished-artifact")

// simplest way to compare two byte slices is to convert to strings, which can be directly compared
if string(data1) != string(data2) {
t.Fatalf("Was expecting artifact from file %v in tasks %v and %v to be identical, but they weren't: %v, %v", file, taskID1, taskID2, len(data1), len(data2))
}
}

namespace := fmt.Sprintf("garbage.generic-worker.TestIndexedArtifact.%v", time.Now().UnixMilli())
testIndexedArtifact("unknown_issuer_app_1.zip", namespace, 119)
testIndexedArtifact("mozharness.zip", namespace, 239)
}

func TestInvalidSHADoesNotPreventMountedMountsFromBeingUnmounted(t *testing.T) {

setup(t)
Expand Down

0 comments on commit 831242e

Please sign in to comment.