sparknlp.functions#

Contains helper functions to assist in transforming Annotation results.

Module Contents#

Functions#

map_annotations(f, output_type)

Creates a Spark UDF to map over an Annotator's results.

map_annotations_array(f, output_type)

Creates a Spark UDF to map over an Annotator's array results.

map_annotations_strict(f)

Creates a Spark UDF to map over an Annotator's results, for which the

map_annotations_col(dataframe, f, column, ...[, ...])

Creates a Spark UDF to map over a column of Annotation results.

map_annotations_cols(dataframe, f, columns, ...[, ...])

Creates a Spark UDF to map over multiple columns of Annotation results.

filter_by_annotations_col(dataframe, f, column)

Applies a filter over a column of Annotations.

explode_annotations_col(dataframe, column, output_column)

Explodes an Annotation column, putting each result onto a separate row.

map_annotations(f, output_type: pyspark.sql.types.DataType)[source]#

Creates a Spark UDF to map over an Annotator’s results.

Parameters:
ffunction

The function to be applied over the results

output_typepyspark.sql.types.DataType

Output type of the data

Returns:
pyspark.sql.functions.udf()

Spark UserDefinedFunction (udf)

Examples

>>> from sparknlp.pretrained import PretrainedPipeline
>>> explain_document_pipeline = PretrainedPipeline("explain_document_dl")
>>> data = spark.createDataFrame([["U.N. official Ekeus heads for Baghdad."]]).toDF("text")
>>> result = explain_document_pipeline.transform(data)

The array type must be provided in order to tell Spark the expected output type of our column. We are using an Annotation array here.

>>> from sparknlp.functions import *
>>> def nnp_tokens(annotations: List[Row]):
...     return list(
...         filter(lambda annotation: annotation.result == 'NNP', annotations)
...     )
>>> result.select(
...     map_annotations(nnp_tokens, Annotation.arrayType())('pos').alias("nnp")
... ).selectExpr("explode(nnp) as nnp").show(truncate=False)
+-----------------------------------------+
|nnp                                      |
+-----------------------------------------+
|[pos, 0, 2, NNP, [word -> U.N], []]      |
|[pos, 14, 18, NNP, [word -> Epeus], []]  |
|[pos, 30, 36, NNP, [word -> Baghdad], []]|
+-----------------------------------------+
map_annotations_array(f, output_type: pyspark.sql.types.DataType)[source]#

Creates a Spark UDF to map over an Annotator’s array results.

Parameters:
ffunction

The function to be applied over the results

output_typepyspark.sql.types.DataType

Output type of the data

Returns:
pyspark.sql.functions.udf()

Spark UserDefinedFunction (udf)

map_annotations_strict(f)[source]#

Creates a Spark UDF to map over an Annotator’s results, for which the return type is explicitly defined as a Annotation.dataType().

Parameters:
ffunction

The function to be applied over the results

Returns:
pyspark.sql.functions.udf()

Spark UserDefinedFunction (udf)

Examples

>>> from sparknlp.pretrained import PretrainedPipeline
>>> explain_document_pipeline = PretrainedPipeline("explain_document_dl")
>>> data = spark.createDataFrame([["U.N. official Ekeus heads for Baghdad."]]).toDF("text")
>>> result = explain_document_pipeline.transform(data)
>>> def nnp_tokens(annotations):
...     return list(
...         filter(lambda annotation: annotation.result == 'NNP', annotations)
...     )
>>> result.select(
...     map_annotations_strict(nnp_tokens)('pos').alias("nnp")
... ).selectExpr("explode(nnp) as nnp").show(truncate=False)
+-----------------------------------------+
|nnp                                      |
+-----------------------------------------+
|[pos, 0, 2, NNP, [word -> U.N], []]      |
|[pos, 14, 18, NNP, [word -> Epeus], []]  |
|[pos, 30, 36, NNP, [word -> Baghdad], []]|
+-----------------------------------------+
map_annotations_col(dataframe: pyspark.sql.DataFrame, f, column: str, output_column: str, annotatyon_type: str, output_type: pyspark.sql.types.DataType = Annotation.arrayType())[source]#

Creates a Spark UDF to map over a column of Annotation results.

Parameters:
dataframeDataFrame

Input DataFrame

ffunction

Function to apply to the column

columnstr

Name of the input column

output_columnstr

Name of the output column

annotatyon_typestr

Annotator type

output_typeDataType, optional

Output type, by default Annotation.arrayType()

Returns:
pyspark.sql.DataFrame

Transformed DataFrame

Examples

