Controlling Runtime

There are several cases where one may want to control the runtime scheduling session:

  • Create maintenance tasks.

  • Command the scheduler from a user interface.

  • Command the scheduler from an external program.

The interaction with the scheduling session is meant to be done via the session instance. This can be accessed using argument rocketry.args.Session:

from rocketry.args import Session

@app.task()
def do_things(session=Session()):
    ...

Alternatively, you can access this by simply using the app.session. However, for bigger applications it is advised to use the argument.

Manipulating the Session

To restart the scheduler in a task:

from rocketry.args import Session

@app.task()
def restart(session=Session()):
    session.restart()

To shut down the scheduler in a task:

from rocketry.args import Session

@app.task()
def shut_down(session=Session()):
    session.shut_down()

Manipulating Other Tasks

To remove a task from the scheduler:

from rocketry.args import Session

@app.task()
def do_things():
    ...

@app.task()
def remove_task(session=Session()):
    session.remove_task("do_things")

To create a task:

from rocketry.conds import daily
from rocketry.args import Session

def do_things():
    ...

@app.task()
def remove_task(session=Session()):
    session.create_task(func=do_things, start_cond=daily)

Accessing a Task

You can also access other tasks in runtime. This is useful for inspecting attributes of another task including its status and logs. You can also set a task running using the task instance.

You can use the Task argument:

from rocketry.args import Task

@app.task()
def do_on_self(this_task=Task()):
    ...

@app.task()
def do_on_other(another_task=Task(do_on_self)):
    ...

Or you can use the session:

from rocketry.args import Session

@app.task()
def do_things(session=Session()):
    task = session["do_things"]
    ...

Task Queue

Task queue is a list of tasks that are run one after another. Rocketry also supports run specific parameters thus you can also create a parametrized task queue.

Here is an example:

from rocketry.args import Session
import asyncio

def next_task():
    "Get next task from the queue"
    yield 'do_things', {}
    yield 'do_report', {'report_date': '2022-01-01'}

@app.task(on_startup=True, execution="async")
async def task_queue(session=Session()):
    queue = next_task()
    for task_name, params in queue:
        task = app.session[task_name]
        task.run(**params)

        # Wait till finish
        while task.is_running:
            await asyncio.sleep(5)