The Local SDK provides a headless (asynchronous) mode for running AI coding tasks in the background without blocking your application.

Headless Methods

Starting a Headless Task

def code_headless(
    prompt: str,
    editable_files: List[str],
    readonly_files: Optional[List[str]] = None,
    task_id: Optional[str] = None
) -> Dict[str, Any]

Starts an asynchronous coding task that runs in the background.

Parameters:

  • prompt (str): Natural language instruction for the AI
  • editable_files (List[str]): Paths to files the AI may modify
  • readonly_files (List[str], optional): Paths to files the AI may read but not change
  • task_id (str, optional): Custom identifier for the task

Returns:

{
  "task_id": "a1b2c3d4-e5f6-...",
  "status": "pending",
  "architect_mode": false
}

Checking Task Status

def get_headless_task_status(task_id: str) -> Dict[str, Any]

Queries the status and result of a previously started headless task.

Parameters:

  • task_id (str): The task identifier returned by code_headless()

Returns a dictionary with different fields depending on the status:

For pending tasks:

{
  "status": "pending"
}

For completed tasks:

{
  "status": "completed",
  "result": {
    "success": true,
    "diff": "...",
    "cost": { ... },
    "marked_up_cost": { ... },
    "credits_remaining": 45.50
  }
}

For failed tasks:

{
  "status": "failed",
  "error": "Error message describing what went wrong"
}

If the task is not found:

{
  "status": "not_found"
}

Quick Headless Example

Run the real headless example script included in the examples:

python cloud-coding/examples/headless_example copy.py

This script will:

  • Create src/calculator.py
  • Start an asynchronous coding task with code_headless()
  • Poll for status and print the diff when completed
from cloudcode import Local
import os, time

sdk = Local(
    working_dir=os.getcwd(),
    api_key=os.getenv("CLOUD_CODE_API_KEY")
)

# Create a test file
sdk.create_file(
    "src/calculator.py",
    """def add(a, b):
    return a + b"""
)

# Start headless task
task = sdk.code_headless(
    prompt="Add multiply and divide functions to calculator.py.",
    editable_files=["src/calculator.py"]
)
task_id = task["task_id"]
print(f"Task started with ID: {task_id}")

# Poll until completed
while True:
    status = sdk.get_headless_task_status(task_id)
    if status["status"] == "completed":
        print("Diff:", status["result"]["diff"])
        print("Updated content:", sdk.read_file("src/calculator.py"))
        break
    time.sleep(2)

Advanced Usage

Tracking Multiple Tasks

from cloudcode import Local
import os
import time
import threading

sdk = Local(
    working_dir="/path/to/your/project",
    api_key=os.getenv("CLOUDCODE_API_KEY")
)

# Start multiple tasks
tasks = []
files_to_improve = [
    {"name": "auth.py", "prompt": "Add input validation"},
    {"name": "models.py", "prompt": "Add data validation"},
    {"name": "views.py", "prompt": "Add error handling"}
]

for file_info in files_to_improve:
    task = sdk.code_headless(
        prompt=f"{file_info['prompt']} to {file_info['name']}",
        editable_files=[file_info["name"]]
    )
    tasks.append({
        "task_id": task["task_id"],
        "file": file_info["name"],
        "prompt": file_info["prompt"],
        "completed": False
    })

# Function to monitor a specific task
def monitor_task(task_info):
    while not task_info["completed"]:
        status = sdk.get_headless_task_status(task_info["task_id"])

        if status["status"] == "completed":
            print(f"✅ Task for {task_info['file']} completed!")
            task_info["completed"] = True
            task_info["result"] = status["result"]
        elif status["status"] == "failed":
            print(f"❌ Task for {task_info['file']} failed: {status.get('error')}")
            task_info["completed"] = True
            task_info["error"] = status.get("error")

        time.sleep(2)

# Start a thread for each task
threads = []
for task_info in tasks:
    thread = threading.Thread(target=monitor_task, args=(task_info,))
    thread.start()
    threads.append(thread)

# Wait for all tasks to complete
for thread in threads:
    thread.join()

# Process the results
for task_info in tasks:
    print(f"\nResults for {task_info['file']}:")
    if "result" in task_info:
        result = task_info["result"]
        if result["success"]:
            print("Changes made:")
            print(result["diff"])
            print(f"Cost: ${result['cost']['message_cost']:.4f}")
        else:
            print("No significant changes were made.")
    elif "error" in task_info:
        print(f"Error: {task_info['error']}")

Integration with Web Applications

Here’s how you might use headless mode in a web application:

# Example using Flask
from flask import Flask, request, jsonify
from cloudcode import Local
import os
import uuid

app = Flask(__name__)

# Store tasks in memory (in a real app, use a database)
tasks = {}

sdk = Local(
    working_dir="/path/to/your/project",
    api_key=os.getenv("CLOUDCODE_API_KEY")
)

@app.route('/api/code', methods=['POST'])
def start_code_task():
    data = request.json

    # Generate a unique ID for the client
    client_task_id = str(uuid.uuid4())

    # Start the headless task
    headless_task = sdk.code_headless(
        prompt=data["prompt"],
        editable_files=data["editable_files"],
        readonly_files=data.get("readonly_files"),
        task_id=client_task_id  # Use the client-generated ID
    )

    # Store task info
    tasks[client_task_id] = {
        "prompt": data["prompt"],
        "files": data["editable_files"],
        "status": "pending",
        "created_at": datetime.now().isoformat()
    }

    return jsonify({
        "task_id": client_task_id,
        "status": "pending"
    })

@app.route('/api/code/<task_id>', methods=['GET'])
def get_task_status(task_id):
    if task_id not in tasks:
        return jsonify({"status": "not_found"}), 404

    # Get the latest status
    status = sdk.get_headless_task_status(task_id)

    # Update our stored task info
    if status["status"] == "completed":
        tasks[task_id]["status"] = "completed"
        tasks[task_id]["result"] = status["result"]
    elif status["status"] == "failed":
        tasks[task_id]["status"] = "failed"
        tasks[task_id]["error"] = status.get("error")

    return jsonify(status)

if __name__ == '__main__':
    app.run(debug=True)

Best Practices

Task Management

  • Generate unique task IDs if not provided
  • Store task information for later reference
  • Implement timeouts for polling to avoid infinite loops
  • Use a proper task queue system for production applications

Error Handling

def poll_until_complete(sdk, task_id, timeout=300):
    """
    Poll a task until it completes or fails, with timeout.

    Returns:
        dict: The task result or error information
    """
    start_time = time.time()

    while time.time() - start_time < timeout:
        status = sdk.get_headless_task_status(task_id)

        if status["status"] == "completed":
            return {
                "status": "completed",
                "result": status["result"]
            }
        elif status["status"] == "failed":
            return {
                "status": "failed",
                "error": status.get("error", "Unknown error")
            }
        elif status["status"] == "not_found":
            return {
                "status": "not_found",
                "error": "Task not found"
            }

        time.sleep(2)

    return {
        "status": "timeout",
        "error": f"Task did not complete within {timeout} seconds"
    }

Performance Considerations

  • For CPU-bound applications, limit the number of concurrent tasks
  • Consider using a task queue system like Celery for production use
  • Implement rate limiting to avoid exceeding API rate limits
  • Store task results in a database for persistence