Skip to content

Web Content Extraction: scrape_executor

For any requirement involving extracting the main text or metadata from a website (such as analyzing, summarizing, or evaluating website content), you MUST use the scrape_executor.

Do NOT use rest_executor or command_executor for web content extraction.

scrape_executor is designed to fetch and extract the main content and metadata from a given URL, making it suitable for analytics, machine learning, or information retrieval tasks.

Example: Scrape and Analyze a Website

[
    {
        "id": "task_1",
        "name": "Scrape Website Content",
        "schemas": {"method": "scrape_executor"},
        "inputs": {
            "url": "https://example.com",
            "max_chars": 5000,
            "extract_metadata": true
        }
    },
    {
        "id": "task_2",
        "name": "Analyze Scraped Content",
        "schemas": {"method": "llm_executor"},
        "parent_id": "task_1",
        "dependencies": [{"id": "task_1", "required": true}],
        "inputs": {
            "model": "gpt-4",
            "messages": [
                {"role": "user", "content": "Analyze the content and provide an evaluation."}
            ]
        }
    }
]

Tip: - Use scrape_executor for any workflow that needs to extract readable content or metadata from a website for downstream analysis. - Only use rest_executor for raw HTTP APIs, and command_executor for unrelated shell commands.

Custom Tasks Guide

For contributors: See the Extension Registry Design for advanced extension patterns and framework internals. This guide is focused on user-level custom executor development.

Learn how to create your own custom executors (tasks) in apflow. This guide will walk you through everything from simple tasks to advanced patterns.

What You'll Learn

  • ✅ How to create custom executors
  • ✅ How to register and use them
  • ✅ Input/output validation with Pydantic schemas
  • ✅ Error handling best practices
  • ✅ Common patterns and examples
  • ✅ Testing your custom tasks

Table of Contents

  1. Quick Start
  2. Understanding Executors
  3. Creating Your First Executor
  4. Required Components
  5. Input Schema
  6. Error Handling
  7. Common Patterns
  8. Advanced Features
  9. Best Practices
  10. Testing

Quick Start

The fastest way to create a custom executor:

from apflow import BaseTask, executor_register
from typing import ClassVar, Dict, Any
from pydantic import BaseModel, Field

# Define input schema as a Pydantic model
class MyFirstInputSchema(BaseModel):
    """Input schema for my first executor"""
    data: str = Field(description="Input data")

@executor_register()
class MyFirstExecutor(BaseTask):
    """A simple custom executor"""

    id = "my_first_executor"
    name = "My First Executor"
    description = "Does something useful"

    inputs_schema: ClassVar[type[BaseModel]] = MyFirstInputSchema

    async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """Execute the task"""
        result = f"Processed: {inputs.get('data', 'no data')}"
        return {"status": "completed", "result": result}

That's it! Just import it and use it:

# Import to register
from my_module import MyFirstExecutor

# Use it
task = await task_manager.task_repository.create_task(
    name="my_first_executor",  # Must match id
    user_id="user123",
    inputs={"data": "Hello!"}
)

Understanding Executors

What is an Executor?

An executor is a piece of code that performs a specific task. Think of it as a function that: - Takes inputs (parameters) - Does some work - Returns a result

Example: - An executor that fetches data from an API - An executor that processes files - An executor that sends emails - An executor that runs AI models

Executor vs Task

Executor: The code that does the work (reusable) Task: An instance of work to be done (specific execution)

Analogy: - Executor = A recipe (reusable template) - Task = A specific meal made from the recipe (one-time execution)

BaseTask vs ExecutableTask

BaseTask: Recommended base class (simpler, includes registration)

from apflow import BaseTask, executor_register

@executor_register()
class MyTask(BaseTask):
    id = "my_task"
    # ...

ExecutableTask: Lower-level interface (more control)

from apflow import ExecutableTask

class MyTask(ExecutableTask):
    @property
    def id(self) -> str:
        return "my_task"
    # ...

Recommendation: Use BaseTask with @executor_register() - it's simpler!

Creating Your First Executor

Let's create a complete, working example step by step.

Step 1: Create the Executor Class

Create a file greeting_executor.py:

from apflow import BaseTask, executor_register
from typing import ClassVar, Dict, Any, Literal
from pydantic import BaseModel, Field

