Spaces:
Runtime error
Runtime error
| import os | |
| import subprocess | |
| import concurrent.futures | |
| from PIL import Image | |
| from tqdm import tqdm | |
| from PIL import Image, ExifTags | |
| target_resolutions = [ | |
| (640, 1632), # 640 * 1632 = 1044480 | |
| (704, 1472), # 704 * 1472 = 1036288 | |
| (768, 1360), # 768 * 1360 = 1044480 | |
| (832, 1248), # 832 * 1248 = 1038336 | |
| (896, 1152), | |
| (960, 1088), # 960 * 1088 = 1044480 | |
| (992, 1056), # 992 * 1056 = 1047552 | |
| (1024, 1024), # 1024 * 1024 = 1048576 | |
| (1056, 992), # 1056 * 992 = 1047552 | |
| (1088, 960), # 1088 * 960 = 1044480 | |
| (1152, 896), | |
| (1248, 832), # 1248 * 832 = 1038336 | |
| (1360, 768), # 1360 * 768 = 1044480 | |
| (1472, 704), # 1472 * 704 = 1036288 | |
| (1632, 640), # 1632 * 640 = 1044480 | |
| # (768, 1360), # 768 * 1360 = 1044480 | |
| # (1472, 704), # 1472 * 704 = 1036288 | |
| # (1024, 1024), # 1024 * 1024 = 1048576 | |
| ] | |
| # 图像预处理 | |
| def apply_exif_orientation(image): | |
| try: | |
| for orientation in ExifTags.TAGS.keys(): | |
| if ExifTags.TAGS[orientation] == 'Orientation': | |
| break | |
| exif = image._getexif() | |
| if exif is not None: | |
| exif = dict(exif.items()) | |
| orientation_value = exif.get(orientation) | |
| if orientation_value == 3: | |
| image = image.rotate(180, expand=True) | |
| elif orientation_value == 6: | |
| image = image.rotate(270, expand=True) | |
| elif orientation_value == 8: | |
| image = image.rotate(90, expand=True) | |
| except (AttributeError, KeyError, IndexError, TypeError): | |
| # cases: image don't have getexif | |
| pass | |
| return image | |
| def convert_image_to_jpg(img, img_path): | |
| """Convert an Image object to JPG.""" | |
| # Remove extension from original filename and add .jpg | |
| base_name = os.path.splitext(img_path)[0] | |
| jpg_path = base_name + '.jpg' | |
| # Convert image to RGB if it is RGBA (or any other mode) | |
| if img.mode != 'RGB': | |
| img = img.convert('RGB') | |
| img.save(jpg_path, format='JPEG', quality=100) | |
| def process_image(img_path): | |
| try: | |
| if img_path.lower().endswith((".jpg", ".png", ".bmp", ".gif", ".tif", ".tiff", ".jpeg", ".webp")): | |
| img = Image.open(img_path) | |
| img = apply_exif_orientation(img) # Apply the EXIF orientation | |
| # Convert to 'RGB' if it is 'RGBA' or any other mode | |
| img = img.convert('RGB') | |
| # 计算原图像的宽高比 | |
| original_aspect_ratio = img.width / img.height | |
| # 找到最接近原图像宽高比的目标分辨率 | |
| target_resolution = min(target_resolutions, key=lambda res: abs(original_aspect_ratio - res[0] / res[1])) | |
| # 计算新的维度 | |
| if img.width / target_resolution[0] < img.height / target_resolution[1]: | |
| new_width = target_resolution[0] | |
| new_height = int(img.height * target_resolution[0] / img.width) | |
| else: | |
| new_height = target_resolution[1] | |
| new_width = int(img.width * target_resolution[1] / img.height) | |
| # 等比缩放图像 | |
| img = img.resize((new_width, new_height), Image.LANCZOS) | |
| # 计算裁剪的区域 | |
| left = int((img.width - target_resolution[0]) / 2) | |
| top = int((img.height - target_resolution[1]) / 2) | |
| right = int((img.width + target_resolution[0]) / 2) | |
| bottom = int((img.height + target_resolution[1]) / 2) | |
| # 裁剪图像 | |
| img = img.crop((left, top, right, bottom)) | |
| # 转换并保存图像为JPG格式 | |
| convert_image_to_jpg(img, img_path) | |
| except Exception as e: | |
| print(f"Error processing image {img_path}: {e}") | |
| return None | |
| def delete_non_jpg_files(folder_path): | |
| """Delete all non-jpg image files in a directory, but keep txt files.""" | |
| for dirpath, dirnames, filenames in os.walk(folder_path): | |
| for filename in filenames: | |
| if not filename.lower().endswith((".jpg", ".txt")): | |
| file_path = os.path.join(dirpath, filename) | |
| try: | |
| os.remove(file_path) | |
| except Exception as e: | |
| print(f"Error occurred while deleting file : {file_path}. Error : {str(e)}") | |
| def process_images_in_folder(folder_path): | |
| """ | |
| Process all images in the given folder according to the target resolutions, | |
| then delete all non-jpg files except for .txt files. | |
| """ | |
| processed_files = [] | |
| file_list = [os.path.join(dirpath, filename) | |
| for dirpath, dirnames, filenames in os.walk(folder_path) | |
| for filename in filenames] | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| list(tqdm(executor.map(process_image, file_list), total=len(file_list))) | |
| delete_non_jpg_files(folder_path) | |
| return f"Processed images in folder: {folder_path}" | |
| # 失败检查 | |
| def run_script(folder_path, keywords): | |
| keywords = keywords if keywords else "sorry,error" | |
| result = subprocess.run( | |
| [ | |
| 'python', './lib/Failed_Tagging_File_Screening.py', | |
| '--image_path', folder_path, | |
| '--keywords', keywords | |
| ], | |
| capture_output=True, text=True | |
| ) | |
| return result.stdout if result.stdout else "No Output", result.stderr if result.stderr else "No Error" | |