Skip to content

Commit

Permalink
Merge pull request #1131 from Cyb3r-Jak3/queues
Browse files Browse the repository at this point in the history
Add support for Queue API
  • Loading branch information
jacobbednarz committed Nov 21, 2022
2 parents a4266f9 + 21a9f46 commit 9ec8f5d
Show file tree
Hide file tree
Showing 3 changed files with 869 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .changelog/1131.txt
@@ -0,0 +1,3 @@
```release-note:enhancement
queue: add support queue API
```
381 changes: 381 additions & 0 deletions queue.go
@@ -0,0 +1,381 @@
package cloudflare

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"time"
)

var (
ErrMissingQueueName = errors.New("required queue name is missing")
ErrMissingQueueConsumerName = errors.New("required queue consumer name is missing")
)

type Queue struct {
ID string `json:"queue_id,omitempty"`
Name string `json:"queue_name,omitempty"`
CreatedOn *time.Time `json:"created_on,omitempty"`
ModifiedOn *time.Time `json:"modified_on,omitempty"`
ProducersTotalCount int `json:"producers_total_count,omitempty"`
Producers []QueueProducer `json:"producers,omitempty"`
ConsumersTotalCount int `json:"consumers_total_count,omitempty"`
Consumers []QueueConsumer `json:"consumers,omitempty"`
}

type QueueProducer struct {
Service string `json:"service,omitempty"`
Environment string `json:"environment,omitempty"`
}

type QueueConsumer struct {
Name string `json:"-"`
Service string `json:"service,omitempty"`
ScriptName string `json:"script_name,omitempty"`
Environment string `json:"environment,omitempty"`
Settings QueueConsumerSettings `json:"settings,omitempty"`
QueueName string `json:"queue_name,omitempty"`
CreatedOn *time.Time `json:"created_on,omitempty"`
DeadLetterQueue string `json:"dead_letter_queue,omitempty"`
}

type QueueConsumerSettings struct {
BatchSize int `json:"batch_size,omitempty"`
MaxRetires int `json:"max_retries,omitempty"`
MaxWaitTime int `json:"max_wait_time_ms,omitempty"`
}

type QueueListResponse struct {
Response
ResultInfo `json:"result_info"`
Result []Queue `json:"result"`
}

type CreateQueueParams struct {
Name string `json:"queue_name"`
}

type QueueResponse struct {
Response
Result Queue `json:"result"`
}

type ListQueueConsumersResponse struct {
Response
ResultInfo `json:"result_info"`
Result []QueueConsumer `json:"result"`
}

type ListQueuesParams struct {
ResultInfo
}

type QueueConsumerResponse struct {
Response
Result QueueConsumer `json:"result"`
}

type UpdateQueueParams struct {
ID string `json:"queue_id,omitempty"`
Name string `json:"queue_name,omitempty"`
CreatedOn *time.Time `json:"created_on,omitempty"`
ModifiedOn *time.Time `json:"modified_on,omitempty"`
ProducersTotalCount int `json:"producers_total_count,omitempty"`
Producers []QueueProducer `json:"producers,omitempty"`
ConsumersTotalCount int `json:"consumers_total_count,omitempty"`
Consumers []QueueConsumer `json:"consumers,omitempty"`
}

type ListQueueConsumersParams struct {
QueueName string `url:"-"`
ResultInfo
}

type CreateQueueConsumerParams struct {
QueueName string `json:"-"`
Consumer QueueConsumer
}

type UpdateQueueConsumerParams struct {
QueueName string `json:"-"`
Consumer QueueConsumer
}

type DeleteQueueConsumerParams struct {
QueueName, ConsumerName string
}

