patterns module¶
periodical(frequency=1, monitoring_cb=None)
¶
A decorator to call a function periodically
Source code in async_app/patterns.py
def periodical(frequency=1, monitoring_cb=None):
"""A decorator to call a function periodically"""
call_every = 1 / frequency
def inner_func(f):
@wraps(f)
async def wrapper(*args, **kwargs):
f_is_async = True if asyncio.iscoroutinefunction(f) else False
while app_state.keep_running:
tic = time.perf_counter()
if f_is_async:
await f(*args, **kwargs)
else:
f(*args, **kwargs)
if monitoring_cb:
monitoring_cb()
toc = time.perf_counter()
sleep_for = max(0, call_every - (toc - tic))
await asyncio.sleep(sleep_for)
return wrapper
return inner_func