Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Queue API #1131

Merged
merged 3 commits into from Nov 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/1131.txt
@@ -0,0 +1,3 @@
```release-note:enhancement
queue: add support queue API
```
381 changes: 381 additions & 0 deletions queue.go
@@ -0,0 +1,381 @@
package cloudflare

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"time"
)

var (
ErrMissingQueueName = errors.New("required queue name is missing")
ErrMissingQueueConsumerName = errors.New("required queue consumer name is missing")
)

type Queue struct {
ID string `json:"queue_id,omitempty"`
Name string `json:"queue_name,omitempty"`
CreatedOn *time.Time `json:"created_on,omitempty"`
ModifiedOn *time.Time `json:"modified_on,omitempty"`
ProducersTotalCount int `json:"producers_total_count,omitempty"`
Producers []QueueProducer `json:"producers,omitempty"`
ConsumersTotalCount int `json:"consumers_total_count,omitempty"`
Consumers []QueueConsumer `json:"consumers,omitempty"`
}

type QueueProducer struct {
Service string `json:"service,omitempty"`
Environment string `json:"environment,omitempty"`
}

type QueueConsumer struct {
Name string `json:"-"`
Service string `json:"service,omitempty"`
ScriptName string `json:"script_name,omitempty"`
Environment string `json:"environment,omitempty"`
Settings QueueConsumerSettings `json:"settings,omitempty"`
QueueName string `json:"queue_name,omitempty"`
CreatedOn *time.Time `json:"created_on,omitempty"`
DeadLetterQueue string `json:"dead_letter_queue,omitempty"`
}

type QueueConsumerSettings struct {
BatchSize int `json:"batch_size,omitempty"`
MaxRetires int `json:"max_retries,omitempty"`
MaxWaitTime int `json:"max_wait_time_ms,omitempty"`
}

type QueueListResponse struct {
Response
ResultInfo `json:"result_info"`
Result []Queue `json:"result"`
}

type CreateQueueParams struct {
Name string `json:"queue_name"`
}

type QueueResponse struct {
Response
Result Queue `json:"result"`
}

type ListQueueConsumersResponse struct {
Response
ResultInfo `json:"result_info"`
Result []QueueConsumer `json:"result"`
}

type ListQueuesParams struct {
ResultInfo
}

type QueueConsumerResponse struct {
Response
Result QueueConsumer `json:"result"`
}

type UpdateQueueParams struct {
ID string `json:"queue_id,omitempty"`
Name string `json:"queue_name,omitempty"`
CreatedOn *time.Time `json:"created_on,omitempty"`
ModifiedOn *time.Time `json:"modified_on,omitempty"`
ProducersTotalCount int `json:"producers_total_count,omitempty"`
Producers []QueueProducer `json:"producers,omitempty"`
ConsumersTotalCount int `json:"consumers_total_count,omitempty"`
Consumers []QueueConsumer `json:"consumers,omitempty"`
}

type ListQueueConsumersParams struct {
QueueName string `url:"-"`
ResultInfo
}

type CreateQueueConsumerParams struct {
QueueName string `json:"-"`
Consumer QueueConsumer
}

type UpdateQueueConsumerParams struct {
QueueName string `json:"-"`
Consumer QueueConsumer
}

type DeleteQueueConsumerParams struct {
QueueName, ConsumerName string
}

// ListQueues returns the queues owned by an account.
//
// API reference: https://api.cloudflare.com/#queue-list-queues
func (api *API) ListQueues(ctx context.Context, rc *ResourceContainer, params ListQueuesParams) ([]Queue, *ResultInfo, error) {
if rc.Identifier == "" {
return []Queue{}, &ResultInfo{}, ErrMissingAccountID
}

autoPaginate := true
if params.PerPage >= 1 || params.Page >= 1 {
autoPaginate = false
}
if params.PerPage < 1 {
params.PerPage = 50
}
if params.Page < 1 {
params.Page = 1
}

var queues []Queue
var qResponse QueueListResponse
for {
uri := buildURI(fmt.Sprintf("/accounts/%s/workers/queues", rc.Identifier), params)

res, err := api.makeRequestContext(ctx, http.MethodGet, uri, nil)
if err != nil {
return []Queue{}, &ResultInfo{}, err
}

err = json.Unmarshal(res, &qResponse)
if err != nil {
return []Queue{}, &ResultInfo{}, fmt.Errorf("failed to unmarshal filters JSON data: %w", err)
}

queues = append(queues, qResponse.Result...)
params.ResultInfo = qResponse.ResultInfo.Next()

if params.ResultInfo.Done() || !autoPaginate {
break
}
}

return queues, &qResponse.ResultInfo, nil
}