# Define input schema
class GreetingInputSchema(BaseModel):
    """Input schema for greeting executor"""
    name: str = Field(description="Name of the person to greet")
    language: Literal["en", "es", "fr", "zh"] = Field(
        default="en", description="Language for the greeting"
    )

@executor_register()
class GreetingExecutor(BaseTask):
    """Creates personalized greetings"""

    id = "greeting_executor"
    name = "Greeting Executor"
    description = "Creates personalized greeting messages"

    inputs_schema: ClassVar[type[BaseModel]] = GreetingInputSchema

    async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """Execute greeting creation"""
        name = inputs.get("name", "Guest")
        language = inputs.get("language", "en")

        greetings = {
            "en": f"Hello, {name}!",
            "es": f"¡Hola, {name}!",
            "fr": f"Bonjour, {name}!"
        }

        return {
            "greeting": greetings.get(language, greetings["en"]),
            "name": name,
            "language": language
        }

Step 2: Use Your Executor

Create a file use_greeting.py:

import asyncio
from apflow import TaskManager, TaskTreeNode, create_session
# Import to register the executor
from greeting_executor import GreetingExecutor

async def main():
    db = create_session()
    task_manager = TaskManager(db)

    # Create task using your executor
    task = await task_manager.task_repository.create_task(
        name="greeting_executor",  # Must match executor id
        user_id="user123",
        inputs={
            "name": "Alice",
            "language": "en"
        }
    )

    # Execute
    task_tree = TaskTreeNode(task)
    await task_manager.distribute_task_tree(task_tree)

    # Get result
    result = await task_manager.task_repository.get_task_by_id(task.id)
    print(f"Greeting: {result.result['greeting']}")

if __name__ == "__main__":
    asyncio.run(main())

Step 3: Run It

python use_greeting.py

Expected Output:

Greeting: Hello, Alice!

Congratulations! You just created and used your first custom executor! 🎉

Required Components

Every executor must have these components:

1. Unique ID

Purpose: Identifies the executor (used when creating tasks)

id = "my_executor_id"  # Must be unique across all executors

Best Practices: - Use lowercase with underscores - Be descriptive: fetch_user_data not task1 - Keep it consistent: don't change after deployment

2. Display Name

Purpose: Human-readable name

name = "My Executor"  # What users see

3. Description

Purpose: Explains what the executor does

description = "Fetches user data from the API"

4. Execute Method

Purpose: The actual work happens here

async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
    """
    Execute the task

    Args:
        inputs: Input parameters (from task.inputs)

    Returns:
        Execution result dictionary
    """
    # Your logic here
    return {"status": "completed", "result": "..."}

Key Points: - Must be async - Receives inputs dictionary - Returns a dictionary - Can raise exceptions (will be caught by TaskManager)

5. Input Schema

Purpose: Defines what inputs are expected (for validation and documentation)

Define input schemas as Pydantic BaseModel classes and assign them as ClassVar on the executor. BaseTask automatically implements get_input_schema() by converting the model to JSON Schema:

from pydantic import BaseModel, Field
from typing import ClassVar

class MyInputSchema(BaseModel):
    """Input schema for my executor"""
    param1: str = Field(description="Parameter description")

class MyExecutor(BaseTask):
    inputs_schema: ClassVar[type[BaseModel]] = MyInputSchema
    # get_input_schema() is automatically provided by BaseTask

Input Schema

Input schemas are defined using Pydantic BaseModel classes. BaseTask converts them to JSON Schema automatically via get_input_schema().

Basic Schema

from pydantic import BaseModel, Field

class MyInputSchema(BaseModel):
    """Input schema for my executor"""
    name: str = Field(description="Person's name")
    age: int = Field(default=0, description="Person's age")

Common Field Types

String

name: str = Field(description="Person's name", min_length=1, max_length=100)

Integer

age: int = Field(description="Person's age", ge=0, le=150)

Boolean

enabled: bool = Field(default=False, description="Whether feature is enabled")

Array

items: list[str] = Field(description="List of items", min_length=1)

Object

config: Dict[str, str] = Field(description="Configuration object")

Enum (Limited Choices)

from typing import Literal

