You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

278 lines
9.0 KiB

import copy
import io
import time
from enum import Enum
from typing import List, NoReturn
class TimeType(Enum):
# 秒
second = 's'
# 毫秒
millisecond = 'ms'
# 纳秒
nanosecond = 'ns'
class BaseWatch(object):
@staticmethod
def _timestamp_to_nanos(t: float) -> int:
# 获取微秒级
return int(round(t * 1000000))
@staticmethod
def _timestamp_to_millis(t: float) -> int:
# 获取毫秒级
return int(round(t * 1000))
class StopWatch(BaseWatch):
class TaskInfo(BaseWatch):
def __init__(self, task_name: str, timestamp: float):
self.task_name = task_name
self.timestamp = timestamp
def get_time_nanos(self) -> int:
return self._timestamp_to_nanos(self.timestamp)
def get_time_millis(self) -> int:
return self._timestamp_to_millis(self.timestamp)
def get_time_seconds(self) -> float:
return self.timestamp
__current_task_name: str or None = None
__start_time_timestamp: float = None
__total_time_timestamp: float = 0.0
__last_task_info: TaskInfo = None
__task_info: List[TaskInfo] = None
__task_count: int = None
def __init__(
self,
unique_id: str,
keep_task_list: bool = True,
time_type: TimeType = TimeType.millisecond,
):
"""
stop_watch计时器
**线程不安全** 请在使用时 实例化新对象进行计时
:param unique_id: 计时器的唯一标识,一般起比较有特点的名称
:param keep_task_list: 是否保存单个计时点(计时任务)的信息
:param time_type: 计时器输出的时间单位 默认毫秒
eg:
sw = StopWatch('计时器唯一名称', time_type=TimeType.second)
sw.start('我是计时点1')
time.sleep(1)
# 计时点1计时结束
sw.stop()
sw.start('我是计时点2')
time.sleep(2)
# 计时点2计时结束
sw.stop()
print(sw.pretty_print())
output:
stopWatch [我是一个计时器]: running time = 3.0048 s
---------------------------------------------
s % Task name
---------------------------------------------
0000001.0043 033.42% 我是计时点1
0000002.0005 066.58% 我是计时点2
"""
self.__unique_id = unique_id
self.__keep_task_list = keep_task_list
self.__time_type = time_type
self.__task_info = list()
self.__task_count = 0
def start(self, task_name: str = '') -> NoReturn:
"""
开始计时
:param task_name: 计时任务(计时点)的名称
:return
"""
if self.__current_task_name is not None:
raise ValueError('Can\'t start StopWatch: it\'s already running')
self.__current_task_name = task_name
self.__start_time_timestamp = time.time()
def stop(self) -> NoReturn:
"""
停止任务计时
:return
"""
if self.__current_task_name is None:
raise ValueError('Can\'t stop StopWatch: it\'s not running')
last_time: float = time.time() - self.__start_time_timestamp
self.__total_time_timestamp += last_time
self.__last_task_info = self.TaskInfo(self.__current_task_name, last_time)
if self.__keep_task_list:
self.__task_info.append(self.__last_task_info)
self.__task_count += self.__task_count
self.__current_task_name = None
def is_running(self) -> bool:
"""
计时器是否运行
"""
return self.__current_task_name is None
@property
def current_task_name(self) -> str or None:
"""
返回当前任务名称
:return:
"""
return self.__current_task_name
def get_last_task_time_nanos(self) -> int:
"""
获取最后一次计时任务的时间(微秒)
:return:
"""
if self.__last_task_info is None:
raise ValueError('No tasks run: can\'t get last task interval')
return self._timestamp_to_nanos(self.__last_task_info.timestamp)
def get_last_task_time_millis(self) -> int:
"""
获取最后一次计时任务的时间(毫秒)
:return:
"""
if self.__last_task_info is None:
raise ValueError('No tasks run: can\'t get last task interval')
return self._timestamp_to_millis(self.__last_task_info.timestamp)
def get_last_task_time_seconds(self) -> float:
"""
获取最后一次计时任务的时间(秒)
:return:
"""
if self.__last_task_info is None:
raise ValueError('No tasks run: can\'t get last task interval')
return self.__last_task_info.timestamp
def get_last_task_name(self) -> str:
"""
获取最后一次计时任务的名称
:return:
"""
if self.__last_task_info is None:
raise ValueError('No tasks run: can\'t get last task name')
return self.__last_task_info.task_name
def get_last_task_info(self) -> TaskInfo:
"""
返回最后一个任务
:return:
"""
return self.__last_task_info
def get_total_time_nanos(self) -> int:
"""
获取当前stop_watch总计时的时间(微秒)
:return:
"""
return self._timestamp_to_nanos(self.__total_time_timestamp)
def get_total_time_millis(self) -> int:
"""
获取当前stop_watch总计时的时间(毫秒)
:return:
"""
return self._timestamp_to_millis(self.__total_time_timestamp)
def get_total_time_seconds(self) -> float:
"""
获取当前stop_watch总计时的时间(秒)
:return:
"""
return self.__total_time_timestamp
def get_task_count(self) -> int:
"""
获取当前stop_watch对象记录的任务数
:return:
"""
return self.__task_count
def get_task_info(self) -> List[TaskInfo]:
"""
获取当前stop_watch对象记录的全部任务
:return:
"""
if not self.__keep_task_list:
raise RuntimeError('task info is not being kept!')
return copy.deepcopy(self.__task_info)
def short_summary(self, time_type: TimeType = None) -> str:
"""
获取当前统计时间数据简述
:param time_type: 计时器输出的时间单位, 如果在此传入,则覆盖掉类构造函数中传入的 time_type
:return:
"""
time_type = time_type or self.__time_type
if time_type == TimeType.millisecond:
running_time = self.get_total_time_millis()
elif time_type == TimeType.nanosecond:
running_time = self.get_total_time_nanos()
else:
running_time = round(self.get_total_time_seconds(), 4)
return f'StopWatch [{self.__unique_id}]: running time = {running_time} {time_type.value}'
def pretty_print(self, time_type: TimeType = None) -> str:
"""
获取当前统计时间数据详情 并生成字符串
:param time_type: 计时器输出的时间单位, 如果在此传入,则覆盖掉类构造函数中传入的 time_type
:return:
"""
time_type = time_type or self.__time_type
sb = io.StringIO()
sb.write(self.short_summary())
sb.write('\n')
if not self.__keep_task_list:
sb.write('No task info kept')
else:
sb.write('---------------------------------------------\n')
sb.write(f'{time_type.value} % Task name\n')
sb.write('---------------------------------------------\n')
for task_info in self.__task_info:
if time_type == TimeType.millisecond:
sb.write(f'{str(task_info.get_time_millis()).zfill(12)} ')
elif time_type == TimeType.nanosecond:
sb.write(f'{str(task_info.get_time_nanos()).zfill(12)} ')
else:
sb.write(f'{format(task_info.get_time_seconds(), ".4f").zfill(12)} ')
percent = task_info.get_time_seconds() / self.get_total_time_seconds()
sb.write(f'{str(format(percent * 100, ".2f")).zfill(6)}% ')
sb.write(f'{task_info.task_name}\n')
return sb.getvalue()
def __str__(self):
sb = io.StringIO()
sb.write(self.short_summary())
if not self.__keep_task_list:
sb.write('; no task info kept')
else:
for task_info in self.__task_info:
sb.write(f'; [')
sb.write(f'{task_info.task_name}')
sb.write('] took ')
sb.write(f'{task_info.get_time_nanos()} ns')
sb.write(f' = {round(task_info.get_time_seconds() / self.get_total_time_seconds(), 2)}%')
return sb.getvalue()