// ListQueues returns the queues owned by an account.
//
// API reference: https://api.cloudflare.com/#queue-list-queues
func (api *API) ListQueues(ctx context.Context, rc *ResourceContainer, params ListQueuesParams) ([]Queue, *ResultInfo, error) {
if rc.Identifier == "" {
return []Queue{}, &ResultInfo{}, ErrMissingAccountID
}

autoPaginate := true
if params.PerPage >= 1 || params.Page >= 1 {
autoPaginate = false
}
if params.PerPage < 1 {
params.PerPage = 50
}
if params.Page < 1 {
params.Page = 1
}

var queues []Queue
var qResponse QueueListResponse
for {
uri := buildURI(fmt.Sprintf("/accounts/%s/workers/queues", rc.Identifier), params)

res, err := api.makeRequestContext(ctx, http.MethodGet, uri, nil)
if err != nil {
return []Queue{}, &ResultInfo{}, err
}

err = json.Unmarshal(res, &qResponse)
if err != nil {
return []Queue{}, &ResultInfo{}, fmt.Errorf("failed to unmarshal filters JSON data: %w", err)
}

queues = append(queues, qResponse.Result...)
params.ResultInfo = qResponse.ResultInfo.Next()

if params.ResultInfo.Done() || !autoPaginate {
break
}
}

return queues, &qResponse.ResultInfo, nil
}

// CreateQueue creates a new queue.
//
// API reference: https://api.cloudflare.com/#queue-create-queue
func (api *API) CreateQueue(ctx context.Context, rc *ResourceContainer, queue CreateQueueParams) (Queue, error) {
if rc.Identifier == "" {
return Queue{}, ErrMissingAccountID
}

if queue.Name == "" {
return Queue{}, ErrMissingQueueName
}

uri := fmt.Sprintf("/accounts/%s/workers/queues", rc.Identifier)
res, err := api.makeRequestContext(ctx, http.MethodPost, uri, queue)
if err != nil {
return Queue{}, fmt.Errorf("%s: %w", errMakeRequestError, err)
}

var r QueueResponse
err = json.Unmarshal(res, &r)
if err != nil {
return Queue{}, fmt.Errorf("%s: %w", errUnmarshalError, err)
}
return r.Result, nil
}

// DeleteQueue deletes a queue.
//
// API reference: https://api.cloudflare.com/#queue-delete-queue
func (api *API) DeleteQueue(ctx context.Context, rc *ResourceContainer, queueName string) error {
if rc.Identifier == "" {
return ErrMissingAccountID
}
if queueName == "" {
return ErrMissingQueueName
}

uri := fmt.Sprintf("/accounts/%s/workers/queues/%s", rc.Identifier, queueName)
_, err := api.makeRequestContext(ctx, http.MethodDelete, uri, nil)
if err != nil {
return fmt.Errorf("%s: %w", errMakeRequestError, err)
}
return nil
}

// GetQueue returns a single queue based on the name.
//
// API reference: https://api.cloudflare.com/#queue-get-queue
func (api *API) GetQueue(ctx context.Context, rc *ResourceContainer, queueName string) (Queue, error) {
if rc.Identifier == "" {
return Queue{}, ErrMissingAccountID
}

if queueName == "" {
return Queue{}, ErrMissingQueueName
}

uri := fmt.Sprintf("/accounts/%s/workers/queues/%s", rc.Identifier, queueName)
res, err := api.makeRequestContext(ctx, http.MethodGet, uri, nil)
if err != nil {
return Queue{}, fmt.Errorf("%s: %w", errMakeRequestError, err)
}

var r QueueResponse
err = json.Unmarshal(res, &r)
if err != nil {
return Queue{}, fmt.Errorf("%s: %w", errUnmarshalError, err)
}

return r.Result, nil
}

// UpdateQueue updates a queue.
//
// API reference: https://api.cloudflare.com/#queue-update-queue
func (api *API) UpdateQueue(ctx context.Context, rc *ResourceContainer, params UpdateQueueParams) (Queue, error) {
if rc.Identifier == "" {
return Queue{}, ErrMissingAccountID
}

if params.Name == "" {
return Queue{}, ErrMissingQueueName
}

uri := fmt.Sprintf("/accounts/%s/workers/queues/%s", rc.Identifier, params.Name)
res, err := api.makeRequestContext(ctx, http.MethodPut, uri, nil)
if err != nil {
return Queue{}, fmt.Errorf("%s: %w", errMakeRequestError, err)
}

var r QueueResponse
err = json.Unmarshal(res, &r)
if err != nil {
return Queue{}, fmt.Errorf("%s: %w", errUnmarshalError, err)
}
return r.Result, nil
}

