python
capillary
- Status
Alpha (API feedback is welcome, API could break compatibility)
- Generated
- Version
- License
BSD 3 Clause
- Authors
Domen Kožar and Aaron McMillin
Introduction
capillary
is a small integration package for the celery
Distributed Task Queue with an aim of designing workflows (Canvas) in a declarative manner using Python decorators.
The main reason why capillary
exists is to get rid of manual tracking how celery tasks depend on each other.
capillary
executes in two phases:
- Scans for all Celery tasks in defined Python packages
- Executes tasks based on metadata passed to
@pipeline
decorators
capillary
uses:
venusian
to discover Celery tasks using deferred decoratorsnetworkx
to handle the graph operations for tracking dependencies between tasks in the workflow
Below is the first steps with Celery tutorial expanded with capillary
integration.
The tasks.py
module contains the steps of the pipeline. Steps are marked with the @pipeline
decorator, which has optional parameters to indicate that this step must be executed after some other step or that the step has certain tags to allow groups of tasks to be executed together.
from capillary import pipeline
@pipeline()
def foo(celery_task):
return 'simple task flow'
@pipeline(after='foo')
def bar(celery_task, l):
return l.upper()
The myapp.py
module then creates a PipelineConfigurator
instance which will assemble the declared steps:
from celery import Celery
from capillary import PipelineConfigurator
import tasks
app = Celery('tasks', broker='redis://', backend='redis://')
pc = PipelineConfigurator(app)
pc.scan(tasks)
Start the worker with
$ celery worker -A myapp -D
and execute the pipeline in a Python shell:
>>> from myapp import pc
>>> asyncresult = pc.run()
>>> asyncresult.get()
SIMPLE TASK FLOW
This example will be used throughout the user guide as a base.
Note
This example assumes the Redis broker is running with default settings, but any Celery broker will do.
backend
is defined only for retrieving the result using .get(), it is otherwise not required.
Celery uses a concept called partials (sometimes also known as Currying) to create function signatures.
capillary
reuses these concepts to execute tasks. A value returned from task foo
is passed into task bar
.
It is possible to pass extra parameters to specific tasks as described in extra-parameters
.
By default PipelineConfigurator.run
will execute all scanned tasks without tags in topological order.
If tags=['foobar']
is passed to @pipeline
, the task will be run when `tagged_as=['foobar']
is passed to PipelineConfigurator.run
.
See predefined_defaults
for information on how to reduce boilerplate and group pipelines per tag.
If a step needs to stop the current pipeline (meaning no further tasks are processed in the pipeline), just raise capillary.AbortPipeline
anywhere in your pipeline tasks.
Some @pipeline
elements might require extra arguments that are only known when PipelineConfigurator.run
is called.
>>> @pipeline(
... required_kwarg_names=['param'],
... )
... def foobar(celery_task, param=None):
... print param
... return 'simple task flow'
When PipelineConfigurator.run
is called, it will need param passed inside required_kwargs; otherwise MissingArgument
will be thrown.
The most typical use case where two @pipeline
decorators are useful is when you'd like to reuse a function for two different pipelines each differently tagged.
@pipeline(
after=['first', 'second'],
tags=['some_pipeline'],
)
@pipeline(
after=['third'],
tags=['other_pipeline'],
)
def foobar(celery_task):
return 'simple task flow'
Executing ConfigurePipeline.run(tagged_as=['some_pipeline'])
would run the foobar function as a task after first and second tasks were done.
However executing ConfigurePipeline.run(tagged_as=['other_pipeline'])
would run the foobar function after third task was done.
Note
If both tags are used (e.g. ConfigurePipeline.run(tagged_as=['some_pipeline', 'other_pipeline'])
) then ordering of tags specified matters and the latter will override a former.
if you specify a different name parameter for each, they will be both executed.
Often @pipeline
definitions will repeat arguments through your application. make_pipeline_from_defaults
allows you to create customized predefined defaults for a pipeline. This example makes a foobar_pipeline
decorator that will apply the same tag to each step:
>>> from capillary import make_pipeline_from_defaults
>>> foobar_pipeline = make_pipeline_from_defaults(
>>> tags=["foobar"]
>>> )
Then use @foobar_pipeline
just as one would use @pipeline
while all your definitions will have foobar as a tag.
Note
Passing tags
to @foobar_pipeline
will override ["foobar"]
value.
To actually see what kind of canvas will be executed call ConfigurePipeline.prettyprint
with the same arguments as ConfigurePipeline.run
>>> pc.prettyprint(args=[], kwargs={})
tasks.foo() | tasks.bar()
Using a constant capillary.ALL
it's possible to declare a task as the last one in the pipeline
>>> from capillary import ALL, pipeline
>>> @pipeline(
... after=ALL,
... )
... def last(celery_task, obj):
... print('ALL DONE!')
... return obj
Note
Multiple tasks with after=ALL steps will be run in celery.group
as the last part of the pipeline.
The following is a quick summary of what happens inside ~PipelineConfigurator.run()
:
- task tree is generated using dependency information
- Celery signatures are created
- task tree is reduced into a chain using topological sort
- tasks is executed using
celery.app.Task.apply_async
Note
Currently the task tree is reduced into a linear chained list of tasks, but in future different "runners" could be implemented.
Functions marked as @pipeline
elements are still just simple untouched functions, until PipelineConfigurator.scan()
is called. If function code doesn't depend on the first argument of celery_task
, just pass None as the value.
To unit test our two pipeline elements from simple-example
:
class PipelineTestCase(unittest.TestCase):
def test_bar(self):
self.assertEquals(bar(None, 'test'), 'TEST')
def test_foo(self):
self.assertEquals(foo(None), 'simple task flow')
To run tests install py.test and run it:
$ py.test tests/
- Using a lot of tasks with large objects passed as arguments can be quite storage intensive. One alternative would be to generate signatures on-the-fly if Celery permits that.
capillary
capillary.PipelineConfigurator