status: Literal["pending", "active", "completed"] = Field(
    default="pending", description="Task status"
)

Default Values

Provide defaults for optional parameters:

timeout: int = Field(default=30, description="Timeout in seconds")

Required Fields

Fields without a default value are required:

class MyInputSchema(BaseModel):
    """Input schema"""
    name: str = Field(description="Name")          # Required (no default)
    email: str = Field(description="Email")         # Required (no default)
    age: int = Field(default=0, description="Age")  # Optional (has default)

Complete Schema Example

from pydantic import BaseModel, Field
from typing import ClassVar, Dict, Any, Literal, Optional

class APICallInputSchema(BaseModel):
    """Input schema for API call executor"""
    url: str = Field(description="API endpoint URL")
    method: Literal["GET", "POST", "PUT", "DELETE"] = Field(
        default="GET", description="HTTP method"
    )
    headers: Optional[Dict[str, str]] = Field(
        default=None, description="HTTP headers"
    )
    timeout: int = Field(
        default=30, description="Timeout in seconds", ge=1, le=300
    )
    retry: bool = Field(default=False, description="Whether to retry on failure")

@executor_register()
class APICallExecutor(BaseTask):
    id = "api_call_executor"
    name = "API Call Executor"
    description = "Calls an external HTTP API"

    inputs_schema: ClassVar[type[BaseModel]] = APICallInputSchema

Error Handling

Return error information in the result:

async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
    try:
        result = perform_operation(inputs)
        return {"status": "completed", "result": result}
    except ValueError as e:
        return {
            "status": "failed",
            "error": str(e),
            "error_type": "validation_error"
        }
    except Exception as e:
        return {
            "status": "failed",
            "error": str(e),
            "error_type": "execution_error"
        }

Benefits: - More control over error format - Can include additional context - Task status will be "failed"

Raising Exceptions

You can also raise exceptions (TaskManager will catch them):

async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
    if not inputs.get("required_param"):
        raise ValueError("required_param is required")

    # Continue with execution
    return {"status": "completed", "result": "..."}

Note: TaskManager will catch exceptions and mark the task as "failed".

Best Practices

  1. Validate early: Check inputs at the start
  2. Return meaningful errors: Include error type and message
  3. Handle specific exceptions: Catch specific errors, not just Exception
  4. Include context: Add relevant information to error messages

Example:

async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
    url = inputs.get("url")
    if not url:
        return {
            "status": "failed",
            "error": "URL is required",
            "error_type": "validation_error",
            "field": "url"
        }

    if not isinstance(url, str):
        return {
            "status": "failed",
            "error": "URL must be a string",
            "error_type": "type_error",
            "field": "url",
            "received_type": type(url).__name__
        }

    # Continue with execution
    try:
        result = await fetch_url(url)
        return {"status": "completed", "result": result}
    except TimeoutError:
        return {
            "status": "failed",
            "error": f"Request to {url} timed out",
            "error_type": "timeout_error"
        }

Common Patterns

Pattern 1: HTTP API Call

import aiohttp
from apflow import BaseTask, executor_register
from typing import ClassVar, Dict, Any, Literal, Optional
from pydantic import BaseModel, Field

class APICallInputSchema(BaseModel):
    """Input schema for API call executor"""
    url: str = Field(description="API URL")
    method: Literal["GET", "POST"] = Field(default="GET", description="HTTP method")
    headers: Optional[Dict[str, str]] = Field(default=None, description="HTTP headers")
    timeout: int = Field(default=30, description="Timeout in seconds")

@executor_register()
class APICallExecutor(BaseTask):
    """Calls an external HTTP API"""

    id = "api_call_executor"
    name = "API Call Executor"
    description = "Calls an external HTTP API"

    inputs_schema: ClassVar[type[BaseModel]] = APICallInputSchema

    async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        url = inputs.get("url")
        method = inputs.get("method", "GET")
        headers = inputs.get("headers", {})
        timeout = inputs.get("timeout", 30)

        try:
            async with aiohttp.ClientSession() as session:
                async with session.request(
                    method,
                    url,
                    headers=headers,
                    timeout=aiohttp.ClientTimeout(total=timeout)
                ) as response:
                    data = await response.json() if response.content_type == "application/json" else await response.text()

                    return {
                        "status": "completed",
                        "status_code": response.status,
                        "data": data
                    }
        except Exception as e:
            return {
                "status": "failed",
                "error": str(e),
                "error_type": type(e).__name__
            }

