Documentation Index Fetch the complete documentation index at: https://docs.fibonacci.today/llms.txt
Use this file to discover all available pages before exploring further.
A Workflow is the core building block of Fibonacci. It’s a directed graph of nodes that execute in order based on their dependencies.
Anatomy of a Workflow
from fibonacci import Workflow
wf = Workflow(
name = "My Workflow" , # Required: Human-readable name
description = "What it does" , # Optional: Description
version = 1 , # Optional: Version number
is_active = True , # Optional: Whether workflow is active
tags = [ "tag1" , "tag2" ] # Optional: Tags for organization
)
Creating a Workflow
From Python Code
from fibonacci import Workflow, LLMNode, ToolNode
wf = Workflow( name = "Data Pipeline" )
# Add nodes
node1 = ToolNode( id = "fetch" , name = "Fetch Data" , tool = "api_call" , params = {})
node2 = LLMNode( id = "process" , name = "Process" , instruction = "..." , dependencies = [ "fetch" ])
wf.add_nodes([node1, node2])
From YAML
from fibonacci import Workflow
wf = Workflow.from_yaml( "workflow.yaml" )
YAML format:
name : Data Pipeline
description : Fetches and processes data
version : 1
tags :
- data
- etl
nodes :
- id : fetch
type : tool
name : Fetch Data
tool_name : api_call
tool_params : {}
- id : process
type : llm
name : Process
instruction : "Process this data: {{fetch}}"
dependencies :
- fetch
Workflow Lifecycle
Create
Define your workflow and nodes in Python or YAML
Validate
Check for errors like circular dependencies or missing nodes
Deploy
Upload to Fibonacci platform workflow_id = wf.deploy( api_key = "..." )
Execute
Run with input data result = wf.run( input_data = { "key" : "value" })
Monitor
Check status and retrieve results status = wf.get_status(run_id)
stats = wf.get_stats()
Workflow Properties
Property Type Description namestrHuman-readable workflow name descriptionstrDescription of what the workflow does versionintVersion number (default: 1) is_activeboolWhether workflow can be executed tagslist[str]Tags for organization and filtering workflow_idstrAssigned after deployment nodeslist[Node]List of nodes in the workflow node_countintNumber of nodes
Managing Workflows
List All Workflows
from fibonacci import FibonacciClient
from fibonacci.config import Config
config = Config.from_env()
async with FibonacciClient(config) as client:
workflows = await client.list_workflows()
for wf in workflows:
print ( f " { wf[ 'name' ] } ( { wf[ 'id' ] } )" )
Or use the CLI:
Update a Workflow
# Update name and description
wf.update(
name = "New Name" ,
description = "New description"
)
# Deactivate
wf.deactivate()
# Reactivate
wf.activate()
Delete a Workflow
wf.delete()
# Or by ID
wf.delete( workflow_id = "wf_abc123" )
Get Workflow Statistics
stats = wf.get_stats()
print ( f "Total runs: { stats.total_runs } " )
print ( f "Success rate: { stats.success_rate } %" )
print ( f "Average duration: { stats.avg_duration_seconds } s" )
print ( f "Total cost: $ { stats.total_cost :.2f} " )
Execution
Synchronous Execution
Wait for the workflow to complete:
result = wf.run(
input_data = { "text" : "Hello world" },
wait = True , # Wait for completion (default)
timeout = 300.0 # Timeout in seconds
)
print (result.status) # "completed" or "failed"
print (result.output_data) # Node outputs
print (result.duration_seconds)
print (result.total_cost)
Asynchronous Execution
Start execution and check later:
# Start without waiting
result = wf.run(
input_data = { "text" : "Hello" },
wait = False
)
run_id = result.id
print ( f "Started run: { run_id } " )
# Check status later
status = wf.get_status(run_id)
print ( f "Status: { status.status } " )
# Wait for completion
final = await client.wait_for_completion(run_id)
Async/Await Pattern
For high-performance applications:
import asyncio
from fibonacci import Workflow
async def main ():
wf = Workflow( name = "Async Example" )
# ... add nodes ...
# Deploy async
workflow_id = await wf.deploy_async( api_key = "..." )
# Run async
result = await wf.run_async( input_data = { "key" : "value" })
print (result.output_data)
asyncio.run(main())
Pass data to your workflow at runtime:
result = wf.run( input_data = {
"customer_id" : "cust_123" ,
"report_type" : "monthly" ,
"include_charts" : True ,
"filters" : {
"start_date" : "2024-01-01" ,
"end_date" : "2024-01-31"
}
})
Access in nodes with {{input.field_name}}:
LLMNode(
id = "generate" ,
instruction = """
Generate a {{ input.report_type }} report for customer {{ input.customer_id }} .
Date range: {{ input.filters.start_date }} to {{ input.filters.end_date }}
"""
)
Output Data
Each node’s output is stored by its ID:
result = wf.run( input_data = { ... })
# Access individual node outputs
raw_data = result.output_data[ "fetch_data" ]
analysis = result.output_data[ "analyze" ]
summary = result.output_data[ "summarize" ]
Best Practices
# ❌ Bad
wf = Workflow( name = "wf1" )
# ✅ Good
wf = Workflow( name = "Weekly Sales Report Generator" )
# In code
wf = Workflow( name = "Pipeline" , version = 2 )
# Or use YAML files with git version control
wf = Workflow.from_yaml( "pipelines/v2/main.yaml" )
Validate before deploying
try :
wf.validate()
wf.deploy( api_key = api_key)
except ValidationError as e:
print ( f "Fix these errors: { e.errors } " )
Next Steps
Node Types Learn about the different types of nodes
YAML Workflows Define workflows declaratively