ragmatic v0.0.5
What is RAGmatic?
RAGmatic is a library for creating and continuously synchronizing embeddings for your data in PostgreSQL.
Features
- Pragmatic: continuous, robust, flexible and runs on PostgreSQL
- Continuous: Automatically create and continuously synchronize embeddings for your data in PostgreSQL
- Robust: Event driven triggers create embeddings jobs with ACID guarantees and queue based workers process them in the background
- Flexible: Use your own embedding pipeline with any model provider. Use all your columns, chunk as you want, enrich your embeddings with metadata, call out to LLMs, you name it, it's all possible
- Runs on PostgreSQL: Seamless vector and hybrid search with pgvector
and more:
- Built in de-duplication to avoid expensive re-embeddings of existing chunks
- Run multiple embedding pipelines per table to compare them and create your own evals
- Support for JSONB, images, blob data and other complex data types
How does RAGmatic work?
- RAGmatic works by tracking changes to your chosen table via database triggers in a new PostgreSQL schema:
ragmatic_<pipeline_name>
. - Once the tracking is setup via
RAGmatic.create()
, you can continue to use your database as normal. - Any changes to your table will be detected and processed by RAGmatic's workers. Chunking and embedding generation is fully configurable and already de-duplicates data to avoid expensive and unnecessary re-embeddings.
- Processed embeddings are stored in the
ragmatic_<pipeline_name>.chunks
table as pgvector's vector data type. You can search these vectors with pgvector'svector_similarity_ops
functions in SQL and even join them with your existing tables to filter results.
🚀 Getting Started
- Install the library:
npm install ragmatic
- Setup tracking for your table. This will create the necessary tables in your database under a
ragmatic_<pipeline_name>
schema.
import RAGmatic from "ragmatic";
import { Worker } from "ragmatic";
import { chunk } from "llm-chunk";
import { OpenAI } from "openai";
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
const blogPostsToEmbeddings = await RAGmatic.create<BlogPost>({
connectionString: process.env.DATABASE_URL!,
name: "blog_posts_openai",
tableToWatch: "blog_posts",
embeddingDimension: 1536,
recordToChunksFunction: async (post: any) => {
return chunk(post.content, {
minLength: 100,
maxLength: 1000,
overlap: 20,
splitter: "sentence",
}).map((chunk, index) => ({
text: chunk,
title: post.title,
}));
},
chunkToEmbeddingFunction: async (chunk: ChunkData) => {
const embedding = await openai.embeddings.create({
model: "text-embedding-3-small",
input: `title: ${chunk.title} content: ${chunk.text}`,
});
return {
embedding: embedding.data[0].embedding,
text: `title: ${chunk.title} content: ${chunk.text}`,
};
},
});
- Start the embedding pipeline. This will continuously embed your data and store the embeddings in the
ragmatic_<pipeline_name>.chunks
table.
await blogPostsToEmbeddings.start();
- Search your data:
import { pg } from "pg";
const client = new pg.Client({
connectionString: process.env.DATABASE_URL!,
});
await client.connect();
// find similar blog posts content to the query
const query = "pgvector is a vector extension for PostgreSQL";
const queryEmbedding = await generateEmbedding(query);
const threshold = 0.5;
const topK = 4;
// join the chunks table with the blog_posts table to get the title
const result = await client.query(
`WITH similarity_scores AS (
SELECT
c.text AS chunk_text,
c.docId,
1 - (cosine_distance(c.embedding, $1)) AS similarity
FROM ragmatic_blog_posts_openai.chunks c
LEFT JOIN blog_posts b ON c.docId = b.id
)
SELECT similarity, chunk_text, docId, b.title
FROM similarity_scores
WHERE similarity > $2
ORDER BY similarity DESC
LIMIT $3;
`,
[queryEmbedding, threshold, topK],
);
See the examples for more.
💡 Examples
🧐 FAQ
What is the difference between RAGmatic and pgvector?
pgvector is a vector extension for PostgreSQL, it allows you to store and search vectors. RAGmatic is an orchestration library built on top of pgvector allowing you to keep your embeddings up to date.
What is the difference between RAGmatic and pgai?
Both are tools for keeping your embeddings in sync with your data in PostgreSQL, however pgai comes with a few drawbacks: it is a database extension, processing happens in the database, and you are limited to using their pre-built embedding pipelines.
We made RAGmatic to be a more flexible and powerful alternative to pgai, allowing you to use your own embedding pipeline defined in TypeScript, enabling you to use any LLM, chunking algorithm and metadata generation to create your own state of the art RAG system.
My table has a lot of columns, how can I track them all?
When setting up your tracker, you don't need specify which columns to track, because RAGmatic will track all columns. It's up to your worker to decide which columns to use for the embedding generation.
What index is used for vector search? How can I configure it?
By default RAGmatic creates a pgvector HNSW index for cosine distance on the ragmatic_<pipeline_name>.chunks
table. You can disable this by setting the skipEmbeddingIndexSetup
option to true
when creating the pipeline. Then you can set up the index manually on the ragmatic_<pipeline_name>.chunks
table.
We will add more guidelines and examples on this soon.
How does the de-duplication work?
De-duplication works by calculating an md5 hash of every chunk and storing it at embedding time. When an update is detected for a row, the worker will check if the chunk has already been embedded and if so, it will skip the embedding step.
You can override the default hash function by providing your own implementation to the worker.
How can I remove RAGmatic from my database?
Call pipeline.destroy()
to drop the ragmatic_<pipeline_name>
schema.
This will remove all the tables and objects created by RAGmatic.
How can I monitor worker processing?
You can check on the job queue by querying the ragmatic_<pipeline_name>.work_queue
table or calling pipeline.countRemainingDocuments()
I just updated my worker's code, how can I migrate to it?
Call pipeline.reprocessAll()
to mark all your existing rows for re-embedding and start your worker with the new code.
What are some useful techniques for improving retrieval?
Please see the examples, dive into the Awesome Generative Information Retrieval repo or hit us up on https://barnacle.ai we'd love to help you out.