Pattern 2: Data Processing

from apflow import BaseTask, executor_register
from typing import ClassVar, Dict, Any, Literal
from pydantic import BaseModel, Field

class DataProcessorInputSchema(BaseModel):
    """Input schema for data processor"""
    data: list[float] = Field(description="Array of numbers")
    operation: Literal["sum", "average", "max", "min"] = Field(
        default="sum", description="Operation to perform"
    )

@executor_register()
class DataProcessor(BaseTask):
    """Processes data"""

    id = "data_processor"
    name = "Data Processor"
    description = "Processes data with various operations"

    inputs_schema: ClassVar[type[BaseModel]] = DataProcessorInputSchema

    async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        data = inputs.get("data", [])
        operation = inputs.get("operation", "sum")

        if operation == "sum":
            result = sum(data)
        elif operation == "average":
            result = sum(data) / len(data) if data else 0
        elif operation == "max":
            result = max(data) if data else None
        elif operation == "min":
            result = min(data) if data else None
        else:
            return {
                "status": "failed",
                "error": f"Unknown operation: {operation}",
                "error_type": "validation_error"
            }

        return {
            "status": "completed",
            "operation": operation,
            "result": result,
            "input_count": len(data)
        }

Pattern 3: File Operations

import aiofiles
from apflow import BaseTask, executor_register
from typing import ClassVar, Dict, Any
from pydantic import BaseModel, Field

class FileReaderInputSchema(BaseModel):
    """Input schema for file reader"""
    file_path: str = Field(description="Path to file")

@executor_register()
class FileReader(BaseTask):
    """Reads files"""

    id = "file_reader"
    name = "File Reader"
    description = "Reads content from files"

    inputs_schema: ClassVar[type[BaseModel]] = FileReaderInputSchema

    async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        file_path = inputs.get("file_path")

        try:
            self.check_input_schema(inputs)
        except ValueError as e:
            return {"status": "failed", "error": str(e), "error_type": "validation_error"}

        try:
            async with aiofiles.open(file_path, 'r') as f:
                content = await f.read()

            return {
                "status": "completed",
                "file_path": file_path,
                "content": content,
                "size": len(content)
            }
        except FileNotFoundError:
            return {
                "status": "failed",
                "error": f"File not found: {file_path}",
                "error_type": "file_not_found"
            }
        except Exception as e:
            return {
                "status": "failed",
                "error": str(e),
                "error_type": type(e).__name__
            }

Pattern 4: Database Query

from apflow import BaseTask, executor_register
from typing import ClassVar, Dict, Any, Optional
from pydantic import BaseModel, Field

class DBQueryInputSchema(BaseModel):
    """Input schema for database query"""
    query: str = Field(description="SQL query")
    params: Optional[Dict[str, Any]] = Field(default=None, description="Query parameters")

@executor_register()
class DatabaseQuery(BaseTask):
    """Executes database queries"""

    id = "db_query"
    name = "Database Query"
    description = "Executes database queries"

    inputs_schema: ClassVar[type[BaseModel]] = DBQueryInputSchema

    async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        query = inputs.get("query")
        params = inputs.get("params", {})

        try:
            self.check_input_schema(inputs)
        except ValueError as e:
            return {"status": "failed", "error": str(e), "error_type": "validation_error"}

        try:
            # Execute query
            # result = await self.db.fetch(query, params)
            result = []  # Placeholder

            return {
                "status": "completed",
                "rows": result,
                "count": len(result)
            }
        except Exception as e:
            return {
                "status": "failed",
                "error": str(e),
                "error_type": "database_error"
            }

Advanced Features

Cancellation Support

Implement cancellation for long-running tasks:

class CancellableTask(BaseTask):
    cancelable: bool = True  # Mark as cancellable

    async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        self._cancelled = False

        for i in range(100):
            # Check for cancellation
            if self._cancelled:
                return {
                    "status": "cancelled",
                    "message": "Task was cancelled",
                    "progress": i
                }

            # Do work
            await asyncio.sleep(0.1)

        return {"status": "completed", "result": "done"}

    async def cancel(self) -> Dict[str, Any]:
        """Cancel task execution"""
        self._cancelled = True
        return {
            "status": "cancelled",
            "message": "Cancellation requested"
        }

