Most RAG tutorials show you how to build a demo. This is not that.
This is how we built a production RAG system that handles real business queries — connecting to live databases, answering customer questions from actual documents, and running 24/7 across WhatsApp, voice calls, and web chat simultaneously.
Here is everything we learned building RagLeap — a self-hosted AI business platform — from scratch.
Why We Chose Django Over FastAPI
Everyone building AI systems defaults to FastAPI. We chose Django. Here is why.
When you are building a product — not a microservice — Django's batteries-included philosophy saves months:
Authentication and multi-tenancy out of the box
ORM that handles complex queries without raw SQL
Admin panel for internal tooling
Mature migration system
Signals for event-driven architecture
We built a custom TenantMiddleware that resolves the workspace from every request context. Every database query, every RAG retrieval, every AI response is automatically scoped to the correct workspace without the developer thinking about it.
pythonclass TenantMiddleware:
def init(self, get_response):
self.get_response = get_response
def __call__(self, request):
workspace_id = self.resolve_workspace(request)
request.workspace = Workspace.objects.get(id=workspace_id)
response = self.get_response(request)
return response
def resolve_workspace(self, request):
# Resolve from subdomain, token, or session
if hasattr(request, 'auth'):
return request.auth.workspace_id
return request.session.get('workspace_id')
Why Neo4j Instead of a Vector Database
This is the question we get asked most.
Standard RAG works like this:
Chunk documents
Embed chunks
Store in vector DB
Retrieve by similarity
It works fine for simple Q&A. It breaks down for complex business queries.
Example: "Which customers complained about delivery last month and what products did they order?"
A vector search returns chunks mentioning "complaints" and "delivery." But it cannot traverse the relationship between a complaint, the customer who made it, and the orders associated with that customer.
That is a graph traversal problem — exactly what Neo4j solves.
In RagLeap, we build a knowledge graph from uploaded documents:
pythonfrom neo4j import GraphDatabase
class KnowledgeGraphBuilder:
def init(self, uri, user, password):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
def create_entity(self, entity_type, properties, workspace_id):
with self.driver.session() as session:
session.run(
f"""
MERGE (e:{entity_type} {{id: $id, workspace_id: $workspace_id}})
SET e += $properties
RETURN e
""",
id=properties.get('id'),
workspace_id=workspace_id,
properties=properties
)
def create_relationship(self, from_id, to_id, relationship_type, workspace_id):
with self.driver.session() as session:
session.run(
"""
MATCH (a {id: $from_id, workspace_id: $workspace_id})
MATCH (b {id: $to_id, workspace_id: $workspace_id})
MERGE (a)-[r:RELATES_TO {type: $rel_type}]->(b)
RETURN r
""",
from_id=from_id,
to_id=to_id,
workspace_id=workspace_id,
rel_type=relationship_type
)
Entities (customers, products, orders, policies) become nodes. Relationships become edges. When a query comes in, we combine vector similarity search with graph traversal.
The result: Answers that make sense for business queries, not just keyword matches.
The Hybrid Retrieval Pipeline
Here is our actual retrieval pipeline that combines vector search with graph traversal:
pythonclass HybridRetriever:
def init(self, vector_store, graph_db, workspace_id):
self.vector_store = vector_store
self.graph_db = graph_db
self.workspace_id = workspace_id
def retrieve(self, query, top_k=5):
# Step 1: Vector similarity search
vector_results = self.vector_store.similarity_search(
query,
k=top_k,
filter={"workspace_id": self.workspace_id}
)
# Step 2: Extract entities from query
entities = self.extract_entities(query)
# Step 3: Graph traversal from extracted entities
graph_results = []
for entity in entities:
related = self.graph_db.traverse(
entity,
max_depth=2,
workspace_id=self.workspace_id
)
graph_results.extend(related)
# Step 4: Merge and deduplicate results
combined = self.merge_results(vector_results, graph_results)
return combined
def extract_entities(self, query):
# Simple NER — in production we use spaCy or LLM-based extraction
# Returns list of entity IDs found in query
pass
def merge_results(self, vector_results, graph_results):
# Score and deduplicate
seen = set()
merged = []
for result in vector_results + graph_results:
if result.id not in seen:
seen.add(result.id)
merged.append(result)
return merged[:10]
Celery for Async — The Backbone of Everything
Almost everything in RagLeap that is not a synchronous API response goes through Celery:
Document ingestion and knowledge graph building
Email monitoring and reply drafting
Voice call management
Lead follow-up automation
WhatsApp message processing
We separate Celery beat (scheduler) from Celery workers (executors). Both run as separate Supervisor processes.
python# tasks.py
from celery import shared_task
from .knowledge_graph import KnowledgeGraphBuilder
from .document_processor import DocumentProcessor
@shared_task(bind=True, max_retries=3)
def process_document(self, document_id, workspace_id):
try:
processor = DocumentProcessor()
doc = processor.load(document_id)
# Extract chunks
chunks = processor.chunk(doc, chunk_size=512, overlap=50)
# Embed and store in vector DB
processor.embed_and_store(chunks, workspace_id)
# Build knowledge graph
builder = KnowledgeGraphBuilder()
entities = processor.extract_entities(doc)
builder.build_graph(entities, workspace_id)
return {"status": "success", "chunks": len(chunks)}
except Exception as exc:
raise self.retry(exc=exc, countdown=60)
Supervisor configuration for separate worker processes:
ini# /etc/supervisor/conf.d/celery.conf
[program:celery_worker]
command=/path/to/venv/bin/celery -A myproject worker --loglevel=info --concurrency=2
directory=/path/to/project
environment=PATH="/usr/bin:/usr/local/bin"
autostart=true
autorestart=true
[program:celery_beat]
command=/path/to/venv/bin/celery -A myproject beat --loglevel=info
directory=/path/to/project
autostart=true
autorestart=true
Multi-Channel Architecture
One AI brain serving WhatsApp, Telegram, Voice calls, Email and web chat simultaneously.
The key insight: normalize everything into the same internal format before it hits the AI.
pythonclass MessageNormalizer:
def normalize(self, raw_message, channel):
return {
"content": self.extract_content(raw_message, channel),
"sender_id": self.extract_sender(raw_message, channel),
"channel": channel,
"timestamp": self.extract_timestamp(raw_message, channel),
"metadata": self.extract_metadata(raw_message, channel)
}
def extract_content(self, message, channel):
extractors = {
"whatsapp": lambda m: m.get("body", ""),
"telegram": lambda m: m.get("text", ""),
"voice": lambda m: m.get("transcript", ""),
"email": lambda m: m.get("text_body", ""),
"web": lambda m: m.get("message", "")
}
return extractors[channel](message)
Channel-specific formatting happens on the way out:
pythonclass ResponseFormatter:
def format(self, ai_response, channel):
formatters = {
"whatsapp": self.format_whatsapp,
"voice": self.format_voice,
"email": self.format_email,
"web": self.format_html
}
return formatterschannel
def format_voice(self, response):
# Remove markdown, make it conversational
clean = response.replace("**", "").replace("#", "")
return clean
def format_whatsapp(self, response):
# WhatsApp supports basic markdown
return response
Running Everything on a $20/Month VPS
The entire stack — Django, PostgreSQL, Neo4j, Redis, Celery, Nginx, Gunicorn — runs on a 4GB RAM VPS.
Key optimizations:
bash# PostgreSQL — /etc/postgresql/14/main/postgresql.conf
shared_buffers = 512MB
effective_cache_size = 1GB
work_mem = 16MB
Neo4j — /etc/neo4j/neo4j.conf
server.memory.heap.initial_size=256m
server.memory.heap.max_size=512m
server.memory.pagecache.size=256m
Gunicorn — 2-3 workers maximum
gunicorn myproject.wsgi:application \
--workers 2 \
--worker-class gthread \
--threads 4 \
--bind 0.0.0.0:8000
Redis memory limit:
bash# /etc/redis/redis.conf
maxmemory 256mb
maxmemory-policy allkeys-lru
Result: Full AI platform running on 4GB RAM with stable performance under production load.
Database AI — Connecting to Live Data
One of the hardest and most powerful features: letting AI query the user's actual database in natural language.
The challenge: you cannot just give an LLM a database connection and ask it to write SQL. That is a security nightmare.
Our approach:
pythonclass DatabaseAI:
def init(self, connection_string, workspace_id):
self.engine = create_engine(connection_string)
self.workspace_id = workspace_id
self.schema = self.introspect_schema()
def introspect_schema(self):
inspector = inspect(self.engine)
schema = {}
for table_name in inspector.get_table_names():
columns = inspector.get_columns(table_name)
schema[table_name] = {
col['name']: str(col['type'])
for col in columns
}
return schema
def natural_language_query(self, question):
# Generate safe parameterized query from schema + question
query_template = self.llm.generate_query(
question=question,
schema=self.schema,
instruction="Generate a safe READ-ONLY SELECT query. No INSERT, UPDATE, DELETE or DROP."
)
# Validate query is read-only
if self.is_safe_query(query_template):
result = self.execute_safe(query_template)
return self.format_result(result)
else:
return "I can only read data, not modify it."
def is_safe_query(self, query):
forbidden = ['INSERT', 'UPDATE', 'DELETE', 'DROP', 'TRUNCATE', 'ALTER']
query_upper = query.upper()
return not any(word in query_upper for word in forbidden)
This lets a business owner message their AI: "How many orders came from Chennai this week?" — and get a real answer from their actual database.
Memory System
RagLeap maintains persistent memory across conversations using a summarization approach:
pythonclass ConversationMemory:
def init(self, workspace_id, customer_id):
self.workspace_id = workspace_id
self.customer_id = customer_id
def get_relevant_context(self, current_query):
# Get recent messages
recent = ConversationHistory.objects.filter(
workspace_id=self.workspace_id,
customer_id=self.customer_id
).order_by('-created_at')[:10]
# Get relevant historical summaries from graph
historical = self.graph_db.get_customer_context(
customer_id=self.customer_id,
query=current_query,
workspace_id=self.workspace_id
)
return {
"recent": [msg.to_dict() for msg in recent],
"historical": historical
}
def summarize_and_store(self, conversation_chunk):
# Periodically compress old conversations
summary = self.llm.summarize(conversation_chunk)
self.graph_db.store_conversation_summary(
customer_id=self.customer_id,
summary=summary,
workspace_id=self.workspace_id
)
What We Would Do Differently
Separate voice processing earlier.
Voice has completely different latency requirements from text. Running it in the same Celery worker pool as document ingestion caused priority issues we had to solve later.Observability from day one.
Adding proper logging, metrics, and tracing after the fact is painful. Build it in from the beginning.Schema design for multi-tenancy from migration 1.
Adding workspace isolation to an existing schema costs weeks of refactoring.
Try RagLeap
If you are building AI systems for businesses or looking for a self-hosted AI platform that actually connects to your data, RagLeap has a free self-hosted tier — single workspace, web embed chatbot, and AI Manager included.
GitHub open source launch coming soon — follow along at ragleap.com.




Top comments (0)