Skip to content

Celery Canvas: Workflow Primitives

Celery Canvas is the workflow composition layer — it lets you build complex task pipelines from simple primitives. All primitives are lazy: they describe work without executing it until you call .delay() or .apply_async().


Primitives Overview

PrimitivePurposeExecution
signatureReusable, lazy task callSingle task
chainSequential — each result feeds the nextSeries
groupParallel — run N tasks at onceParallel
chordgroup + callback after all finishParallel + join
chunksSplit one big iterable into batchesParallel
map / starmapApply a task across a listParallel

Setup (shared for all examples)

# tasks.py
from celery import Celery

app = Celery("demo", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")

@app.task
def add(x, y):
    return x + y

@app.task
def double(x):
    return x * 2

@app.task
def summarize(results):
    return {"total": sum(results), "count": len(results)}

1. signature — lazy task descriptor

A signature (alias s) wraps a task call with its arguments. It can be passed around, partially applied, or composed.

from celery import signature
from tasks import add

# equivalent ways to create a signature
sig = add.s(3, 4)          # shorthand
sig = add.signature((3, 4)) # explicit
sig = signature("tasks.add", args=(3, 4))  # by name

result = sig.delay()       # actually send it
print(result.get())        # 7

Partial application — freeze some args, leave others for the caller:

# x=10 is frozen; caller provides y via chaining or direct call
partial = add.s(10)
partial.delay(5).get()     # 15  (10 + 5)

2. chain — sequential pipeline

Each task receives the return value of the previous task as its first argument.

from celery import chain
from tasks import add, double

# add(2, 2) → 4 → double(4) → 8 → double(8) → 16
result = chain(add.s(2, 2), double.s(), double.s()).delay()
print(result.get())  # 16

Using the | pipe operator (syntactic sugar):

pipeline = add.s(2, 2) | double.s() | double.s()
result = pipeline.delay()
print(result.get())  # 16

Key rule: every task in the chain (except the first) must accept the previous result as its first positional arg. Use add.si(x, y) (immutable signature) to ignore the incoming value.

from tasks import add

# si() = immutable: ignores the value passed in from the previous task
pipeline = add.s(1, 1) | add.si(10, 20)   # result of first is discarded
result = pipeline.delay()
print(result.get())  # 30

3. group — parallel fan-out

A group runs N tasks concurrently and returns a list of results in the same order as the tasks.

from celery import group
from tasks import double

# runs double(1), double(2), double(3) in parallel
job = group(double.s(i) for i in range(1, 4))
result = job.delay()
print(result.get())  # [2, 4, 6]

result.get() blocks until all tasks complete.

result = group(add.s(i, i) for i in range(5)).delay()
# can also check readiness
if result.ready():
    print(result.get())   # [0, 2, 4, 6, 8]

4. chord — parallel + join callback

A chord is a group with a header (parallel tasks) and a body (callback that receives all results).

from celery import chord
from tasks import double, summarize

header = group(double.s(i) for i in range(1, 6))   # [2, 4, 6, 8, 10]
callback = summarize.s()                             # receives [2, 4, 6, 8, 10]

result = chord(header)(callback)
print(result.get())  # {"total": 30, "count": 5}

Shorter form using |:

result = (group(double.s(i) for i in range(1, 6)) | summarize.s()).delay()
print(result.get())  # {"total": 30, "count": 5}

Requires a result backend (e.g. Redis, RabbitMQ). Without it, Celery can’t know when all header tasks are done.


5. chunks — batch an iterable

Splits a long sequence into N-sized chunks, each chunk processed by one worker call.

from tasks import add

# process 1000 add() calls in batches of 10 → 100 worker tasks
result = add.chunks(zip(range(100), range(100)), 10).group().apply_async()
print(result.get())  # [[0, 2, 4, ...], ...]  (list of 10 sub-lists)

6. map and starmap — apply over a list

map passes each item as a single argument:

result = double.map([1, 2, 3, 4]).delay()
print(result.get())   # [2, 4, 6, 8]

starmap unpacks each item as *args:

result = add.starmap([(1, 2), (3, 4), (5, 6)]).delay()
print(result.get())   # [3, 7, 11]

map/starmap run as a single task on one worker (sequential internally). Use group if you need actual parallelism.


Composing Primitives

Primitives can be nested freely.

Fan-out → reduce

from celery import group, chord

# parallel multiply, then sum everything
pipeline = chord(
    group(add.s(i, i * 2) for i in range(5)),
    summarize.s()
)
print(pipeline().get())  # {"total": 30, "count": 5}

Chain of groups (staged pipeline)

from celery import chain, group

# stage 1: double all in parallel
# stage 2: chain the group result into another task
# NOTE: a group inside a chain receives no implicit input from the previous step.
# Use a chord to pass group results forward.

stage1 = chord(group(double.s(i) for i in range(1, 4)), summarize.s())
stage2 = double.s()   # receives summarize's return value

pipeline = chain(stage1, stage2)   # not meaningful here, just structure demo

Real multi-stage pattern

@app.task
def fetch(url):
    return url   # imagine HTTP call

@app.task
def parse(html):
    return len(html)

@app.task
def aggregate(sizes):
    return sum(sizes)

urls = ["http://a.com", "http://b.com", "http://c.com"]

pipeline = chord(
    group(chain(fetch.s(u), parse.s()) for u in urls),
    aggregate.s()
)
result = pipeline.delay()
print(result.get())

Error Handling

result = chain(add.s(1, 2), double.s()).apply_async()

try:
    value = result.get(timeout=10, propagate=True)  # re-raises task exceptions
except Exception as e:
    print(f"Pipeline failed: {e}")

Use .on_error() to attach an errback:

@app.task
def on_failure(request, exc, traceback):
    print(f"Task {request.id} failed: {exc}")

add.s(1, 2).on_error(on_failure.s()).delay()

Quick Reference

# Signature
add.s(1, 2)                      # lazy descriptor
add.si(1, 2)                     # immutable — ignores incoming chain value
add.s(1)                         # partial — caller provides second arg

# Chain (sequential)
chain(t1, t2, t3)
t1 | t2 | t3

# Group (parallel)
group(t.s(i) for i in items)

# Chord (parallel + callback)
chord(group(...))(callback.s())
(group(...) | callback.s()).delay()

# Chunks
task.chunks(iterable, size)

# Map / Starmap
task.map(list)
task.starmap(list_of_tuples)

When to use what

  • Need output of A before B startschain
  • A, B, C are independentgroup
  • Need all parallel results mergedchord
  • Splitting a large datasetchunks
  • Applying one task over a list (single worker)map / starmap
  • Applying one task over a list (parallel workers)group