Note: Not all executors need cancellation. Only implement if your task can be safely cancelled.

Accessing Task Context

Access task information through the executor:

class ContextAwareTask(BaseTask):
    async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        # Task context is available through TaskManager
        # You can access it via hooks or by storing task reference

        # Example: Access task ID (if available)
        # task_id = getattr(self, '_task_id', None)

        return {"status": "completed"}

Best Practices

1. Keep Tasks Focused

Good:

class FetchUserData(BaseTask):
    # Only fetches user data

Bad:

class DoEverything(BaseTask):
    # Fetches, processes, saves, sends notifications, etc.

2. Validate Inputs Early

async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
    # Validate at the start
    url = inputs.get("url")
    if not url:
        return {"status": "failed", "error": "URL is required"}

    if not isinstance(url, str):
        return {"status": "failed", "error": "URL must be a string"}

    # Continue with execution

3. Use Async Properly

Good:

async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            data = await response.json()
    return {"status": "completed", "data": data}

Bad:

async def execute(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
    import requests
    response = requests.get(url)  # Blocking!
    return {"status": "completed", "data": response.json()}

4. Document Your Tasks

class MyCustomTask(BaseTask):
    """
    Custom task that performs specific operations.

    This task processes input data and returns processed results.
    It supports various processing modes and configurations.

    Example:
        task = create_task(
            name="my_custom_task",
            inputs={"data": [1, 2, 3], "mode": "sum"}
        )
    """

5. Return Consistent Results

# Good: Consistent format
return {
    "status": "completed",
    "result": result,
    "metadata": {...}
}

# Bad: Inconsistent format
return result  # Sometimes just the result
return {"data": result}  # Sometimes wrapped

Testing

Unit Testing

Test your executor in isolation:

import pytest
from my_executors import GreetingExecutor

@pytest.mark.asyncio
async def test_greeting_executor():
    executor = GreetingExecutor()

    # Test with valid inputs
    result = await executor.execute({
        "name": "Alice",
        "language": "en"
    })

    assert result["status"] == "completed"
    assert "Hello, Alice!" in result["greeting"]

    # Test with default language
    result = await executor.execute({"name": "Bob"})
    assert result["language"] == "en"

    # Test with invalid language
    result = await executor.execute({
        "name": "Charlie",
        "language": "invalid"
    })
    # Should handle gracefully

Integration Testing

Test with TaskManager:

import pytest
from apflow import TaskManager, TaskTreeNode, create_session
from my_executors import GreetingExecutor

@pytest.mark.asyncio
async def test_executor_integration():
    # Import to register
    from my_executors import GreetingExecutor

    db = create_session()
    task_manager = TaskManager(db)

    # Create and execute task
    task = await task_manager.task_repository.create_task(
        name="greeting_executor",
        user_id="test_user",
        inputs={"name": "Test User", "language": "en"}
    )

    task_tree = TaskTreeNode(task)
    await task_manager.distribute_task_tree(task_tree)

    # Verify result
    result = await task_manager.task_repository.get_task_by_id(task.id)
    assert result.status == "completed"
    assert "Test User" in result.result["greeting"]

Built-in Executors

apflow provides several built-in executors for common use cases. These executors are automatically registered and can be used directly in your tasks.

HTTP/REST API Executor

Execute HTTP requests to external APIs, webhooks, and HTTP-based services.

Installation:

# httpx is included in a2a extra
pip install apflow[a2a]

Usage:

{
    "schemas": {
        "method": "rest_executor"
    },
    "inputs": {
        "url": "https://api.example.com/users",
        "method": "GET",
        "headers": {"Authorization": "Bearer token"},
        "timeout": 30.0
    }
}

Features: - Supports GET, POST, PUT, DELETE, PATCH methods - Authentication: Bearer token, Basic auth, API key - Custom headers and query parameters - JSON and form data support - SSL verification control

SSH Remote Executor

Execute commands on remote servers via SSH.

Installation:

pip install apflow[ssh]

Usage:

{
    "schemas": {
        "method": "ssh_executor"
    },
    "inputs": {
        "host": "example.com",
        "username": "user",
        "key_file": "/path/to/key",
        "command": "ls -la",
        "timeout": 30
    }
}

Features: - Password and key-based authentication - Environment variable support - Automatic key file permission validation - Command timeout handling

Docker Container Executor

Execute commands in isolated Docker containers.

Installation:

pip install apflow[docker]

Usage:

{
    "schemas": {
        "method": "docker_executor"
    },
    "inputs": {
        "image": "python:3.11",
        "command": "python -c 'print(\"Hello\")'",
        "env": {"KEY": "value"},
        "volumes": {"/host/path": "/container/path"},
        "resources": {"cpu": "1.0", "memory": "512m"}
    }
}

Features: - Custom Docker images - Environment variables - Volume mounts - Resource limits (CPU, memory) - Automatic container cleanup

gRPC Executor

Call gRPC services and microservices.

Installation:

pip install apflow[grpc]

Usage:

{
    "schemas": {
        "method": "grpc_executor"
    },
    "inputs": {
        "server": "localhost:50051",
        "service": "Greeter",
        "method": "SayHello",
        "request": {"name": "World"},
        "timeout": 30.0
    }
}

Features: - Dynamic proto loading support - Custom metadata - Timeout handling - Error handling

WebSocket Executor

Bidirectional WebSocket communication.

Installation:

# websockets is included in a2a extra
pip install apflow[a2a]

Usage:

{
    "schemas": {
        "method": "websocket_executor"
    },
    "inputs": {
        "url": "ws://example.com/ws",
        "message": "Hello",
        "wait_response": true,
        "timeout": 30.0
    }
}

Features: - Send and receive messages - JSON message support - Configurable response waiting - Connection timeout handling

apflow API Executor

Call other apflow API instances for distributed execution.

Installation:

# httpx is included in a2a extra
pip install apflow[a2a]

Usage:

{
    "schemas": {
        "method": "apflow_api_executor"
    },
    "inputs": {
        "base_url": "http://remote-instance:8000",
        "method": "tasks.execute",
        "params": {"task_id": "task-123"},
        "auth_token": "eyJ...",
        "wait_for_completion": true
    }
}

Features: - All task management methods (tasks.execute, tasks.create, tasks.get, etc.) - JWT authentication support - Task completion polling - Streaming support - Distributed execution scenarios

Use Cases: - Distributed task execution across multiple instances - Service orchestration - Load balancing - Cross-environment task execution

MCP (Model Context Protocol) Executor

Interact with MCP servers to access external tools and data sources through the standardized MCP protocol.

Installation:

# MCP executor uses standard library for stdio mode
# For HTTP mode, httpx is included in a2a extra
pip install apflow[a2a]  # For HTTP transport
# Or just use stdio mode (no additional dependencies)

Transport Modes:

  1. stdio - Local process communication (no dependencies)
  2. http - Remote server communication (requires httpx from [a2a] extra)

Operations: - list_tools: List available MCP tools - call_tool: Call a tool with arguments - list_resources: List available resources - read_resource: Read a resource by URI

Usage Examples:

stdio Transport - List Tools:

{
    "schemas": {
        "method": "mcp_executor"
    },
    "inputs": {
        "transport": "stdio",
        "command": ["python", "-m", "mcp_server"],
        "operation": "list_tools"
    }
}

stdio Transport - Call Tool:

{
    "schemas": {
        "method": "mcp_executor"
    },
    "inputs": {
        "transport": "stdio",
        "command": ["python", "-m", "mcp_server"],
        "operation": "call_tool",
        "tool_name": "search_web",
        "arguments": {
            "query": "Python async programming"
        }
    }
}

stdio Transport - Read Resource:

{
    "schemas": {
        "method": "mcp_executor"
    },
    "inputs": {
        "transport": "stdio",
        "command": ["python", "-m", "mcp_server"],
        "operation": "read_resource",
        "resource_uri": "file:///path/to/file.txt"
    }
}

HTTP Transport - List Tools:

{
    "schemas": {
        "method": "mcp_executor"
    },
    "inputs": {
        "transport": "http",
        "url": "http://localhost:8000/mcp",
        "operation": "list_tools",
        "headers": {
            "Authorization": "Bearer token"
        }
    }
}

HTTP Transport - Call Tool:

{
    "schemas": {
        "method": "mcp_executor"
    },
    "inputs": {
        "transport": "http",
        "url": "http://localhost:8000/mcp",
        "operation": "call_tool",
        "tool_name": "search_web",
        "arguments": {
            "query": "Python async"
        },
        "timeout": 30.0
    }
}

Configuration Options: - transport: "stdio" or "http" (required) - operation: "list_tools", "call_tool", "list_resources", "read_resource" (required) - For stdio: - command: List of strings for MCP server command (required) - env: Optional environment variables dict - cwd: Optional working directory - For http: - url: MCP server URL (required) - headers: Optional HTTP headers dict - For call_tool: - tool_name: Tool name (required) - arguments: Tool arguments dict (required) - For read_resource: - resource_uri: Resource URI (required) - timeout: Operation timeout in seconds (default: 30.0)

Features: - Support for stdio and HTTP transport modes - JSON-RPC 2.0 protocol compliance - Tool and resource access - Environment variable injection (stdio) - Custom headers (HTTP) - Timeout and cancellation support - Comprehensive error handling

Use Cases: - Access external tools via MCP servers - Read data from MCP resources - Integrate with MCP-compatible services - Local and remote MCP server communication

Email Executor

Send emails via Resend API or SMTP protocol.

Installation:

pip install apflow[email]

Usage (Resend):

{
    "schemas": {
        "method": "send_email_executor"
    },
    "inputs": {
        "provider": "resend",
        "api_key": "re_xxx",
        "to": ["[email protected]"],
        "from_email": "[email protected]",
        "subject": "Order Confirmation",
        "html": "<h1>Thank you for your order!</h1>"
    }
}

Usage (SMTP):

{
    "schemas": {
        "method": "send_email_executor"
    },
    "inputs": {
        "provider": "smtp",
        "smtp_host": "smtp.example.com",
        "smtp_port": 587,
        "smtp_username": "user",
        "smtp_password": "pass",
        "to": ["[email protected]"],
        "from_email": "[email protected]",
        "subject": "Hello",
        "body": "Hello World"
    }
}

Features: - Two providers: Resend (HTTP API) and SMTP - Plain text and HTML email support - CC, BCC, and reply-to recipients - Configurable timeout - SMTP STARTTLS support (enabled by default)

LLM Executor (llm_executor)

Direct LLM interaction via LiteLLM, supporting over 100+ providers including OpenAI, Anthropic, Google Gemini, and many others.

Installation:

pip install apflow[llm]

Features: - Unified Model Access: Use a single interface to interact with any LLM provider. - Streaming Support: Built-in support for real-time streaming results (SSE). - Auto-Config: Automatically handles API keys from environment variables or project-level configuration. - LiteLLM Power: Leverages LiteLLM for robust, production-ready LLM interactions.

Usage:

# Create task using LLM executor
task = await task_manager.task_repository.create_task(
    name="llm_executor",
    user_id="user123",
    inputs={
        "model": "gpt-4o",
        "messages": [
            {"role": "user", "content": "Explain quantum entanglement in one sentence."}
        ]
    }
)

Input Schema: - model: (required) Model name (provider-prefixed if needed, e.g., gpt-4o, claude-3-5-sonnet-20240620) - messages: (required) Array of message objects (role and content) - stream: (optional) Enable streaming (default: false) - temperature: (optional) Controls randomness (default: 1.0) - max_tokens: (optional) Maximum generation length - api_base: (optional) Custom API base URL - api_key: (optional) Override API key (can be passed via X-LLM-API-KEY header in API)

Task Tree Generator Executor

Generate valid task tree JSON arrays from natural language requirements using LLM.

Installation:

# Install LLM provider package (choose one)
pip install openai
# or
pip install anthropic

Usage:

{
    "schemas": {
        "method": "generate_executor"
    },
    "inputs": {
        "requirement": "Fetch data from API, process it, and save to database",
        "user_id": "user123",
        "llm_provider": "openai",  # Optional: "openai" or "anthropic"
        "model": "gpt-4",  # Optional: model name
        "temperature": 0.7,  # Optional: LLM temperature
        "max_tokens": 4000  # Optional: maximum tokens
    }
}

Features: - Automatically collects available executors and their schemas - Loads framework documentation as LLM context - Generates valid task trees compatible with TaskCreator.create_task_tree_from_array() - Comprehensive validation ensures generated tasks meet all requirements - Supports OpenAI and Anthropic LLM providers - Configurable via environment variables or input parameters

Configuration: - OPENAI_API_KEY or ANTHROPIC_API_KEY: LLM API key (environment variable) - OPENAI_MODEL (for OpenAI): Model name (default: "gpt-4o") - ANTHROPIC_MODEL (for Anthropic): Model name (default: "claude-3-5-sonnet-20241022") - APFLOW_LLM_PROVIDER: Provider selection (default: "openai")

Output: Returns a JSON array of task objects that can be used with TaskCreator.create_task_tree_from_array():

{
    "status": "completed",
    "tasks": [
        {
            "name": "rest_executor",
            "inputs": {"url": "https://api.example.com/data", "method": "GET"},
            "priority": 1
        },
        {
            "name": "command_executor",
            "parent_id": "task_1",
            "dependencies": [{"id": "task_1", "required": True}],
            "inputs": {"command": "python process_data.py"},
            "priority": 2
        }
    ],
    "count": 2
}

Use Cases: - Automatically generate task trees from natural language requirements - Rapid prototyping of workflows - Converting business requirements into executable task structures - Learning tool for understanding task tree patterns

CLI Usage Examples:

The following examples demonstrate the intelligent task tree generation capabilities:

# 1. Parallel Workflow - Fetch from multiple APIs in parallel
apflow generate task-tree "Fetch data from two different APIs in parallel, then merge the results and save to database"

# 2. ETL Pipeline - Extract, Transform, Load workflow
apflow generate task-tree "Extract data from REST API, transform it by filtering and aggregating, then load it into database"

# 3. Multi-Source Data Collection - Parallel system monitoring
apflow generate task-tree "Collect system information about CPU and memory in parallel, analyze the data, and aggregate results"

# 4. Complex Processing Flow - Multi-step data processing
apflow generate task-tree "Call REST API to get user data, process response with Python script, validate processed data, and save to file"

# 5. Fan-Out Fan-In Pattern - Parallel processing with convergence
apflow generate task-tree "Fetch data from API, process it in two different ways in parallel (filter and aggregate), merge both results, and save to database"

# 6. Complete Business Scenario - Real-world monitoring workflow
apflow generate task-tree "Monitor system resources (CPU, memory, disk) in parallel, analyze metrics, generate report, and send notification if threshold exceeded"

# 7. Data Pipeline - Multi-source processing
apflow generate task-tree "Download data from multiple sources simultaneously, transform each dataset independently, then combine all results into single output file"

# 8. Hierarchical Processing - Category-based organization
apflow generate task-tree "Fetch data from API, organize into categories, process each category in parallel, then aggregate all category results"

# 9. Complex Workflow - Complete business process
apflow generate task-tree "Fetch customer data from API, validate information, process orders in parallel for each customer, aggregate results, calculate totals, and generate final report"

# 10. With custom LLM parameters
apflow generate task-tree "Create a workflow" --temperature 0.9 --max-tokens 6000 --provider openai --model gpt-4o

# 11. Save to database
apflow generate task-tree "My requirement" --save --user-id user123

Tips for Better Results: - Be specific: More detailed requirements lead to better task trees - Mention patterns: Use words like "parallel", "sequential", "merge", "aggregate" to guide generation - Specify executors: Mention specific operations (API, database, file, command) for better executor selection - Describe flow: Explain the data flow and execution order in your requirement - Save to file: Use --output tasks.json to save generated tasks for later use

Summary

All built-in executors follow the same pattern: 1. Inherit from BaseTask 2. Registered with @executor_register() 3. Support cancellation (where applicable) 4. Provide input schema validation 5. Return consistent result format

You can use these executors directly in your task schemas or extend them for custom behavior.

Next Steps


Need help? Check the FAQ or Quick Start Guide