Robust Applications¶
This section contains options and design patterns for creating robust applications that has monitors, persistence and tolerance for errors.
Silence Errors¶
One might not want to silence errors in development or in test but having error tolerance in production could be useful.
By default, errors caused by the conditions or tasks outside their execution crash the system. However, you can silence those errors:
from rocketry import Rocketry
from rocketry.log import LogRecord
app = Rocketry(config={
'silence_task_prerun': True,
'silence_task_logging': True,
'silence_cond_check': True
})
Note
The errors and their tracebacks are still logged to
logger called rocketry.scheduler
even if they are
silenced. Setting this logger up helps with diagnosing
the errors.
Warning
You should first address the errors than silence them. Silencing task logging errors can cause the scheduler to be not in sync with the real events.
Log to Database¶
Sometimes your application might crash. If this happens you might want to continue where it was left previously. You need to store the log records to a file or to a database to achieve this.
Task logging is done using Red Bird’s repositories. You can use any repository supported by the library or create your own. In this section we will go through some of them. Please consult Red Bird’s documentation for more.
To CSV¶
Logging to CSV files is a good option for simple projects:
from redbird.repos import SQLRepo
from rocketry.log import RunRecord
repo = CSVFileRepo(model=RunRecord, filename="tasks.csv")
repo.create()
app.session.set_repo(repo, delete_existing=True)
To SQL¶
Logging to relational database is a good option for larger projects that:
from redbird.repos import SQLRepo
from rocketry.log import RunRecord
from sqlalchemy import create_engine
engine = create_engine('sqlite://')
engine.execute("""CREATE TABLE log (
id INTEGER PRIMARY KEY,
created FLOAT,
task_name TEXT,
run_id TEXT,
action TEXT
)""")
repo = SQLRepo(model=RunRecord, table="log", engine=engine, id_field="id")
app.session.set_repo(repo, delete_existing=True)
Warning
SQLAlchemy’s ORM requires primary key.
It is adviced to create incremential index as created
could have multiple same values
if your system is fast enough.
Warning
If logging the task logs fail, the scheduler might not be in sync. Often the errors are caused by temporary connection failures and it might be useful to retry the log insertion:
from functools import wraps
def retry_func(func):
@wraps(func)
def wrapper(*args, **kwargs):
err = None
for i in range(3):
try:
return func(*arg, **kwargs)
except Exception as exc:
err = exc
raise err
return wrapper
repo = SQLRepo(model=LogRecord, table="log", engine=engine, id_field="id")
repo.emit = retry_func(repo.emit)
Logging Task Errors¶
Logging errors is often cruicial for diagnostic purposes and for quickly addressing failures. Because Rocketry simply extends logging library, you can direct the log task records to anywhere you wish.
Error Emails¶
Recommended way to send errors via email is to use Red Mail’s email handler and add it to the logger that handles the task logs. Red Mail is an advanced email sending library created by Rocketry’s author.
import logging
from redmail.log import EmailHandler
handler = EmailHandler(
host="localhost",
port=0,
sender="no-reply@example.com",
receivers=["me@example.com"],
subject="Task failed",
html="""
<h2>Task failed: {{ record.task_name }}</h2>
<code><pre>{{ record.exc_text }}</pre></code>
""",
)
handler.setLevel(logging.ERROR)
task_logger = logging.getLogger("rocketry.task")
task_logger.addHandler(handler)
First we created a logging handler that sends emails, then we set the level of this handler to log errors only and then we set this handler to Rocketry’s task logger.
Warning
Sometimes email sending might fail due to connection
problems. It might be safer to wrap the emit
method with a try-except block.
Logging Scheduler¶
You can also setup logging for the scheduler. This could be useful for additional diagnostics.
import logging
sched_logger = logging.getLogger("rocketry.scheduler")
sched_logger.addHandler(logging.StreamHandler())
Retry Failed¶
Sometimes you might want to retry a failed task. Of course, not all tasks are safe to retry but some might. For example,
from rocketry.conds import retry, daily
@app.task(daily | retry(3))
def do_things():
...
The above runs once a day but it will retry maximum of three times if it fails.
However, sometimes the task might run hours before it fails thus it might be useful to force a time window in which the task is allowed to run:
from rocketry.conds import retry, daily, time_of_day
@app.task((daily | retry(3)) & time_of_day.between("10:00", "12:00"))
def do_things():
...
The above runs once a day between 10:00 and 12:00. It will also retry maximum of three times if it fails and time is still between 10:00 and 12:00.