Skip to main content
The Workflow class is the foundation of Fibonacci. It orchestrates node execution, manages dependencies, and handles workflow lifecycle.

Constructor

from fibonacci import Workflow

workflow = Workflow(
    name="my-workflow",
    description="Processes customer inquiries",
    version="1.0.0",
    timeout=300
)

Parameters

ParameterTypeDefaultDescription
namestrRequiredUnique workflow identifier
descriptionstrNoneHuman-readable description
versionstrNoneSemantic version (e.g., “1.0.0”)
timeoutintNoneMaximum execution time in seconds
memory_configMemoryConfigNoneMemory backend configuration
default_retryRetryConfigNoneDefault retry settings for nodes
input_schemadictNoneJSON Schema for input validation
output_schemadictNoneJSON Schema for output validation
error_handlerCallableNoneCustom error handler function
hooksdictNoneLifecycle hook functions
allow_partial_resultsboolFalseReturn results from successful nodes on failure

Class Methods

from_yaml

Load a workflow from a YAML file.
workflow = Workflow.from_yaml(path: str) -> Workflow
Parameters:
  • path (str): Path to YAML workflow file
Returns: Workflow instance Example:
workflow = Workflow.from_yaml("workflows/analyzer.yaml")

from_yaml_string

Load a workflow from a YAML string.
workflow = Workflow.from_yaml_string(yaml_content: str) -> Workflow
Parameters:
  • yaml_content (str): YAML content as string
Returns: Workflow instance Example:
yaml = """
name: quick-workflow
nodes:
  - id: analyze
    type: llm
    model: claude-sonnet-4-5-20250929
    prompt: "Analyze: {{input.text}}"
"""
workflow = Workflow.from_yaml_string(yaml)

from_yaml_url

Load a workflow from a remote URL.
workflow = Workflow.from_yaml_url(url: str) -> Workflow
Parameters:
  • url (str): URL to YAML workflow file
Returns: Workflow instance Example:
workflow = Workflow.from_yaml_url(
    "https://example.com/workflows/analyzer.yaml"
)

Instance Methods

add_node

Add a node to the workflow.
workflow.add_node(node: Node) -> Workflow
Parameters:
  • node (Node): Node instance (LLMNode, ToolNode, etc.)
Returns: Workflow (for chaining) Example:
from fibonacci import LLMNode

node = LLMNode(
    id="analyzer",
    model="claude-sonnet-4-5-20250929",
    prompt="Analyze: {{input.text}}"
)
workflow.add_node(node)

remove_node

Remove a node from the workflow.
workflow.remove_node(node_id: str) -> Workflow
Parameters:
  • node_id (str): ID of node to remove
Returns: Workflow (for chaining) Raises: WorkflowError if node not found or has dependents

get_node

Get a node by ID.
workflow.get_node(node_id: str) -> Node | None
Parameters:
  • node_id (str): Node identifier
Returns: Node instance or None

validate

Validate workflow configuration.
workflow.validate() -> ValidationResult
Returns: ValidationResult with valid, errors, and warnings Example:
result = workflow.validate()
if not result.valid:
    print(f"Errors: {result.errors}")

execute

Execute the workflow synchronously.
workflow.execute(
    inputs: dict,
    timeout: int | None = None,
    user_id: str | None = None,
    metadata: dict | None = None
) -> dict
Parameters:
  • inputs (dict): Input data for the workflow
  • timeout (int): Execution timeout (overrides workflow default)
  • user_id (str): User identifier for memory scoping
  • metadata (dict): Additional execution metadata
Returns: dict with node outputs keyed by node ID Raises: WorkflowError, NodeExecutionError, TimeoutError, ValidationError Example:
result = workflow.execute(
    inputs={"text": "Hello world"},
    timeout=60,
    user_id="user-123"
)
print(result["analyzer"])

execute_async

Execute the workflow asynchronously.
await workflow.execute_async(
    inputs: dict,
    timeout: int | None = None,
    user_id: str | None = None,
    metadata: dict | None = None
) -> dict
Parameters: Same as execute() Returns: dict with node outputs Example:
import asyncio

async def main():
    result = await workflow.execute_async(
        inputs={"text": "Hello world"}
    )
    print(result)

asyncio.run(main())

deploy

Deploy the workflow to Fibonacci Cloud.
workflow.deploy(
    environment: str = "production",
    version: str | None = None,
    message: str | None = None
) -> DeploymentResult
Parameters:
  • environment (str): Target environment
  • version (str): Version tag
  • message (str): Deployment message
Returns: DeploymentResult with deployment details

to_yaml

Export workflow to YAML.
workflow.to_yaml(path: str | None = None) -> str
Parameters:
  • path (str): Optional file path to write
Returns: YAML string

to_dict

Export workflow to dictionary.
workflow.to_dict() -> dict
Returns: Dictionary representation of workflow

Properties

nodes

List of nodes in the workflow.
workflow.nodes -> list[Node]

memory

Access workflow memory.
workflow.memory -> MemoryManager
Example:
workflow.memory.set("key", "value", scope="user")
value = workflow.memory.get("key", scope="user")

execution_history

Get execution history.
workflow.execution_history -> list[ExecutionRecord]

Hooks

Register lifecycle hooks for workflow events:
workflow = Workflow(
    name="hooked-workflow",
    hooks={
        "on_start": lambda ctx: print(f"Starting: {ctx.workflow_name}"),
        "on_node_start": lambda ctx: print(f"Node: {ctx.node_id}"),
        "on_node_complete": lambda ctx: print(f"Done: {ctx.node_id}"),
        "on_error": lambda ctx: log_error(ctx.error),
        "on_complete": lambda ctx: print(f"Finished in {ctx.duration}s")
    }
)

Available Hooks

HookTriggerContext
on_startWorkflow execution startsworkflow_name, inputs
on_node_startNode execution startsnode_id, inputs
on_node_completeNode execution completesnode_id, output, duration
on_errorError occurserror, node_id, partial_results
on_completeWorkflow completesoutputs, duration
on_retryRetry attemptnode_id, attempt, error

Complete Example

from fibonacci import Workflow, LLMNode, ToolNode, RetryConfig

# Create workflow with full configuration
workflow = Workflow(
    name="customer-analyzer",
    description="Analyzes customer feedback",
    version="2.0.0",
    timeout=300,
    default_retry=RetryConfig(max_attempts=3, delay=1.0),
    input_schema={
        "type": "object",
        "required": ["feedback"],
        "properties": {
            "feedback": {"type": "string", "minLength": 1}
        }
    },
    hooks={
        "on_error": lambda ctx: print(f"Error: {ctx.error}")
    }
)

# Add nodes
sentiment = LLMNode(
    id="sentiment",
    model="claude-sonnet-4-5-20250929",
    prompt="Analyze sentiment: {{input.feedback}}"
)

categorize = LLMNode(
    id="categorize",
    model="claude-sonnet-4-5-20250929",
    prompt="Categorize: {{input.feedback}}\nSentiment: {{sentiment}}",
    dependencies=["sentiment"]
)

notify = ToolNode(
    id="notify",
    tool="slack.post_message",
    inputs={
        "channel": "#feedback",
        "message": "New {{categorize}} feedback ({{sentiment}})"
    },
    dependencies=["sentiment", "categorize"]
)

workflow.add_node(sentiment)
workflow.add_node(categorize)
workflow.add_node(notify)

# Validate
validation = workflow.validate()
if validation.valid:
    # Execute
    result = workflow.execute(
        inputs={"feedback": "Great product!"},
        user_id="customer-456"
    )
    print(result)