import sys
import numpy as np
import datetime as dt
import os
import glob
import time
from spacekit.logger.log import Logger, SPACEKIT_LOG
class Stopwatch:
def __init__(self, func, log=None, out="."):
self.func = func
self.log = log
self.out = out
self.lap = 0
self.laps = {}
self.ps = self.func.__name__
self.t0 = None
self.p0 = None
self.t1 = None
self.p1 = None
self.walltime = None
self.clocktime = None
self.delta = None
if self.log is None:
self.log = Logger(self.ps, console_log_level="info").setup_logger()
def clockit(self):
def wrap(*args, **kwargs):
self.start()
result = self.func(*args, **kwargs)
self.stop()
return result
return wrap
def start(self):
self.t0, self.p0 = time.time(), time.process_time()
self.record(self.t0, "STARTED")
def stop(self):
self.t1, self.p1 = time.time(), time.process_time()
self.record(self.t1, "COMPLETED")
self.duration()
self.log.info(f"WALL [{self.ps}] : {self.walltime}")
self.log.info(f"CLOCK [{self.ps}] : {self.clocktime}")
def record(self, t, info):
timestring = dt.datetime.fromtimestamp(t).strftime("%m/%d/%Y - %H:%M:%S")
self.log(f"{timestring} [i] {info} [{self.ps}]")
if self.delta:
self.log.info(f"Duration [{self.ps}] : {self.delta[0]} {self.delta[1]}\n")
self.lap += 1
self.laps[self.lap] = self.delta
self.reset()
def reset(self):
self.t0 = None
self.t1 = None
self.p0 = None
self.p1 = None
self.delta = None
self.walltime = None
self.clocktime = None
def duration(self, wall=True):
"""calculates total duration from start to end of a process that finished running.
Parameters
----------
start : int
starting interval clocktime
end : int
end interval clocktime
prcname : str, optional
name of running process, by default ""
"""
self.walltime = np.round((self.t1 - self.t0), 2)
self.clocktime = np.round((self.p1 - self.p0), 2)
s = self.walltime if wall is True else self.clocktime
m = np.round((s / 60), 2)
h = np.round((m / 60), 2)
if s > 3600:
self.delta = (h, "HOURS")
elif s > 60:
self.delta = (m, "MINUTES")
else:
self.delta = (s, "SECONDS")
[docs]def proc_time(start, end, prcname=""):
"""calculates total duration from start to end of a process that finished running.
Parameters
----------
start : int
starting interval clocktime
end : int
end interval clocktime
prcname : str, optional
name of running process, by default ""
"""
duration = np.round((end - start), 2)
proc_time = np.round((duration / 60), 2)
if duration > 3600:
t = f"{np.round((proc_time / 60), 2)} hours."
elif duration > 60:
t = f"{proc_time} minutes."
else:
t = f"{duration} seconds."
print(f"Process [{prcname}] : {t}\n")
return t
# TODO: record laps (eg. for loading train test val image sets)
[docs]def stopwatch(prcname, t0=None, t1=None, out=".", log=True, subset_name=None):
"""Times a process from start to finish and (optionally) records the intervals and total duration in a text file on disk.
Parameters
----------
prcname : str
name of running process
t0 : int, optional
time.time timestamp start interval, by default None
t1 : int, optional
time.time timestamp end interval], by default None
out : str, optional
location to save recorded clocktimes, by default "."
log : bool, optional
record process clocktimes in a text file on disk, by default True
"""
lap = 0
if t1 is not None:
info = "COMPLETED"
t = t1
if t0 is not None:
lap = 1
else:
info = "STARTED"
t = t0
timestring = dt.datetime.fromtimestamp(t).strftime("%m/%d/%Y - %H:%M:%S")
message = f"{timestring} [i] {info} [{prcname}]"
print(message)
fname = "clocktime.txt" if subset_name is None else f"clocktime{subset_name}.txt"
if log is True:
sysout = sys.stdout
with open(f"{out}/{fname}", "a") as timelog:
sys.stdout = timelog
print(message)
if lap:
proc_time(t0, t1, prcname=prcname)
sys.stdout = sysout
def get_file_metrics(visit_path):
fpaths = glob.glob(f"{visit_path}/*.fits")
n_files = len(fpaths)
if n_files > 0:
total_size = sum([os.stat(fname).st_size for fname in fpaths])
return n_files, total_size
else:
return 0, 0
def timer(t0=None, clock=None):
if t0 is None:
return time.time(), time.process_time()
else:
walltime = time.time() - t0
clocktime = time.process_time() - clock
return walltime, clocktime
def clockit(func):
ps = func.__name__
def wrap(*args, **kwargs):
start = time.time()
start(ps, t0=start)
result = func(*args, **kwargs)
end = time.time()
stopwatch(ps, t0=start, t1=end)
print(end - start)
return result
return wrap
def record_metrics(
log_dir, visit, wall, clock, ps="svm", n_files=None, total_size=None
):
log_file = os.path.join(log_dir, f"{ps}-stats.txt")
metrics = {"visit": visit, "walltime": wall, "clocktime": clock}
if n_files:
metrics.update({"n_files": n_files, "total_size": total_size})
with open(log_file, "w") as lf:
for k, v in metrics.items():
lf.write(f"{k}: {v}\n")
def xtimer(fn):
def inner(*args, **kwargs):
start_time = time.perf_counter()
to_execute = fn(*args, **kwargs)
end_time = time.perf_counter()
execution_time = end_time - start_time
SPACEKIT_LOG.info('{0} took {1:.8f}s to execute'.format(fn.__name__, execution_time))
return to_execute
return inner