app module¶
AsyncApp
¶
Source code in async_app/app.py
class AsyncApp(object):
def __init__(self, **kwargs):
self.name = app_name
logger.info(f"Initializing AsyncApp {self.name}")
logger.debug(f"My kwargs: {kwargs}")
self.task_descriptions = {
"init": [],
"continuous": [],
"periodic": [],
"cleanup": [],
}
self.tasks = []
self.results = []
self.periodicals = {}
self.periodicals_timing = {}
self.periodicals_timing_maxlen = 21
self.process_default_options(**kwargs)
def process_default_options(self, **kwargs):
logger.info("Processing default async_app options.")
monitor_mapping = {
"process_monitoring_frequency": process_monitor,
"system_monitoring_frequency": system_monitor,
"task_monitoring_frequency": self.task_monitor,
"periodicals_monitoring_frequency": self.periodicals_monitor,
}
for key, monitoring_function in monitor_mapping.items():
# if the async_app_options are not used, the expected values might not have been set.
if not key in kwargs.keys():
continue
monitoring_frequency = kwargs[key]
if monitoring_frequency > 0:
task_description = {
"kind": "periodic",
"function": monitoring_function,
"frequency": monitoring_frequency,
}
self.add_task_description(task_description)
def add_monitoring_ts(self, uid):
self.periodicals_timing[uid].append(time.perf_counter())
def add_task_description(self, task_description):
"""Add a task to todo list including args and kwargs."""
logger.info(f"Adding new task with {task_description=}")
task_description["name"] = task_description["function"].__name__
task_description["uid"] = str(uuid.uuid4())
# normalize task descriptions. Make sure expected properties exist
# re-write task kinds
kind = task_description["kind"]
if kind.lower() in ("init", "initialize"):
task_description["kind"] = "init"
self.task_descriptions["init"].append(task_description)
elif kind.lower() in ("continuous", "continuously"):
task_description["kind"] = "continuous"
self.task_descriptions["continuous"].append(task_description)
elif kind.lower() in ("periodic", "periodical", "periodically"):
task_description["kind"] = "periodic"
if "call_every" in task_description.keys():
task_description["frequency"] = 1 / task_description["call_every"]
self.task_descriptions["periodic"].append(task_description)
elif kind.lower() in ("cleanup", "teardown"):
task_description["kind"] = "cleanup"
self.task_descriptions["cleanup"].append(task_description)
else:
logger.error(f"Unknown task kind '{kind}' detected. Not adding task.")
return
async def create_tasks(self, kind):
"""Initialize tasks of kind 'kind'."""
# filter for relevant tasks
task_descriptions = self.task_descriptions[kind]
print(task_descriptions)
tasks = []
for task_description in task_descriptions:
logger.debug(f"Creating asyncio task for {task_description=}")
# mandatory properties for 'continuous' and 'periodical' tasks
kind = task_description["kind"]
function = task_description["function"]
name = task_description["name"]
uid = task_description["uid"]
# optional properties
args = task_description.get("args", ())
kwargs = task_description.get("kwargs", {})
# derived properties
f_is_async = asyncio.iscoroutinefunction(function)
if kind == "init":
if f_is_async:
task = asyncio.create_task(function(*args, **kwargs), name=name)
else:
task = asyncio.create_task(
asyncio.to_thread(function(*args, **kwargs)), name=name
)
# self.model["init"]["tasks"].append(task)
tasks.append(task)
elif kind == "continuous":
if f_is_async:
task = asyncio.create_task(function(*args, **kwargs), name=name)
else:
task = asyncio.create_task(
asyncio.to_thread(function(*args, **kwargs)), name=name
)
# self.model["main"]["tasks"].append(task)
tasks.append(task)
elif kind == "periodic":
# mandatory properties for 'periodical' tasks
frequency = task_description["frequency"]
# optional properties for 'periodical' tasks
monitor = task_description.get("monitor", False)
if monitor:
# add callback to monitor performance
self.periodicals_timing[uid] = deque(
[float("nan")] * self.periodicals_timing_maxlen,
maxlen=self.periodicals_timing_maxlen,
)
monitoring_callback = functools.partial(self.add_monitoring_ts, uid)
else:
monitoring_callback = None
task = asyncio.create_task(
periodical(frequency, monitoring_callback)(function)(
*args, **kwargs
),
name=name,
)
tasks.append(task)
# self.model["main"]["tasks"].append(task)
self.periodicals[uid] = name
elif kind == "cleanup":
cleanup_function = functools.partial(function, *args, **kwargs)
asyncio_atexit.register(cleanup_function)
else:
# will never be reached
logger.warning(
f"Unknown task kind detected: {kind=}. Task will not be executed!"
)
# extend self.tasks for the task monitor to work properly
self.tasks.extend(tasks)
return tasks
async def run_tasks(self, tasks):
"""Run tasks of kind 'kind'."""
logger.info(f"Processing tasks")
try:
for coro in asyncio.as_completed(tasks):
try:
await coro
status = "completed"
except Exception as e:
status = "failed"
finally:
task_name = coro.__name__
now = dt.datetime.now()
logger.debug(f"Task {task_name} {status} at {now.isoformat()}")
except asyncio.CancelledError:
logger.info(f"Task {task_name} was cancelled")
async def run(self):
# make sure to initialize cleanup tasks first
tasks = await self.create_tasks("cleanup")
# Next would be init tasks run first
tasks = await self.create_tasks("init")
await self.run_tasks(tasks)
# only after that run the main tasks
continuous_tasks = await self.create_tasks("continuous")
periodic_tasks = await self.create_tasks("periodic")
await self.run_tasks(continuous_tasks + periodic_tasks)
logger.info("All work is done. Here's the outcome")
# logger.info(f"Results: {json.dumps(self.results, indent=log_indent)}")
results = []
for task in self.tasks:
result = {
"name": task.get_name(),
"result": None,
"exception": None,
}
try:
# In case of a manual exit, before anything has been completed, neither an exception nor a result has been set
# querying these will lead to an exception ...
if not task.exception():
result["result"] = task.result()
else:
result["exception"] = str(task.exception())
except asyncio.exceptions.InvalidStateError:
logger.warning(
f"Task {task.get_name()} could not be queried for results or exceptions."
)
finally:
results.append(result)
logger.info(f"Results: {json.dumps(results, indent=log_indent)}")
async def task_monitor(self):
"""A tasks monitor."""
tasks_running = []
tasks_done = []
tasks_failed = []
for task in self.tasks:
task_name = task.get_name()
if task.done():
if task.exception():
tasks_failed.append(task_name)
else:
tasks_done.append(task_name)
else:
tasks_running.append(task_name)
record = {
"running": {
"count": len(tasks_running),
"tasks": tasks_running,
},
"done": {
"count": len(tasks_done),
"tasks": tasks_done,
},
"failed": {
"count": len(tasks_failed),
"tasks": tasks_failed,
},
}
logger.debug(json.dumps(record, indent=4))
await app_messenger.publish(f"{app_name}:task_monitor", record)
await app_messenger.set(f"{app_name}:task_monitor", record)
def periodicals_monitor(self):
record = {}
for _uuid, periodical_timing in self.periodicals_timing.items():
ts = np.array(periodical_timing)
f = 1 / np.diff(ts).mean()
task_name = self.periodicals[_uuid]
record[task_name] = f
logger.debug(json.dumps(record, indent=log_indent))
def exit(self, *args):
"""Exit hook."""
logger.info("Exit requested. GoodBye")
app_state.keep_running = False
add_task_description(self, task_description)
¶
Add a task to todo list including args and kwargs.
Source code in async_app/app.py
def add_task_description(self, task_description):
"""Add a task to todo list including args and kwargs."""
logger.info(f"Adding new task with {task_description=}")
task_description["name"] = task_description["function"].__name__
task_description["uid"] = str(uuid.uuid4())
# normalize task descriptions. Make sure expected properties exist
# re-write task kinds
kind = task_description["kind"]
if kind.lower() in ("init", "initialize"):
task_description["kind"] = "init"
self.task_descriptions["init"].append(task_description)
elif kind.lower() in ("continuous", "continuously"):
task_description["kind"] = "continuous"
self.task_descriptions["continuous"].append(task_description)
elif kind.lower() in ("periodic", "periodical", "periodically"):
task_description["kind"] = "periodic"
if "call_every" in task_description.keys():
task_description["frequency"] = 1 / task_description["call_every"]
self.task_descriptions["periodic"].append(task_description)
elif kind.lower() in ("cleanup", "teardown"):
task_description["kind"] = "cleanup"
self.task_descriptions["cleanup"].append(task_description)
else:
logger.error(f"Unknown task kind '{kind}' detected. Not adding task.")
return
create_tasks(self, kind)
async
¶
Initialize tasks of kind 'kind'.
Source code in async_app/app.py
async def create_tasks(self, kind):
"""Initialize tasks of kind 'kind'."""
# filter for relevant tasks
task_descriptions = self.task_descriptions[kind]
print(task_descriptions)
tasks = []
for task_description in task_descriptions:
logger.debug(f"Creating asyncio task for {task_description=}")
# mandatory properties for 'continuous' and 'periodical' tasks
kind = task_description["kind"]
function = task_description["function"]
name = task_description["name"]
uid = task_description["uid"]
# optional properties
args = task_description.get("args", ())
kwargs = task_description.get("kwargs", {})
# derived properties
f_is_async = asyncio.iscoroutinefunction(function)
if kind == "init":
if f_is_async:
task = asyncio.create_task(function(*args, **kwargs), name=name)
else:
task = asyncio.create_task(
asyncio.to_thread(function(*args, **kwargs)), name=name
)
# self.model["init"]["tasks"].append(task)
tasks.append(task)
elif kind == "continuous":
if f_is_async:
task = asyncio.create_task(function(*args, **kwargs), name=name)
else:
task = asyncio.create_task(
asyncio.to_thread(function(*args, **kwargs)), name=name
)
# self.model["main"]["tasks"].append(task)
tasks.append(task)
elif kind == "periodic":
# mandatory properties for 'periodical' tasks
frequency = task_description["frequency"]
# optional properties for 'periodical' tasks
monitor = task_description.get("monitor", False)
if monitor:
# add callback to monitor performance
self.periodicals_timing[uid] = deque(
[float("nan")] * self.periodicals_timing_maxlen,
maxlen=self.periodicals_timing_maxlen,
)
monitoring_callback = functools.partial(self.add_monitoring_ts, uid)
else:
monitoring_callback = None
task = asyncio.create_task(
periodical(frequency, monitoring_callback)(function)(
*args, **kwargs
),
name=name,
)
tasks.append(task)
# self.model["main"]["tasks"].append(task)
self.periodicals[uid] = name
elif kind == "cleanup":
cleanup_function = functools.partial(function, *args, **kwargs)
asyncio_atexit.register(cleanup_function)
else:
# will never be reached
logger.warning(
f"Unknown task kind detected: {kind=}. Task will not be executed!"
)
# extend self.tasks for the task monitor to work properly
self.tasks.extend(tasks)
return tasks
exit(self, *args)
¶
Exit hook.
Source code in async_app/app.py
def exit(self, *args):
"""Exit hook."""
logger.info("Exit requested. GoodBye")
app_state.keep_running = False
run_tasks(self, tasks)
async
¶
Run tasks of kind 'kind'.
Source code in async_app/app.py
async def run_tasks(self, tasks):
"""Run tasks of kind 'kind'."""
logger.info(f"Processing tasks")
try:
for coro in asyncio.as_completed(tasks):
try:
await coro
status = "completed"
except Exception as e:
status = "failed"
finally:
task_name = coro.__name__
now = dt.datetime.now()
logger.debug(f"Task {task_name} {status} at {now.isoformat()}")
except asyncio.CancelledError:
logger.info(f"Task {task_name} was cancelled")
task_monitor(self)
async
¶
A tasks monitor.
Source code in async_app/app.py
async def task_monitor(self):
"""A tasks monitor."""
tasks_running = []
tasks_done = []
tasks_failed = []
for task in self.tasks:
task_name = task.get_name()
if task.done():
if task.exception():
tasks_failed.append(task_name)
else:
tasks_done.append(task_name)
else:
tasks_running.append(task_name)
record = {
"running": {
"count": len(tasks_running),
"tasks": tasks_running,
},
"done": {
"count": len(tasks_done),
"tasks": tasks_done,
},
"failed": {
"count": len(tasks_failed),
"tasks": tasks_failed,
},
}
logger.debug(json.dumps(record, indent=4))
await app_messenger.publish(f"{app_name}:task_monitor", record)
await app_messenger.set(f"{app_name}:task_monitor", record)