// ListQueueConsumers returns the consumers of a queue.
//
// API reference: https://api.cloudflare.com/#queue-list-queue-consumers
func (api *API) ListQueueConsumers(ctx context.Context, rc *ResourceContainer, params ListQueueConsumersParams) ([]QueueConsumer, *ResultInfo, error) {
if rc.Identifier == "" {
return []QueueConsumer{}, &ResultInfo{}, ErrMissingAccountID
}

if params.QueueName == "" {
return []QueueConsumer{}, &ResultInfo{}, ErrMissingQueueName
}

autoPaginate := true
if params.PerPage >= 1 || params.Page >= 1 {
autoPaginate = false
}
if params.PerPage < 1 {
params.PerPage = 50
}
if params.Page < 1 {
params.Page = 1
}

var queuesConsumers []QueueConsumer
var qResponse ListQueueConsumersResponse
for {
uri := buildURI(fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers", rc.Identifier, params.QueueName), params)

res, err := api.makeRequestContext(ctx, http.MethodGet, uri, nil)
if err != nil {
return []QueueConsumer{}, &ResultInfo{}, err
}

err = json.Unmarshal(res, &qResponse)
if err != nil {
return []QueueConsumer{}, &ResultInfo{}, fmt.Errorf("failed to unmarshal filters JSON data: %w", err)
}

queuesConsumers = append(queuesConsumers, qResponse.Result...)
params.ResultInfo = qResponse.ResultInfo.Next()

if params.ResultInfo.Done() || !autoPaginate {
break
}
}

return queuesConsumers, &qResponse.ResultInfo, nil
}

// CreateQueueConsumer creates a new consumer for a queue.
//
// API reference: https://api.cloudflare.com/#queue-create-queue-consumer
func (api *API) CreateQueueConsumer(ctx context.Context, rc *ResourceContainer, params CreateQueueConsumerParams) (QueueConsumer, error) {
if rc.Identifier == "" {
return QueueConsumer{}, ErrMissingAccountID
}

if params.QueueName == "" {
return QueueConsumer{}, ErrMissingQueueName
}

uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers", rc.Identifier, params.QueueName)
res, err := api.makeRequestContext(ctx, http.MethodPost, uri, params.Consumer)
if err != nil {
return QueueConsumer{}, fmt.Errorf("%s: %w", errMakeRequestError, err)
}

var r QueueConsumerResponse
err = json.Unmarshal(res, &r)
if err != nil {
return QueueConsumer{}, fmt.Errorf("%s: %w", errUnmarshalError, err)
}
return r.Result, nil
}

// DeleteQueueConsumer deletes the consumer for a queue..
//
// API reference: https://api.cloudflare.com/#queue-delete-queue-consumer
func (api *API) DeleteQueueConsumer(ctx context.Context, rc *ResourceContainer, params DeleteQueueConsumerParams) error {
if rc.Identifier == "" {
return ErrMissingAccountID
}

if params.QueueName == "" {
return ErrMissingQueueName
}

if params.ConsumerName == "" {
return ErrMissingQueueConsumerName
}

uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers/%s", rc.Identifier, params.QueueName, params.ConsumerName)
_, err := api.makeRequestContext(ctx, http.MethodDelete, uri, nil)
if err != nil {
return fmt.Errorf("%s: %w", errMakeRequestError, err)
}

return nil
}

// UpdateQueueConsumer updates the consumer for a queue, or creates one if it does not exist..
//
// API reference: https://api.cloudflare.com/#queue-update-queue-consumer
func (api *API) UpdateQueueConsumer(ctx context.Context, rc *ResourceContainer, params UpdateQueueConsumerParams) (QueueConsumer, error) {
if rc.Identifier == "" {
return QueueConsumer{}, ErrMissingAccountID
}

if params.QueueName == "" {
return QueueConsumer{}, ErrMissingQueueName
}

if params.Consumer.Name == "" {
return QueueConsumer{}, ErrMissingQueueConsumerName
}

uri := fmt.Sprintf("/accounts/%s/workers/queues/%s/consumers/%s", rc.Identifier, params.QueueName, params.Consumer.Name)
res, err := api.makeRequestContext(ctx, http.MethodPut, uri, params.Consumer)
if err != nil {
return QueueConsumer{}, fmt.Errorf("%s: %w", errMakeRequestError, err)
}

var r QueueConsumerResponse
err = json.Unmarshal(res, &r)
if err != nil {
return QueueConsumer{}, fmt.Errorf("%s: %w", errUnmarshalError, err)
}
return r.Result, nil
}

0 comments on commit 9ec8f5d

Please sign in to comment.