Embedding Pipeline
Batch document embedding pipeline with pgvector storage, incremental updates, and deduplication. Handles PDFs, web pages, and plain text with automatic chunking.
PythonAI WorkflowsBuilt with OpenClaw
1.6k
Stars
6.3k
Installs
4
Deps
2
Comments
Install / Copy
pip install freestack-embedding-pipelineCode Preview
index.tsx
import asyncio
from pgvector.asyncpg import register_vector
import asyncpg
class EmbeddingPipeline:
def __init__(self, db_url: str, model: str = "text-embedding-3-small"):
self.db_url = db_url
self.model = model
self.batch_size = 100
async def process_documents(self, docs: list[Document]) -> int:
pool = await asyncpg.create_pool(self.db_url)
await register_vector(pool)
processed = 0
for batch in self._chunk(docs, self.batch_size):
embeddings = await self._embed_batch([d.content for d in batch])
async with pool.acquire() as conn:
await conn.executemany(
"INSERT INTO embeddings (content, embedding, metadata) VALUES ($1, $2, $3)",
[(d.content, e, d.metadata) for d, e in zip(batch, embeddings)],
)
processed += len(batch)
return processedDA
dataeng1 week ago
The incremental update logic is solid. Only re-embeds changed documents.
PG
pgfan4 days ago
Works seamlessly with Supabase's pgvector. Good async patterns throughout.
Related Modules
RAG Pipeline
Full retrieval-augmented generation pipeline with document chunking, embedding generation, vector storage, and context-aware retrieval. Supports PDF, Markdown, and HTML sources.
3.8kPython
Agent Loop
Autonomous agent execution loop with tool calling, memory management, and graceful error recovery. Supports OpenAI and Anthropic function calling formats.
2.9kTypeScript
AI Chat Widget
Drop-in AI chat interface with streaming responses, message history, typing indicators, and markdown rendering. Connects to any OpenAI-compatible API.
2.1kReact