// CreateQueue creates a new queue.
//
// API reference: https://api.cloudflare.com/#queue-create-queue
func (api *API) CreateQueue(ctx context.Context, rc *ResourceContainer, queue CreateQueueParams) (Queue, error) {
if rc.Identifier == "" {
return Queue{}, ErrMissingAccountID
}

if queue.Name == "" {
return Queue{}, ErrMissingQueueName
}

uri := fmt.Sprintf("/accounts/%s/workers/queues", rc.Identifier)
res, err := api.makeRequestContext(ctx, http.MethodPost, uri, queue)
if err != nil {
return Queue{}, fmt.Errorf("%s: %w", errMakeRequestError, err)
}

var r QueueResponse
err = json.Unmarshal(res, &r)
if err != nil {
return Queue{}, fmt.Errorf("%s: %w", errUnmarshalError, err)
}
return r.Result, nil
}

// DeleteQueue deletes a queue.
//
// API reference: https://api.cloudflare.com/#queue-delete-queue
func (api *API) DeleteQueue(ctx context.Context, rc *ResourceContainer, queueName string) error {
if rc.Identifier == "" {
return ErrMissingAccountID
}
if queueName == "" {
return ErrMissingQueueName
}

uri := fmt.Sprintf("/accounts/%s/workers/queues/%s", rc.Identifier, queueName)
_, err := api.makeRequestContext(ctx, http.MethodDelete, uri, nil)
if err != nil {
return fmt.Errorf("%s: %w", errMakeRequestError, err)
}
return nil
}

// GetQueue returns a single queue based on the name.
//
// API reference: https://api.cloudflare.com/#queue-get-queue
func (api *API) GetQueue(ctx context.Context, rc *ResourceContainer, queueName string) (Queue, error) {
if rc.Identifier == "" {
return Queue{}, ErrMissingAccountID
}

if queueName == "" {
return Queue{}, ErrMissingQueueName
}

uri := fmt.Sprintf("/accounts/%s/workers/queues/%s", rc.Identifier, queueName)
res, err := api.makeRequestContext(ctx, http.MethodGet, uri, nil)
if err != nil {
return Queue{}, fmt.Errorf("%s: %w", errMakeRequestError, err)
}

var r QueueResponse
err = json.Unmarshal(res, &r)
if err != nil {
return Queue{}, fmt.Errorf("%s: %w", errUnmarshalError, err)
}

return r.Result, nil
}

// UpdateQueue updates a queue.
//
// API reference: https://api.cloudflare.com/#queue-update-queue
func (api *API) UpdateQueue(ctx context.Context, rc *ResourceContainer, params UpdateQueueParams) (Queue, error) {
if rc.Identifier == "" {
return Queue{}, ErrMissingAccountID
}

if params.Name == "" {
return Queue{}, ErrMissingQueueName
}

uri := fmt.Sprintf("/accounts/%s/workers/queues/%s", rc.Identifier, params.Name)
res, err := api.makeRequestContext(ctx, http.MethodPut, uri, nil)
if err != nil {
return Queue{}, fmt.Errorf("%s: %w", errMakeRequestError, err)
}

var r QueueResponse
err = json.Unmarshal(res, &r)
if err != nil {
return Queue{}, fmt.Errorf("%s: %w", errUnmarshalError, err)
}
return r.Result, nil
}

