Source code for sparknlp

#  Copyright 2017-2022 John Snow Labs
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

import sys
import subprocess
import threading
from pyspark.sql import SparkSession
from sparknlp import annotator
# Must be declared here one by one or else PretrainedPipeline will fail with AttributeError
from sparknlp.base import DocumentAssembler, MultiDocumentAssembler, Finisher, EmbeddingsFinisher, TokenAssembler, \
    Doc2Chunk, AudioAssembler, GraphFinisher, ImageAssembler, TableAssembler
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.java_gateway import launch_gateway

sys.modules['com.johnsnowlabs.nlp.annotators'] = annotator
sys.modules['com.johnsnsowlabs.nlp.annotators.tokenizer'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.tokenizer.wordpiece'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.ner'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.ner.regex'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.ner.crf'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.ner.dl'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.pos'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.pos.perceptron'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.sbd'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.sbd.pragmatic'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.sbd.deep'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.sda'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.sda.pragmatic'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.sda.vivekn'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.spell'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.spell.norvig'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.spell.symmetric'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.parser'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.parser.dep'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.parser.typdep'] = annotator
sys.modules['com.johnsnowlabs.nlp.embeddings'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.classifier'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.classifier.dl'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.spell.context'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.ld'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.ld.dl'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.sentence_detector_dl'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.seq2seq'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.ws'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.er'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.coref'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.cv'] = annotator
sys.modules['com.johnsnowlabs.nlp.annotators.audio'] = annotator
sys.modules['com.johnsnowlabs.ml.ai'] = annotator

annotators = annotator
embeddings = annotator


[docs]def start(gpu=False, apple_silicon=False, aarch64=False, memory="16G", cache_folder="", log_folder="", cluster_tmp_dir="", params=None, real_time_output=False, output_level=1): """Starts a PySpark instance with default parameters for Spark NLP. The default parameters would result in the equivalent of: .. code-block:: python SparkSession.builder \\ .appName("Spark NLP") \\ .master("local[*]") \\ .config("spark.driver.memory", "16G") \\ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \\ .config("spark.kryoserializer.buffer.max", "2000M") \\ .config("spark.driver.maxResultSize", "0") \\ .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:|release|") \\ .getOrCreate() Parameters ---------- gpu : bool, optional Whether to enable GPU acceleration (must be set up correctly), by default False apple_silicon : bool, optional Whether to enable Apple Silicon support for macOS aarch64 : bool, optional Whether to enable Linux Aarch64 support memory : str, optional How much memory to allocate for the Spark driver, by default "16G" cache_folder : str, optional The location to download and extract pretrained Models and Pipelines. If not set, it will be in the users home directory under `cache_pretrained`. log_folder : str, optional The location to use on a cluster for temporarily files such as unpacking indexes for WordEmbeddings. By default, this locations is the location of `hadoop.tmp.dir` set via Hadoop configuration for Apache Spark. NOTE: `S3` is not supported and it must be local, HDFS, or DBFS. params : dict, optional Custom parameters to set for the Spark configuration, by default None. cluster_tmp_dir : str, optional The location to save logs from annotators during training. If not set, it will be in the users home directory under `annotator_logs`. real_time_output : bool, optional Whether to read and print JVM output in real time, by default False output_level : int, optional Output level for logs, by default 1 Notes ----- Since Spark version 3.2, Python 3.6 is deprecated. If you are using this python version, consider sticking to lower versions of Spark. Returns ------- :class:`SparkSession` The initiated Spark session. """ current_version = "5.5.1" if params is None: params = {} else: if not isinstance(params, dict): raise TypeError('params must be a dictionary like {"spark.executor.memory": "8G"}') if '_instantiatedSession' in dir(SparkSession) and SparkSession._instantiatedSession is not None: print('Warning::Spark Session already created, some configs may not take.') driver_cores = "*" for key, value in params.items(): if key == "spark.driver.cores": driver_cores = f"{value}" else: driver_cores = "*" class SparkNLPConfig: def __init__(self): self.master, self.app_name = "local[{}]".format(driver_cores), "Spark NLP" self.serializer, self.serializer_max_buffer = "org.apache.spark.serializer.KryoSerializer", "2000M" self.driver_max_result_size = "0" # Spark NLP on CPU or GPU self.maven_spark3 = "com.johnsnowlabs.nlp:spark-nlp_2.12:{}".format(current_version) self.maven_gpu_spark3 = "com.johnsnowlabs.nlp:spark-nlp-gpu_2.12:{}".format(current_version) # Spark NLP on Apple Silicon self.maven_silicon = "com.johnsnowlabs.nlp:spark-nlp-silicon_2.12:{}".format(current_version) # Spark NLP on Linux Aarch64 self.maven_aarch64 = "com.johnsnowlabs.nlp:spark-nlp-aarch64_2.12:{}".format(current_version) def start_without_realtime_output(): builder = SparkSession.builder \ .appName(spark_nlp_config.app_name) \ .master(spark_nlp_config.master) \ .config("spark.driver.memory", memory) \ .config("spark.serializer", spark_nlp_config.serializer) \ .config("spark.kryoserializer.buffer.max", spark_nlp_config.serializer_max_buffer) \ .config("spark.driver.maxResultSize", spark_nlp_config.driver_max_result_size) if apple_silicon: spark_jars_packages = spark_nlp_config.maven_silicon elif aarch64: spark_jars_packages = spark_nlp_config.maven_aarch64 elif gpu: spark_jars_packages = spark_nlp_config.maven_gpu_spark3 else: spark_jars_packages = spark_nlp_config.maven_spark3 if cache_folder != '': builder.config("spark.jsl.settings.pretrained.cache_folder", cache_folder) if log_folder != '': builder.config("spark.jsl.settings.annotator.log_folder", log_folder) if cluster_tmp_dir != '': builder.config("spark.jsl.settings.storage.cluster_tmp_dir", cluster_tmp_dir) if params.get("spark.jars.packages") is None: builder.config("spark.jars.packages", spark_jars_packages) for key, value in params.items(): if key == "spark.jars.packages": packages = spark_jars_packages + "," + value builder.config(key, packages) else: builder.config(key, value) return builder.getOrCreate() def start_with_realtime_output(): class SparkWithCustomGateway: def __init__(self): spark_conf = SparkConf() spark_conf.setAppName(spark_nlp_config.app_name) spark_conf.setMaster(spark_nlp_config.master) spark_conf.set("spark.driver.memory", memory) spark_conf.set("spark.serializer", spark_nlp_config.serializer) spark_conf.set("spark.kryoserializer.buffer.max", spark_nlp_config.serializer_max_buffer) spark_conf.set("spark.driver.maxResultSize", spark_nlp_config.driver_max_result_size) if apple_silicon: spark_jars_packages = spark_nlp_config.maven_silicon elif aarch64: spark_jars_packages = spark_nlp_config.maven_aarch64 elif gpu: spark_jars_packages = spark_nlp_config.maven_gpu_spark3 else: spark_jars_packages = spark_nlp_config.maven_spark3 if cache_folder != '': spark_conf.set("spark.jsl.settings.pretrained.cache_folder", cache_folder) if log_folder != '': spark_conf.set("spark.jsl.settings.annotator.log_folder", log_folder) if cluster_tmp_dir != '': spark_conf.set("spark.jsl.settings.storage.cluster_tmp_dir", cluster_tmp_dir) if params.get("spark.jars.packages") is None: spark_conf.set("spark.jars.packages", spark_jars_packages) for key, value in params.items(): if key == "spark.jars.packages": packages = spark_jars_packages + "," + value spark_conf.set(key, packages) else: spark_conf.set(key, value) # Make the py4j JVM stdout and stderr available without buffering popen_kwargs = { 'stdout': subprocess.PIPE, 'stderr': subprocess.PIPE, 'bufsize': 0 } # Launch the gateway with our custom settings self.gateway = launch_gateway(conf=spark_conf, popen_kwargs=popen_kwargs) self.process = self.gateway.proc # Use the gateway we launched spark_context = SparkContext(gateway=self.gateway) self.spark_session = SparkSession(spark_context) self.out_thread = threading.Thread(target=self.output_reader) self.error_thread = threading.Thread(target=self.error_reader) self.std_background_listeners() def std_background_listeners(self): self.out_thread.start() self.error_thread.start() def output_reader(self): for line in iter(self.process.stdout.readline, b''): print('{0}'.format(line.decode('utf-8')), end='') def error_reader(self): RED = '\033[91m' RESET = '\033[0m' for line in iter(self.process.stderr.readline, b''): if output_level == 0: print(RED + '{0}'.format(line.decode('utf-8')) + RESET, end='') else: # output just info pass def shutdown(self): self.spark_session.stop() self.gateway.shutdown() self.process.communicate() self.out_thread.join() self.error_thread.join() return SparkWithCustomGateway() spark_nlp_config = SparkNLPConfig() if real_time_output: # Available from Spark 3.0.x class SparkRealTimeOutput: def __init__(self): self.__spark_with_custom_gateway = start_with_realtime_output() self.spark_session = self.__spark_with_custom_gateway.spark_session def shutdown(self): self.__spark_with_custom_gateway.shutdown() return SparkRealTimeOutput().spark_session else: spark_session = start_without_realtime_output() return spark_session
[docs]def version(): """Returns the current Spark NLP version. Returns ------- str The current Spark NLP version. """ return '5.5.1'