Source code for ewoksxrpd.tasks.log
import logging
import time
from contextlib import contextmanager
from typing import Any
from typing import Generator
from typing import Iterator
from typing import Optional
from typing import Tuple
from ewoksdata.data.contextiterator import contextiterator
_logger = logging.getLogger(__name__)
[docs]
@contextiterator
def zip_with_progress(
*iterators: Iterator[Iterator[Any]],
message: str = "Progress %d/%s",
nmax: Optional[int] = None,
period: float = 5,
logger=None,
iter_time_name: str = "iteration",
) -> Generator[Tuple[Any], None, None]:
"""Like python's zip but will progress logging when iterating over the result.
In addition it yields a context manager which allows for timing different sections
in the context as well as the yielding time itself:
.. code-block::python
for v1, v2, tmcontext in zip_with_progress([...], [...], iter_time_name="read"):
with tmcontext("process"):
...
with tmcontext("write"):
...
"""
ctimer = _ContextTimer(iter_time_name)
if logger is None:
logger = _logger
i = 0
t0 = time.time()
if nmax is None:
nmax = "?"
try:
for tpl in zip(*iterators):
yield (*tpl, ctimer.tmcontext)
i += 1
if (time.time() - t0) > period:
t0 = time.time()
_logger.info(f"{message} (ONGOING: {ctimer})", i, nmax)
finally:
_logger.info(f"{message} (FINISHED: {ctimer})", i, nmax)
class _ContextTimer:
def __init__(self, rest_context: str = "other") -> None:
self._total_t0 = time.perf_counter_ns()
self._context_time = dict()
self._rest_context = rest_context
@contextmanager
def tmcontext(self, name: str) -> Generator[None, None, None]:
context_t0 = time.perf_counter_ns()
try:
yield
finally:
context_t1 = time.perf_counter_ns()
self._context_time.setdefault(name, 0)
self._context_time[name] += context_t1 - context_t0
def __str__(self) -> str:
dt_total = time.perf_counter_ns() - self._total_t0
dt_sum = sum([dt for dt in self._context_time.values()])
dt_rest = dt_total - dt_sum
dts = [
f"{name}={self.strftime(dt, dt_total)}"
for name, dt in self._context_time.items()
]
dts.append(f"{self._rest_context}={self.strftime(dt_rest, dt_total)}")
dts.append(f"TOTAL={self.strftime(dt_total, dt_total)}")
return ", ".join(dts)
@staticmethod
def strftime(dt_ns: int, dt_total: int) -> str:
m, s = divmod(int(dt_ns * 1e-9 + 0.5), 60)
h, m = divmod(m, 60)
f = int(dt_ns / dt_total * 100 + 0.5)
return f"{h:02d}:{m:02d}:{s:02d} ({f:d}%%)"