diff --git a/.changelog/1131.txt b/.changelog/1131.txt new file mode 100644 index 000000000..1434c51d6 --- /dev/null +++ b/.changelog/1131.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +queue: add support queue API +``` \ No newline at end of file diff --git a/queue.go b/queue.go new file mode 100644 index 000000000..0bd5a4dfa --- /dev/null +++ b/queue.go @@ -0,0 +1,381 @@ +package cloudflare + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "time" +) + +var ( + ErrMissingQueueName = errors.New("required queue name is missing") + ErrMissingQueueConsumerName = errors.New("required queue consumer name is missing") +) + +type Queue struct { + ID string `json:"queue_id,omitempty"` + Name string `json:"queue_name,omitempty"` + CreatedOn *time.Time `json:"created_on,omitempty"` + ModifiedOn *time.Time `json:"modified_on,omitempty"` + ProducersTotalCount int `json:"producers_total_count,omitempty"` + Producers []QueueProducer `json:"producers,omitempty"` + ConsumersTotalCount int `json:"consumers_total_count,omitempty"` + Consumers []QueueConsumer `json:"consumers,omitempty"` +} + +type QueueProducer struct { + Service string `json:"service,omitempty"` + Environment string `json:"environment,omitempty"` +} + +type QueueConsumer struct { + Name string `json:"-"` + Service string `json:"service,omitempty"` + ScriptName string `json:"script_name,omitempty"` + Environment string `json:"environment,omitempty"` + Settings QueueConsumerSettings `json:"settings,omitempty"` + QueueName string `json:"queue_name,omitempty"` + CreatedOn *time.Time `json:"created_on,omitempty"` + DeadLetterQueue string `json:"dead_letter_queue,omitempty"` +} + +type QueueConsumerSettings struct { + BatchSize int `json:"batch_size,omitempty"` + MaxRetires int `json:"max_retries,omitempty"` + MaxWaitTime int `json:"max_wait_time_ms,omitempty"` +} + +type QueueListResponse struct { + Response + ResultInfo `json:"result_info"` + Result []Queue `json:"result"` +} + +type CreateQueueParams struct { + Name string `json:"queue_name"` +} + +type QueueResponse struct { + Response + Result Queue `json:"result"` +} + +type ListQueueConsumersResponse struct { + Response + ResultInfo `json:"result_info"` + Result []QueueConsumer `json:"result"` +} + +type ListQueuesParams struct { + ResultInfo +} + +type QueueConsumerResponse struct { + Response + Result QueueConsumer `json:"result"` +} + +type UpdateQueueParams struct { + ID string `json:"queue_id,omitempty"` + Name string `json:"queue_name,omitempty"` + CreatedOn *time.Time `json:"created_on,omitempty"` + ModifiedOn *time.Time `json:"modified_on,omitempty"` + ProducersTotalCount int `json:"producers_total_count,omitempty"` + Producers []QueueProducer `json:"producers,omitempty"` + ConsumersTotalCount int `json:"consumers_total_count,omitempty"` + Consumers []QueueConsumer `json:"consumers,omitempty"` +} + +type ListQueueConsumersParams struct { + QueueName string `url:"-"` + ResultInfo +} + +type CreateQueueConsumerParams struct { + QueueName string `json:"-"` + Consumer QueueConsumer +} + +type UpdateQueueConsumerParams struct { + QueueName string `json:"-"` + Consumer QueueConsumer +} + +type DeleteQueueConsumerParams struct { + QueueName, ConsumerName string +} + +// ListQueues returns the queues owned by an account. +// +// API reference: https://api.cloudflare.com/#queue-list-queues +func (api *API) ListQueues(ctx context.Context, rc *ResourceContainer, params ListQueuesParams) ([]Queue, *ResultInfo, error) { + if rc.Identifier == "" { + return []Queue{}, &ResultInfo{}, ErrMissingAccountID + } + + autoPaginate := true + if params.PerPage >= 1 || params.Page >= 1 { + autoPaginate = false + } + if params.PerPage < 1 { + params.PerPage = 50 + } + if params.Page < 1 { + params.Page = 1 + } + + var queues []Queue + var qResponse QueueListResponse + for { + uri := buildURI(fmt.Sprintf("/accounts/%s/workers/queues", rc.Identifier), params) + + res, err := api.makeRequestContext(ctx, http.MethodGet, uri, nil) + if err != nil { + return []Queue{}, &ResultInfo{}, err + } + + err = json.Unmarshal(res, &qResponse) + if err != nil { + return []Queue{}, &ResultInfo{}, fmt.Errorf("failed to unmarshal filters JSON data: %w", err) + } + + queues = append(queues, qResponse.Result...) + params.ResultInfo = qResponse.ResultInfo.Next() + + if params.ResultInfo.Done() || !autoPaginate { + break + } + } + + return queues, &qResponse.ResultInfo, nil +} + +// CreateQueue creates a new queue. +// +// API reference: https://api.cloudflare.com/#queue-create-queue +func (api *API) CreateQueue(ctx context.Context, rc *ResourceContainer, queue CreateQueueParams) (Queue, error) { + if rc.Identifier == "" { + return Queue{}, ErrMissingAccountID + } + + if queue.Name == "" { + return Queue{}, ErrMissingQueueName + } + + uri := fmt.Sprintf("/accounts/%s/workers/queues", rc.Identifier) + res, err := api.makeRequestContext(ctx, http.MethodPost, uri, queue) + if err != nil { + return Queue{}, fmt.Errorf("%s: %w", errMakeRequestError, err) + } + + var r QueueResponse + err = json.Unmarshal(res, &r) + if err != nil { + return Queue{}, fmt.Errorf("%s: %w", errUnmarshalError, err) + } + return r.Result, nil +} + +// DeleteQueue deletes a queue. +// +// API reference: https://api.cloudflare.com/#queue-delete-queue +func (api *API) DeleteQueue(ctx context.Context, rc *ResourceContainer, queueName string) error { + if rc.Identifier == "" { + return ErrMissingAccountID + } + if queueName == "" { + return ErrMissingQueueName + } + + uri := fmt.Sprintf("/accounts/%s/workers/queues/%s", rc.Identifier, queueName) + _, err := api.makeRequestContext(ctx, http.MethodDelete, uri, nil) + if err != nil { + return fmt.Errorf("%s: %w", errMakeRequestError, err) + } + return nil +} + +// GetQueue returns a single queue based on the name. +// +// API reference: https://api.cloudflare.com/#queue-get-queue +func (api *API) GetQueue(ctx context.Context, rc *ResourceContainer, queueName string) (Queue, error) { + if rc.Identifier == "" { + return Queue{}, ErrMissingAccountID + } + + if queueName == "" { + return Queue{}, ErrMissingQueueName + } + + uri := fmt.Sprintf("/accounts/%s/workers/queues/%s", rc.Identifier, queueName) + res, err := api.makeRequestContext(ctx, http.MethodGet, uri, nil) + if err != nil { + return Queue{}, fmt.Errorf("%s: %w", errMakeRequestError, err) + } + + var r QueueResponse + err = json.Unmarshal(res, &r) + if err != nil { + return Queue{}, fmt.Errorf("%s: %w", errUnmarshalError, err) + } + + return r.Result, nil +} + +// UpdateQueue updates a queue. +// +// API reference: https://api.cloudflare.com/#queue-update-queue +func (api *API) UpdateQueue(ctx context.Context, rc *ResourceContainer, params UpdateQueueParams) (Queue, error) { + if rc.Identifier == "" { + return Queue{}, ErrMissingAccountID + } + + if params.Name == "" { + return Queue{}, ErrMissingQueueName + } + + uri := fmt.Sprintf("/accounts/%s/workers/queues/%s", rc.Identifier, params.Name) + res, err := api.makeRequestContext(ctx, http.MethodPut, uri, nil) + if err != nil { + return Queue{}, fmt.Errorf("%s: %w", errMakeRequestError, err) + } + + var r QueueResponse + err = json.Unmarshal(res, &r) + if err != nil { + return Queue{}, fmt.Errorf("%s: %w", errUnmarshalError, err) + } + return r.Result, nil +} + +// ListQueueConsumers returns the consumers of a queue. +// +// API reference: https://api.cloudflare.com/#queue-list-queue-consumers +func (api *API) ListQueueConsumers(ctx context.Context, rc *ResourceContainer, params ListQueueConsumersParams) ([]QueueConsumer, *ResultInfo, error) { + if rc.Identifier == "" { + return []QueueConsumer{}, &ResultInfo{}, ErrMissingAccountID + } + + if params.QueueName == "" { + return []QueueConsumer{}, &ResultInfo{}, ErrMissingQueueName + } + + autoPaginate := true + if params.PerPage >= 1 || params.Page >= 1 { + autoPaginate = false + } + if params.PerPage < 1 { + params.PerPage = 50 + } + if params.Page < 1 { + params.Page = 1 + } + + var queuesConsumers []QueueConsumer + var qResponse ListQueueConsumersResponse + for { + uri := buildURI(fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers", rc.Identifier, params.QueueName), params) + + res, err := api.makeRequestContext(ctx, http.MethodGet, uri, nil) + if err != nil { + return []QueueConsumer{}, &ResultInfo{}, err + } + + err = json.Unmarshal(res, &qResponse) + if err != nil { + return []QueueConsumer{}, &ResultInfo{}, fmt.Errorf("failed to unmarshal filters JSON data: %w", err) + } + + queuesConsumers = append(queuesConsumers, qResponse.Result...) + params.ResultInfo = qResponse.ResultInfo.Next() + + if params.ResultInfo.Done() || !autoPaginate { + break + } + } + + return queuesConsumers, &qResponse.ResultInfo, nil +} + +// CreateQueueConsumer creates a new consumer for a queue. +// +// API reference: https://api.cloudflare.com/#queue-create-queue-consumer +func (api *API) CreateQueueConsumer(ctx context.Context, rc *ResourceContainer, params CreateQueueConsumerParams) (QueueConsumer, error) { + if rc.Identifier == "" { + return QueueConsumer{}, ErrMissingAccountID + } + + if params.QueueName == "" { + return QueueConsumer{}, ErrMissingQueueName + } + + uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers", rc.Identifier, params.QueueName) + res, err := api.makeRequestContext(ctx, http.MethodPost, uri, params.Consumer) + if err != nil { + return QueueConsumer{}, fmt.Errorf("%s: %w", errMakeRequestError, err) + } + + var r QueueConsumerResponse + err = json.Unmarshal(res, &r) + if err != nil { + return QueueConsumer{}, fmt.Errorf("%s: %w", errUnmarshalError, err) + } + return r.Result, nil +} + +// DeleteQueueConsumer deletes the consumer for a queue.. +// +// API reference: https://api.cloudflare.com/#queue-delete-queue-consumer +func (api *API) DeleteQueueConsumer(ctx context.Context, rc *ResourceContainer, params DeleteQueueConsumerParams) error { + if rc.Identifier == "" { + return ErrMissingAccountID + } + + if params.QueueName == "" { + return ErrMissingQueueName + } + + if params.ConsumerName == "" { + return ErrMissingQueueConsumerName + } + + uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers/%s", rc.Identifier, params.QueueName, params.ConsumerName) + _, err := api.makeRequestContext(ctx, http.MethodDelete, uri, nil) + if err != nil { + return fmt.Errorf("%s: %w", errMakeRequestError, err) + } + + return nil +} + +// UpdateQueueConsumer updates the consumer for a queue, or creates one if it does not exist.. +// +// API reference: https://api.cloudflare.com/#queue-update-queue-consumer +func (api *API) UpdateQueueConsumer(ctx context.Context, rc *ResourceContainer, params UpdateQueueConsumerParams) (QueueConsumer, error) { + if rc.Identifier == "" { + return QueueConsumer{}, ErrMissingAccountID + } + + if params.QueueName == "" { + return QueueConsumer{}, ErrMissingQueueName + } + + if params.Consumer.Name == "" { + return QueueConsumer{}, ErrMissingQueueConsumerName + } + + uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers/%s", rc.Identifier, params.QueueName, params.Consumer.Name) + res, err := api.makeRequestContext(ctx, http.MethodPut, uri, params.Consumer) + if err != nil { + return QueueConsumer{}, fmt.Errorf("%s: %w", errMakeRequestError, err) + } + + var r QueueConsumerResponse + err = json.Unmarshal(res, &r) + if err != nil { + return QueueConsumer{}, fmt.Errorf("%s: %w", errUnmarshalError, err) + } + return r.Result, nil +} diff --git a/queue_test.go b/queue_test.go new file mode 100644 index 000000000..5485b354c --- /dev/null +++ b/queue_test.go @@ -0,0 +1,485 @@ +package cloudflare + +import ( + "context" + "fmt" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +const ( + testQueueID = "6b7efc370ea34ded8327fa20698dfe3a" + testQueueName = "example-queue" + testQueueConsumerName = "example-consumer" +) + +func testQueue() Queue { + CreatedOn, _ := time.Parse(time.RFC3339, "2023-01-01T00:00:00Z") + ModifiedOn, _ := time.Parse(time.RFC3339, "2023-01-01T00:00:00Z") + return Queue{ + ID: testQueueID, + Name: testQueueName, + CreatedOn: &CreatedOn, + ModifiedOn: &ModifiedOn, + ProducersTotalCount: 1, + Producers: []QueueProducer{ + { + Service: "example-producer", + Environment: "production", + }, + }, + ConsumersTotalCount: 1, + Consumers: []QueueConsumer{ + { + Service: "example-consumer", + Environment: "production", + Settings: QueueConsumerSettings{ + BatchSize: 10, + MaxRetires: 3, + MaxWaitTime: 5000, + }, + }, + }, + } +} + +func testQueueConsumer() QueueConsumer { + CreatedOn, _ := time.Parse(time.RFC3339, "2023-01-01T00:00:00Z") + return QueueConsumer{ + Service: "example-consumer", + Environment: "production", + Settings: QueueConsumerSettings{ + BatchSize: 10, + MaxRetires: 3, + MaxWaitTime: 5000, + }, + QueueName: testQueueName, + CreatedOn: &CreatedOn, + } +} + +func TestQueue_List(t *testing.T) { + setup() + defer teardown() + + mux.HandleFunc(fmt.Sprintf("/accounts/%s/workers/queues", testAccountID), func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodGet, r.Method, "Expected method 'GET', got %s", r.Method) + + w.Header().Set("content-type", "application/json") + fmt.Fprintf(w, `{ + "success": true, + "errors": null, + "messages": null, + "result": [ + { + "queue_id": "6b7efc370ea34ded8327fa20698dfe3a", + "queue_name": "example-queue", + "created_on": "2023-01-01T00:00:00Z", + "modified_on": "2023-01-01T00:00:00Z", + "producers_total_count": 1, + "producers": [ + { + "service": "example-producer", + "environment": "production" + } + ], + "consumers_total_count": 1, + "consumers": [ + { + "service": "example-consumer", + "environment": "production", + "settings": { + "batch_size": 10, + "max_retries": 3, + "max_wait_time_ms": 5000 + } + } + ] + } + ], + "result_info": { + "page": 1, + "per_page": 100, + "count": 1, + "total_count": 1, + "total_pages": 1 + } +}`) + }) + + _, _, err := client.ListQueues(context.Background(), AccountIdentifier(""), ListQueuesParams{}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingAccountID, err) + } + + result, _, err := client.ListQueues(context.Background(), AccountIdentifier(testAccountID), ListQueuesParams{}) + if assert.NoError(t, err) { + assert.Equal(t, 1, len(result)) + assert.Equal(t, testQueue(), result[0]) + } +} + +func TestQueue_Create(t *testing.T) { + setup() + defer teardown() + + mux.HandleFunc(fmt.Sprintf("/accounts/%s/workers/queues", testAccountID), func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodPost, r.Method, "Expected method 'POST', got %s", r.Method) + + w.Header().Set("content-type", "application/json") + fmt.Fprintf(w, `{ + "success": true, + "errors": null, + "messages": null, + "result": { + "queue_id": "6b7efc370ea34ded8327fa20698dfe3a", + "queue_name": "example-queue", + "created_on": "2023-01-01T00:00:00Z", + "modified_on": "2023-01-01T00:00:00Z" + } + }`) + }) + _, err := client.CreateQueue(context.Background(), AccountIdentifier(""), CreateQueueParams{}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingAccountID, err) + } + + _, err = client.CreateQueue(context.Background(), AccountIdentifier(testAccountID), CreateQueueParams{}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingQueueName, err) + } + results, err := client.CreateQueue(context.Background(), AccountIdentifier(testAccountID), CreateQueueParams{Name: "example-queue"}) + if assert.NoError(t, err) { + CreatedOn, _ := time.Parse(time.RFC3339, "2023-01-01T00:00:00Z") + ModifiedOn, _ := time.Parse(time.RFC3339, "2023-01-01T00:00:00Z") + createdQueue := Queue{ + ID: testQueueID, + Name: testQueueName, + CreatedOn: &CreatedOn, + ModifiedOn: &ModifiedOn, + } + + assert.Equal(t, createdQueue, results) + } +} + +func TestQueue_Delete(t *testing.T) { + setup() + defer teardown() + + mux.HandleFunc(fmt.Sprintf("/accounts/%s/workers/queues/%s", testAccountID, testQueueName), func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodDelete, r.Method, "Expected method 'DELETE', got %s", r.Method) + + w.Header().Set("content-type", "application/json") + fmt.Fprintf(w, `{ + "success": true, + "errors": [], + "messages": [], + "result": null + }`) + }) + err := client.DeleteQueue(context.Background(), AccountIdentifier(""), "") + if assert.Error(t, err) { + assert.Equal(t, ErrMissingAccountID, err) + } + + err = client.DeleteQueue(context.Background(), AccountIdentifier(testAccountID), "") + if assert.Error(t, err) { + assert.Equal(t, ErrMissingQueueName, err) + } + + err = client.DeleteQueue(context.Background(), AccountIdentifier(testAccountID), testQueueName) + assert.NoError(t, err) +} + +func TestQueue_Get(t *testing.T) { + setup() + defer teardown() + + mux.HandleFunc(fmt.Sprintf("/accounts/%s/workers/queues/%s", testAccountID, testQueueID), func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodGet, r.Method, "Expected method 'GET', got %s", r.Method) + + w.Header().Set("content-type", "application/json") + fmt.Fprintf(w, ` + { + "success": true, + "errors": [], + "messages": [], + "result": { + "queue_id": "6b7efc370ea34ded8327fa20698dfe3a", + "queue_name": "example-queue", + "created_on": "2023-01-01T00:00:00Z", + "modified_on": "2023-01-01T00:00:00Z", + "producers_total_count": 1, + "producers": [ + { + "service": "example-producer", + "environment": "production" + } + ], + "consumers_total_count": 1, + "consumers": [ + { + "service": "example-consumer", + "environment": "production", + "settings": { + "batch_size": 10, + "max_retries": 3, + "max_wait_time_ms": 5000 + } + } + ] + } + }`) + }) + + _, err := client.GetQueue(context.Background(), AccountIdentifier(""), "") + if assert.Error(t, err) { + assert.Equal(t, ErrMissingAccountID, err) + } + + _, err = client.GetQueue(context.Background(), AccountIdentifier(testAccountID), "") + if assert.Error(t, err) { + assert.Equal(t, ErrMissingQueueName, err) + } + + result, err := client.GetQueue(context.Background(), AccountIdentifier(testAccountID), testQueueID) + if assert.NoError(t, err) { + assert.Equal(t, testQueue(), result) + } +} + +func TestQueue_Update(t *testing.T) { + setup() + defer teardown() + + mux.HandleFunc(fmt.Sprintf("/accounts/%s/workers/queues/%s", testAccountID, testQueueName), func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodPut, r.Method, "Expected method 'PUT', got %s", r.Method) + + w.Header().Set("content-type", "application/json") + fmt.Fprintf(w, `{ + "success": true, + "errors": null, + "messages": null, + "result": { + "queue_id": "6b7efc370ea34ded8327fa20698dfe3a", + "queue_name": "renamed-example-queue", + "created_on": "2023-01-01T00:00:00Z", + "modified_on": "2023-01-01T00:00:00Z" + } + }`) + }) + _, err := client.UpdateQueue(context.Background(), AccountIdentifier(""), UpdateQueueParams{}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingAccountID, err) + } + + _, err = client.UpdateQueue(context.Background(), AccountIdentifier(testAccountID), UpdateQueueParams{}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingQueueName, err) + } + + results, err := client.UpdateQueue(context.Background(), AccountIdentifier(testAccountID), UpdateQueueParams{Name: "example-queue"}) + if assert.NoError(t, err) { + CreatedOn, _ := time.Parse(time.RFC3339, "2023-01-01T00:00:00Z") + ModifiedOn, _ := time.Parse(time.RFC3339, "2023-01-01T00:00:00Z") + createdQueue := Queue{ + ID: testQueueID, + Name: "renamed-example-queue", + CreatedOn: &CreatedOn, + ModifiedOn: &ModifiedOn, + } + + assert.Equal(t, createdQueue, results) + } +} + +func TestQueue_ListConsumers(t *testing.T) { + setup() + defer teardown() + + mux.HandleFunc(fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers", testAccountID, testQueueName), func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodGet, r.Method, "Expected method 'GET', got %s", r.Method) + + w.Header().Set("content-type", "application/json") + fmt.Fprintf(w, ` + { + "success": true, + "errors": null, + "messages": null, + "result": [ + { + "service": "example-consumer", + "environment": "production", + "settings": { + "batch_size": 10, + "max_retries": 3, + "max_wait_time_ms": 5000 + }, + "queue_name": "example-queue", + "created_on": "2023-01-01T00:00:00Z" + } + ], + "result_info": { + "page": 1, + "per_page": 100, + "count": 1, + "total_count": 1, + "total_pages": 1 + } + }`) + }) + + _, _, err := client.ListQueueConsumers(context.Background(), AccountIdentifier(""), ListQueueConsumersParams{}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingAccountID, err) + } + + _, _, err = client.ListQueueConsumers(context.Background(), AccountIdentifier(testAccountID), ListQueueConsumersParams{}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingQueueName, err) + } + + result, _, err := client.ListQueueConsumers(context.Background(), AccountIdentifier(testAccountID), ListQueueConsumersParams{QueueName: testQueueName}) + if assert.NoError(t, err) { + assert.Equal(t, 1, len(result)) + assert.Equal(t, testQueueConsumer(), result[0]) + } +} + +func TestQueue_CreateConsumer(t *testing.T) { + setup() + defer teardown() + + mux.HandleFunc(fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers", testAccountID, testQueueName), func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodPost, r.Method, "Expected method 'POST', got %s", r.Method) + + w.Header().Set("content-type", "application/json") + fmt.Fprintf(w, `{ + "success": true, + "errors": [], + "messages": [], + "result": { + "service": "example-consumer", + "environment": "production", + "settings": { + "batch_size": 10, + "max_retries": 3, + "max_wait_time_ms": 5000 + }, + "dead_letter_queue": "example-dlq", + "queue_name": "example-queue", + "created_on": "2023-01-01T00:00:00Z" + } + }`) + }) + + _, err := client.CreateQueueConsumer(context.Background(), AccountIdentifier(""), CreateQueueConsumerParams{}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingAccountID, err) + } + + _, err = client.CreateQueueConsumer(context.Background(), AccountIdentifier(testAccountID), CreateQueueConsumerParams{}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingQueueName, err) + } + + result, err := client.CreateQueueConsumer(context.Background(), AccountIdentifier(testAccountID), CreateQueueConsumerParams{QueueName: testQueueName, Consumer: QueueConsumer{ + Service: "example-consumer", + Environment: "production", + }}) + if assert.NoError(t, err) { + expectedQueueConsumer := testQueueConsumer() + expectedQueueConsumer.DeadLetterQueue = "example-dlq" + assert.Equal(t, expectedQueueConsumer, result) + } +} + +func TestQueue_DeleteConsumer(t *testing.T) { + setup() + defer teardown() + + mux.HandleFunc(fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers/%s", testAccountID, testQueueName, testQueueConsumerName), func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodDelete, r.Method, "Expected method 'DELETE', got %s", r.Method) + + w.Header().Set("content-type", "application/json") + fmt.Fprintf(w, `{ + "success": true, + "errors": [], + "messages": [], + "result": null + }`) + }) + + err := client.DeleteQueueConsumer(context.Background(), AccountIdentifier(""), DeleteQueueConsumerParams{}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingAccountID, err) + } + + err = client.DeleteQueueConsumer(context.Background(), AccountIdentifier(testAccountID), DeleteQueueConsumerParams{}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingQueueName, err) + } + + err = client.DeleteQueueConsumer(context.Background(), AccountIdentifier(testAccountID), DeleteQueueConsumerParams{QueueName: testQueueName}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingQueueConsumerName, err) + } + + err = client.DeleteQueueConsumer(context.Background(), AccountIdentifier(testAccountID), DeleteQueueConsumerParams{QueueName: testQueueName, ConsumerName: testQueueConsumerName}) + assert.NoError(t, err) +} + +func TestQueue_UpdateConsumer(t *testing.T) { + setup() + defer teardown() + + mux.HandleFunc(fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers/%s", testAccountID, testQueueName, testQueueConsumerName), func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodPut, r.Method, "Expected method 'PUT', got %s", r.Method) + + w.Header().Set("content-type", "application/json") + fmt.Fprintf(w, `{ + "success": true, + "errors": [], + "messages": [], + "result": { + "service": "example-consumer", + "environment": "production", + "settings": { + "batch_size": 10, + "max_retries": 3, + "max_wait_time_ms": 5000 + }, + "queue_name": "example-queue", + "created_on": "2023-01-01T00:00:00Z" + } + }`) + }) + + _, err := client.UpdateQueueConsumer(context.Background(), AccountIdentifier(""), UpdateQueueConsumerParams{}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingAccountID, err) + } + + _, err = client.UpdateQueueConsumer(context.Background(), AccountIdentifier(testAccountID), UpdateQueueConsumerParams{}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingQueueName, err) + } + + _, err = client.UpdateQueueConsumer(context.Background(), AccountIdentifier(testAccountID), UpdateQueueConsumerParams{QueueName: testQueueName}) + if assert.Error(t, err) { + assert.Equal(t, ErrMissingQueueConsumerName, err) + } + + result, err := client.UpdateQueueConsumer(context.Background(), AccountIdentifier(testAccountID), UpdateQueueConsumerParams{QueueName: testQueueName, Consumer: QueueConsumer{ + Name: testQueueConsumerName, + Service: "example-consumer", + Environment: "production", + }}) + if assert.NoError(t, err) { + assert.Equal(t, testQueueConsumer(), result) + } +}