Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.fibonacci.today/llms.txt

Use this file to discover all available pages before exploring further.

The Workflow class is the foundation of Fibonacci. It orchestrates node execution, manages dependencies, and handles the full workflow lifecycle from building to deployment and execution.

Constructor

from fibonacci import Workflow

wf = Workflow(
    name="customer-support-bot",
    description="Routes and responds to customer support tickets",
    version=1
)

Parameters

ParameterTypeDefaultDescription
namestrRequiredWorkflow name
descriptionstr""Human-readable description
versionint1Workflow version number
is_activeboolTrueWhether the deployed workflow is active
is_templateboolFalseSave as a reusable template
configConfigNoneSDK configuration object (uses default config if omitted)

Class Methods

from_yaml

Load a workflow from a YAML file.
wf = Workflow.from_yaml(path: str) -> Workflow
Parameters:
  • path (str): Path to the YAML workflow file
Returns: Workflow instance Example:
wf = Workflow.from_yaml("workflows/analyzer.yaml")

Instance Methods

add_node

Add a single node to the workflow.
wf.add_node(node: Node) -> Workflow
Returns self for chaining:
wf.add_node(node_a).add_node(node_b).add_node(node_c)

add_nodes

Add multiple nodes at once.
wf.add_nodes(nodes: list[Node]) -> Workflow
Example:
wf.add_nodes([classifier, router, handler_a, handler_b])

get_node

Look up a node by ID.
wf.get_node(node_id: str) -> Node | None

remove_node

Remove a node from the workflow.
wf.remove_node(node_id: str) -> Workflow

validate

Validate the workflow DAG (checks for cycles, missing dependencies, etc.).
wf.validate() -> bool
Returns: True if valid, raises ValidationError otherwise.
if wf.validate():
    wf.deploy()

deploy

Deploy the workflow to the Fibonacci platform.
wf.deploy(
    api_key: str | None = None,
    validate: bool = True
) -> str
Parameters:
  • api_key (str): API key to use for deployment. Falls back to the configured key if omitted.
  • validate (bool): Validate the workflow before deploying (default True).
Returns: workflow_id string — save this to run the workflow later. Raises: AuthenticationError, ValidationError, DeploymentError Example:
workflow_id = wf.deploy(api_key="fib_live_abc123")
print(f"Deployed: {workflow_id}")

deploy_async

Async version of deploy().
workflow_id = await wf.deploy_async(api_key="fib_live_abc123")

run

Execute the workflow synchronously.
wf.run(
    input_data: dict,
    workflow_id: str | None = None,
    wait: bool = True,
    timeout: float = 300.0,
    secrets: dict | None = None
) -> WorkflowRunStatus
Parameters:
  • input_data (dict): Input data for the workflow.
  • workflow_id (str): Workflow ID to run. Uses the last deployed workflow if omitted.
  • wait (bool): Wait for the run to complete before returning (default True).
  • timeout (float): Maximum seconds to wait for completion (default 300.0).
  • secrets (dict): Runtime secrets injected into the workflow (e.g. {"SLACK_TOKEN": "xoxb-..."}).
Returns: WorkflowRunStatus with fields:
  • .status"completed", "failed", "running", etc.
  • .output_data — dict of node outputs keyed by node ID
  • .error_message — error details if status is "failed"
  • .total_cost — total cost of the run in USD
  • .nodes_executed — number of nodes that ran
Raises: AuthenticationError, ExecutionError, RateLimitError Example:
result = wf.run(
    input_data={"text": "Hello world"},
    secrets={"GOOGLE_TOKEN": "my-token"}
)
print(result.output_data["analyzer"])

run_async

Async version of run().
import asyncio

async def main():
    result = await wf.run_async(
        input_data={"text": "Hello world"}
    )
    print(result.output_data)

asyncio.run(main())

get_status

Get the status of a specific run.
wf.get_status(run_id: str | None = None) -> WorkflowRunStatus

update

Update workflow metadata on the platform.
wf.update(
    name: str | None = None,
    description: str | None = None,
    is_active: bool | None = None
) -> WorkflowResponse

delete

Delete the deployed workflow.
wf.delete() -> bool

activate / deactivate

Toggle a deployed workflow on or off without deleting it.
wf.activate()    # -> WorkflowResponse
wf.deactivate()  # -> WorkflowResponse

get_stats

Get aggregated statistics for the deployed workflow.
wf.get_stats() -> WorkflowStats
Returns: WorkflowStats with .total_runs, .success_rate, .avg_duration_seconds, .avg_cost.

to_yaml

Export the workflow to a YAML string (and optionally write to a file).
yaml_str = wf.to_yaml()            # return string
wf.to_yaml("exported.yaml")        # write to file

to_yaml_string

Export the workflow to a YAML string (always returns a string, never writes).
yaml_str = wf.to_yaml_string()

to_dict

Export the workflow as a Python dictionary.
data = wf.to_dict()

Properties

workflow_id

The ID assigned after deployment. None if the workflow has not been deployed yet.
print(wf.workflow_id)   # e.g. "wf_abc123"

nodes

List of nodes currently in the workflow.
for node in wf.nodes:
    print(node.id)

node_count

Number of nodes in the workflow.
print(wf.node_count)   # e.g. 4

YAML Workflows

Load a workflow from a YAML definition file:
wf = Workflow.from_yaml("workflow.yaml")
wf.deploy()
result = wf.run(input_data={"text": "Hello!"})
Export an existing workflow to YAML:
wf.to_yaml("exported.yaml")
See the YAML Workflows guide for the full YAML schema.

Complete Example

from fibonacci import Workflow, LLMNode, ToolNode, ConditionalNode, Config

# Optional: explicit config
config = Config(api_key="fib_live_abc123")

wf = Workflow(
    name="feedback-pipeline",
    description="Classifies and routes customer feedback",
    version=1,
    config=config
)

# Classify
classify = LLMNode(
    id="classify",
    name="Classify Feedback",
    instruction="Classify as positive, negative, or neutral. One word only.\n\n{{input.feedback}}"
)

# Route
router = ConditionalNode(
    id="router",
    name="Route by Sentiment",
    left_value="{{classify}}",
    operator="contains",
    right_value="negative",
    true_branch=["escalate"],
    false_branch=["log_ok"],
    dependencies=["classify"]
)

# Escalate negative
escalate = ToolNode(
    id="escalate",
    name="Alert Support Team",
    tool="slack_send_message",
    params={
        "channel": "#support",
        "message": "Negative feedback: {{input.feedback}}"
    },
    dependencies=["router"]
)

# Log positive/neutral
log_ok = ToolNode(
    id="log_ok",
    name="Log Feedback",
    tool="google_sheets_append",
    params={
        "spreadsheet_id": "{{input.log_sheet}}",
        "range": "Log!A:C",
        "values": [["{{input.feedback}}", "{{classify}}"]]
    },
    dependencies=["router"]
)

wf.add_nodes([classify, router, escalate, log_ok])

# Validate before deploying
wf.validate()

# Deploy
workflow_id = wf.deploy()
print(f"Deployed: {workflow_id}")

# Run
result = wf.run(
    input_data={"feedback": "Great product!", "log_sheet": "abc123"}
)
print(result.output_data)
print(f"Cost: ${result.total_cost:.4f}")

# Lifecycle management
stats = wf.get_stats()
print(f"Total runs: {stats.total_runs}, success rate: {stats.success_rate:.0%}")

# Deactivate when not needed
wf.deactivate()