Skip to main content
A Workflow is the core building block of Fibonacci. It’s a directed graph of nodes that execute in order based on their dependencies.

Anatomy of a Workflow

from fibonacci import Workflow

wf = Workflow(
    name="My Workflow",           # Required: Human-readable name
    description="What it does",   # Optional: Description
    version=1,                    # Optional: Version number
    is_active=True,               # Optional: Whether workflow is active
    tags=["tag1", "tag2"]         # Optional: Tags for organization
)

Creating a Workflow

From Python Code

from fibonacci import Workflow, LLMNode, ToolNode

wf = Workflow(name="Data Pipeline")

# Add nodes
node1 = ToolNode(id="fetch", name="Fetch Data", tool="api_call", params={})
node2 = LLMNode(id="process", name="Process", instruction="...", dependencies=["fetch"])

wf.add_nodes([node1, node2])

From YAML

from fibonacci import Workflow

wf = Workflow.from_yaml("workflow.yaml")
YAML format:
workflow.yaml
name: Data Pipeline
description: Fetches and processes data
version: 1
tags:
  - data
  - etl

nodes:
  - id: fetch
    type: tool
    name: Fetch Data
    tool_name: api_call
    tool_params: {}

  - id: process
    type: llm
    name: Process
    instruction: "Process this data: {{fetch}}"
    dependencies:
      - fetch

Workflow Lifecycle

1

Create

Define your workflow and nodes in Python or YAML
2

Validate

Check for errors like circular dependencies or missing nodes
wf.validate()
3

Deploy

Upload to Fibonacci platform
workflow_id = wf.deploy(api_key="...")
4

Execute

Run with input data
result = wf.run(input_data={"key": "value"})
5

Monitor

Check status and retrieve results
status = wf.get_status(run_id)
stats = wf.get_stats()

Workflow Properties

PropertyTypeDescription
namestrHuman-readable workflow name
descriptionstrDescription of what the workflow does
versionintVersion number (default: 1)
is_activeboolWhether workflow can be executed
tagslist[str]Tags for organization and filtering
workflow_idstrAssigned after deployment
nodeslist[Node]List of nodes in the workflow
node_countintNumber of nodes

Managing Workflows

List All Workflows

from fibonacci import FibonacciClient
from fibonacci.config import Config

config = Config.from_env()

async with FibonacciClient(config) as client:
    workflows = await client.list_workflows()
    for wf in workflows:
        print(f"{wf['name']} ({wf['id']})")
Or use the CLI:
fibonacci list

Update a Workflow

# Update name and description
wf.update(
    name="New Name",
    description="New description"
)

# Deactivate
wf.deactivate()

# Reactivate
wf.activate()

Delete a Workflow

wf.delete()

# Or by ID
wf.delete(workflow_id="wf_abc123")

Get Workflow Statistics

stats = wf.get_stats()

print(f"Total runs: {stats.total_runs}")
print(f"Success rate: {stats.success_rate}%")
print(f"Average duration: {stats.avg_duration_seconds}s")
print(f"Total cost: ${stats.total_cost:.2f}")

Execution

Synchronous Execution

Wait for the workflow to complete:
result = wf.run(
    input_data={"text": "Hello world"},
    wait=True,      # Wait for completion (default)
    timeout=300.0   # Timeout in seconds
)

print(result.status)       # "completed" or "failed"
print(result.output_data)  # Node outputs
print(result.duration_seconds)
print(result.total_cost)

Asynchronous Execution

Start execution and check later:
# Start without waiting
result = wf.run(
    input_data={"text": "Hello"},
    wait=False
)

run_id = result.id
print(f"Started run: {run_id}")

# Check status later
status = wf.get_status(run_id)
print(f"Status: {status.status}")

# Wait for completion
final = await client.wait_for_completion(run_id)

Async/Await Pattern

For high-performance applications:
import asyncio
from fibonacci import Workflow

async def main():
    wf = Workflow(name="Async Example")
    # ... add nodes ...
    
    # Deploy async
    workflow_id = await wf.deploy_async(api_key="...")
    
    # Run async
    result = await wf.run_async(input_data={"key": "value"})
    
    print(result.output_data)

asyncio.run(main())

Input Data

Pass data to your workflow at runtime:
result = wf.run(input_data={
    "customer_id": "cust_123",
    "report_type": "monthly",
    "include_charts": True,
    "filters": {
        "start_date": "2024-01-01",
        "end_date": "2024-01-31"
    }
})
Access in nodes with {{input.field_name}}:
LLMNode(
    id="generate",
    instruction="""
    Generate a {{input.report_type}} report for customer {{input.customer_id}}.
    Date range: {{input.filters.start_date}} to {{input.filters.end_date}}
    """
)

Output Data

Each node’s output is stored by its ID:
result = wf.run(input_data={...})

# Access individual node outputs
raw_data = result.output_data["fetch_data"]
analysis = result.output_data["analyze"]
summary = result.output_data["summarize"]

Best Practices

# ❌ Bad
wf = Workflow(name="wf1")

# ✅ Good
wf = Workflow(name="Weekly Sales Report Generator")
wf = Workflow(
    name="Customer Onboarding",
    tags=["customers", "onboarding", "production", "v2"]
)
# In code
wf = Workflow(name="Pipeline", version=2)

# Or use YAML files with git version control
wf = Workflow.from_yaml("pipelines/v2/main.yaml")
try:
    wf.validate()
    wf.deploy(api_key=api_key)
except ValidationError as e:
    print(f"Fix these errors: {e.errors}")

Next Steps