sparknlp.functions
#
Contains helper functions to assist in transforming Annotation results.
Module Contents#
Functions#
|
Creates a Spark UDF to map over an Annotator's results. |
|
Creates a Spark UDF to map over an Annotator's array results. |
Creates a Spark UDF to map over an Annotator's results, for which the |
|
|
Creates a Spark UDF to map over a column of Annotation results. |
|
Creates a Spark UDF to map over multiple columns of Annotation results. |
|
Applies a filter over a column of Annotations. |
|
Explodes an Annotation column, putting each result onto a separate row. |
- map_annotations(f, output_type: pyspark.sql.types.DataType)[source]#
Creates a Spark UDF to map over an Annotator’s results.
- Parameters:
- ffunction
The function to be applied over the results
- output_type
pyspark.sql.types.DataType
Output type of the data
- Returns:
pyspark.sql.functions.udf()
Spark UserDefinedFunction (udf)
Examples
>>> from sparknlp.pretrained import PretrainedPipeline >>> explain_document_pipeline = PretrainedPipeline("explain_document_dl") >>> data = spark.createDataFrame([["U.N. official Ekeus heads for Baghdad."]]).toDF("text") >>> result = explain_document_pipeline.transform(data)
The array type must be provided in order to tell Spark the expected output type of our column. We are using an Annotation array here.
>>> from sparknlp.functions import * >>> def nnp_tokens(annotations: List[Row]): ... return list( ... filter(lambda annotation: annotation.result == 'NNP', annotations) ... ) >>> result.select( ... map_annotations(nnp_tokens, Annotation.arrayType())('pos').alias("nnp") ... ).selectExpr("explode(nnp) as nnp").show(truncate=False) +-----------------------------------------+ |nnp | +-----------------------------------------+ |[pos, 0, 2, NNP, [word -> U.N], []] | |[pos, 14, 18, NNP, [word -> Epeus], []] | |[pos, 30, 36, NNP, [word -> Baghdad], []]| +-----------------------------------------+
- map_annotations_array(f, output_type: pyspark.sql.types.DataType)[source]#
Creates a Spark UDF to map over an Annotator’s array results.
- Parameters:
- ffunction
The function to be applied over the results
- output_type
pyspark.sql.types.DataType
Output type of the data
- Returns:
pyspark.sql.functions.udf()
Spark UserDefinedFunction (udf)
- map_annotations_strict(f)[source]#
Creates a Spark UDF to map over an Annotator’s results, for which the return type is explicitly defined as a Annotation.dataType().
- Parameters:
- ffunction
The function to be applied over the results
- Returns:
pyspark.sql.functions.udf()
Spark UserDefinedFunction (udf)
Examples
>>> from sparknlp.pretrained import PretrainedPipeline >>> explain_document_pipeline = PretrainedPipeline("explain_document_dl") >>> data = spark.createDataFrame([["U.N. official Ekeus heads for Baghdad."]]).toDF("text") >>> result = explain_document_pipeline.transform(data) >>> def nnp_tokens(annotations): ... return list( ... filter(lambda annotation: annotation.result == 'NNP', annotations) ... ) >>> result.select( ... map_annotations_strict(nnp_tokens)('pos').alias("nnp") ... ).selectExpr("explode(nnp) as nnp").show(truncate=False) +-----------------------------------------+ |nnp | +-----------------------------------------+ |[pos, 0, 2, NNP, [word -> U.N], []] | |[pos, 14, 18, NNP, [word -> Epeus], []] | |[pos, 30, 36, NNP, [word -> Baghdad], []]| +-----------------------------------------+
- map_annotations_col(dataframe: pyspark.sql.DataFrame, f, column: str, output_column: str, annotatyon_type: str, output_type: pyspark.sql.types.DataType = Annotation.arrayType())[source]#
Creates a Spark UDF to map over a column of Annotation results.
- Parameters:
- dataframeDataFrame
Input DataFrame
- ffunction
Function to apply to the column
- columnstr
Name of the input column
- output_columnstr
Name of the output column
- annotatyon_typestr
Annotator type
- output_typeDataType, optional
Output type, by default Annotation.arrayType()
- Returns:
pyspark.sql.DataFrame
Transformed DataFrame
Examples
>>> from sparknlp.pretrained import PretrainedPipeline >>> from sparknlp.functions import * >>> explain_document_pipeline = PretrainedPipeline("explain_document_dl") >>> data = spark.createDataFrame([["U.N. official Ekeus heads for Baghdad."]]).toDF("text") >>> result = explain_document_pipeline.transform(data) >>> chunks_df = map_annotations_col( ... result, ... lambda x: [ ... Annotation("chunk", a.begin, a.end, a.result, a.metadata, a.embeddings) ... for a in x ... ], ... "pos", ... "pos_chunk", ... "chunk", ... ) >>> chunks_df.selectExpr("explode(pos_chunk)").show() +--------------------+ | col| +--------------------+ |[chunk, 0, 2, NNP...| |[chunk, 3, 3, ., ...| |[chunk, 5, 12, JJ...| |[chunk, 14, 18, N...| |[chunk, 20, 24, V...| |[chunk, 26, 28, I...| |[chunk, 30, 36, N...| |[chunk, 37, 37, ....| +--------------------+
- map_annotations_cols(dataframe: pyspark.sql.DataFrame, f, columns: list, output_column: str, annotatyon_type: str, output_type: pyspark.sql.types.DataType = Annotation.arrayType())[source]#
Creates a Spark UDF to map over multiple columns of Annotation results.
- Parameters:
- dataframeDataFrame
Input DataFrame
- ffunction
Function to apply to the column
- columnslist
Name of the input column
- output_columnstr
Name of the output column
- annotatyon_typestr
Annotator type
- output_typeDataType, optional
Output type, by default Annotation.arrayType()
- Returns:
pyspark.sql.DataFrame
Transformed DataFrame
Examples
>>> from sparknlp.pretrained import PretrainedPipeline >>> from sparknlp.functions import * >>> explain_document_pipeline = PretrainedPipeline("explain_document_dl") >>> data = spark.createDataFrame([["U.N. official Ekeus heads for Baghdad."]]).toDF("text") >>> result = explain_document_pipeline.transform(data) >>> chunks_df = map_annotations_cols( ... result, ... lambda x: [ ... Annotation("tag", a.begin, a.end, a.result, a.metadata, a.embeddings) ... for a in x ... ], ... ["pos", "ner"], ... "tags", ... "chunk" ... ) >>> chunks_df.selectExpr("explode(tags)").show(truncate=False) +-------------------------------------------+ |col | +-------------------------------------------+ |[tag, 0, 2, NNP, [word -> U.N], []] | |[tag, 3, 3, ., [word -> .], []] | |[tag, 5, 12, JJ, [word -> official], []] | |[tag, 14, 18, NNP, [word -> Epeus], []] | |[tag, 20, 24, VBZ, [word -> heads], []] | |[tag, 26, 28, IN, [word -> for], []] | |[tag, 30, 36, NNP, [word -> Baghdad], []] | |[tag, 37, 37, ., [word -> .], []] | |[tag, 0, 2, B-ORG, [word -> U.N], []] | |[tag, 3, 3, O, [word -> .], []] | |[tag, 5, 12, O, [word -> official], []] | |[tag, 14, 18, B-PER, [word -> Ekeus], []] | |[tag, 20, 24, O, [word -> heads], []] | |[tag, 26, 28, O, [word -> for], []] | |[tag, 30, 36, B-LOC, [word -> Baghdad], []]| |[tag, 37, 37, O, [word -> .], []] | +-------------------------------------------+
- filter_by_annotations_col(dataframe, f, column)[source]#
Applies a filter over a column of Annotations.
- Parameters:
- dataframeDataFrame
Input DataFrame
- ffunction
Filter function
- columnstr
Name of the column
- Returns:
pyspark.sql.DataFrame
Filtered DataFrame
Examples
>>> from sparknlp.pretrained import PretrainedPipeline >>> from sparknlp.functions import * >>> explain_document_pipeline = PretrainedPipeline("explain_document_dl") >>> data = spark.createDataFrame([["U.N. official Ekeus heads for Baghdad."]]).toDF("text") >>> result = explain_document_pipeline.transform(data) >>> def filter_pos(annotation: Annotation): ... return annotation.result == "NNP" >>> filter_by_annotations_col( ... explode_annotations_col(result, "pos", "pos"), filter_pos, "pos" ... ).select("pos").show(truncate=False) +-----------------------------------------+ |pos | +-----------------------------------------+ |[pos, 0, 2, NNP, [word -> U.N], []] | |[pos, 14, 18, NNP, [word -> Epeus], []] | |[pos, 30, 36, NNP, [word -> Baghdad], []]| +-----------------------------------------+
- explode_annotations_col(dataframe: pyspark.sql.DataFrame, column, output_column)[source]#
Explodes an Annotation column, putting each result onto a separate row.
- Parameters:
- dataframeDataFrame
The Spark DataFrame containing output Annotations
- columnstr
Name of the column
- output_columnstr
Name of the output column
- Returns:
pyspark.sql.DataFrame
Transformed DataFrame
Examples
>>> from sparknlp.pretrained import PretrainedPipeline >>> from sparknlp.functions import * >>> explain_document_pipeline = PretrainedPipeline("explain_document_dl") >>> data = spark.createDataFrame([["U.N. official Ekeus heads for Baghdad."]]).toDF("text") >>> result = explain_document_pipeline.transform(data) >>> result.select("pos.result").show(truncate=False) +----------------------------------+ |result | +----------------------------------+ |[NNP, ., JJ, NNP, VBZ, IN, NNP, .]| +----------------------------------+ >>> explode_annotations_col(result, "pos", "pos").select("pos.result").show() +------+ |result| +------+ | NNP| | .| | JJ| | NNP| | VBZ| | IN| | NNP| | .| +------+