Skip to main content

Architecture Overview

Engram is a semantic search system for AI conversations and documents, built with a modular architecture that separates concerns across distinct layers.

System Flow

Source Data → Adapters → Chunker → Embedder → PostgreSQL/pgvector

Search/Enrichment/Reorganization

Core Components

Data Layer

PostgreSQL + pgvector - Primary data store with vector similarity search capabilities.

Key tables:

  • documents - Source documents with metadata
  • memories - Chunks with embeddings
  • clusters - Document groupings
  • cluster_members - Cluster membership
  • document_relations - Cross-document links
  • archived_external_ids - Archived document tracking
  • knowledge_artifacts - Generated knowledge maps

Processing Layer

ModulePurpose
adapters/Parse Claude, ChatGPT exports and Craft MCP API into unified Document model
processing/chunker.pyToken-based splitting (tiktoken, 500 tokens, 50 overlap)
processing/ingest.pyOrchestrates: hash check → chunk → embed → upsert

Search Layer

ModulePurpose
search/semantic.pypgvector similarity search using <=> cosine distance
search/hybrid.pyCombined semantic + keyword search

Enrichment Layer

ModulePurpose
enrichment/GPT-4o summarization, tagging, cluster labeling, knowledge maps
openai/batch_client.pyOpenAI Batch API for 50% cheaper enrichment

Azure Integration

ModulePurpose
azure/client.pyFoundryClient with embed(), chat(), chat_json()
azure/cost_tracker.pyPersists costs to ~/.engram_costs.json

Key Patterns

Async Everywhere

All database and Azure calls are async. The CLI bridges with asyncio.run().

import asyncio
from engram.azure.client import FoundryClient
from engram.search.semantic import SemanticSearch

async def search(query: str) -> list[Memory]:
client = FoundryClient()
embedding = await client.embed(query)
searcher = SemanticSearch()
return await searcher.similarity_search(embedding, limit=10)

# CLI entry point
def main():
results = asyncio.run(search("my query"))
for result in results:
print(f"{result.score:.3f}: {result.content[:100]}...")

Adapters Pattern

Each data source has a dedicated adapter that converts to a unified Document model:

from engram.adapters.claude import ClaudeAdapter
from engram.adapters.chatgpt import ChatGPTAdapter

# Parse Claude export
adapter = ClaudeAdapter()
documents = adapter.parse("conversations.json")

# Each document has consistent structure
for doc in documents:
print(f"ID: {doc.id}")
print(f"Source: {doc.source}") # 'claude' or 'chatgpt'
print(f"Created: {doc.created_at}")
print(f"Content: {doc.content[:100]}...")

Chunking Strategy

Documents are split into overlapping chunks for better semantic retrieval:

from engram.processing.chunker import Chunker

chunker = Chunker(
max_tokens=500, # Maximum tokens per chunk
overlap_tokens=50 # Overlap between chunks
)

chunks = chunker.chunk(document.content)
# Returns list of text chunks with position metadata

Embedding and Storage

Embeddings are generated and stored with numpy arrays:

import numpy as np
from psycopg.types.json import Json

async def store_memory(conn, content: str, embedding: list[float], metadata: dict):
"""Store a memory chunk with its embedding."""
await conn.execute(
"""
INSERT INTO memories (content, embedding, metadata)
VALUES (%s, %s, %s)
""",
(content, np.array(embedding), Json(metadata))
)

pgvector enables efficient similarity search:

async def similarity_search(conn, query_embedding: list[float], limit: int = 10):
"""Find similar memories using cosine distance."""
result = await conn.execute(
"""
SELECT id, content, 1 - (embedding <=> %s) as similarity
FROM memories
ORDER BY embedding <=> %s
LIMIT %s
""",
(np.array(query_embedding), np.array(query_embedding), limit)
)
return await result.fetchall()

Database Conventions

  • psycopg3 placeholders - Use %s not $1 for SQL parameters
  • pgvector requires numpy - Convert embeddings to np.array() before queries
  • JSONB fields - Wrap dicts with psycopg.types.json.Json()
  • Async connections - Use async with for connection management
import psycopg
from psycopg.rows import dict_row

async def get_connection():
return await psycopg.AsyncConnection.connect(
DATABASE_URL,
row_factory=dict_row
)

async def query_documents():
async with await get_connection() as conn:
result = await conn.execute("SELECT * FROM documents LIMIT 10")
return await result.fetchall()

API Layer

The FastAPI backend exposes REST endpoints for the frontend:

  • /api/v1/conversations - Conversation management
  • /api/v1/search - Semantic search
  • /api/v1/memories - Memory access
  • /api/v1/chat - Chat completions

See the API Reference for complete endpoint documentation.

Frontend Architecture

The Next.js 14 frontend uses:

  • TanStack Query for data fetching
  • Vercel AI SDK for streaming
  • IndexedDB for offline queue
  • Workbox for PWA support