Documentation Index
Fetch the complete documentation index at: https://cortex-ad5578da.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
This comprehensive guide shows you how to connect to popular workplace apps like Google Drive, Slack, and Notion using Composio, extract their data, and store it in Cortex as your vector database for AI-powered search and retrieval.
Overview
By combining Composio’s app integration capabilities with Cortex’s vector storage, you can:
- Unify data sources: Connect to 100+ apps including Google Workspace, Slack, Notion, GitHub, and more
- Intelligent processing: Extract and process different content types (documents, messages, pages)
- Semantic search: Query across all your data sources using natural language
- Real-time sync: Keep your knowledge base updated with the latest information
Prerequisites
Before you begin, ensure you have:
- Python 3.8+ installed
- A Composio account and API key (get it here)
- A Cortex account and API key (contact us)
- Access to the apps you want to integrate (Google Drive, Slack, Notion, etc.)
Step 1: Install Required SDKs
Install both Composio and the necessary HTTP client for Cortex:
pip install composio-core requests
Step 2: Set Up Authentication
Composio Authentication
First, set up your Composio API key:
import os
from composio import Composio
# Set your API key
os.environ["COMPOSIO_API_KEY"] = "your_composio_api_key"
# Initialize Composio client
composio = Composio()
Cortex Configuration
Configure your Cortex API settings:
import requests
CORTEX_API_KEY = "your_cortex_api_key"
CORTEX_BASE_URL = "https://api.usecortex.ai"
TENANT_ID = "your_tenant_id"
headers = {
"Authorization": f"Bearer {CORTEX_API_KEY}",
"Content-Type": "application/json"
}
Step 3: Connect to Your Apps
Authorize App Connections
Connect to your desired applications using Composio’s authorization flow:
def connect_to_apps(user_id: str):
"""Connect to Google Drive, Slack, and Notion"""
apps_to_connect = [
{"name": "googledrive", "display": "Google Drive"},
{"name": "slack", "display": "Slack"},
{"name": "notion", "display": "Notion"}
]
connected_apps = {}
for app in apps_to_connect:
try:
# Create connection request
connection_request = composio.client.get_connection_request(
user_uuid=user_id,
app_name=app["name"]
)
print(f"Authorize {app['display']} by visiting: {connection_request.redirect_url}")
# Wait for user to complete authorization
connection_request.wait_until_active(timeout=300) # 5 minutes timeout
connected_apps[app["name"]] = connection_request
print(f"{app['display']} connected successfully!")
except Exception as e:
print(f"Failed to connect to {app['display']}: {str(e)}")
return connected_apps
# Connect apps for a user
user_id = "user@yourcompany.com"
connections = connect_to_apps(user_id)
Retrieve the tools for your connected apps:
def get_available_tools(user_id: str, app_names: list):
"""Get tools for connected applications"""
tools = {}
for app_name in app_names:
try:
app_tools = composio.tools.get(
user_id=user_id,
apps=[app_name]
)
tools[app_name] = app_tools
print(f"Retrieved {len(app_tools)} tools for {app_name}")
except Exception as e:
print(f"Error getting tools for {app_name}: {str(e)}")
return tools
# Get tools for connected apps
app_names = ["googledrive", "slack", "notion"]
available_tools = get_available_tools(user_id, app_names)
Google Drive Integration
def extract_google_drive_data(user_id: str):
"""Extract files and documents from Google Drive"""
try:
# List files in Google Drive
result = composio.tools.execute_action(
action="googledrive_list_files",
user_id=user_id,
params={
"q": "mimeType='application/pdf' or mimeType contains 'document' or mimeType contains 'text'",
"maxResults": 50
}
)
extracted_data = []
for file_item in result.get("files", []):
# Get file content
content_result = composio.tools.execute_action(
action="googledrive_get_file_content",
user_id=user_id,
params={"fileId": file_item["id"]}
)
extracted_data.append({
"id": file_item["id"],
"title": file_item["name"],
"type": "googledrive",
"content": content_result.get("content", ""),
"url": file_item.get("webViewLink", ""),
"timestamp": file_item.get("modifiedTime", ""),
"metadata": {
"mimeType": file_item.get("mimeType"),
"size": file_item.get("size"),
"owner": file_item.get("owners", [{}])[0].get("displayName", "")
}
})
return extracted_data
except Exception as e:
print(f"Error extracting Google Drive data: {str(e)}")
return []
google_drive_data = extract_google_drive_data(user_id)
print(f"Extracted {len(google_drive_data)} files from Google Drive")
Slack Integration
def extract_slack_data(user_id: str):
"""Extract messages and threads from Slack"""
try:
# Get Slack channels
channels_result = composio.tools.execute_action(
action="slack_list_channels",
user_id=user_id,
params={"types": "public_channel,private_channel"}
)
extracted_data = []
for channel in channels_result.get("channels", []):
# Get recent messages from each channel
messages_result = composio.tools.execute_action(
action="slack_get_channel_history",
user_id=user_id,
params={
"channel": channel["id"],
"count": 100,
"inclusive": True
}
)
for message in messages_result.get("messages", []):
if message.get("text"): # Only process messages with text content
extracted_data.append({
"id": f"slack_{channel['id']}_{message['ts']}",
"title": f"Message in #{channel['name']}",
"type": "slack",
"content": message["text"],
"url": f"https://yourworkspace.slack.com/archives/{channel['id']}/p{message['ts'].replace('.', '')}",
"timestamp": message["ts"],
"metadata": {
"channel": channel["name"],
"user": message.get("user", ""),
"thread_ts": message.get("thread_ts")
}
})
return extracted_data
except Exception as e:
print(f"Error extracting Slack data: {str(e)}")
return []
slack_data = extract_slack_data(user_id)
print(f"Extracted {len(slack_data)} messages from Slack")
Notion Integration
def extract_notion_data(user_id: str):
"""Extract pages and databases from Notion"""
try:
# Search for pages in Notion
search_result = composio.tools.execute_action(
action="notion_search_pages",
user_id=user_id,
params={
"filter": {"property": "object", "value": "page"},
"page_size": 50
}
)
extracted_data = []
for page in search_result.get("results", []):
# Get page content
content_result = composio.tools.execute_action(
action="notion_get_page_content",
user_id=user_id,
params={"page_id": page["id"]}
)
# Extract title from properties
title = "Untitled"
if page.get("properties", {}).get("title"):
title_prop = page["properties"]["title"]
if title_prop.get("title") and len(title_prop["title"]) > 0:
title = title_prop["title"][0].get("plain_text", "Untitled")
extracted_data.append({
"id": page["id"],
"title": title,
"type": "notion",
"content": content_result.get("content", ""),
"url": page.get("url", ""),
"timestamp": page.get("last_edited_time", ""),
"metadata": {
"created_time": page.get("created_time"),
"created_by": page.get("created_by", {}).get("id"),
"parent": page.get("parent", {})
}
})
return extracted_data
except Exception as e:
print(f"Error extracting Notion data: {str(e)}")
return []
notion_data = extract_notion_data(user_id)
print(f"Extracted {len(notion_data)} pages from Notion")
Step 5: Store Data in Cortex
Batch Upload to Cortex
Use Cortex’s app sources upload endpoint to efficiently store all your extracted data:
def upload_to_cortex(data_sources: list, source_type: str):
"""Upload extracted data to Cortex using the app sources endpoint"""
# Prepare data for Cortex app sources format
cortex_data = []
for item in data_sources:
cortex_item = {
"id": item["id"],
"title": item["title"],
"type": source_type,
"description": f"Content from {item['type']}",
"url": item.get("url", ""),
"timestamp": item.get("timestamp", ""),
"content": {
"text": item["content"],
"html_base64": "",
"csv_base64": "",
"markdown": "",
"files": [],
"layout": []
},
"cortex_metadata": {
"source_app": item["type"],
"extracted_at": item.get("timestamp", "")
},
"meta": item.get("metadata", {}),
"attachments": []
}
cortex_data.append(cortex_item)
# Upload in batches of 20 (recommended by Cortex)
batch_size = 20
uploaded_count = 0
for i in range(0, len(cortex_data), batch_size):
batch = cortex_data[i:i + batch_size]
try:
response = requests.post(
f"{CORTEX_BASE_URL}/upload/upload_app_sources",
params={"tenant_id": TENANT_ID},
headers=headers,
json=batch
)
if response.status_code == 200:
uploaded_count += len(batch)
print(f"Uploaded batch {i//batch_size + 1}: {len(batch)} items")
else:
print(f"Failed to upload batch {i//batch_size + 1}: {response.text}")
except Exception as e:
print(f"Error uploading batch: {str(e)}")
# Wait 1 second between batches as recommended
time.sleep(1)
return uploaded_count
# Upload all extracted data
import time
total_uploaded = 0
total_uploaded += upload_to_cortex(google_drive_data, "google_drive")
total_uploaded += upload_to_cortex(slack_data, "slack")
total_uploaded += upload_to_cortex(notion_data, "notion")
print(f"Successfully uploaded {total_uploaded} items to Cortex!")
Verify Processing
Check if your uploaded data is ready for querying:
def verify_processing():
"""Verify that uploaded data is processed and ready"""
try:
response = requests.get(
f"{CORTEX_BASE_URL}/list/sources",
params={"tenant_id": TENANT_ID},
headers=headers
)
if response.status_code == 200:
sources = response.json()
print(f"Found {len(sources)} sources in your knowledge base:")
for source in sources[:10]: # Show first 10
print(f" - {source.get('title', 'Untitled')} ({source.get('type', 'unknown')})")
return True
else:
print(f"Error checking sources: {response.text}")
return False
except Exception as e:
print(f"Error verifying processing: {str(e)}")
return False
# Verify processing
if verify_processing():
print("Data is ready for querying!")
Step 6: Query Your Integrated Data
Now you can query across all your connected apps using Cortex’s Q&A endpoint:
def query_integrated_data(question: str, session_id: str = "integration_session"):
"""Query across all integrated app data"""
query_payload = {
"question": question,
"session_id": session_id,
"tenant_id": TENANT_ID,
"ai_generation": True,
"highlight_chunks": True,
"search_alpha": 0.7, # Balance between semantic and keyword search
"recency_bias": 0.3 # Slightly favor recent content
}
try:
response = requests.post(
f"{CORTEX_BASE_URL}/search/qna",
headers=headers,
json=query_payload
)
if response.status_code == 200:
result = response.json()
print(f"Answer: {result.get('answer', 'No answer generated')}")
print(f"\nSources:")
for i, source in enumerate(result.get('sources', [])[:3], 1):
print(f" {i}. {source.get('title', 'Untitled')} (via {source.get('type', 'unknown')})")
if source.get('url'):
print(f" {source['url']}")
return result
else:
print(f"Query failed: {response.text}")
return None
except Exception as e:
print(f"Error querying data: {str(e)}")
return None
# Example queries
example_queries = [
"What recent project updates were discussed in Slack?",
"Find documentation about our API from Google Drive",
"Show me meeting notes from Notion about the Q4 planning",
"What are the main topics discussed across all our platforms?"
]
for query in example_queries:
print(f"\nQuery: {query}")
query_integrated_data(query)
print("-" * 80)
Advanced Configuration
Filtering by Source
You can query specific sources using metadata filters:
def query_specific_source(question: str, source_type: str):
"""Query only specific app sources"""
query_payload = {
"question": question,
"session_id": f"{source_type}_session",
"tenant_id": TENANT_ID,
"ai_generation": True,
"tenant_metadata": {"source_app": source_type} # Filter by source
}
response = requests.post(
f"{CORTEX_BASE_URL}/search/qna",
headers=headers,
json=query_payload
)
return response.json() if response.status_code == 200 else None
# Query only Slack data
slack_result = query_specific_source("What were the latest team updates?", "slack")
Setting Up Automated Sync
For keeping your data fresh, you can set up periodic sync:
import schedule
import time
def sync_all_apps():
"""Sync data from all connected apps"""
print("Starting automated sync...")
# Re-extract data
new_google_drive_data = extract_google_drive_data(user_id)
new_slack_data = extract_slack_data(user_id)
new_notion_data = extract_notion_data(user_id)
# Upload to Cortex
total_synced = 0
total_synced += upload_to_cortex(new_google_drive_data, "google_drive")
total_synced += upload_to_cortex(new_slack_data, "slack")
total_synced += upload_to_cortex(new_notion_data, "notion")
print(f"Sync completed: {total_synced} items updated")
# Schedule sync every 6 hours
schedule.every(6).hours.do(sync_all_apps)
# Run scheduler (in production, use a proper job scheduler)
# while True:
# schedule.run_pending()
# time.sleep(60)
Best Practices
- Batch Processing: Always upload data in batches of 20 or less
- Rate Limiting: Wait 1 second between batch uploads
- Incremental Sync: Only sync new/changed content when possible
- Content Filtering: Pre-filter irrelevant content before uploading
Security Considerations
- API Key Management: Store API keys in environment variables
- Access Scopes: Request minimal necessary permissions from apps
- Data Retention: Implement proper data cleanup policies
- Encryption: Ensure data is encrypted in transit and at rest
Monitoring and Maintenance
def get_sync_status():
"""Monitor the health of your integrations"""
# Check Cortex storage
sources_response = requests.get(
f"{CORTEX_BASE_URL}/list/sources",
params={"tenant_id": TENANT_ID},
headers=headers
)
if sources_response.status_code == 200:
sources = sources_response.json()
# Group by source type
source_counts = {}
for source in sources:
source_type = source.get('type', 'unknown')
source_counts[source_type] = source_counts.get(source_type, 0) + 1
print("Current Knowledge Base Status:")
for source_type, count in source_counts.items():
print(f" - {source_type}: {count} items")
return source_counts
return {}
# Monitor your integration
sync_status = get_sync_status()
Troubleshooting
Common Issues
- Authorization Failures: Ensure app permissions are correctly granted
- Rate Limiting: Implement exponential backoff for API calls
- Content Processing: Handle different content types appropriately
- Memory Usage: Process large datasets in smaller chunks
Error Handling
def robust_extraction(extraction_func, app_name: str, max_retries: int = 3):
"""Wrapper for robust data extraction with retries"""
for attempt in range(max_retries):
try:
return extraction_func(user_id)
except Exception as e:
print(f"Attempt {attempt + 1} failed for {app_name}: {str(e)}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
else:
print(f"Max retries reached for {app_name}")
return []
# Use robust extraction
google_drive_data = robust_extraction(extract_google_drive_data, "Google Drive")
Conclusion
You now have a complete system for ingesting data from popular workplace apps using Composio and storing it in Cortex. This integration enables you to:
- Search across all platforms with natural language queries
- Get AI-powered answers that cite sources from multiple apps
- Maintain up-to-date knowledge with automated syncing
- Scale to additional apps using the same pattern
The combination of Composio’s extensive app ecosystem and Cortex’s powerful vector search capabilities provides a robust foundation for building AI applications that can access and understand your organization’s distributed knowledge.
For additional apps and advanced configurations, refer to the Composio documentation and explore Cortex’s API reference for more advanced features.