import asyncio import time from datetime import datetime, timedelta from graphgen.utils import logger class RPM: def __init__(self, rpm: int = 1000): self.rpm = rpm self.record = {"rpm_slot": self.get_minute_slot(), "counter": 0} @staticmethod def get_minute_slot(): current_time = time.time() dt_object = datetime.fromtimestamp(current_time) total_minutes_since_midnight = dt_object.hour * 60 + dt_object.minute return total_minutes_since_midnight async def wait(self, silent=False): current = time.time() dt_object = datetime.fromtimestamp(current) minute_slot = self.get_minute_slot() if self.record["rpm_slot"] == minute_slot: # check RPM exceed if self.record["counter"] >= self.rpm: # wait until next minute next_minute = dt_object.replace(second=0, microsecond=0) + timedelta( minutes=1 ) _next = next_minute.timestamp() sleep_time = abs(_next - current) if not silent: logger.info("RPM sleep %s", sleep_time) await asyncio.sleep(sleep_time) self.record = {"rpm_slot": self.get_minute_slot(), "counter": 0} else: self.record = {"rpm_slot": self.get_minute_slot(), "counter": 0} self.record["counter"] += 1 if not silent: logger.debug(self.record) class TPM: def __init__(self, tpm: int = 20000): self.tpm = tpm self.record = {"tpm_slot": self.get_minute_slot(), "counter": 0} @staticmethod def get_minute_slot(): current_time = time.time() dt_object = datetime.fromtimestamp(current_time) total_minutes_since_midnight = dt_object.hour * 60 + dt_object.minute return total_minutes_since_midnight async def wait(self, token_count, silent=False): current = time.time() dt_object = datetime.fromtimestamp(current) minute_slot = self.get_minute_slot() # get next slot, skip if self.record["tpm_slot"] != minute_slot: self.record = {"tpm_slot": minute_slot, "counter": token_count} return # check RPM exceed old_counter = self.record["counter"] self.record["counter"] += token_count if self.record["counter"] > self.tpm: logger.info("Current TPM: %s, limit: %s", old_counter, self.tpm) # wait until next minute next_minute = dt_object.replace(second=0, microsecond=0) + timedelta( minutes=1 ) _next = next_minute.timestamp() sleep_time = abs(_next - current) logger.warning("TPM limit exceeded, wait %s seconds", sleep_time) await asyncio.sleep(sleep_time) self.record = {"tpm_slot": self.get_minute_slot(), "counter": token_count} if not silent: logger.debug(self.record)