Documentation Index
Fetch the complete documentation index at: https://docs.fibonacci.today/llms.txt
Use this file to discover all available pages before exploring further.
The Workflow class is the foundation of Fibonacci. It orchestrates node execution, manages dependencies, and handles the full workflow lifecycle from building to deployment and execution.
Constructor
from fibonacci import Workflow
wf = Workflow(
name="customer-support-bot",
description="Routes and responds to customer support tickets",
version=1
)
Parameters
| Parameter | Type | Default | Description |
|---|
name | str | Required | Workflow name |
description | str | "" | Human-readable description |
version | int | 1 | Workflow version number |
is_active | bool | True | Whether the deployed workflow is active |
is_template | bool | False | Save as a reusable template |
config | Config | None | SDK configuration object (uses default config if omitted) |
Class Methods
from_yaml
Load a workflow from a YAML file.
wf = Workflow.from_yaml(path: str) -> Workflow
Parameters:
path (str): Path to the YAML workflow file
Returns: Workflow instance
Example:
wf = Workflow.from_yaml("workflows/analyzer.yaml")
Instance Methods
add_node
Add a single node to the workflow.
wf.add_node(node: Node) -> Workflow
Returns self for chaining:
wf.add_node(node_a).add_node(node_b).add_node(node_c)
add_nodes
Add multiple nodes at once.
wf.add_nodes(nodes: list[Node]) -> Workflow
Example:
wf.add_nodes([classifier, router, handler_a, handler_b])
get_node
Look up a node by ID.
wf.get_node(node_id: str) -> Node | None
remove_node
Remove a node from the workflow.
wf.remove_node(node_id: str) -> Workflow
validate
Validate the workflow DAG (checks for cycles, missing dependencies, etc.).
Returns: True if valid, raises ValidationError otherwise.
if wf.validate():
wf.deploy()
deploy
Deploy the workflow to the Fibonacci platform.
wf.deploy(
api_key: str | None = None,
validate: bool = True
) -> str
Parameters:
api_key (str): API key to use for deployment. Falls back to the configured key if omitted.
validate (bool): Validate the workflow before deploying (default True).
Returns: workflow_id string — save this to run the workflow later.
Raises: AuthenticationError, ValidationError, DeploymentError
Example:
workflow_id = wf.deploy(api_key="fib_live_abc123")
print(f"Deployed: {workflow_id}")
deploy_async
Async version of deploy().
workflow_id = await wf.deploy_async(api_key="fib_live_abc123")
run
Execute the workflow synchronously.
wf.run(
input_data: dict,
workflow_id: str | None = None,
wait: bool = True,
timeout: float = 300.0,
secrets: dict | None = None
) -> WorkflowRunStatus
Parameters:
input_data (dict): Input data for the workflow.
workflow_id (str): Workflow ID to run. Uses the last deployed workflow if omitted.
wait (bool): Wait for the run to complete before returning (default True).
timeout (float): Maximum seconds to wait for completion (default 300.0).
secrets (dict): Runtime secrets injected into the workflow (e.g. {"SLACK_TOKEN": "xoxb-..."}).
Returns: WorkflowRunStatus with fields:
.status — "completed", "failed", "running", etc.
.output_data — dict of node outputs keyed by node ID
.error_message — error details if status is "failed"
.total_cost — total cost of the run in USD
.nodes_executed — number of nodes that ran
Raises: AuthenticationError, ExecutionError, RateLimitError
Example:
result = wf.run(
input_data={"text": "Hello world"},
secrets={"GOOGLE_TOKEN": "my-token"}
)
print(result.output_data["analyzer"])
run_async
Async version of run().
import asyncio
async def main():
result = await wf.run_async(
input_data={"text": "Hello world"}
)
print(result.output_data)
asyncio.run(main())
get_status
Get the status of a specific run.
wf.get_status(run_id: str | None = None) -> WorkflowRunStatus
update
Update workflow metadata on the platform.
wf.update(
name: str | None = None,
description: str | None = None,
is_active: bool | None = None
) -> WorkflowResponse
delete
Delete the deployed workflow.
activate / deactivate
Toggle a deployed workflow on or off without deleting it.
wf.activate() # -> WorkflowResponse
wf.deactivate() # -> WorkflowResponse
get_stats
Get aggregated statistics for the deployed workflow.
wf.get_stats() -> WorkflowStats
Returns: WorkflowStats with .total_runs, .success_rate, .avg_duration_seconds, .avg_cost.
to_yaml
Export the workflow to a YAML string (and optionally write to a file).
yaml_str = wf.to_yaml() # return string
wf.to_yaml("exported.yaml") # write to file
to_yaml_string
Export the workflow to a YAML string (always returns a string, never writes).
yaml_str = wf.to_yaml_string()
to_dict
Export the workflow as a Python dictionary.
Properties
workflow_id
The ID assigned after deployment. None if the workflow has not been deployed yet.
print(wf.workflow_id) # e.g. "wf_abc123"
nodes
List of nodes currently in the workflow.
for node in wf.nodes:
print(node.id)
node_count
Number of nodes in the workflow.
print(wf.node_count) # e.g. 4
YAML Workflows
Load a workflow from a YAML definition file:
wf = Workflow.from_yaml("workflow.yaml")
wf.deploy()
result = wf.run(input_data={"text": "Hello!"})
Export an existing workflow to YAML:
wf.to_yaml("exported.yaml")
See the YAML Workflows guide for the full YAML schema.
Complete Example
from fibonacci import Workflow, LLMNode, ToolNode, ConditionalNode, Config
# Optional: explicit config
config = Config(api_key="fib_live_abc123")
wf = Workflow(
name="feedback-pipeline",
description="Classifies and routes customer feedback",
version=1,
config=config
)
# Classify
classify = LLMNode(
id="classify",
name="Classify Feedback",
instruction="Classify as positive, negative, or neutral. One word only.\n\n{{input.feedback}}"
)
# Route
router = ConditionalNode(
id="router",
name="Route by Sentiment",
left_value="{{classify}}",
operator="contains",
right_value="negative",
true_branch=["escalate"],
false_branch=["log_ok"],
dependencies=["classify"]
)
# Escalate negative
escalate = ToolNode(
id="escalate",
name="Alert Support Team",
tool="slack_send_message",
params={
"channel": "#support",
"message": "Negative feedback: {{input.feedback}}"
},
dependencies=["router"]
)
# Log positive/neutral
log_ok = ToolNode(
id="log_ok",
name="Log Feedback",
tool="google_sheets_append",
params={
"spreadsheet_id": "{{input.log_sheet}}",
"range": "Log!A:C",
"values": [["{{input.feedback}}", "{{classify}}"]]
},
dependencies=["router"]
)
wf.add_nodes([classify, router, escalate, log_ok])
# Validate before deploying
wf.validate()
# Deploy
workflow_id = wf.deploy()
print(f"Deployed: {workflow_id}")
# Run
result = wf.run(
input_data={"feedback": "Great product!", "log_sheet": "abc123"}
)
print(result.output_data)
print(f"Cost: ${result.total_cost:.4f}")
# Lifecycle management
stats = wf.get_stats()
print(f"Total runs: {stats.total_runs}, success rate: {stats.success_rate:.0%}")
# Deactivate when not needed
wf.deactivate()