Skip to content

Latest commit

 

History

History
1043 lines (795 loc) · 36.2 KB

api.md

File metadata and controls

1043 lines (795 loc) · 36.2 KB

Contents

Modules

faktory

creates faktory singletons

Classes

Client

A client connection handle for interacting with the faktory server. Holds a pool of 1 or more underlying connections. Safe for concurrent use and tolerant of unexpected connection terminations. Use this object for all interactions with the factory server.

Job

A class wrapping a JobPayload

Creating and pushing a job is typically accomplished by using a faktory client, which implements .job and automatically sets the client for the job when calling .push on the job later.

You do not need to use this class directly.`

Mutation

A wrapper for the Mutate API

A low-level data management API to script certain repairs or migrations.

!!! Please be warned: MUTATE commands can be slow and/or resource intensive. They should not be used as part of your application logic.

Worker

Representation of a worker process with many concurrent job processors. Works at the concurrency set in options during construction. Will hold at most concurrency jobs in-memory while processing at any one time. Listens for signals to quiet or shutdown. Should not be started more than once per-process, nor should more than one worker be started per-process.

Typedefs

HI : object

An after-connect initial message from the server to handshake the connection

JobFunction : function

A function that executes work

Jobtype : string

Discriminator used by a worker to decide how to execute a job. This will be the name you used during register.

JobFunction : function

A function that executes work

Registry : Object.<Jobtype, JobFunction>

A lookup table holding the jobtype constants mapped to their job functions

ContextProvider : function

A function returned by a job function that will be called with the job context as its only argument and awaited. This exists to allow you to define simple job functions that only accept their job args, but in many cases you might need the job's custom properties or stateful connections (like a database connection) in your job and want to attach a connection for your job function to use without having to create it itself.

Context : object

A context object passed through middleware and to a job thunk

ContextProvider : function

A function returned by a job function that will be called with the job context as its only argument and awaited. This exists to allow you to define simple job functions that only accept their job args, but in many cases you might need the job's custom properties or stateful connections (like a database connection) in your job and want to attach a connection for your job function to use without having to create it itself.

HELLO : object

The client's response to the server's HI to initiate a connection

JobPayload : Object

A work unit that can be scheduled by the faktory work server and executed by clients

RejectedJobFromPushBulk : Object

A lookup table holding the jobtype constants mapped to their job functions

RejectedJobsFromPushBulk : Array

A lookup table holding the jobtype constants mapped to their job functions

RFC3339_DateTime : string

An RFC3339-format datetime string

API

faktory

creates faktory singletons

faktory.use(fn) ⇒ FaktoryControl

Adds a middleware function to the stack

Kind: instance method of faktory
Returns: FaktoryControl - this
See: koa middleware

Param Type Description
fn function koa-compose-style middleware function

Example

faktory.use(async (ctx, next) => {
  // a pool you created to hold database connections
  pool.use(async (conn) => {
    ctx.db = conn;
    await next();
  });
});

faktory.register(name, fn) ⇒ FaktoryControl

Adds a JobFunction to the Registry

Kind: instance method of faktory
Returns: FaktoryControl - this

Param Type Description
name Jobtype string descriptor for the jobtype
fn JobFunction

Example

faktory.register('MyJob', (...args) => {
  // some work
});

faktory.connect(...args) ⇒ Client

Creates a new Client

Kind: instance method of faktory

Param Type Description
...args * args forwarded to Client

Example

const client = await faktory.connect();

await client.push(job);

faktory.work(options) ⇒ Promise

Starts a worker. Resolves after the worker is started. Only call this once per-process.

Kind: instance method of faktory
Returns: Promise - the Worker.work promise

Param Type Description
options object options to Worker

Example

// this keeps the process open and can be `await`ed
faktory.work();

faktory.stop() ⇒ promise

Stops the worker previously started.

Kind: instance method of faktory
Returns: promise - promise returned by Worker.stop
Example

// previously
faktory.work();

faktory.stop();

Client

A client connection handle for interacting with the faktory server. Holds a pool of 1 or more underlying connections. Safe for concurrent use and tolerant of unexpected connection terminations. Use this object for all interactions with the factory server.

Kind: global class

new Client([options])

Creates a Client with a connection pool

Param Type Default Description
[options] object
[options.url] string "tcp://127.0.0.1:7419" connection string for the faktory server (checks for FAKTORY_PROVIDER and FAKTORY_URL)
[options.host] string "127.0.0.1" host string to connect to
[options.port] number | string 7419 port to connect to faktory server on
[options.password] string faktory server password to use during HELLO
[options.wid] string optional wid that should be provided to the server (only necessary for a worker process consuming jobs)
[options.labels] Array.<string> [] optional labels to provide the faktory server for this client
[options.poolSize] number 10 the maxmimum size of the connection pool

Example

const client = new Client();

const job = await client.fetch('default');

client.connect() ⇒ Promise.<Client>

Explicitly opens a connection and then closes it to test connectivity. Under normal circumstances you don't need to call this method as all of the communication methods will check out a connection before executing. If a connection is not available, one will be created. This method exists to ensure connection is possible if you need to do so. You can think of this like sqlx#MustConnect

Kind: instance method of Client
Returns: Promise.<Client> - resolves when a connection is opened

client.close() ⇒ Promise.<undefined>

Closes the connection to the server

Kind: instance method of Client

client.job(jobtype, ...args) ⇒ Job

Creates a new Job object to build a job payload

Kind: instance method of Client
Returns: Job - a job builder with attached Client for PUSHing
See: Job

Param Type Description
jobtype String name of the job function
...args * arguments to the job function

client.send(...args)

Borrows a connection from the connection pool, forwards all arguments to Connection.send, and checks the connection back into the pool when the promise returned by the wrapped function is resolved or rejected.

Kind: instance method of Client
See: Connection.send

Param Type Description
...args * arguments to Connection.send

client.fetch(...queues) ⇒ Promise.<(object|null)>

Fetches a job payload from the server from one of ...queues

Kind: instance method of Client
Returns: Promise.<(object|null)> - a job payload if one is available, otherwise null

Param Type Description
...queues String list of queues to pull a job from

client.beat() ⇒ Promise.<string>

Sends a heartbeat for this.wid to the server

Kind: instance method of Client
Returns: Promise.<string> - string 'OK' when the heartbeat is accepted, otherwise may return a state string when the server has a signal to send this client (quiet, terminate)

client.push(job) ⇒ Promise.<string>

Pushes a job payload to the server

Kind: instance method of Client
Returns: Promise.<string> - the jid for the pushed job

Param Type Description
job Job | Object job payload to push

client.pushBulk(jobs) ⇒ Promise.<RejectedJobsFromPushBulk>

Pushes multiple jobs to the server and return map containing failed job submissions if any

Kind: instance method of Client
Returns: Promise.<RejectedJobsFromPushBulk> - response from the faktory server

Param Type Description
jobs Array.<Job> | Array.<Object> jobs payload to push

client.flush() ⇒ Promise.<string>

Sends a FLUSH to the server

Kind: instance method of Client
Returns: Promise.<string> - resolves with the server's response text

client.info() ⇒ Promise.<object>

Sends an INFO command to the server

Kind: instance method of Client
Returns: Promise.<object> - the server's INFO response object

client.ack(jid) ⇒ Promise.<string>

Sends an ACK to the server for a particular job ID

Kind: instance method of Client
Returns: Promise.<string> - the server's response text

Param Type Description
jid String the jid of the job to acknowledge

client.fail(jid, e) ⇒ Promise.<string>

Sends a FAIL command to the server for a particular job ID with error information

Kind: instance method of Client
Returns: Promise.<string> - the server's response text

Param Type Description
jid String the jid of the job to FAIL
e Error an error object that caused the job to fail

Job

A class wrapping a JobPayload

Creating and pushing a job is typically accomplished by using a faktory client, which implements .job and automatically sets the client for the job when calling .push on the job later.

You do not need to use this class directly.`

