Skip to main content
The FibonacciClient class provides methods for interacting with Fibonacci Cloud, including workflow deployment, execution, and management.

Constructor

from fibonacci import FibonacciClient

client = FibonacciClient(
    api_key="your-api-key",
    base_url="https://api.fibonacci.dev",
    timeout=60
)

Parameters

ParameterTypeDefaultDescription
api_keystrRequiredFibonacci API key
base_urlstr"https://api.fibonacci.dev"API base URL
timeoutint60Default request timeout
max_retriesint3Max retry attempts
organization_idstrNoneOrganization identifier

Authentication

From Environment

import os
from fibonacci import FibonacciClient

# Uses FIBONACCI_API_KEY environment variable
client = FibonacciClient.from_env()

From Keychain

from fibonacci import FibonacciClient

# Uses system keychain
client = FibonacciClient.from_keychain()

Workflow Methods

deploy

Deploy a workflow to Fibonacci Cloud.
client.deploy(
    workflow: Workflow | str,
    environment: str = "production",
    version: str | None = None
) -> DeploymentResult
Parameters:
  • workflow: Workflow instance or path to YAML file
  • environment: Target environment (development/staging/production)
  • version: Version tag
Returns: DeploymentResult Example:
result = client.deploy(
    workflow="workflows/analyzer.yaml",
    environment="production",
    version="1.2.0"
)
print(f"Deployed: {result.deployment_id}")

execute

Execute a deployed workflow.
client.execute(
    workflow_name: str,
    inputs: dict,
    environment: str = "production",
    timeout: int | None = None,
    async_execution: bool = False
) -> ExecutionResult | ExecutionHandle
Parameters:
  • workflow_name: Name of deployed workflow
  • inputs: Input data
  • environment: Target environment
  • timeout: Execution timeout
  • async_execution: Return immediately with handle
Returns: ExecutionResult or ExecutionHandle (if async) Example:
# Synchronous
result = client.execute(
    workflow_name="customer-analyzer",
    inputs={"text": "Hello world"}
)
print(result.outputs)

# Asynchronous
handle = client.execute(
    workflow_name="long-running",
    inputs={"data": large_data},
    async_execution=True
)
# Check status later
status = client.get_execution(handle.execution_id)

get_execution

Get execution status and results.
client.get_execution(execution_id: str) -> ExecutionResult
Parameters:
  • execution_id: Execution identifier
Returns: ExecutionResult

list_executions

List workflow executions.
client.list_executions(
    workflow_name: str | None = None,
    environment: str | None = None,
    status: str | None = None,
    limit: int = 100,
    offset: int = 0
) -> list[ExecutionSummary]
Parameters:
  • workflow_name: Filter by workflow
  • environment: Filter by environment
  • status: Filter by status (running/success/failed)
  • limit: Max results
  • offset: Pagination offset
Returns: List of ExecutionSummary

cancel_execution

Cancel a running execution.
client.cancel_execution(execution_id: str) -> bool
Parameters:
  • execution_id: Execution identifier
Returns: True if cancelled

Workflow Management

list_workflows

List deployed workflows.
client.list_workflows(
    environment: str | None = None
) -> list[WorkflowSummary]

get_workflow

Get workflow details.
client.get_workflow(
    workflow_name: str,
    environment: str = "production"
) -> WorkflowDetails

delete_workflow

Delete a deployed workflow.
client.delete_workflow(
    workflow_name: str,
    environment: str = "production"
) -> bool

Tool Methods

list_tools

List available tools.
client.list_tools(
    category: str | None = None
) -> list[ToolInfo]
Example:
tools = client.list_tools(category="google")
for tool in tools:
    print(f"{tool.name}: {tool.description}")

get_tool

Get tool details.
client.get_tool(tool_name: str) -> ToolDetails

connect_tool

Connect an external tool.
client.connect_tool(
    tool_name: str,
    credentials: dict | None = None
) -> ConnectionResult
Example:
# OAuth-based connection (opens browser)
result = client.connect_tool("google")

# Credential-based connection
result = client.connect_tool("slack", credentials={
    "bot_token": "xoxb-..."
})

Memory Methods

get_memory

Retrieve a memory value.
client.get_memory(
    key: str,
    scope: str = "workflow",
    user_id: str | None = None
) -> Any

set_memory

Store a memory value.
client.set_memory(
    key: str,
    value: Any,
    scope: str = "workflow",
    user_id: str | None = None,
    ttl: int | None = None
) -> bool

delete_memory

Delete a memory value.
client.delete_memory(
    key: str,
    scope: str = "workflow",
    user_id: str | None = None
) -> bool

Async Client

For async applications, use AsyncFibonacciClient:
from fibonacci import AsyncFibonacciClient
import asyncio

async def main():
    client = AsyncFibonacciClient(api_key="...")
    
    result = await client.execute(
        workflow_name="analyzer",
        inputs={"text": "Hello"}
    )
    print(result.outputs)

asyncio.run(main())

Response Types

DeploymentResult

@dataclass
class DeploymentResult:
    deployment_id: str
    workflow_name: str
    version: str
    environment: str
    status: str
    url: str
    created_at: datetime

ExecutionResult

@dataclass
class ExecutionResult:
    execution_id: str
    workflow_name: str
    status: str  # running, success, failed
    outputs: dict | None
    error: str | None
    duration_ms: int
    started_at: datetime
    completed_at: datetime | None

WorkflowSummary

@dataclass
class WorkflowSummary:
    name: str
    version: str
    environment: str
    status: str
    last_deployed: datetime
    execution_count: int

Error Handling

from fibonacci import FibonacciClient
from fibonacci.exceptions import (
    AuthenticationError,
    RateLimitError,
    WorkflowNotFoundError
)

client = FibonacciClient(api_key="...")

try:
    result = client.execute(
        workflow_name="my-workflow",
        inputs={"text": "Hello"}
    )
except AuthenticationError:
    print("Invalid API key")
except RateLimitError as e:
    print(f"Rate limited. Retry after {e.retry_after}s")
except WorkflowNotFoundError:
    print("Workflow not deployed")

Complete Example

from fibonacci import FibonacciClient, Workflow

# Initialize client
client = FibonacciClient.from_env()

# Deploy workflow
workflow = Workflow.from_yaml("workflows/analyzer.yaml")
deployment = client.deploy(
    workflow=workflow,
    environment="production",
    version="1.0.0"
)
print(f"Deployed: {deployment.url}")

# Execute workflow
result = client.execute(
    workflow_name="analyzer",
    inputs={"text": "Analyze this text"},
    timeout=60
)

if result.status == "success":
    print(f"Result: {result.outputs}")
else:
    print(f"Error: {result.error}")

# List recent executions
executions = client.list_executions(
    workflow_name="analyzer",
    status="failed",
    limit=10
)
for ex in executions:
    print(f"{ex.execution_id}: {ex.status}")