Advanced Tutorial

This is an advanced level tutorial.

Task Types

So far, we have only used FuncTask with passing a callable. There are other task types as well to cover most common use cases:

  • FuncTask: Executes a Python function

  • CommandTask: Executes a shell command

  • CodeTask: Executes raw code as string. Potentially dangerous.

Here are the ways to initialize tasks:

@app.task('daily')
def do_things():
    ...

def run_things():
    ...

app.task('daily', func=run_things)

app.task('daily', func_name="main", path="path/to/example.py")

app.task('daily', command='echo "Hello world"')

app.task('daily', code='print("Hello world")')

Metatasks

The scheduler system can be modified in runtime. You could during the runtime:

  • shut down the scheduler

  • restart the scheduler

  • force a task to be run

  • disable a task

  • create, update or delete tasks

To do there, you can create a task that runs either as a separate thread or on the main loop and pass the session or a task in the task parameters. Tasks parallelized as separate processes cannot alter the scheduling environment due to limitations with sharing memory.

To access the session:

from rocketry.args import Session

@app.task(execution="thread")
def do_shutdown(session=Session()):
    session.shutdown()

You can also access other tasks in runtime. To do so, use Session or Task arguments to access tasks.

We have the following task:

@app.task()
def do_things():
    ... # Just some task

To access this task using the Session argument:

from rocketry.args import Session

@app.task(execution="thread")
def read_task(session=Session()):
    # Get by name
    task = session['do_things']

    # Or by function
    task = session[do_things]
    ...

    # Or just loop the tasks
    for task in session.tasks:
        if task.name == "do_things":
            break
    ...

To access this task using the Task argument:

from rocketry.args import Task

@app.task(execution="thread")
def read_task(task=Task(do_things)):
    ...

Access Task Logs

Now that we know how to access tasks in runtime, we can read the logs of our task.

Let’s take this again as an example:

@app.task()
def do_things():
    ...

Then we make a task that fetch the task and queries its log:

from rocketry.args import Session

@app.task(execution="thread")
def read_logs(session=Session()):
    task = session['do_things']

    run_logs = task.logger.filter_by(action="run").all()
    success_logs = task.logger.filter_by(action="success").all()
    fail_logs = task.logger.filter_by(action="fail").all()
    ...