Python
Examples using the Team6 API with Python.
Setup
import os
import time
import requests
TEAM6_API = 'https://api.team6.ai'
TOKEN = os.environ['TEAM6_TOKEN']
headers = {
'Authorization': f'Bearer {TOKEN}',
'Content-Type': 'application/json',
}
Create a Project
def create_project(name: str, description: str):
response = requests.post(
f'{TEAM6_API}/projects',
headers=headers,
json={'name': name, 'description': description}
)
response.raise_for_status()
return response.json()
project = create_project('My App', 'Main development project')
print(f"Project created: {project['id']}")
Connect a Repository
def connect_repository(project_id: str, name: str, url: str, is_primary=True):
response = requests.post(
f'{TEAM6_API}/repositories',
headers=headers,
json={
'projectId': project_id,
'name': name,
'url': url,
'defaultBranch': 'main',
'isPrimary': is_primary,
}
)
response.raise_for_status()
return response.json()
repo = connect_repository(
project['id'],
'backend-api',
'https://github.com/myorg/backend-api'
)
Create and Start a Task
def create_task(project_id: str, title: str, description: str, agent_id: str):
response = requests.post(
f'{TEAM6_API}/projects/{project_id}/tasks',
headers=headers,
json={
'title': title,
'description': description,
'owner': agent_id,
}
)
response.raise_for_status()
return response.json()
def start_task(task_id: str):
response = requests.post(
f'{TEAM6_API}/tasks/{task_id}/start',
headers=headers
)
response.raise_for_status()
return response.json()
task = create_task(
project['id'],
'Add user authentication',
'Implement JWT authentication with login endpoint',
'agent_backend'
)
start_task(task['id'])
print(f"Task started: {task['id']}")
Poll for Task Completion
def get_task(task_id: str):
response = requests.get(
f'{TEAM6_API}/tasks/{task_id}',
headers=headers
)
response.raise_for_status()
return response.json()
def wait_for_task(task_id: str, timeout=600):
start = time.time()
while time.time() - start < timeout:
task = get_task(task_id)
if task['status'] == 'done':
print(f"Task completed! PR: {task.get('prUrl')}")
return task
if task['status'] == 'in_review':
print(f"PR ready for review: {task.get('prUrl')}")
return task
print(f"Status: {task['status']}")
time.sleep(10)
raise TimeoutError('Task timed out')
completed_task = wait_for_task(task['id'])
Chat with Agent
def chat_with_agent(task_id: str, message: str):
response = requests.post(
f'{TEAM6_API}/tasks/{task_id}/chat',
headers=headers,
json={'message': message}
)
response.raise_for_status()
return response.json()
response = chat_with_agent(task['id'], 'Please add input validation')
print(f"Agent response: {response['response']}")
List Tasks
def list_tasks(project_id: str, status=None):
params = {}
if status:
params['status'] = status
response = requests.get(
f'{TEAM6_API}/projects/{project_id}/tasks',
headers=headers,
params=params
)
response.raise_for_status()
return response.json()
in_progress = list_tasks(project['id'], 'in_progress')
print(f"Tasks in progress: {len(in_progress)}")
Get Task Diff
def get_task_diff(task_id: str):
response = requests.get(
f'{TEAM6_API}/tasks/{task_id}/diff',
headers=headers
)
response.raise_for_status()
return response.json()
diff = get_task_diff(task['id'])
print("Files changed:")
for file in diff['files']:
print(f" {file['path']}: +{file['additions']} -{file['deletions']}")
Merge PR
def merge_task(task_id: str):
response = requests.post(
f'{TEAM6_API}/tasks/{task_id}/merge',
headers=headers
)
response.raise_for_status()
return response.json()
merge_task(task['id'])
print("PR merged!")
Full Example
def automate_feature():
# Create project
project = create_project('My App', 'Main project')
# Set GitHub token
requests.post(
f'{TEAM6_API}/projects/{project["id"]}/github-token',
headers=headers,
json={'token': os.environ['GITHUB_TOKEN']}
)
# Connect repository
connect_repository(
project['id'],
'backend',
'https://github.com/myorg/backend'
)
# Create and start task
task = create_task(
project['id'],
'Add health check endpoint',
'Create GET /health endpoint that returns { status: "ok" }',
'agent_backend'
)
start_task(task['id'])
# Wait for completion
completed = wait_for_task(task['id'])
print(f"Done! PR: {completed.get('prUrl')}")
if __name__ == '__main__':
automate_feature()
Async Version
import asyncio
import aiohttp
async def create_task_async(session, project_id, title, description, agent_id):
async with session.post(
f'{TEAM6_API}/projects/{project_id}/tasks',
json={
'title': title,
'description': description,
'owner': agent_id,
}
) as response:
return await response.json()
async def main():
async with aiohttp.ClientSession(headers=headers) as session:
tasks = await asyncio.gather(
create_task_async(session, project_id, 'Task 1', 'Description 1', 'agent_backend'),
create_task_async(session, project_id, 'Task 2', 'Description 2', 'agent_frontend'),
create_task_async(session, project_id, 'Task 3', 'Description 3', 'agent_backend'),
)
print(f"Created {len(tasks)} tasks")
asyncio.run(main())