sparknlp.annotator.vector_db.vector_db_connector#

Contains classes for VectorDBConnector.

Module Contents#

Classes#

VectorDBConnector

Connector for storing and retrieving embeddings from vector databases.

class VectorDBConnector(classname='com.johnsnowlabs.ml.ai.VectorDBConnector', java_model=None)[source]#

Connector for storing and retrieving embeddings from vector databases.

This annotator takes embeddings from previous annotators (like BertEmbeddings, SentenceEmbeddings, OpenAIEmbeddings, etc.) and stores them in a vector database for similarity search and retrieval. Currently supports Pinecone with more providers planned.

Parameters:
provider

Vector database provider. Currently supported: ‘pinecone’

indexName

Name of the index/collection in the vector database

namespace

Namespace/partition within the index (optional)

idColumn

Column name to use as vector ID (if not set, generates UUID)

metadataColumns

Column names to include as metadata with vectors

batchSize

Number of vectors to upsert in a single batch

Examples

>>> import sparknlp
>>> from sparknlp.base import *
>>> from sparknlp.annotator import *
>>> from pyspark.ml import Pipeline
>>> documentAssembler = DocumentAssembler() \
...     .setInputCol("text") \
...     .setOutputCol("document")
>>> embeddings = BertSentenceEmbeddings.pretrained() \
...     .setInputCols(["document"]) \
...     .setOutputCol("sentence_embeddings")
>>> vectorDB = VectorDBConnector() \
...     .setInputCols(["document", "sentence_embeddings"]) \
...     .setOutputCol("vectordb_result") \
...     .setProvider("pinecone") \
...     .setIndexName("my-index") \
...     .setNamespace("production") \
...     .setIdColumn("id") \
...     .setMetadataColumns(["text", "category"]) \
...     .setBatchSize(100)
>>> pipeline = Pipeline().setStages([
...     documentAssembler,
...     embeddings,
...     vectorDB
... ])
>>> data = spark.createDataFrame([
...     ("1", "Spark NLP is great", "tech"),
...     ("2", "Vector databases enable semantic search", "tech")
... ]).toDF("id", "text", "category")
>>> result = pipeline.fit(data).transform(data)
name = 'VectorDBConnector'[source]#
inputAnnotatorTypes[source]#
outputAnnotatorType = 'document'[source]#
provider[source]#
indexName[source]#
namespace[source]#
idColumn[source]#
metadataColumns[source]#
batchSize[source]#
setProvider(value)[source]#

Sets the vector database provider.

Parameters:
valuestr

Vector database provider. Currently supported: ‘pinecone’

setIndexName(value)[source]#

Sets the name of the index/collection in the vector database.

Parameters:
valuestr

Name of the index/collection

setNamespace(value)[source]#

Sets the namespace/partition within the index.

Parameters:
valuestr

Namespace/partition name (optional)

setIdColumn(value)[source]#

Sets the column name to use as vector ID.

Parameters:
valuestr

Column name for vector ID. If not set, UUIDs will be generated.

setMetadataColumns(value)[source]#

Sets the column names to include as metadata with vectors.

Parameters:
valuelist[str]

List of column names to include as metadata

setBatchSize(value)[source]#

Sets the number of vectors to upsert in a single batch.

Parameters:
valueint

Batch size for upsert operations (max 1000)