// ListQueueConsumers returns the consumers of a queue.
//
// API reference: https://api.cloudflare.com/#queue-list-queue-consumers
func (api *API) ListQueueConsumers(ctx context.Context, rc *ResourceContainer, params ListQueueConsumersParams) ([]QueueConsumer, *ResultInfo, error) {
if rc.Identifier == "" {
return []QueueConsumer{}, &ResultInfo{}, ErrMissingAccountID
}

if params.QueueName == "" {
return []QueueConsumer{}, &ResultInfo{}, ErrMissingQueueName
}

autoPaginate := true
if params.PerPage >= 1 || params.Page >= 1 {
autoPaginate = false
}
if params.PerPage < 1 {
params.PerPage = 50
}
if params.Page < 1 {
params.Page = 1
}

var queuesConsumers []QueueConsumer
var qResponse ListQueueConsumersResponse
for {
uri := buildURI(fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers", rc.Identifier, params.QueueName), params)

res, err := api.makeRequestContext(ctx, http.MethodGet, uri, nil)
if err != nil {
return []QueueConsumer{}, &ResultInfo{}, err
}

err = json.Unmarshal(res, &qResponse)
if err != nil {
return []QueueConsumer{}, &ResultInfo{}, fmt.Errorf("failed to unmarshal filters JSON data: %w", err)
}

queuesConsumers = append(queuesConsumers, qResponse.Result...)
params.ResultInfo = qResponse.ResultInfo.Next()

if params.ResultInfo.Done() || !autoPaginate {
break
}
}

return queuesConsumers, &qResponse.ResultInfo, nil
}

// CreateQueueConsumer creates a new consumer for a queue.
//
// API reference: https://api.cloudflare.com/#queue-create-queue-consumer
func (api *API) CreateQueueConsumer(ctx context.Context, rc *ResourceContainer, params CreateQueueConsumerParams) (QueueConsumer, error) {
if rc.Identifier == "" {
return QueueConsumer{}, ErrMissingAccountID
}

if params.QueueName == "" {
return QueueConsumer{}, ErrMissingQueueName
}

uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers", rc.Identifier, params.QueueName)
res, err := api.makeRequestContext(ctx, http.MethodPost, uri, params.Consumer)
if err != nil {
return QueueConsumer{}, fmt.Errorf("%s: %w", errMakeRequestError, err)
}

var r QueueConsumerResponse
err = json.Unmarshal(res, &r)
if err != nil {
return QueueConsumer{}, fmt.Errorf("%s: %w", errUnmarshalError, err)
}
return r.Result, nil
}

// DeleteQueueConsumer deletes the consumer for a queue..
//
// API reference: https://api.cloudflare.com/#queue-delete-queue-consumer
func (api *API) DeleteQueueConsumer(ctx context.Context, rc *ResourceContainer, params DeleteQueueConsumerParams) error {
if rc.Identifier == "" {
return ErrMissingAccountID
}

if params.QueueName == "" {
return ErrMissingQueueName
}

if params.ConsumerName == "" {
return ErrMissingQueueConsumerName
}

uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers/%s", rc.Identifier, params.QueueName, params.ConsumerName)
_, err := api.makeRequestContext(ctx, http.MethodDelete, uri, nil)
if err != nil {
return fmt.Errorf("%s: %w", errMakeRequestError, err)
}

return nil
}

// UpdateQueueConsumer updates the consumer for a queue, or creates one if it does not exist..
//
// API reference: https://api.cloudflare.com/#queue-update-queue-consumer
func (api *API) UpdateQueueConsumer(ctx context.Context, rc *ResourceContainer, params UpdateQueueConsumerParams) (QueueConsumer, error) {
if rc.Identifier == "" {
return QueueConsumer{}, ErrMissingAccountID
}

if params.QueueName == "" {
return QueueConsumer{}, ErrMissingQueueName
}

if params.Consumer.Name == "" {
return QueueConsumer{}, ErrMissingQueueConsumerName
}

uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers/%s", rc.Identifier, params.QueueName, params.Consumer.Name)
res, err := api.makeRequestContext(ctx, http.MethodPut, uri, params.Consumer)
if err != nil {
return QueueConsumer{}, fmt.Errorf("%s: %w", errMakeRequestError, err)
}

var r QueueConsumerResponse
err = json.Unmarshal(res, &r)
if err != nil {
return QueueConsumer{}, fmt.Errorf("%s: %w", errUnmarshalError, err)
}
return r.Result, nil
}