def upload_to_cortex(data_sources: list, source_type: str):
"""Upload extracted data to Cortex using the app sources endpoint"""
# Prepare data for Cortex app sources format
cortex_data = []
for item in data_sources:
cortex_item = {
"id": item["id"],
"title": item["title"],
"type": source_type,
"description": f"Content from {item['type']}",
"url": item.get("url", ""),
"timestamp": item.get("timestamp", ""),
"content": {
"text": item["content"],
"html_base64": "",
"csv_base64": "",
"markdown": "",
"files": [],
"layout": []
},
"cortex_metadata": {
"source_app": item["type"],
"extracted_at": item.get("timestamp", "")
},
"meta": item.get("metadata", {}),
"attachments": []
}
cortex_data.append(cortex_item)
# Upload in batches of 20 (recommended by Cortex)
batch_size = 20
uploaded_count = 0
for i in range(0, len(cortex_data), batch_size):
batch = cortex_data[i:i + batch_size]
try:
response = requests.post(
f"{CORTEX_BASE_URL}/upload/upload_app_sources",
params={"tenant_id": TENANT_ID},
headers=headers,
json=batch
)
if response.status_code == 200:
uploaded_count += len(batch)
print(f"Uploaded batch {i//batch_size + 1}: {len(batch)} items")
else:
print(f"Failed to upload batch {i//batch_size + 1}: {response.text}")
except Exception as e:
print(f"Error uploading batch: {str(e)}")
# Wait 1 second between batches as recommended
time.sleep(1)
return uploaded_count
# Upload all extracted data
import time
total_uploaded = 0
total_uploaded += upload_to_cortex(google_drive_data, "google_drive")
total_uploaded += upload_to_cortex(slack_data, "slack")
total_uploaded += upload_to_cortex(notion_data, "notion")
print(f"Successfully uploaded {total_uploaded} items to Cortex!")