>>> from sparknlp.pretrained import PretrainedPipeline
>>> from sparknlp.functions import *
>>> explain_document_pipeline = PretrainedPipeline("explain_document_dl")
>>> data = spark.createDataFrame([["U.N. official Ekeus heads for Baghdad."]]).toDF("text")
>>> result = explain_document_pipeline.transform(data)
>>> chunks_df = map_annotations_col(
...     result,
...     lambda x: [
...         Annotation("chunk", a.begin, a.end, a.result, a.metadata, a.embeddings)
...         for a in x
...     ],
...     "pos",
...     "pos_chunk",
...     "chunk",
... )
>>> chunks_df.selectExpr("explode(pos_chunk)").show()
+--------------------+
|                 col|
+--------------------+
|[chunk, 0, 2, NNP...|
|[chunk, 3, 3, ., ...|
|[chunk, 5, 12, JJ...|
|[chunk, 14, 18, N...|
|[chunk, 20, 24, V...|
|[chunk, 26, 28, I...|
|[chunk, 30, 36, N...|
|[chunk, 37, 37, ....|
+--------------------+
map_annotations_cols(dataframe: pyspark.sql.DataFrame, f, columns: list, output_column: str, annotatyon_type: str, output_type: pyspark.sql.types.DataType = Annotation.arrayType())[source]#

Creates a Spark UDF to map over multiple columns of Annotation results.

Parameters:
dataframeDataFrame

Input DataFrame

ffunction

Function to apply to the column

columnslist

Name of the input column

output_columnstr

Name of the output column

annotatyon_typestr

Annotator type

output_typeDataType, optional

Output type, by default Annotation.arrayType()

Returns:
pyspark.sql.DataFrame

Transformed DataFrame

Examples

>>> from sparknlp.pretrained import PretrainedPipeline
>>> from sparknlp.functions import *
>>> explain_document_pipeline = PretrainedPipeline("explain_document_dl")
>>> data = spark.createDataFrame([["U.N. official Ekeus heads for Baghdad."]]).toDF("text")
>>> result = explain_document_pipeline.transform(data)
>>> chunks_df = map_annotations_cols(
...     result,
...     lambda x: [
...         Annotation("tag", a.begin, a.end, a.result, a.metadata, a.embeddings)
...         for a in x
...     ],
...     ["pos", "ner"],
...     "tags",
...     "chunk"
... )
>>> chunks_df.selectExpr("explode(tags)").show(truncate=False)
+-------------------------------------------+
|col                                        |
+-------------------------------------------+
|[tag, 0, 2, NNP, [word -> U.N], []]        |
|[tag, 3, 3, ., [word -> .], []]            |
|[tag, 5, 12, JJ, [word -> official], []]   |
|[tag, 14, 18, NNP, [word -> Epeus], []]    |
|[tag, 20, 24, VBZ, [word -> heads], []]    |
|[tag, 26, 28, IN, [word -> for], []]       |
|[tag, 30, 36, NNP, [word -> Baghdad], []]  |
|[tag, 37, 37, ., [word -> .], []]          |
|[tag, 0, 2, B-ORG, [word -> U.N], []]      |
|[tag, 3, 3, O, [word -> .], []]            |
|[tag, 5, 12, O, [word -> official], []]    |
|[tag, 14, 18, B-PER, [word -> Ekeus], []]  |
|[tag, 20, 24, O, [word -> heads], []]      |
|[tag, 26, 28, O, [word -> for], []]        |
|[tag, 30, 36, B-LOC, [word -> Baghdad], []]|
|[tag, 37, 37, O, [word -> .], []]          |
+-------------------------------------------+
filter_by_annotations_col(dataframe, f, column)[source]#

Applies a filter over a column of Annotations.

Parameters:
dataframeDataFrame

Input DataFrame

ffunction

Filter function

columnstr

Name of the column

Returns:
pyspark.sql.DataFrame

Filtered DataFrame

Examples

>>> from sparknlp.pretrained import PretrainedPipeline
>>> from sparknlp.functions import *
>>> explain_document_pipeline = PretrainedPipeline("explain_document_dl")
>>> data = spark.createDataFrame([["U.N. official Ekeus heads for Baghdad."]]).toDF("text")
>>> result = explain_document_pipeline.transform(data)
>>> def filter_pos(annotation: Annotation):
...     return annotation.result == "NNP"
>>> filter_by_annotations_col(
...     explode_annotations_col(result, "pos", "pos"), filter_pos, "pos"
... ).select("pos").show(truncate=False)
+-----------------------------------------+
|pos                                      |
+-----------------------------------------+
|[pos, 0, 2, NNP, [word -> U.N], []]      |
|[pos, 14, 18, NNP, [word -> Epeus], []]  |
|[pos, 30, 36, NNP, [word -> Baghdad], []]|
+-----------------------------------------+
explode_annotations_col(dataframe: pyspark.sql.DataFrame, column, output_column)[source]#

Explodes an Annotation column, putting each result onto a separate row.

Parameters:
dataframeDataFrame

The Spark DataFrame containing output Annotations

columnstr

Name of the column

output_columnstr

Name of the output column

Returns:
pyspark.sql.DataFrame

Transformed DataFrame

Examples

>>> from sparknlp.pretrained import PretrainedPipeline
>>> from sparknlp.functions import *
>>> explain_document_pipeline = PretrainedPipeline("explain_document_dl")
>>> data = spark.createDataFrame([["U.N. official Ekeus heads for Baghdad."]]).toDF("text")
>>> result = explain_document_pipeline.transform(data)
>>> result.select("pos.result").show(truncate=False)
+----------------------------------+
|result                            |
+----------------------------------+
|[NNP, ., JJ, NNP, VBZ, IN, NNP, .]|
+----------------------------------+
>>> explode_annotations_col(result, "pos", "pos").select("pos.result").show()
+------+
|result|
+------+
|   NNP|
|     .|
|    JJ|
|   NNP|
|   VBZ|
|    IN|
|   NNP|
|     .|
+------+