Spaces:
Paused
Paused
| import asyncio | |
| import logging | |
| import platform | |
| import re | |
| from contextlib import AsyncExitStack | |
| from pathlib import Path | |
| from typing import Literal, Optional, Self | |
| from playwright.async_api import Browser, BrowserContext, Page, Playwright, async_playwright | |
| from playwright.async_api import TimeoutError as PlaywrightTimeoutError | |
| from playwright_stealth import StealthConfig, stealth_async | |
| from pydantic import Field | |
| from tenacity import before_sleep_log, retry, stop_after_delay, wait_exponential | |
| from proxy_lite.browser.bounding_boxes import POI, BoundingBox, Point, annotate_bounding_boxes | |
| from proxy_lite.logger import logger | |
| import base64 | |
| SELF_CONTAINED_TAGS = [ | |
| # many of these are non-interactive but keeping them anyway | |
| "area", | |
| "base", | |
| "br", | |
| "col", | |
| "embed", | |
| "hr", | |
| "img", | |
| "input", | |
| "link", | |
| "meta", | |
| "param", | |
| "source", | |
| "track", | |
| "wbr", | |
| ] | |
| def element_as_text( | |
| mark_id: int, | |
| tag: Optional[str] = None, | |
| text: Optional[str] = None, | |
| **raw_attributes, | |
| ) -> str: | |
| """Return a text representation of all elements on the page.""" | |
| attributes = [] | |
| for k, v in raw_attributes.items(): | |
| if v is None: | |
| continue | |
| if isinstance(v, bool): | |
| if v: | |
| attributes.append(k) | |
| # we ignore False bool attributes | |
| else: | |
| v = str(v) | |
| if len(v) > 2500: | |
| v = v[: 2500 - 1] + "…" | |
| attributes.append(f'{k}="{v}"') | |
| attributes = " ".join(attributes) | |
| attributes = (" " + attributes).rstrip() | |
| tag = tag.lower() | |
| if text is None: | |
| text = "" | |
| if len(text) > 2500: | |
| text = text[: 2500 - 1] + "…" | |
| # sub-out line breaks so elements are easier to distinguish | |
| attributes = re.sub(r"\r\n|\r|\n", "⏎", attributes) | |
| text = re.sub(r"\r\n|\r|\n", "⏎", text) | |
| if tag in SELF_CONTAINED_TAGS: | |
| if text: | |
| logger.warning( | |
| f"Got self-contained element '{tag}' which contained text '{text}'.", | |
| ) | |
| else: | |
| return f"- [{mark_id}] <{tag}{attributes}/>" | |
| return f"- [{mark_id}] <{tag}{attributes}>{text}</{tag}>" | |
| class BrowserSession: | |
| def __init__( | |
| self, | |
| viewport_width: int = 1280, | |
| viewport_height: int = 720, | |
| headless: bool = True, | |
| ): | |
| self.viewport_width = viewport_width | |
| self.viewport_height = viewport_height | |
| self.headless = headless | |
| self.playwright: Playwright | None = None | |
| self.browser: Browser | None = None | |
| self.context: BrowserContext | None = None | |
| self._exit_stack: AsyncExitStack | None = None | |
| self.poi_elements: list = Field(default_factory=list) | |
| self.poi_centroids: list[Point] = Field(default_factory=list) | |
| self.bounding_boxes: list[BoundingBox] = Field(default_factory=list) | |
| self.pois: list[POI] = Field(default_factory=list) | |
| async def __aenter__(self) -> Self: | |
| self._exit_stack = AsyncExitStack() | |
| self.playwright = await async_playwright().start() | |
| self.browser = await self.playwright.chromium.launch(headless=self.headless) | |
| self.context = await self.browser.new_context( | |
| viewport={"width": self.viewport_width, "height": self.viewport_height}, | |
| ) | |
| # Ensure there's at least one page open | |
| if not self.context.pages: | |
| await self.context.new_page() | |
| self.context.set_default_timeout(60_000) | |
| self.current_page.set_default_timeout(60_000) | |
| await stealth_async(self.current_page, StealthConfig(navigator_user_agent=False)) | |
| await self.context.add_init_script( | |
| path=Path(__file__).with_name("add_custom_select.js"), | |
| ) | |
| await self.context.add_init_script( | |
| path=Path(__file__).with_name("find_pois.js"), | |
| ) | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: | |
| if self.browser: | |
| await self.browser.close() | |
| if self.playwright: | |
| await self.playwright.stop() | |
| if self._exit_stack: | |
| await self._exit_stack.aclose() | |
| def current_page(self) -> Optional[Page]: | |
| if self.context and self.context.pages: | |
| return self.context.pages[-1] # Return the most recently opened page | |
| return None | |
| def current_url(self) -> Optional[str]: | |
| if self.current_page: | |
| return self.current_page.url | |
| return None | |
| # re-run for cases of mid-run redirects | |
| async def process_iframe(self, iframe) -> Optional[tuple[dict, dict]]: | |
| try: | |
| # Check iframe visibility and size | |
| bounding_box = await iframe.bounding_box() | |
| if not bounding_box: | |
| return None # Skip if iframe is not visible | |
| width, height = bounding_box["width"], bounding_box["height"] | |
| if width < 50 or height < 50: | |
| return None | |
| frame = await iframe.content_frame() | |
| if not frame: | |
| return None | |
| poi = await frame.evaluate( | |
| """() => { | |
| overwriteDefaultSelectConvergence(); | |
| return findPOIsConvergence(); | |
| }""", | |
| ) | |
| if not poi: | |
| return None | |
| iframe_offset = {"x": round(bounding_box["x"]), "y": round(bounding_box["y"])} | |
| return poi, iframe_offset | |
| except Exception as e: | |
| logger.error(f"Error processing iframe: {e}") | |
| return None | |
| async def update_poi(self) -> None: | |
| try: | |
| # Wait for basic page load states to ensure the DOM is ready. | |
| # This is a fundamental wait that should always apply. | |
| await self.current_page.wait_for_load_state("domcontentloaded", timeout=60000) | |
| logger.debug(f"DEBUG: wait_for_load_state('domcontentloaded') completed for {self.current_page.url}.") | |
| current_url = self.current_page.url | |
| # Define common Salesforce URL patterns for different states | |
| login_url_patterns = [ | |
| "login.salesforce.com", | |
| "identity.force.com", | |
| "auth.lightning.force.com", | |
| "setup.salesforce.com", # Sometimes a setup login redirects here temporarily | |
| "my.salesforce.com" # Your specific custom domain login redirects here | |
| ] | |
| # This is the main Salesforce Lightning application base URL, typically seen after login. | |
| # We treat this as an intermediate loading state before the specific target page. | |
| intermediate_app_url_pattern = "/one/one.app" | |
| # Check the current state of the page based on its URL | |
| is_on_login_page = any(pattern in current_url for pattern in login_url_patterns) | |
| is_on_intermediate_app_page = intermediate_app_url_pattern in current_url | |
| # Note: is_on_target_forecast_page checks if the specific target path is in the URL | |
| is_on_target_forecast_page = "/AccountForecastSettings/home" in current_url | |
| # --- CONDITIONAL WAITING LOGIC BASED ON URL --- | |
| if is_on_target_forecast_page: | |
| logger.info(f"INFO: Detected target Account Forecast Settings page: {current_url}. Waiting for content.") | |
| # When on the specific target page, wait for its content and spinners | |
| spinner_selectors = [ | |
| "div.slds-spinner_container", | |
| "div.auraLoadingBox", | |
| "div.dxp_axb_container", # Main overlay from your inspect screenshot | |
| "div.slds-sprite-astro-x-large" # Specific animated element itself | |
| ] | |
| for selector in spinner_selectors: | |
| try: | |
| await self.current_page.wait_for_selector(selector, state="hidden", timeout=5000) # Reduced timeout | |
| logger.debug(f"DEBUG: Spinner element '{selector}' became hidden for {self.current_page.url}.") | |
| except PlaywrightTimeoutError: | |
| logger.warning(f"DEBUGGING: Spinner element '{selector}' not detected or did not disappear on {self.current_page.url} within 5s.") | |
| # Wait for a known element on the Account Forecast Settings page to ensure content is there. | |
| try: | |
| # Added 'h2' for section headers, and a more generic 'div[data-aura-rendered-by]' for Lightning components | |
| await self.current_page.wait_for_selector("h1.slds-page-header__title, h2, .account-forecast-settings-component, div[data-aura-rendered-by]", state="visible", timeout=15000) # Increased timeout slightly for robust content load | |
| logger.debug(f"DEBUG: Confirmed main page element visible for {self.current_page.url}.") | |
| except PlaywrightTimeoutError: | |
| logger.warning(f"DEBUGGING: Main page element not visible on {self.current_page.url} within 15s. This might indicate incomplete page load despite no spinner.") | |
| elif is_on_login_page: | |
| logger.info(f"INFO: Detected Salesforce login page: {current_url}. Waiting for login elements.") | |
| # When on a login page, just wait for the login form elements to be visible | |
| try: | |
| await self.current_page.wait_for_selector("input[type='email'], input[type='password'], input[type='submit'], #username, #password, #Login", state="visible", timeout=10000) | |
| logger.debug(f"DEBUG: Login page elements visible on {self.current_page.url}.") | |
| except PlaywrightTimeoutError: | |
| logger.warning(f"DEBUGGING: Login page elements not visible on {self.current_page.url} within 10s. This may happen if elements are in an iframe or if page is extremely slow.") | |
| elif is_on_intermediate_app_page: | |
| logger.info(f"INFO: Detected intermediate Salesforce Lightning app loading page: {current_url}. Waiting for network idle and app spinner.") | |
| # This is the /one/one.app page or similar. Don't wait for specific content, just general load. | |
| try: | |
| await self.current_page.wait_for_load_state("networkidle", timeout=30000) # Give it more time for network to settle | |
| logger.debug(f"DEBUG: Network idle detected on intermediate app page: {current_url}.") | |
| except PlaywrightTimeoutError: | |
| logger.warning(f"DEBUGGING: Network idle timeout on intermediate app page: {current_url}. Proceeding anyway.") | |
| # Also try to wait for a common full-app spinner to disappear, if present | |
| try: | |
| await self.current_page.wait_for_selector('div.app-spinner, div.auraLoadingBox', state='hidden', timeout=15000) # Added auraLoadingBox as it might reappear | |
| logger.debug(f"DEBUG: App spinner on intermediate page became hidden.") | |
| except PlaywrightTimeoutError: | |
| logger.warning(f"DEBUGGING: App spinner on intermediate page not found or did not disappear.") | |
| else: | |
| logger.info(f"INFO: Detected unhandled URL type: {current_url}. Performing generic body wait.") | |
| # Fallback for any other page, just wait for body to be visible | |
| try: | |
| await self.current_page.wait_for_selector("body", timeout=5000, state="visible") | |
| logger.debug(f"DEBUG: wait_for_selector('body', state='visible') completed for {self.current_page.url}.") | |
| except PlaywrightTimeoutError: | |
| logger.warning(f"DEBUGGING: Playwright Timeout (5s) on body selector for {self.current_page.url}. Continuing anyway.") | |
| pass | |
| except PlaywrightTimeoutError as e: | |
| logger.error(f"ERROR: Timeout waiting for page readiness for {self.current_page.url}: {e}") | |
| raise # Re-raise if essential waits fail (e.g., initial domcontentloaded) | |
| except Exception as e: | |
| logger.error(f"ERROR: An unexpected error occurred during page readiness check for {self.current_page.url}: {e}") | |
| raise | |
| # Rest of update_poi: Run the bounding box javascript code to highlight the points of interest on the page | |
| page_info = await self.current_page.evaluate( | |
| """() => { | |
| overwriteDefaultSelectConvergence(); | |
| return findPOIsConvergence(); | |
| }""", | |
| ) | |
| # Get the points of interest on the page | |
| self.poi_elements = page_info["element_descriptions"] | |
| element_centroids = page_info["element_centroids"] | |
| try: | |
| # Select all iframes on the page | |
| iframes = await self.current_page.query_selector_all("iframe") | |
| max_iframes = 10 | |
| # Define an asynchronous function to process and filter each iframe | |
| tasks = [asyncio.create_task(self.process_iframe(iframe)) for iframe in iframes[:max_iframes]] | |
| results = await asyncio.gather(*tasks) | |
| filtered_results = [result for result in results if result is not None] | |
| iframes_pois = [] | |
| iframe_offsets = [] | |
| for poi, offset in filtered_results: | |
| iframes_pois.append(poi) | |
| iframe_offsets.append(offset) | |
| # Combine the points of interest from the iframes with the main page and adjust the centroids | |
| for index, iframe_poi in enumerate(iframes_pois): | |
| self.poi_elements.extend(iframe_poi["element_descriptions"]) | |
| for centroid in iframe_poi["element_centroids"]: | |
| centroid["x"] += iframe_offsets[index]["x"] | |
| centroid["y"] += iframe_offsets[index]["y"] | |
| centroid["left"] += iframe_offsets[index]["x"] | |
| centroid["top"] += iframe_offsets[index]["y"] | |
| centroid["right"] += iframe_offsets[index]["x"] | |
| # Fix: Removed duplicate 'centroid["y"] += iframe_offsets[index]["y"]' | |
| centroid["bottom"] += iframe_offsets[index]["y"] | |
| element_centroids.extend(iframe_poi["element_centroids"]) | |
| except Exception as e: | |
| logger.error(f"Error in finding iframes: {e}") | |
| # Get the centroids of the points of interest | |
| self.poi_centroids = [Point(x=xy["x"], y=xy["y"]) for xy in element_centroids] | |
| self.bounding_boxes = [BoundingBox(**xy, label=str(i)) for i, xy in enumerate(element_centroids)] | |
| self.pois = [ | |
| POI(info=info, element_centroid=centroid, bounding_box=bbox) | |
| for info, centroid, bbox in zip( | |
| self.poi_elements, | |
| self.poi_centroids, | |
| self.bounding_boxes, | |
| strict=False, | |
| ) | |
| ] | |
| def poi_text(self) -> str: | |
| # Get all points of interest on the page as text | |
| texts = [element_as_text(mark_id=i, **element) for i, element in enumerate(self.poi_elements)] | |
| # Return formatted text of points of interest on page | |
| return "\n".join([txt for txt in texts if txt]) | |
| async def screenshot( | |
| self, | |
| delay: float = 0.0, | |
| quality: int = 70, | |
| type: str = "jpeg", | |
| scale: str = "css", | |
| ) -> tuple[bytes, bytes]: | |
| if delay > 0.0: | |
| await asyncio.sleep(delay) | |
| await self.update_poi() | |
| # Keep original logic if page is highly dynamic, but for static shots, simpler is faster | |
| # old_poi_positions = [tuple(point) for point in self.poi_centroids] | |
| img = await self.current_page.screenshot(type=type, quality=quality, scale=scale) | |
| annotated_img = annotate_bounding_boxes(image=img, bounding_boxes=self.bounding_boxes) | |
| # Re-evaluating this block for performance. Removed redundant update_poi and conditional screenshot. | |
| # If precise screenshot timing is needed, the caller should manage delays and updates. | |
| return img, annotated_img | |
| async def goto(self, url: str) -> None: | |
| await self.current_page.goto(url, wait_until="domcontentloaded") | |
| async def reload(self) -> None: | |
| await self.current_page.reload(wait_until="domcontentloaded") | |
| async def click_tab(self, mark_id: int) -> None: | |
| point: Point = self.poi_centroids[mark_id] | |
| await self.hover(point) | |
| await self.current_page.mouse.click(*point, button="middle") | |
| async def click(self, mark_id: int) -> None: | |
| point: Point = self.poi_centroids[mark_id] | |
| await self.hover(point) | |
| await self.current_page.mouse.click(*point) | |
| async def enter_text(self, mark_id: int, text: str, submit: bool = False) -> None: | |
| await self.clear_text_field(mark_id) | |
| await self.click(mark_id) | |
| await self.current_page.keyboard.type(text) | |
| if submit: | |
| await self.current_page.keyboard.press("Enter") | |
| async def scroll( | |
| self, | |
| direction: Literal["up", "down", "left", "right"], | |
| mark_id: Optional[int] = None, | |
| ) -> None: | |
| if mark_id is None: | |
| point = Point(x=-1, y=-1) | |
| max_scroll_x = self.viewport_width | |
| max_scroll_y = self.viewport_height | |
| else: | |
| point: Point = self.poi_centroids[mark_id] | |
| bbox: BoundingBox = self.bounding_boxes[mark_id] | |
| max_scroll_x = bbox.right - bbox.left | |
| max_scroll_y = bbox.bottom - bbox.top | |
| await self.hover(point=point) | |
| scroll_x = int(max_scroll_x * 0.8) | |
| scroll_y = int(max_scroll_y * 0.8) | |
| is_vertical = direction in ("up", "down") | |
| reverse_scroll = direction in ("up", "left") | |
| await self.current_page.mouse.wheel( | |
| scroll_x * (-1 if reverse_scroll else 1) * (not is_vertical), | |
| scroll_y * (-1 if reverse_scroll else 1) * is_vertical, | |
| ) | |
| async def go_back(self) -> None: | |
| # If there is no tab open then return | |
| if not self.current_page: | |
| return | |
| await self.current_page.go_back(wait_until="domcontentloaded") | |
| if self.current_page.url == "about:blank": | |
| if not len(self.context.pages) > 1: | |
| await self.current_page.go_forward(wait_until="domcontentloaded") | |
| raise Exception("There is no previous page to go back to.") | |
| await self.current_page.close() | |
| async def hover(self, point: Point) -> None: | |
| await self.current_page.mouse.move(*point) | |
| async def focus(self, point: Point) -> None: | |
| # Focus on the element on the page at point (x, y) | |
| await self.current_page.evaluate( | |
| """ | |
| ([x, y]) => { | |
| const element = document.elementFromPoint(x, y); | |
| if (element && element.focus) { | |
| element.focus(); | |
| } | |
| }""", | |
| tuple(point), | |
| ) | |
| async def get_text(self, mark_id: int) -> str: | |
| return await self.current_page.evaluate( | |
| """ | |
| (mark_id) => { | |
| const element = marked_elements_convergence[mark_id]; | |
| if (element && (element.value !== undefined || element.textContent !== undefined)) { | |
| return element.value || element.textContent; | |
| } | |
| return ''; | |
| } | |
| """, | |
| (mark_id,), | |
| ) | |
| async def clear_text_field(self, mark_id: int) -> None: | |
| existing_text = await self.get_text(mark_id) | |
| if existing_text.strip(): | |
| # Clear existing text only if it exists | |
| await self.click(mark_id) | |
| if platform.system() == "Darwin": # selecting all text is OS-specific | |
| await self.click(mark_id) | |
| await self.current_page.keyboard.press("Meta+a") | |
| await self.current_page.keyboard.press("Backspace") | |
| else: | |
| await self.current_page.keyboard.press("Control+Home") | |
| await self.current_page.keyboard.press("Control+Shift+End") | |
| await self.current_page.keyboard.press("Backspace") | |
| async def open_new_tab_and_go_to(self, url: str) -> None: | |
| """ | |
| Opens a new browser tab/page and navigates to the specified URL. | |
| Closes the old page if it's not the last one remaining. | |
| """ | |
| logger.info(f"Attempting to open a new tab and navigate to: {url}") | |
| new_page = await self.context.new_page() | |
| # Close the previous page if it's not the only one left in the context | |
| if len(self.context.pages) > 1 and self.current_page and self.current_page != new_page: | |
| try: | |
| await self.current_page.close() | |
| logger.debug("Closed previous page.") | |
| except Exception as e: | |
| logger.warning(f"Could not close previous page (might already be closed or detached): {e}") | |
| # After navigation, trigger POI update to reflect the new page's state | |
| await new_page.goto(url, wait_until="domcontentloaded") | |
| logger.info(f"Successfully navigated to {url} in a new tab.") | |
| # Crucial: update_poi uses self.current_page, which is now new_page implicitly | |
| await self.update_poi() | |
| if __name__ == "__main__": | |
| async def dummy_test(): | |
| async with BrowserSession(headless=False) as s: | |
| page = await s.context.new_page() | |
| await page.goto("http://google.co.uk") | |
| await asyncio.sleep(5) | |
| await page.screenshot(path="example.png") | |
| await s.update_poi() | |
| _, annotated_image = await s.screenshot() | |
| with open("output.png", "wb") as f: | |
| f.write(annotated_image) | |
| asyncio.run(dummy_test()) | |