# Copyright 2017-2023 John Snow Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains classes for the AutoGGUFEmbeddings."""
from typing import List
from sparknlp.common import *
[docs]class AutoGGUFEmbeddings(AnnotatorModel, HasBatchedAnnotate):
"""
Annotator that uses the llama.cpp library to generate text embeddings with large language
models
The type of embedding pooling can be set with the `setPoolingType` method. The default is
`"MEAN"`. The available options are `"NONE"`, `"MEAN"`, `"CLS"`, and `"LAST"`.
Pretrained models can be loaded with :meth:`.pretrained` of the companion
object:
>>> auto_gguf_model = AutoGGUFEmbeddings.pretrained() \\
... .setInputCols(["document"]) \\
... .setOutputCol("embeddings")
The default model is ``"nomic-embed-text-v1.5.Q8_0.gguf"``, if no name is provided.
For extended examples of usage, see the
`AutoGGUFEmbeddingsTest <https://github.com/JohnSnowLabs/spark-nlp/tree/master/src/test/scala/com/johnsnowlabs/nlp/embeddings/AutoGGUFEmbeddingsTest.scala>`__
and the
`example notebook <https://github.com/JohnSnowLabs/spark-nlp/tree/master/examples/python/llama.cpp/llama.cpp_in_Spark_NLP_AutoGGUFEmbeddings.ipynb>`__.
For available pretrained models please see the `Models Hub <https://sparknlp.org/models>`__.
====================== ======================
Input Annotation types Output Annotation type
====================== ======================
``DOCUMENT`` ``SENTENCE_EMBEDDINGS``
====================== ======================
Parameters
----------
nThreads
Set the number of threads to use during generation
nThreadsBatch
Set the number of threads to use during batch and prompt processing
nCtx
Set the size of the prompt context
nBatch
Set the logical batch size for prompt processing (must be >=32 to use BLAS)
nUbatch
Set the physical batch size for prompt processing (must be >=32 to use BLAS)
nChunks
Set the maximal number of chunks to process
nSequences
Set the number of sequences to decode
nGpuLayers
Set the number of layers to store in VRAM (-1 - use default)
gpuSplitMode
Set how to split the model across GPUs
mainGpu
Set the main GPU that is used for scratch and small tensors.
tensorSplit
Set how split tensors should be distributed across GPUs
grpAttnN
Set the group-attention factor
grpAttnW
Set the group-attention width
ropeFreqBase
Set the RoPE base frequency, used by NTK-aware scaling
ropeFreqScale
Set the RoPE frequency scaling factor, expands context by a factor of 1/N
yarnExtFactor
Set the YaRN extrapolation mix factor
yarnAttnFactor
Set the YaRN scale sqrt(t) or attention magnitude
yarnBetaFast
Set the YaRN low correction dim or beta
yarnBetaSlow
Set the YaRN high correction dim or alpha
yarnOrigCtx
Set the YaRN original context size of model
defragmentationThreshold
Set the KV cache defragmentation threshold
numaStrategy
Set optimization strategies that help on some NUMA systems (if available)
ropeScalingType
Set the RoPE frequency scaling method, defaults to linear unless specified by the model
poolingType
Set the pooling type for embeddings, use model default if unspecified
flashAttention
Whether to enable Flash Attention
useMmap
Whether to use memory-map model (faster load but may increase pageouts if not using mlock)
useMlock
Whether to force the system to keep model in RAM rather than swapping or compressing
noKvOffload
Whether to disable KV offload
Notes
-----
To use GPU inference with this annotator, make sure to use the Spark NLP GPU package and set
the number of GPU layers with the `setNGpuLayers` method.
When using larger models, we recommend adjusting GPU usage with `setNCtx` and `setNGpuLayers`
according to your hardware to avoid out-of-memory errors.
Examples
--------
>>> import sparknlp
>>> from sparknlp.base import *
>>> from sparknlp.annotator import *
>>> from pyspark.ml import Pipeline
>>> document = DocumentAssembler() \\
... .setInputCol("text") \\
... .setOutputCol("document")
>>> autoGGUFEmbeddings = AutoGGUFEmbeddings.pretrained() \\
... .setInputCols(["document"]) \\
... .setOutputCol("embeddings") \\
... .setBatchSize(4) \\
... .setNGpuLayers(99) \\
... .setPoolingType("MEAN")
>>> pipeline = Pipeline().setStages([document, autoGGUFEmbeddings])
>>> data = spark.createDataFrame([["The moons of Jupiter are 77 in total, with 79 confirmed natural satellites and 2 man-made ones."]]).toDF("text")
>>> result = pipeline.fit(data).transform(data)
>>> result.select("embeddings.embeddings").show(truncate = False)
+--------------------------------------------------------------------------------+
| embeddings|
+--------------------------------------------------------------------------------+
|[[-0.034486726, 0.07770534, -0.15982522, -0.017873349, 0.013914132, 0.0365736...|
+--------------------------------------------------------------------------------+
"""
name = "AutoGGUFEmbeddings"
inputAnnotatorTypes = [AnnotatorType.DOCUMENT]
outputAnnotatorType = AnnotatorType.DOCUMENT
# -------- MODEl PARAMETERS --------
nThreads = Param(
Params._dummy(),
"nThreads",
"Set the number of threads to use during generation",
typeConverter=TypeConverters.toInt,
)
nThreadsBatch = Param(
Params._dummy(),
"nThreadsBatch",
"Set the number of threads to use during batch and prompt processing",
typeConverter=TypeConverters.toInt,
)
nCtx = Param(
Params._dummy(),
"nCtx",
"Set the size of the prompt context",
typeConverter=TypeConverters.toInt,
)
nBatch = Param(
Params._dummy(),
"nBatch",
"Set the logical batch size for prompt processing (must be >=32 to use BLAS)",
typeConverter=TypeConverters.toInt,
)
nUbatch = Param(
Params._dummy(),
"nUbatch",
"Set the physical batch size for prompt processing (must be >=32 to use BLAS)",
typeConverter=TypeConverters.toInt,
)
nChunks = Param(
Params._dummy(),
"nChunks",
"Set the maximal number of chunks to process",
typeConverter=TypeConverters.toInt,
)
nSequences = Param(
Params._dummy(),
"nSequences",
"Set the number of sequences to decode",
typeConverter=TypeConverters.toInt,
)
nGpuLayers = Param(
Params._dummy(),
"nGpuLayers",
"Set the number of layers to store in VRAM (-1 - use default)",
typeConverter=TypeConverters.toInt,
)
# Set how to split the model across GPUs
#
# - NONE: No GPU split
# - LAYER: Split the model across GPUs by layer
# - ROW: Split the model across GPUs by rows
gpuSplitMode = Param(
Params._dummy(),
"gpuSplitMode",
"Set how to split the model across GPUs",
typeConverter=TypeConverters.toString,
)
mainGpu = Param(
Params._dummy(),
"mainGpu",
"Set the main GPU that is used for scratch and small tensors.",
typeConverter=TypeConverters.toInt,
)
tensorSplit = Param(
Params._dummy(),
"tensorSplit",
"Set how split tensors should be distributed across GPUs",
typeConverter=TypeConverters.toListFloat,
)
grpAttnN = Param(
Params._dummy(),
"grpAttnN",
"Set the group-attention factor",
typeConverter=TypeConverters.toInt,
)
grpAttnW = Param(
Params._dummy(),
"grpAttnW",
"Set the group-attention width",
typeConverter=TypeConverters.toInt,
)
ropeFreqBase = Param(
Params._dummy(),
"ropeFreqBase",
"Set the RoPE base frequency, used by NTK-aware scaling",
typeConverter=TypeConverters.toFloat,
)
ropeFreqScale = Param(
Params._dummy(),
"ropeFreqScale",
"Set the RoPE frequency scaling factor, expands context by a factor of 1/N",
typeConverter=TypeConverters.toFloat,
)
yarnExtFactor = Param(
Params._dummy(),
"yarnExtFactor",
"Set the YaRN extrapolation mix factor",
typeConverter=TypeConverters.toFloat,
)
yarnAttnFactor = Param(
Params._dummy(),
"yarnAttnFactor",
"Set the YaRN scale sqrt(t) or attention magnitude",
typeConverter=TypeConverters.toFloat,
)
yarnBetaFast = Param(
Params._dummy(),
"yarnBetaFast",
"Set the YaRN low correction dim or beta",
typeConverter=TypeConverters.toFloat,
)
yarnBetaSlow = Param(
Params._dummy(),
"yarnBetaSlow",
"Set the YaRN high correction dim or alpha",
typeConverter=TypeConverters.toFloat,
)
yarnOrigCtx = Param(
Params._dummy(),
"yarnOrigCtx",
"Set the YaRN original context size of model",
typeConverter=TypeConverters.toInt,
)
defragmentationThreshold = Param(
Params._dummy(),
"defragmentationThreshold",
"Set the KV cache defragmentation threshold",
typeConverter=TypeConverters.toFloat,
)
# Set optimization strategies that help on some NUMA systems (if available)
#
# Available Strategies:
#
# - DISABLED: No NUMA optimizations
# - DISTRIBUTE: Spread execution evenly over all
# - ISOLATE: Only spawn threads on CPUs on the node that execution started on
# - NUMA_CTL: Use the CPU map provided by numactl
# - MIRROR: Mirrors the model across NUMA nodes
numaStrategy = Param(
Params._dummy(),
"numaStrategy",
"Set optimization strategies that help on some NUMA systems (if available)",
typeConverter=TypeConverters.toString,
)
# Set the RoPE frequency scaling method, defaults to linear unless specified by the model.
#
# - UNSPECIFIED: Don't use any scaling
# - LINEAR: Linear scaling
# - YARN: YaRN RoPE scaling
ropeScalingType = Param(
Params._dummy(),
"ropeScalingType",
"Set the RoPE frequency scaling method, defaults to linear unless specified by the model",
typeConverter=TypeConverters.toString,
)
# Set the pooling type for embeddings, use model default if unspecified
#
# - 0 UNSPECIFIED: Don't use any pooling
# - 1 MEAN: Mean Pooling
# - 2 CLS: CLS Pooling
poolingType = Param(
Params._dummy(),
"poolingType",
"Set the pooling type for embeddings, use model default if unspecified",
typeConverter=TypeConverters.toString,
)
embedding = Param(
Params._dummy(),
"embedding",
"Whether to load model with embedding support",
typeConverter=TypeConverters.toBoolean,
)
flashAttention = Param(
Params._dummy(),
"flashAttention",
"Whether to enable Flash Attention",
typeConverter=TypeConverters.toBoolean,
)
useMmap = Param(
Params._dummy(),
"useMmap",
"Whether to use memory-map model (faster load but may increase pageouts if not using mlock)",
typeConverter=TypeConverters.toBoolean,
)
useMlock = Param(
Params._dummy(),
"useMlock",
"Whether to force the system to keep model in RAM rather than swapping or compressing",
typeConverter=TypeConverters.toBoolean,
)
noKvOffload = Param(
Params._dummy(),
"noKvOffload",
"Whether to disable KV offload",
typeConverter=TypeConverters.toBoolean,
)
# -------- MODEL SETTERS --------
[docs] def setNThreads(self, nThreads: int):
"""Set the number of threads to use during generation"""
return self._set(nThreads=nThreads)
[docs] def setNThreadsBatch(self, nThreadsBatch: int):
"""Set the number of threads to use during batch and prompt processing"""
return self._set(nThreadsBatch=nThreadsBatch)
[docs] def setNCtx(self, nCtx: int):
"""Set the size of the prompt context"""
return self._set(nCtx=nCtx)
[docs] def setNBatch(self, nBatch: int):
"""Set the logical batch size for prompt processing (must be >=32 to use BLAS)"""
return self._set(nBatch=nBatch)
[docs] def setNUbatch(self, nUbatch: int):
"""Set the physical batch size for prompt processing (must be >=32 to use BLAS)"""
return self._set(nUbatch=nUbatch)
[docs] def setNChunks(self, nChunks: int):
"""Set the maximal number of chunks to process"""
return self._set(nChunks=nChunks)
[docs] def setNSequences(self, nSequences: int):
"""Set the number of sequences to decode"""
return self._set(nSequences=nSequences)
[docs] def setNGpuLayers(self, nGpuLayers: int):
"""Set the number of layers to store in VRAM (-1 - use default)"""
return self._set(nGpuLayers=nGpuLayers)
[docs] def setGpuSplitMode(self, gpuSplitMode: str):
"""Set how to split the model across GPUs"""
return self._set(gpuSplitMode=gpuSplitMode)
[docs] def setMainGpu(self, mainGpu: int):
"""Set the main GPU that is used for scratch and small tensors."""
return self._set(mainGpu=mainGpu)
[docs] def setTensorSplit(self, tensorSplit: List[float]):
"""Set how split tensors should be distributed across GPUs"""
return self._set(tensorSplit=tensorSplit)
[docs] def setGrpAttnN(self, grpAttnN: int):
"""Set the group-attention factor"""
return self._set(grpAttnN=grpAttnN)
[docs] def setGrpAttnW(self, grpAttnW: int):
"""Set the group-attention width"""
return self._set(grpAttnW=grpAttnW)
[docs] def setRopeFreqBase(self, ropeFreqBase: float):
"""Set the RoPE base frequency, used by NTK-aware scaling"""
return self._set(ropeFreqBase=ropeFreqBase)
[docs] def setRopeFreqScale(self, ropeFreqScale: float):
"""Set the RoPE frequency scaling factor, expands context by a factor of 1/N"""
return self._set(ropeFreqScale=ropeFreqScale)
[docs] def setYarnExtFactor(self, yarnExtFactor: float):
"""Set the YaRN extrapolation mix factor"""
return self._set(yarnExtFactor=yarnExtFactor)
[docs] def setYarnAttnFactor(self, yarnAttnFactor: float):
"""Set the YaRN scale sqrt(t) or attention magnitude"""
return self._set(yarnAttnFactor=yarnAttnFactor)
[docs] def setYarnBetaFast(self, yarnBetaFast: float):
"""Set the YaRN low correction dim or beta"""
return self._set(yarnBetaFast=yarnBetaFast)
[docs] def setYarnBetaSlow(self, yarnBetaSlow: float):
"""Set the YaRN high correction dim or alpha"""
return self._set(yarnBetaSlow=yarnBetaSlow)
[docs] def setYarnOrigCtx(self, yarnOrigCtx: int):
"""Set the YaRN original context size of model"""
return self._set(yarnOrigCtx=yarnOrigCtx)
[docs] def setDefragmentationThreshold(self, defragmentationThreshold: float):
"""Set the KV cache defragmentation threshold"""
return self._set(defragmentationThreshold=defragmentationThreshold)
[docs] def setNumaStrategy(self, numaStrategy: str):
"""Set optimization strategies that help on some NUMA systems (if available)"""
numaUpper = numaStrategy.upper()
numaStrategies = ["DISABLED", "DISTRIBUTE", "ISOLATE", "NUMA_CTL", "MIRROR"]
if numaUpper not in numaStrategies:
raise ValueError(
f"Invalid NUMA strategy: {numaUpper}. "
+ f"Valid values are: {numaStrategies}"
)
return self._set(numaStrategy=numaStrategy)
[docs] def setRopeScalingType(self, ropeScalingType: str):
"""Set the RoPE frequency scaling method, defaults to linear unless specified by the model"""
return self._set(ropeScalingType=ropeScalingType)
[docs] def setPoolingType(self, poolingType: str):
"""Set the pooling type for embeddings, use model default if unspecified"""
poolingTypeUpper = poolingType.upper()
poolingTypes = ["NONE", "MEAN", "CLS", "LAST"]
if poolingTypeUpper not in poolingTypes:
raise ValueError(
f"Invalid pooling type: {poolingType}. "
+ f"Valid values are: {poolingTypes}"
)
return self._set(poolingType=poolingType)
[docs] def setFlashAttention(self, flashAttention: bool):
"""Whether to enable Flash Attention"""
return self._set(flashAttention=flashAttention)
[docs] def setUseMmap(self, useMmap: bool):
"""Whether to use memory-map model (faster load but may increase pageouts if not using mlock)"""
return self._set(useMmap=useMmap)
[docs] def setUseMlock(self, useMlock: bool):
"""Whether to force the system to keep model in RAM rather than swapping or compressing"""
return self._set(useMlock=useMlock)
[docs] def setNoKvOffload(self, noKvOffload: bool):
"""Whether to disable KV offload"""
return self._set(noKvOffload=noKvOffload)
@keyword_only
def __init__(
self,
classname="com.johnsnowlabs.nlp.embeddings.AutoGGUFEmbeddings",
java_model=None,
):
super(AutoGGUFEmbeddings, self).__init__(
classname=classname, java_model=java_model
)
self._setDefault(
embedding=True,
nCtx=4096,
nBatch=512,
poolingType="MEAN",
)
@staticmethod
[docs] def loadSavedModel(folder, spark_session):
"""Loads a locally saved model.
Parameters
----------
folder : str
Folder of the saved model
spark_session : pyspark.sql.SparkSession
The current SparkSession
Returns
-------
AutoGGUFEmbeddings
The restored model
"""
from sparknlp.internal import _AutoGGUFEmbeddingsLoader
jModel = _AutoGGUFEmbeddingsLoader(folder, spark_session._jsparkSession)._java_obj
return AutoGGUFEmbeddings(java_model=jModel)
@staticmethod
[docs] def pretrained(name="nomic-embed-text-v1.5.Q8_0.gguf", lang="en", remote_loc=None):
"""Downloads and loads a pretrained model.
Parameters
----------
name : str, optional
Name of the pretrained model, by default "nomic-embed-text-v1.5.Q8_0.gguf"
lang : str, optional
Language of the pretrained model, by default "en"
remote_loc : str, optional
Optional remote address of the resource, by default None. Will use
Spark NLPs repositories otherwise.
Returns
-------
AutoGGUFEmbeddings
The restored model
"""
from sparknlp.pretrained import ResourceDownloader
return ResourceDownloader.downloadModel(
AutoGGUFEmbeddings, name, lang, remote_loc
)