Kind: global class

new Job(jobtype, [client])

Creates a job

Param Type Description
jobtype string Jobtype string
[client] Client a client to use for communicating to the server (if calling push)

Example (with a faktory client)

// with a client
const client = await faktory.connect();
const job = client.job('SendWelcomeEmail', id);

job.jid

sets the jid

Kind: instance property of Job
See: JobPayload

Param Type Description
value string the >8 length jid

job.queue

sets the queue

Kind: instance property of Job
See: JobPayload

Param Type Description
value string queue name

job.args

sets the args

Kind: instance property of Job
See: JobPayload

Param Type Description
value Array array of positional arguments

job.priority

sets the priority of this job

Kind: instance property of Job
See: JobPayload

Param Type Description
value number 0-9

job.retry

sets the retry count

Kind: instance property of Job
See: JobPayload

Param Type Description
value number {@see JobPayload}

job.at

sets the scheduled time

Kind: instance property of Job
See: JobPayload

Param Type Description
value Date | string the date object or RFC3339 timestamp string

job.reserveFor

sets the reserveFor parameter

Kind: instance property of Job
See: JobPayload

Param Type
value number

job.custom

sets the custom object property

Kind: instance property of Job
See: JobPayload

Param Type Description
value object the custom data

job.toJSON() ⇒ object

Generates an object from this instance for transmission over the wire

Kind: instance method of Job
Returns: object - the job as a serializable javascript object
Link: JobPayload|JobPayload}
See: JobPayload

