Source code for sparknlp.functions

#  Copyright 2017-2022 John Snow Labs
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

"""Contains helper functions to assist in transforming Annotation results.
"""

from pyspark.sql.functions import udf, array
from pyspark.sql.types import *
from pyspark.sql import DataFrame
from sparknlp.annotation import Annotation


[docs]def map_annotations(f, output_type: DataType): """Creates a Spark UDF to map over an Annotator's results. Parameters ---------- f : function The function to be applied over the results output_type : :class:`pyspark.sql.types.DataType` Output type of the data Returns ------- :func:`pyspark.sql.functions.udf` Spark UserDefinedFunction (udf) Examples -------- >>> from sparknlp.pretrained import PretrainedPipeline >>> explain_document_pipeline = PretrainedPipeline("explain_document_dl") >>> data = spark.createDataFrame([["U.N. official Ekeus heads for Baghdad."]]).toDF("text") >>> result = explain_document_pipeline.transform(data) The array type must be provided in order to tell Spark the expected output type of our column. We are using an Annotation array here. >>> from sparknlp.functions import * >>> def nnp_tokens(annotations: List[Row]): ... return list( ... filter(lambda annotation: annotation.result == 'NNP', annotations) ... ) >>> result.select( ... map_annotations(nnp_tokens, Annotation.arrayType())('pos').alias("nnp") ... ).selectExpr("explode(nnp) as nnp").show(truncate=False) +-----------------------------------------+ |nnp | +-----------------------------------------+ |[pos, 0, 2, NNP, [word -> U.N], []] | |[pos, 14, 18, NNP, [word -> Epeus], []] | |[pos, 30, 36, NNP, [word -> Baghdad], []]| +-----------------------------------------+ """ return udf( lambda content: [ Annotation.toRow(a) for a in f([Annotation.fromRow(r) for r in content])], output_type
)
[docs]def map_annotations_array(f, output_type: DataType): """Creates a Spark UDF to map over an Annotator's array results. Parameters ---------- f : function The function to be applied over the results output_type : :class:`pyspark.sql.types.DataType` Output type of the data Returns ------- :func:`pyspark.sql.functions.udf` Spark UserDefinedFunction (udf) """ return udf( lambda cols: [Annotation.toRow(item) for item in f([Annotation.fromRow(r) for col in cols for r in col])], output_type
)
[docs]def map_annotations_strict(f): """Creates a Spark UDF to map over an Annotator's results, for which the return type is explicitly defined as a `Annotation.dataType()`. Parameters ---------- f : function The function to be applied over the results Returns ------- :func:`pyspark.sql.functions.udf` Spark UserDefinedFunction (udf) Examples -------- >>> from sparknlp.pretrained import PretrainedPipeline >>> explain_document_pipeline = PretrainedPipeline("explain_document_dl") >>> data = spark.createDataFrame([["U.N. official Ekeus heads for Baghdad."]]).toDF("text") >>> result = explain_document_pipeline.transform(data) >>> def nnp_tokens(annotations): ... return list( ... filter(lambda annotation: annotation.result == 'NNP', annotations) ... ) >>> result.select( ... map_annotations_strict(nnp_tokens)('pos').alias("nnp") ... ).selectExpr("explode(nnp) as nnp").show(truncate=False) +-----------------------------------------+ |nnp | +-----------------------------------------+ |[pos, 0, 2, NNP, [word -> U.N], []] | |[pos, 14, 18, NNP, [word -> Epeus], []] | |[pos, 30, 36, NNP, [word -> Baghdad], []]| +-----------------------------------------+ """ return udf( lambda content: [ Annotation.toRow(a) for a in f([Annotation.fromRow(r) for r in content])], ArrayType(Annotation.dataType())
)
[docs]def map_annotations_col(dataframe: DataFrame, f, column: str, output_column: str, annotatyon_type: str, output_type: DataType = Annotation.arrayType()): """Creates a Spark UDF to map over a column of Annotation results. Parameters ---------- dataframe : DataFrame Input DataFrame f : function Function to apply to the column column : str Name of the input column output_column : str Name of the output column annotatyon_type : str Annotator type output_type : DataType, optional Output type, by default Annotation.arrayType() Returns ------- :class:`pyspark.sql.DataFrame` Transformed DataFrame Examples -------- >>> from sparknlp.pretrained import PretrainedPipeline >>> from sparknlp.functions import * >>> explain_document_pipeline = PretrainedPipeline("explain_document_dl") >>> data = spark.createDataFrame([["U.N. official Ekeus heads for Baghdad."]]).toDF("text") >>> result = explain_document_pipeline.transform(data) >>> chunks_df = map_annotations_col( ... result, ... lambda x: [ ... Annotation("chunk", a.begin, a.end, a.result, a.metadata, a.embeddings) ... for a in x ... ], ... "pos", ... "pos_chunk", ... "chunk", ... ) >>> chunks_df.selectExpr("explode(pos_chunk)").show() +--------------------+ | col| +--------------------+ |[chunk, 0, 2, NNP...| |[chunk, 3, 3, ., ...| |[chunk, 5, 12, JJ...| |[chunk, 14, 18, N...| |[chunk, 20, 24, V...| |[chunk, 26, 28, I...| |[chunk, 30, 36, N...| |[chunk, 37, 37, ....| +--------------------+ """ return dataframe.withColumn(output_column, map_annotations(f, output_type)(column).alias(output_column, metadata={ 'annotatorType': annotatyon_type}))
[docs]def map_annotations_cols(dataframe: DataFrame, f, columns: list, output_column: str, annotatyon_type: str, output_type: DataType = Annotation.arrayType()): """Creates a Spark UDF to map over multiple columns of Annotation results. Parameters ---------- dataframe : DataFrame Input DataFrame f : function Function to apply to the column columns : list Name of the input column output_column : str Name of the output column annotatyon_type : str Annotator type output_type : DataType, optional Output type, by default Annotation.arrayType() Returns ------- :class:`pyspark.sql.DataFrame` Transformed DataFrame Examples -------- >>> from sparknlp.pretrained import PretrainedPipeline >>> from sparknlp.functions import * >>> explain_document_pipeline = PretrainedPipeline("explain_document_dl") >>> data = spark.createDataFrame([["U.N. official Ekeus heads for Baghdad."]]).toDF("text") >>> result = explain_document_pipeline.transform(data) >>> chunks_df = map_annotations_cols( ... result, ... lambda x: [ ... Annotation("tag", a.begin, a.end, a.result, a.metadata, a.embeddings) ... for a in x ... ], ... ["pos", "ner"], ... "tags", ... "chunk" ... ) >>> chunks_df.selectExpr("explode(tags)").show(truncate=False) +-------------------------------------------+ |col | +-------------------------------------------+ |[tag, 0, 2, NNP, [word -> U.N], []] | |[tag, 3, 3, ., [word -> .], []] | |[tag, 5, 12, JJ, [word -> official], []] | |[tag, 14, 18, NNP, [word -> Epeus], []] | |[tag, 20, 24, VBZ, [word -> heads], []] | |[tag, 26, 28, IN, [word -> for], []] | |[tag, 30, 36, NNP, [word -> Baghdad], []] | |[tag, 37, 37, ., [word -> .], []] | |[tag, 0, 2, B-ORG, [word -> U.N], []] | |[tag, 3, 3, O, [word -> .], []] | |[tag, 5, 12, O, [word -> official], []] | |[tag, 14, 18, B-PER, [word -> Ekeus], []] | |[tag, 20, 24, O, [word -> heads], []] | |[tag, 26, 28, O, [word -> for], []] | |[tag, 30, 36, B-LOC, [word -> Baghdad], []]| |[tag, 37, 37, O, [word -> .], []] | +-------------------------------------------+ """ return dataframe.withColumn(output_column, map_annotations_array(f, output_type)(array(*columns)).alias(output_column, metadata={ 'annotatorType': annotatyon_type}))
[docs]def filter_by_annotations_col(dataframe, f, column): """Applies a filter over a column of Annotations. Parameters ---------- dataframe : DataFrame Input DataFrame f : function Filter function column : str Name of the column Returns ------- :class:`pyspark.sql.DataFrame` Filtered DataFrame Examples -------- >>> from sparknlp.pretrained import PretrainedPipeline >>> from sparknlp.functions import * >>> explain_document_pipeline = PretrainedPipeline("explain_document_dl") >>> data = spark.createDataFrame([["U.N. official Ekeus heads for Baghdad."]]).toDF("text") >>> result = explain_document_pipeline.transform(data) >>> def filter_pos(annotation: Annotation): ... return annotation.result == "NNP" >>> filter_by_annotations_col( ... explode_annotations_col(result, "pos", "pos"), filter_pos, "pos" ... ).select("pos").show(truncate=False) +-----------------------------------------+ |pos | +-----------------------------------------+ |[pos, 0, 2, NNP, [word -> U.N], []] | |[pos, 14, 18, NNP, [word -> Epeus], []] | |[pos, 30, 36, NNP, [word -> Baghdad], []]| +-----------------------------------------+ """ this_udf = udf( lambda content: f(content), BooleanType() ) return dataframe.filter(this_udf(column))
[docs]def explode_annotations_col(dataframe: DataFrame, column, output_column): """Explodes an Annotation column, putting each result onto a separate row. Parameters ---------- dataframe : DataFrame The Spark DataFrame containing output Annotations column : str Name of the column output_column : str Name of the output column Returns ------- :class:`pyspark.sql.DataFrame` Transformed DataFrame Examples -------- >>> from sparknlp.pretrained import PretrainedPipeline >>> from sparknlp.functions import * >>> explain_document_pipeline = PretrainedPipeline("explain_document_dl") >>> data = spark.createDataFrame([["U.N. official Ekeus heads for Baghdad."]]).toDF("text") >>> result = explain_document_pipeline.transform(data) >>> result.select("pos.result").show(truncate=False) +----------------------------------+ |result | +----------------------------------+ |[NNP, ., JJ, NNP, VBZ, IN, NNP, .]| +----------------------------------+ >>> explode_annotations_col(result, "pos", "pos").select("pos.result").show() +------+ |result| +------+ | NNP| | .| | JJ| | NNP| | VBZ| | IN| | NNP| | .| +------+ """ from pyspark.sql.functions import explode return dataframe.withColumn(output_column, explode(column))