Architecture Overview
Engram is a semantic search system for AI conversations and documents, built with a modular architecture that separates concerns across distinct layers.
System Flow
Source Data → Adapters → Chunker → Embedder → PostgreSQL/pgvector
↓
Search/Enrichment/Reorganization
Core Components
Data Layer
PostgreSQL + pgvector - Primary data store with vector similarity search capabilities.
Key tables:
documents- Source documents with metadatamemories- Chunks with embeddingsclusters- Document groupingscluster_members- Cluster membershipdocument_relations- Cross-document linksarchived_external_ids- Archived document trackingknowledge_artifacts- Generated knowledge maps
Processing Layer
| Module | Purpose |
|---|---|
adapters/ | Parse Claude, ChatGPT exports and Craft MCP API into unified Document model |
processing/chunker.py | Token-based splitting (tiktoken, 500 tokens, 50 overlap) |
processing/ingest.py | Orchestrates: hash check → chunk → embed → upsert |
Search Layer
| Module | Purpose |
|---|---|
search/semantic.py | pgvector similarity search using <=> cosine distance |
search/hybrid.py | Combined semantic + keyword search |
Enrichment Layer
| Module | Purpose |
|---|---|
enrichment/ | GPT-4o summarization, tagging, cluster labeling, knowledge maps |
openai/batch_client.py | OpenAI Batch API for 50% cheaper enrichment |
Azure Integration
| Module | Purpose |
|---|---|
azure/client.py | FoundryClient with embed(), chat(), chat_json() |
azure/cost_tracker.py | Persists costs to ~/.engram_costs.json |
Key Patterns
Async Everywhere
All database and Azure calls are async. The CLI bridges with asyncio.run().
import asyncio
from engram.azure.client import FoundryClient
from engram.search.semantic import SemanticSearch
async def search(query: str) -> list[Memory]:
client = FoundryClient()
embedding = await client.embed(query)
searcher = SemanticSearch()
return await searcher.similarity_search(embedding, limit=10)
# CLI entry point
def main():
results = asyncio.run(search("my query"))
for result in results:
print(f"{result.score:.3f}: {result.content[:100]}...")
Adapters Pattern
Each data source has a dedicated adapter that converts to a unified Document model:
from engram.adapters.claude import ClaudeAdapter
from engram.adapters.chatgpt import ChatGPTAdapter
# Parse Claude export
adapter = ClaudeAdapter()
documents = adapter.parse("conversations.json")
# Each document has consistent structure
for doc in documents:
print(f"ID: {doc.id}")
print(f"Source: {doc.source}") # 'claude' or 'chatgpt'
print(f"Created: {doc.created_at}")
print(f"Content: {doc.content[:100]}...")
Chunking Strategy
Documents are split into overlapping chunks for better semantic retrieval:
from engram.processing.chunker import Chunker
chunker = Chunker(
max_tokens=500, # Maximum tokens per chunk
overlap_tokens=50 # Overlap between chunks
)
chunks = chunker.chunk(document.content)
# Returns list of text chunks with position metadata
Embedding and Storage
Embeddings are generated and stored with numpy arrays:
import numpy as np
from psycopg.types.json import Json
async def store_memory(conn, content: str, embedding: list[float], metadata: dict):
"""Store a memory chunk with its embedding."""
await conn.execute(
"""
INSERT INTO memories (content, embedding, metadata)
VALUES (%s, %s, %s)
""",
(content, np.array(embedding), Json(metadata))
)
Vector Search
pgvector enables efficient similarity search:
async def similarity_search(conn, query_embedding: list[float], limit: int = 10):
"""Find similar memories using cosine distance."""
result = await conn.execute(
"""
SELECT id, content, 1 - (embedding <=> %s) as similarity
FROM memories
ORDER BY embedding <=> %s
LIMIT %s
""",
(np.array(query_embedding), np.array(query_embedding), limit)
)
return await result.fetchall()
Database Conventions
- psycopg3 placeholders - Use
%snot$1for SQL parameters - pgvector requires numpy - Convert embeddings to
np.array()before queries - JSONB fields - Wrap dicts with
psycopg.types.json.Json() - Async connections - Use
async withfor connection management
import psycopg
from psycopg.rows import dict_row
async def get_connection():
return await psycopg.AsyncConnection.connect(
DATABASE_URL,
row_factory=dict_row
)
async def query_documents():
async with await get_connection() as conn:
result = await conn.execute("SELECT * FROM documents LIMIT 10")
return await result.fetchall()
API Layer
The FastAPI backend exposes REST endpoints for the frontend:
/api/v1/conversations- Conversation management/api/v1/search- Semantic search/api/v1/memories- Memory access/api/v1/chat- Chat completions
See the API Reference for complete endpoint documentation.
Frontend Architecture
The Next.js 14 frontend uses:
- TanStack Query for data fetching
- Vercel AI SDK for streaming
- IndexedDB for offline queue
- Workbox for PWA support