job.push() ⇒ string

Pushes this job to the faktory server. Modifications after this point are not persistable to the server

Kind: instance method of Job
Returns: string - return of client.push(job)

Job.jid() ⇒ string

generates a uuid

Kind: static method of Job
Returns: string - a uuid/v4 string

Mutation

A wrapper for the Mutate API

A low-level data management API to script certain repairs or migrations.

!!! Please be warned: MUTATE commands can be slow and/or resource intensive. They should not be used as part of your application logic.

Kind: global class

new Mutation(client)

Param Type
client Client

mutation.ofType(type)

Filters the affected jobs by a jobtype string. Use this to ensure you're only affecting a single jobtype if applicable. Can be chained.

Note: jobtype and other filters do not apply for the clear command.

Kind: instance method of Mutation

Param Type Description
type string jobtype fiter for operation

Example

client.dead.ofType('SendEmail').discard();

mutation.withJids(...jids)

Filters the affected jobs by one or more job ids. This is much more efficient when only one jid is provided. Can be chained.

Note: jobtype and other filters do not apply for the clear command.

Kind: instance method of Mutation

Param Type Description
...jids string job ids to target for the operation

Example

await client.retries.withJids('1234').requeue();

mutation.matching(pattern)

Filters the MUTATE selection to jobs matching a Redis SCAN pattern. Can be chained.

Note the regexp filter scans the entire job payload and can be tricky to get right, for instance you'll probably need * on both sides. The regexp filter option is passed to Redis's SCAN command directly, read the SCAN documentation for further details. https://redis.io/commands/scan

Kind: instance method of Mutation

Param Type Description
pattern string redis SCAN pattern to target jobs for the operation

Example

await client.retries.matching("*uid:12345*").kill();

mutation.clear()

Executes a clear mutation. This clears the set entirely and any filtering added does not apply.

Kind: instance method of Mutation

mutation.kill()

Executes a kill mutation. Jobs that are killed are sent to the dead set.

Kind: instance method of Mutation

mutation.discard()

Executes a discard mutation. Jobs that are discarded are permanently deleted.

Kind: instance method of Mutation

mutation.requeue()

Executes a requeue mutation. Jobs that are requeued are sent back to their original queue for processing.

Kind: instance method of Mutation

Worker

Representation of a worker process with many concurrent job processors. Works at the concurrency set in options during construction. Will hold at most concurrency jobs in-memory while processing at any one time. Listens for signals to quiet or shutdown. Should not be started more than once per-process, nor should more than one worker be started per-process.

Kind: global class

new Worker([options])

Param Type Default Description
[options] object
[options.wid] String uuid().slice(0, 8) the wid the worker will use
[options.concurrency] Number 20 how many jobs this worker can process at once
[options.shutdownTimeout] Number 8 the amount of time in seconds that the worker may take to finish a job before exiting ungracefully
[options.beatInterval] Number 15 the amount of time in seconds between each heartbeat
[options.queues] Array.<string> ['default'] the queues this worker will fetch jobs from
[options.middleware] Array.<function()> [] a set of middleware to run before performing each job in koa.js-style middleware execution signature
[options.registry] Registry Registry the job registry to use when working
[options.poolSize] Number concurrency+2 the client connection pool size for this worker

Example

const worker = new Worker({
  queues: ['critical', 'default', 'low'],
});

worker.work();

worker.work() ⇒

starts the worker fetch loop and job processing

Kind: instance method of Worker
Returns: self, when working has been stopped by a signal or concurrent call to stop or quiet
See

  • Worker.quiet
  • Worker.stop

worker.quiet()

Signals to the worker to discontinue fetching new jobs and allows the worker to continue processing any currently-running jobs

Kind: instance method of Worker

worker.stop() ⇒ promise

stops the worker

Kind: instance method of Worker
Returns: promise - resolved when worker stops

worker.beat()

Sends a heartbeat for this server and interprets the response state (if present) to quiet or terminate the worker

Kind: instance method of Worker

worker.use(fn) ⇒ FaktoryControl

Adds a middleware function to the stack

Kind: instance method of Worker
Returns: FaktoryControl - this
See: koa middleware

Param Type Description
fn function koa-compose-style middleware function

Example

faktory.use(async (ctx, next) => {
  // a pool you created to hold database connections
  pool.use(async (conn) => {
    ctx.db = conn;
    await next();
  });
});

worker.register(name, fn) ⇒ FaktoryControl

Adds a JobFunction to the Registry

Kind: instance method of Worker
Returns: FaktoryControl - this

Param Type Description
name Jobtype string descriptor for the jobtype
fn JobFunction

Example

faktory.register('MyJob', (...args) => {
  // some work
});

