The Local SDK provides a headless (asynchronous) mode for running AI coding tasks in the background without blocking your application.
Headless Methods
Starting a Headless Task
def code_headless(
prompt: str,
editable_files: List[str],
readonly_files: Optional[List[str]] = None,
task_id: Optional[str] = None
) -> Dict[str, Any]
Starts an asynchronous coding task that runs in the background.
Parameters:
- prompt (
str
): Natural language instruction for the AI
- editable_files (
List[str]
): Paths to files the AI may modify
- readonly_files (
List[str]
, optional): Paths to files the AI may read but not change
- task_id (
str
, optional): Custom identifier for the task
Returns:
{
"task_id": "a1b2c3d4-e5f6-...",
"status": "pending",
"architect_mode": false
}
Checking Task Status
def get_headless_task_status(task_id: str) -> Dict[str, Any]
Queries the status and result of a previously started headless task.
Parameters:
- task_id (
str
): The task identifier returned by code_headless()
Returns a dictionary with different fields depending on the status:
For pending tasks:
For completed tasks:
{
"status": "completed",
"result": {
"success": true,
"diff": "...",
"cost": { ... },
"marked_up_cost": { ... },
"credits_remaining": 45.50
}
}
For failed tasks:
{
"status": "failed",
"error": "Error message describing what went wrong"
}
If the task is not found:
{
"status": "not_found"
}
Quick Headless Example
Run the real headless example script included in the examples:
python cloud-coding/examples/headless_example copy.py
This script will:
- Create
src/calculator.py
- Start an asynchronous coding task with
code_headless()
- Poll for status and print the diff when completed
from cloudcode import Local
import os, time
sdk = Local(
working_dir=os.getcwd(),
api_key=os.getenv("CLOUD_CODE_API_KEY")
)
# Create a test file
sdk.create_file(
"src/calculator.py",
"""def add(a, b):
return a + b"""
)
# Start headless task
task = sdk.code_headless(
prompt="Add multiply and divide functions to calculator.py.",
editable_files=["src/calculator.py"]
)
task_id = task["task_id"]
print(f"Task started with ID: {task_id}")
# Poll until completed
while True:
status = sdk.get_headless_task_status(task_id)
if status["status"] == "completed":
print("Diff:", status["result"]["diff"])
print("Updated content:", sdk.read_file("src/calculator.py"))
break
time.sleep(2)
Advanced Usage
Tracking Multiple Tasks
from cloudcode import Local
import os
import time
import threading
sdk = Local(
working_dir="/path/to/your/project",
api_key=os.getenv("CLOUDCODE_API_KEY")
)
# Start multiple tasks
tasks = []
files_to_improve = [
{"name": "auth.py", "prompt": "Add input validation"},
{"name": "models.py", "prompt": "Add data validation"},
{"name": "views.py", "prompt": "Add error handling"}
]
for file_info in files_to_improve:
task = sdk.code_headless(
prompt=f"{file_info['prompt']} to {file_info['name']}",
editable_files=[file_info["name"]]
)
tasks.append({
"task_id": task["task_id"],
"file": file_info["name"],
"prompt": file_info["prompt"],
"completed": False
})
# Function to monitor a specific task
def monitor_task(task_info):
while not task_info["completed"]:
status = sdk.get_headless_task_status(task_info["task_id"])
if status["status"] == "completed":
print(f"✅ Task for {task_info['file']} completed!")
task_info["completed"] = True
task_info["result"] = status["result"]
elif status["status"] == "failed":
print(f"❌ Task for {task_info['file']} failed: {status.get('error')}")
task_info["completed"] = True
task_info["error"] = status.get("error")
time.sleep(2)
# Start a thread for each task
threads = []
for task_info in tasks:
thread = threading.Thread(target=monitor_task, args=(task_info,))
thread.start()
threads.append(thread)
# Wait for all tasks to complete
for thread in threads:
thread.join()
# Process the results
for task_info in tasks:
print(f"\nResults for {task_info['file']}:")
if "result" in task_info:
result = task_info["result"]
if result["success"]:
print("Changes made:")
print(result["diff"])
print(f"Cost: ${result['cost']['message_cost']:.4f}")
else:
print("No significant changes were made.")
elif "error" in task_info:
print(f"Error: {task_info['error']}")
Integration with Web Applications
Here’s how you might use headless mode in a web application:
# Example using Flask
from flask import Flask, request, jsonify
from cloudcode import Local
import os
import uuid
app = Flask(__name__)
# Store tasks in memory (in a real app, use a database)
tasks = {}
sdk = Local(
working_dir="/path/to/your/project",
api_key=os.getenv("CLOUDCODE_API_KEY")
)
@app.route('/api/code', methods=['POST'])
def start_code_task():
data = request.json
# Generate a unique ID for the client
client_task_id = str(uuid.uuid4())
# Start the headless task
headless_task = sdk.code_headless(
prompt=data["prompt"],
editable_files=data["editable_files"],
readonly_files=data.get("readonly_files"),
task_id=client_task_id # Use the client-generated ID
)
# Store task info
tasks[client_task_id] = {
"prompt": data["prompt"],
"files": data["editable_files"],
"status": "pending",
"created_at": datetime.now().isoformat()
}
return jsonify({
"task_id": client_task_id,
"status": "pending"
})
@app.route('/api/code/<task_id>', methods=['GET'])
def get_task_status(task_id):
if task_id not in tasks:
return jsonify({"status": "not_found"}), 404
# Get the latest status
status = sdk.get_headless_task_status(task_id)
# Update our stored task info
if status["status"] == "completed":
tasks[task_id]["status"] = "completed"
tasks[task_id]["result"] = status["result"]
elif status["status"] == "failed":
tasks[task_id]["status"] = "failed"
tasks[task_id]["error"] = status.get("error")
return jsonify(status)
if __name__ == '__main__':
app.run(debug=True)
Best Practices
Task Management
- Generate unique task IDs if not provided
- Store task information for later reference
- Implement timeouts for polling to avoid infinite loops
- Use a proper task queue system for production applications
Error Handling
def poll_until_complete(sdk, task_id, timeout=300):
"""
Poll a task until it completes or fails, with timeout.
Returns:
dict: The task result or error information
"""
start_time = time.time()
while time.time() - start_time < timeout:
status = sdk.get_headless_task_status(task_id)
if status["status"] == "completed":
return {
"status": "completed",
"result": status["result"]
}
elif status["status"] == "failed":
return {
"status": "failed",
"error": status.get("error", "Unknown error")
}
elif status["status"] == "not_found":
return {
"status": "not_found",
"error": "Task not found"
}
time.sleep(2)
return {
"status": "timeout",
"error": f"Task did not complete within {timeout} seconds"
}
- For CPU-bound applications, limit the number of concurrent tasks
- Consider using a task queue system like Celery for production use
- Implement rate limiting to avoid exceeding API rate limits
- Store task results in a database for persistence