Spaces:
Runtime error
Runtime error
| import json | |
| import random | |
| import pandas as pd | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| SYSTEM_PROMPT_I2V = """ | |
| You are an expert in video captioning. You are given a structured video caption and you need to compose it to be more natural and fluent in English. | |
| ## Structured Input | |
| {structured_input} | |
| ## Notes | |
| 1. If there has an empty field, just ignore it and do not mention it in the output. | |
| 2. Do not make any semantic changes to the original fields. Please be sure to follow the original meaning. | |
| 3. If the action field is not empty, eliminate the irrelevant information in the action field that is not related to the timing action(such as wearings, background and environment information) to make a pure action field. | |
| ## Output Principles and Orders | |
| 1. First, eliminate the static information in the action field that is not related to the timing action, such as background or environment information. | |
| 2. Second, describe each subject with its pure action and expression if these fields exist. | |
| ## Output | |
| Please directly output the final composed caption without any additional information. | |
| """ | |
| SYSTEM_PROMPT_T2V = """ | |
| You are an expert in video captioning. You are given a structured video caption and you need to compose it to be more natural and fluent in English. | |
| ## Structured Input | |
| {structured_input} | |
| ## Notes | |
| 1. According to the action field information, change its name field to the subject pronoun in the action. | |
| 2. If there has an empty field, just ignore it and do not mention it in the output. | |
| 3. Do not make any semantic changes to the original fields. Please be sure to follow the original meaning. | |
| ## Output Principles and Orders | |
| 1. First, declare the shot_type, then declare the shot_angle and the shot_position fields in natural and fluent. | |
| 2. Second, eliminate information in the action field that is not related to the timing action, such as background or environment information if action is not empty. | |
| 3. Third, describe each subject with its pure action, appearance, expression, position if these fields exist. | |
| 4. Finally, declare the environment and lighting if the environment and lighting fields are not empty. | |
| ## Output | |
| Please directly output the final composed caption without any additional information. | |
| """ | |
| class StructuralCaptionDataset(torch.utils.data.Dataset): | |
| def __init__(self, input_csv, model_path, task): | |
| if isinstance(input_csv, pd.DataFrame): | |
| self.meta = input_csv | |
| else: | |
| self.meta = pd.read_csv(input_csv) | |
| self.task = task | |
| self.system_prompt = SYSTEM_PROMPT_T2V if self.task == 't2v' else SYSTEM_PROMPT_I2V | |
| self.tokenizer = AutoTokenizer.from_pretrained(model_path) | |
| def __len__(self): | |
| return len(self.meta) | |
| def __getitem__(self, index): | |
| row = self.meta.iloc[index] | |
| real_index = self.meta.index[index] | |
| struct_caption = json.loads(row["structural_caption"]) | |
| camera_movement = struct_caption.get('camera_motion', '') | |
| if camera_movement != '': | |
| camera_movement += '.' | |
| camera_movement = camera_movement.capitalize() | |
| fusion_by_llm = False | |
| cleaned_struct_caption = self.clean_struct_caption(struct_caption, self.task) | |
| if cleaned_struct_caption.get('num_subjects', 0) > 0: | |
| new_struct_caption = json.dumps(cleaned_struct_caption, indent=4, ensure_ascii=False) | |
| conversation = [ | |
| { | |
| "role": "user", | |
| "content": self.system_prompt.format(structured_input=new_struct_caption), | |
| }, | |
| ] | |
| text = self.tokenizer.apply_chat_template( | |
| conversation, | |
| tokenize=False, | |
| add_generation_prompt=True, | |
| enable_thinking=False, | |
| ) | |
| fusion_by_llm = True | |
| else: | |
| text = '-' | |
| return real_index, fusion_by_llm, text, '-', camera_movement | |
| def clean_struct_caption(self, struct_caption, task): | |
| raw_subjects = struct_caption.get('subjects', []) | |
| subjects = [] | |
| for subject in raw_subjects: | |
| subject_type = subject.get("TYPES", {}).get('type', '') | |
| subject_sub_type = subject.get("TYPES", {}).get('sub_type', '') | |
| if subject_type not in ["Human", "Animal"]: | |
| subject['expression'] = '' | |
| if subject_type == 'Human' and subject_sub_type == 'Accessory': | |
| subject['expression'] = '' | |
| if subject_sub_type != '': | |
| subject['name'] = subject_sub_type | |
| if 'TYPES' in subject: | |
| del subject['TYPES'] | |
| if 'is_main_subject' in subject: | |
| del subject['is_main_subject'] | |
| subjects.append(subject) | |
| to_del_subject_ids = [] | |
| for idx, subject in enumerate(subjects): | |
| action = subject.get('action', '').strip() | |
| subject['action'] = action | |
| if random.random() > 0.9 and 'appearance' in subject: | |
| del subject['appearance'] | |
| if random.random() > 0.9 and 'position' in subject: | |
| del subject['position'] | |
| if task == 'i2v': | |
| # just keep name and action, expression in subjects | |
| dropped_keys = ['appearance', 'position'] | |
| for key in dropped_keys: | |
| if key in subject: | |
| del subject[key] | |
| if subject['action'] == '' and ('expression' not in subject or subject['expression'] == ''): | |
| to_del_subject_ids.append(idx) | |
| # delete the subjects according to the to_del_subject_ids | |
| for idx in sorted(to_del_subject_ids, reverse=True): | |
| del subjects[idx] | |
| new_struct_caption = { | |
| 'num_subjects': len(subjects), | |
| 'subjects': subjects, | |
| 'shot_type': struct_caption.get('shot_type', ''), | |
| 'shot_angle': struct_caption.get('shot_angle', ''), | |
| 'shot_position': struct_caption.get('shot_position', ''), | |
| 'environment': struct_caption.get('environment', ''), | |
| 'lighting': struct_caption.get('lighting', ''), | |
| } | |
| if task == 't2v' and random.random() > 0.9: | |
| del new_struct_caption['lighting'] | |
| if task == 'i2v': | |
| drop_keys = ['environment', 'lighting', 'shot_type', 'shot_angle', 'shot_position'] | |
| for drop_key in drop_keys: | |
| del new_struct_caption[drop_key] | |
| return new_struct_caption | |
| class FusionCaptioner: | |
| def __init__(self, model_path): | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| model_path, | |
| torch_dtype="auto", | |
| device_map="cuda", | |
| ) | |
| self.model_path = model_path | |
| self.tokenizer = AutoTokenizer.from_pretrained(model_path) | |
| def __call__(self, structural_caption, task='t2v'): | |
| if isinstance(structural_caption, dict): | |
| structural_caption = json.dumps(structural_caption, ensure_ascii=False) | |
| else: | |
| structural_caption = json.dumps(json.loads(structural_caption), ensure_ascii=False) | |
| meta = pd.DataFrame([structural_caption], columns=['structural_caption']) | |
| dataset = StructuralCaptionDataset(meta, self.model_path, task) | |
| _, fusion_by_llm, text, original_text, camera_movement = dataset[0] | |
| if not fusion_by_llm: | |
| caption = original_text + " " + camera_movement | |
| return caption | |
| model_inputs = self.tokenizer([text], return_tensors="pt").to(self.model.device) | |
| generated_ids = self.model.generate(**model_inputs, max_new_tokens=1024, temperature=0.1) | |
| generated_ids = [ | |
| output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids) | |
| ] | |
| result = self.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0] | |
| llm_caption = result + " " + camera_movement | |
| return llm_caption | |