HI : object

An after-connect initial message from the server to handshake the connection

Kind: global typedef
See: HELLO
Properties

Name Type Description
v number faktory server protocol version number
i number only present when password is required. number of password hash iterations. see HELLO.
s string only present when password is required. salt for password hashing. see HELLO.

JobFunction : function

A function that executes work

Kind: global typedef

Param Type Description
...args * arguments from the job payload

Example

function(...args) {
  // does something meaningful
}

Jobtype : string

Discriminator used by a worker to decide how to execute a job. This will be the name you used during register.

Kind: global typedef
See: https://github.com/contribsys/faktory/wiki/The-Job-Payload
Example

// where `MyFunction` is the jobtype

faktory.register('MyFunction', () => {})

JobFunction : function

A function that executes work

Kind: global typedef

Param Type Description
...args * arguments from the job payload

Example

function(...args) {
  // does something meaningful
}

Registry : Object.<Jobtype, JobFunction>

A lookup table holding the jobtype constants mapped to their job functions

Kind: global typedef
See

  • Jobtype
  • JobFunction

Example

{
  SendWelcomeUser: (id) => {
    // job fn
  },
  GenerateThumbnail: (id, size) => {
    // job fn
  }
}

ContextProvider : function

A function returned by a job function that will be called with the job context as its only argument and awaited. This exists to allow you to define simple job functions that only accept their job args, but in many cases you might need the job's custom properties or stateful connections (like a database connection) in your job and want to attach a connection for your job function to use without having to create it itself.

Kind: global typedef
See: Context

Param Type Description
ctx object context object containing the job and any other data attached via userland-middleware

Example

// assumes you have middleware that attaches `db` to `ctx`

faktory.register('UserWelcomer', (...args) => async (ctx) => {
  const [ id ] = args;
  const user = await ctx.db.users.find(id);
  const email = new WelcomeEmail(user);
  await email.deliver();
});

Context : object

A context object passed through middleware and to a job thunk

Kind: global typedef
Properties

Name Type Description
Context.job object the job payload
Context.fn function a reference to the job function

ContextProvider : function

A function returned by a job function that will be called with the job context as its only argument and awaited. This exists to allow you to define simple job functions that only accept their job args, but in many cases you might need the job's custom properties or stateful connections (like a database connection) in your job and want to attach a connection for your job function to use without having to create it itself.

Kind: global typedef
See: Context

Param Type Description
ctx object context object containing the job and any other data attached via userland-middleware

Example

// assumes you have middleware that attaches `db` to `ctx`

faktory.register('UserWelcomer', (...args) => async (ctx) => {
  const [ id ] = args;
  const user = await ctx.db.users.find(id);
  const email = new WelcomeEmail(user);
  await email.deliver();
});

HELLO : object

The client's response to the server's HI to initiate a connection

Kind: global typedef
See

Properties

Name Type Description
v string the faktory client protocol version
hostname string name of the host that is running this worker
wid string globally unique identifier for this worker
pid number local process identifier for this worker on its host
labels Array.<string> labels that apply to this worker, to allow producers to target work units to worker types.
pwdhash string This field should be the hexadecimal representation of the ith SHA256 hash of the client password concatenated with the value in s.

JobPayload : Object

A work unit that can be scheduled by the faktory work server and executed by clients

Kind: global typedef
See

Properties

Name Type Default Description
[jid] string "uuid()" globally unique ID for the job.
jobtype Jobtype
[queue] string "default" which job queue to push this job onto.
[args] array [] parameters the worker should use when executing the job.
[priority] number 5 higher priority jobs are dequeued before lower priority jobs.
[retry] number 25 number of times to retry this job if it fails. 0 discards the failed job, -1 saves the failed job to the dead set.
[at] RFC3339_DateTime run the job at approximately this time; immediately if blank
[reserve_for] number 1800 number of seconds a job may be held by a worker before it is considered failed.
custom object provides additional context to the worker executing the job.

RejectedJobFromPushBulk : Object

A lookup table holding the jobtype constants mapped to their job functions

Kind: global typedef
See: JobPayload
Properties

Name Type Description
reason string server-provided reason for the job failing to enqueue.
jobPayload JobPayload the job payload that failed to enqueue.

RejectedJobsFromPushBulk : Array

A lookup table holding the jobtype constants mapped to their job functions

Kind: global typedef
See

  • RejectedJobFromPushBulk
  • JobPayload

RFC3339_DateTime : string

An RFC3339-format datetime string

Kind: global typedef
Example

"2002-10-02T10:00:00-05:00"
"2002-10-02T15:00:00Z"
"2002-10-02T15:00:00.05Z"

new Date().toISOString();
// => '2019-02-11T15:59:15.593Z'