Spaces:
Runtime error
Runtime error
| # ########################################################################### | |
| # | |
| # CLOUDERA APPLIED MACHINE LEARNING PROTOTYPE (AMP) | |
| # (C) Cloudera, Inc. 2022 | |
| # All rights reserved. | |
| # | |
| # Applicable Open Source License: Apache 2.0 | |
| # | |
| # NOTE: Cloudera open source products are modular software products | |
| # made up of hundreds of individual components, each of which was | |
| # individually copyrighted. Each Cloudera open source product is a | |
| # collective work under U.S. Copyright Law. Your license to use the | |
| # collective work is as provided in your written agreement with | |
| # Cloudera. Used apart from the collective work, this file is | |
| # licensed for your use pursuant to the open source license | |
| # identified above. | |
| # | |
| # This code is provided to you pursuant a written agreement with | |
| # (i) Cloudera, Inc. or (ii) a third-party authorized to distribute | |
| # this code. If you do not have a written agreement with Cloudera nor | |
| # with an authorized and properly licensed third party, you do not | |
| # have any rights to access nor to use this code. | |
| # | |
| # Absent a written agreement with Cloudera, Inc. (βClouderaβ) to the | |
| # contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY | |
| # KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED | |
| # WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO | |
| # IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND | |
| # FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU, | |
| # AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS | |
| # ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE | |
| # OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY | |
| # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR | |
| # CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES | |
| # RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF | |
| # BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF | |
| # DATA. | |
| # | |
| # ########################################################################### | |
| from typing import List | |
| import torch | |
| import pandas as pd | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModel, | |
| AutoModelForSequenceClassification, | |
| ) | |
| class ContentPreservationScorer: | |
| """ | |
| Utility for calculating Content Preservation Score between | |
| two pieces of text (i.e. input and output of TST model). | |
| This custom evaluation metric aims to quantify content preservation by | |
| first modifying text to remove all style-related tokens leaving just | |
| content related tokens behind. Style tokens are determind on a | |
| sentence-by-sentence basis by extracting out salient token attributions | |
| from a trained Style Classifier (BERT) so contextual information is | |
| perserved in the attribution scores. Style tokens are then masked/removed | |
| from the text. We pass the style-less sentences through a pre-trained, | |
| but not fine-tuned SentenceBert model to compute sentence embeddings. | |
| Cosine similarity on the embeddings produces a score that should represent | |
| content preservation. | |
| PSUEDO-CODE: (higher score is better preservation) | |
| 1. mask out style tokens for input and output text (1str) | |
| 2. get SBERT embedddings for each (multi) | |
| 3. calculate cosine similarity (multi pairs) | |
| Attributes: | |
| cls_model_identifier (str) | |
| sbert_model_identifier (str) | |
| """ | |
| def __init__(self, cls_model_identifier: str, sbert_model_identifier: str): | |
| self.cls_model_identifier = cls_model_identifier | |
| self.sbert_model_identifier = sbert_model_identifier | |
| self.device = ( | |
| torch.cuda.current_device() if torch.cuda.is_available() else "cpu" | |
| ) | |
| self._initialize_hf_artifacts() | |
| def _initialize_hf_artifacts(self): | |
| """ | |
| Initialize a HuggingFace artifacts (tokenizer and model) according | |
| to the provided identifiers for both SBert and the classification model. | |
| Then initialize the word attribution explainer with the HF model+tokenizer. | |
| """ | |
| # sbert | |
| self.sbert_tokenizer = AutoTokenizer.from_pretrained( | |
| self.sbert_model_identifier | |
| ) | |
| self.sbert_model = AutoModel.from_pretrained(self.sbert_model_identifier) | |
| # classifer | |
| self.cls_tokenizer = AutoTokenizer.from_pretrained(self.cls_model_identifier) | |
| self.cls_model = AutoModelForSequenceClassification.from_pretrained( | |
| self.cls_model_identifier | |
| ) | |
| self.cls_model.to(self.device) | |
| def compute_sentence_embeddings(self, input_text: List[str]) -> torch.Tensor: | |
| """ | |
| Compute sentence embeddings for each sentence provided a list of text strings. | |
| Args: | |
| input_text (List[str]) - list of input sentences to encode | |
| Returns: | |
| sentence_embeddings (torch.Tensor) | |
| """ | |
| # tokenize sentences | |
| encoded_input = self.sbert_tokenizer( | |
| input_text, | |
| padding=True, | |
| truncation=True, | |
| max_length=256, | |
| return_tensors="pt", | |
| ) | |
| # to device | |
| self.sbert_model.eval() | |
| self.sbert_model.to(self.device) | |
| encoded_input = {k: v.to(self.device) for k, v in encoded_input.items()} | |
| # compute token embeddings | |
| with torch.no_grad(): | |
| model_output = self.sbert_model(**encoded_input) | |
| return ( | |
| self.mean_pooling(model_output, encoded_input["attention_mask"]) | |
| .detach() | |
| .cpu() | |
| ) | |
| def calculate_content_preservation_score( | |
| self, | |
| input_text: List[str], | |
| output_text: List[str], | |
| threshold: float = 0.3, | |
| mask_type: str = "pad", | |
| return_all: bool = False, | |
| ) -> List[float]: | |
| """ | |
| Calcualates the content preservation score (CPS) between two pieces of text. | |
| Args: | |
| input_text (list) - list of input texts with indicies corresponding | |
| to counterpart in output_text | |
| ouptput_text (list) - list of output texts with indicies corresponding | |
| to counterpart in input_text | |
| return_all (bool) - If true, return dict containing intermediate | |
| text with style masking applied, along with scores | |
| mask_type (str) - "pad", "remove", or "none" | |
| Returns: | |
| A list of floats with corresponding content preservation scores. | |
| PSUEDO-CODE: (higher score is better preservation) | |
| 1. mask out style tokens for input and output text (1str) | |
| 2. get SBERT embedddings for each (multi) | |
| 3. calculate cosine similarity (multi pairs) | |
| """ | |
| if len(input_text) != len(output_text): | |
| raise ValueError( | |
| "input_text and output_text must be of same length with corresponding items" | |
| ) | |
| if mask_type != "none": | |
| # Mask out style tokens | |
| masked_input_text = [ | |
| self.mask_style_tokens(text, mask_type=mask_type, threshold=threshold) | |
| for text in input_text | |
| ] | |
| masked_output_text = [ | |
| self.mask_style_tokens(text, mask_type=mask_type, threshold=threshold) | |
| for text in output_text | |
| ] | |
| # Compute SBert embeddings | |
| input_embeddings = self.compute_sentence_embeddings(masked_input_text) | |
| output_embeddings = self.compute_sentence_embeddings(masked_output_text) | |
| else: | |
| # Compute SBert embeddings on unmasked text | |
| input_embeddings = self.compute_sentence_embeddings(input_text) | |
| output_embeddings = self.compute_sentence_embeddings(output_text) | |
| # Calculate cosine similarity | |
| scores = self.cosine_similarity(input_embeddings, output_embeddings) | |
| if return_all: | |
| output = { | |
| "scores": scores, | |
| "masked_input_text": masked_input_text | |
| if mask_type != "none" | |
| else input_text, | |
| "masked_output_text": masked_output_text | |
| if mask_type != "none" | |
| else output_text, | |
| } | |
| return output | |
| else: | |
| return scores | |
| def calculate_feature_attribution_scores( | |
| self, text: str, class_index: int = 0, as_norm: bool = False | |
| ) -> List[tuple]: | |
| """ | |
| Calcualte feature attributions using integrated gradients by passing | |
| a string of text as input. | |
| Args: | |
| text (str) - text to get attributions for | |
| class_index (int) - Optional output index to provide attributions for | |
| """ | |
| attributions = self.explainer(text, index=class_index) | |
| if as_norm: | |
| return self.format_feature_attribution_scores(attributions) | |
| return attributions | |
| def mask_style_tokens( | |
| self, | |
| text: str, | |
| threshold: float = 0.3, | |
| mask_type: str = "pad", | |
| class_index: int = 0, | |
| ) -> str: | |
| """ | |
| Utility function to mask out style tokens from a given string of text. | |
| Style tokens are determined by first calculating feature importances (via | |
| word attributions from trained StyleClassifer) for each token in the input sentence. | |
| We then normalize the absolute values of attributions scores to see how much each token | |
| contributes as a percentage overall style classification and rank those in descending order. | |
| We then select the top N tokens that account for the cumulative _threshold_ amount (%) of | |
| total styleattribution. By using cumulative percentages, N is not a fixed number and we | |
| ultimately take however many tokens are needed to account for _threshold_ % of the overall | |
| style. | |
| We can optionally return a string with these style tokens padded out or completely removed | |
| by toggling _mask_type_ between "pad" and "remove". | |
| Args: | |
| text (str) | |
| threshold (float) - percentage of style attribution as cutoff for masking selection. | |
| mask_type (str) - "pad" or "remove", indicates how to handle style tokens | |
| class_index (str) | |
| Returns: | |
| text (str) | |
| """ | |
| # get attributions and format as sorted dataframe | |
| attributions = self.calculate_feature_attribution_scores( | |
| text, class_index=class_index, as_norm=False | |
| ) | |
| attributions_df = self.format_feature_attribution_scores(attributions) | |
| # select tokens to mask | |
| token_idxs_to_mask = [] | |
| # If the first token accounts for more than the set | |
| # threshold, take just that token to mask. Otherwise, | |
| # take all tokens up to the threshold | |
| if attributions_df.iloc[0]["cumulative"] > threshold: | |
| token_idxs_to_mask.append(attributions_df.index[0]) | |
| else: | |
| token_idxs_to_mask.extend( | |
| attributions_df[ | |
| attributions_df["cumulative"] <= threshold | |
| ].index.to_list() | |
| ) | |
| # Build text sequence with tokens masked out | |
| mask_map = {"pad": "[PAD]", "remove": ""} | |
| toks = [token for token, score in attributions] | |
| for idx in token_idxs_to_mask: | |
| toks[idx] = mask_map[mask_type] | |
| if mask_type == "remove": | |
| toks = [token for token in toks if token != ""] | |
| # Decode that sequence | |
| masked_text = self.explainer.tokenizer.decode( | |
| self.explainer.tokenizer.convert_tokens_to_ids(toks), | |
| skip_special_tokens=False, | |
| ) | |
| # Remove special characters other than [PAD] | |
| for special_token in self.explainer.tokenizer.all_special_tokens: | |
| if special_token != "[PAD]": | |
| masked_text = masked_text.replace(special_token, "") | |
| return masked_text.strip() | |
| def format_feature_attribution_scores(attributions: List[tuple]) -> pd.DataFrame: | |
| """ | |
| Utility for formatting attribution scores for style token mask selection | |
| Sorts a given List[tuple] where tuples represent (token, score) by the | |
| normalized absolute value of each token score. | |
| """ | |
| df = pd.DataFrame(attributions, columns=["token", "score"]) | |
| df["abs_norm"] = df["score"].abs() / df["score"].abs().sum() | |
| df = df.sort_values(by="abs_norm", ascending=False) | |
| df["cumulative"] = df["abs_norm"].cumsum() | |
| return df | |
| def cosine_similarity(tensor1: torch.Tensor, tensor2: torch.Tensor) -> List[float]: | |
| """ | |
| Calculate cosine similarity on pairs of embedddings. | |
| Can handle 1D Tensor for single pair or 2D Tensors with corresponding indicies | |
| for matrix operation on multiple pairs. | |
| """ | |
| assert tensor1.shape == tensor2.shape | |
| # ensure 2D tensor | |
| if tensor1.ndim == 1: | |
| tensor1 = tensor1.unsqueeze(0) | |
| tensor2 = tensor2.unsqueeze(0) | |
| cos_sim = torch.nn.CosineSimilarity(dim=1, eps=1e-6) | |
| return [round(val, 4) for val in cos_sim(tensor1, tensor2).tolist()] | |
| def mean_pooling(model_output, attention_mask): | |
| """ | |
| Peform mean pooling over token embeddings to create sentence embedding. Here we take | |
| the attention mask into account for correct averaging on active token positions. | |
| CODE BORROWED FROM: | |
| https://www.sbert.net/examples/applications/computing-embeddings/README.html#sentence-embeddings-with-transformers | |
| """ | |
| token_embeddings = model_output[ | |
| 0 | |
| ] # First element of model_output contains all token embeddings | |
| input_mask_expanded = ( | |
| attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() | |
| ) | |
| sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1) | |
| sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9) | |
| return sum_embeddings / sum_mask | |