Source code for sparknlp.logging.comet

#  Copyright 2017-2022 John Snow Labs
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

"""Package that contains classes for integration with Comet."""

try:
    import comet_ml
except AttributeError:
    # Python 3.6
    comet_ml = None
except ModuleNotFoundError:
    # Python 3.7+
    comet_ml = None

import threading
import time
import os


[docs]class CometLogger: """Logger class for Comet integration `Comet <https://www.comet.ml/>`__ is a meta machine learning platform designed to help AI practitioners and teams build reliable machine learning models for real-world applications by streamlining the machine learning model lifecycle. By leveraging Comet, users can track, compare, explain and reproduce their machine learning experiments. To log a Spark NLP annotator, it will need an "outputLogPath" parameter, as the CometLogger reads the log file generated during the training process. For more examples see the `Examples <https://github.com/JohnSnowLabs/spark-nlp/blob/master/examples/python/logging/Comet_SparkNLP_Integration.ipynb>`__. Parameters ---------- workspace : str, optional Name of the workspace in Comet, by default None project_name : str, optional Name of the project in Comet, by default None comet_mode : str, optional Mode of logging, by default None. If set to "offline" then offline mode will be used, otherwise online. experiment_id : str, optional Id of the experiment, if it is reused, by default None tags : List[str], optional List of tags for the experiment, by default None Attributes ---------- experiment : comet_ml.Experiment Object representing the Comet experiment Raises ------ ImportError If the package comet-ml is not installed Examples -------- Metrics while training an annotator can be logged with for example: >>> import sparknlp >>> from sparknlp.base import * >>> from sparknlp.annotator import * >>> from sparknlp.logging.comet import CometLogger >>> spark = sparknlp.start() To run an online experiment, the logger is defined like so. >>> OUTPUT_LOG_PATH = "./run" >>> logger = CometLogger() Then the experiment can start like so >>> document = DocumentAssembler() \\ ... .setInputCol("text")\\ ... .setOutputCol("document") >>> embds = UniversalSentenceEncoder.pretrained() \\ ... .setInputCols("document") \\ ... .setOutputCol("sentence_embeddings") >>> multiClassifier = MultiClassifierDLApproach() \\ ... .setInputCols("sentence_embeddings") \\ ... .setOutputCol("category") \\ ... .setLabelColumn("labels") \\ ... .setBatchSize(128) \\ ... .setLr(1e-3) \\ ... .setThreshold(0.5) \\ ... .setShufflePerEpoch(False) \\ ... .setEnableOutputLogs(True) \\ ... .setOutputLogsPath(OUTPUT_LOG_PATH) \\ ... .setMaxEpochs(1) >>> logger.monitor(logdir=OUTPUT_LOG_PATH, model=multiClassifier) >>> trainDataset = spark.createDataFrame( ... [("Nice.", ["positive"]), ("That's bad.", ["negative"])], ... schema=["text", "labels"], ... ) >>> pipeline = Pipeline(stages=[document, embds, multiClassifier]) >>> pipeline.fit(trainDataset) >>> logger.end() If you are using a jupyter notebook, it is possible to display the live web interface with >>> logger.experiment.display(tab='charts') """ def __init__( self, workspace=None, project_name=None, comet_mode=None, experiment_id=None, tags=None, **experiment_kwargs, ): if comet_ml is None: raise ImportError( "`comet_ml` is not installed. Please install it with `pip install comet-ml`." ) self.comet_mode = comet_mode self.workspace = workspace self.project_name = project_name self.experiment_id = experiment_id self.experiment_kwargs = experiment_kwargs self.experiment = self._get_experiment( self.comet_mode, self.workspace, self.project_name, self.experiment_id, **self.experiment_kwargs, ) self.experiment.log_other("Created from", "SparkNLP") if tags is not None: self.experiment.add_tags(tags) self._watch_file = False self._monitor_thread_timeout = 5 self.thread = None def _get_experiment( self, mode, workspace=None, project_name=None, experiment_id=None, **experiment_kwargs, ): if mode == "offline": if experiment_id is not None: return comet_ml.ExistingOfflineExperiment( previous_experiment=experiment_id, workspace=workspace, project_name=project_name, **experiment_kwargs, ) return comet_ml.OfflineExperiment( workspace=workspace, project_name=project_name, **experiment_kwargs, ) else: if experiment_id is not None: return comet_ml.ExistingExperiment( previous_experiment=experiment_id, workspace=workspace, project_name=project_name, **experiment_kwargs, ) return comet_ml.Experiment( workspace=workspace, project_name=project_name, **experiment_kwargs, )
[docs] def log_pipeline_parameters(self, pipeline, stages=None): """Iterates over the different stages in a pyspark PipelineModel object and logs the parameters to Comet. Parameters ---------- pipeline : pyspark.ml.PipelineModel PipelineModel object stages : List[str], optional Names of the stages of the pipeline to include, by default None (logs all) Examples -------- The pipeline model contains the annotators of Spark NLP, that were fitted to a dataframe. >>> logger.log_pipeline_parameters(pipeline_model) """ self.experiment.log_other("pipeline_uid", pipeline.uid) if stages is None: stages = [s.name for s in pipeline.stages] for stage in pipeline.stages: if stage.name not in stages: continue params = stage.extractParamMap() for param, param_value in params.items(): self.experiment.log_parameter(f"{stage.name}-{param.name}", param_value)
[docs] def log_visualization(self, html, name="viz.html"): """Uploads a NER visualization from Spark NLP Display to comet. Parameters ---------- html : str HTML of the spark NLP Display visualization name : str, optional Name for the visualization in comet, by default "viz.html" Examples -------- This example has NER chunks (NER extracted by e.g. :class:`.NerDLModel` and converted by a :class:`.NerConverter`) extracted in the colum "ner_chunk". >>> from sparknlp_display import NerVisualizer >>> logger = CometLogger() >>> for idx, result in enumerate(results.collect()): ... viz = NerVisualizer().display( ... result=result, ... label_col='ner_chunk', ... document_col='document', ... return_html=True ... ) ... logger.log_visualization(viz, name=f'viz-{idx}.html') """ self.log_asset_data(html, name)
[docs] def log_metrics(self, metrics, step=None, epoch=None, prefix=None): """Submits logs of an evaluation metrics. Parameters ---------- metrics : dict Dictionary with key value pairs corresponding to the measured metric and its value step : int, optional Used to associate a specific step, by default None epoch : int, optional Used to associate a specific epoch, by default None prefix : str, optional Name prefix for this metric, by default None. This can be used to identify for example different features by name. Examples -------- In this example, sklearn is used to retrieve the metrics. >>> from sklearn.preprocessing import MultiLabelBinarizer >>> from sklearn.metrics import classification_report >>> prediction = model.transform(testDataset) >>> preds_df = prediction.select('labels', 'category.result').toPandas() >>> mlb = MultiLabelBinarizer() >>> y_true = mlb.fit_transform(preds_df['labels']) >>> y_pred = mlb.fit_transform(preds_df['result']) >>> report = classification_report(y_true, y_pred, output_dict=True) Iterate over the report and log the metrics: >>> for key, value in report.items(): ... logger.log_metrics(value, prefix=key) >>> logger.end() If you are using Spark NLP in a notebook, then you can display the metrics directly with >>> logger.experiment.display(tab='metrics') """ self.experiment.log_metrics(metrics, step=step, epoch=epoch, prefix=prefix)
[docs] def log_parameters(self, parameters, step=None): """Logs a dictionary (or dictionary-like object) of multiple parameters. Parameters ---------- parameters : dict Parameters in a key : value form step : int, optional Used to associate a specific step, by default None, by default None """ self.experiment.log_parameters(parameters, step=step)
[docs] def log_completed_run(self, log_file_path): """Submit logs of training metrics after a run has completed. Parameters ---------- log_file_path : str Path to log file containing training metrics """ with open(log_file_path, "r") as f: stats = f.read().splitlines() self._parse_log_entry(stats) self.experiment.log_other("log_file_path", log_file_path)
[docs] def log_asset(self, asset_path, metadata=None, step=None): """Uploads an asset to comet. Parameters ---------- asset_path : str Path to the asset metadata : str, optional Some additional data to attach to the the audio asset. Must be a JSON-encodable dict, by default None step : int, optional Used to associate a specific step, by default None, by default None """ self.experiment.log_asset(asset_path, metadata=metadata, step=step)
[docs] def log_asset_data(self, asset, name, overwrite=False, metadata=None, step=None): """Uploads the data given to comet (str, binary, or JSON). Parameters ---------- asset : str or bytes or dict Data to be saved as asset name : str A custom file name to be displayed overwrite : bool, optional If True will overwrite all existing assets with the same name, by default False metadata : dict, optional Some additional data to attach to the the asset data. Must be a JSON-encodable dict, by default None step : int, optional Used to associate a specific step, by default None, by default None """ self.experiment.log_asset_data( asset, name, overwrite=overwrite, metadata=metadata, step=step
)
[docs] def monitor(self, logdir, model, interval=10): """Monitors the training of the model and submits logs to comet, given by an interval. To log a Spark NLP annotator, it will need an "outputLogPath" parameter, as the CometLogger reads the log file generated during the training process. If you are not able to monitor the live training, you can still log the training at the end with :meth:`.log_completed_run`. Parameters ---------- logdir : str Path to the output of the logs model : AnnotatorApproach Annotator to monitor interval : int, optional Interval for refreshing, by default 10 """ self._watch_file = True self.experiment.log_other("model_uid", model.uid) self.thread = threading.Thread( target=self._monitor_log_file, args=( os.path.join(logdir, f"{model.uid}.log"), interval, ), ) self.thread.start()
def _file_watcher(self, filename, interval): """Generator that yields lines from the model log file. Parameters ---------- filename : str Path to model log file interval : int Time (seconds) to wait in between checking for file updates Yields ------ str A single line from the file """ fp = open(filename) line = "" while self._watch_file: partial_line = fp.readline() if len(partial_line) != 0: line += partial_line if line.endswith("\n"): yield line line = "" else: time.sleep(interval) fp.close() def _monitor_log_file(self, filename, interval): # Wait for file to be created: while not os.path.exists(filename) and self._watch_file: time.sleep(interval) watcher = self._file_watcher(filename, interval) for line in watcher: lines = line.split("\n") self._parse_log_entry(lines) def _convert_log_entry_to_dict(self, log_entries): output_dict = {} for entry in log_entries: key, value = entry.strip(" ").split(":") output_dict[key] = float(value) return output_dict def _parse_run_metrics(self, parts): epoch_str, ratio = parts[0].split(" ", 1) epoch, total = ratio.split("/", 1) metrics = parts[2:] formatted_metrics = self._convert_log_entry_to_dict(metrics) return formatted_metrics, epoch def _parse_run_parameters(self, parts): parameters = parts[2:] formatted_parameters = self._convert_log_entry_to_dict(parameters) return formatted_parameters def _parse_log_entry(self, lines): for line in lines: parts = line.split("-") if line.startswith("Training started"): parameters = self._parse_run_parameters(parts) self.log_parameters(parameters) elif line.startswith("Epoch"): metrics, epoch = self._parse_run_metrics(parts) self.log_metrics(metrics, step=int(epoch), epoch=int(epoch))
[docs] def end(self): """Ends the experiment and the logger. Submits all outstanding logs to comet. """ self._watch_file = False self.experiment.end() if self.thread: self.thread.join(timeout=self._monitor_thread_timeout)