Intermediate Tutorial

This is an intermediate level tutorial. In this tutorial we go through aspects that you might not come across in very simple applications but you eventually need to know.

Running as Async

By default, app.run starts a new event loop. If you wish to integrate other async apps, such as FastAPI, you can also call app.serve method which is an async method to start the scheduler:

import asyncio
from rocketry import Rocketry

app = Rocketry(execution="async")

@app.task()
async def do_things():
    ...

async def main():
    "Launch Rocketry app (and possibly something else)"
    rocketry_task = asyncio.create_task(app.serve())
    # Start possibly other async apps
    await rocketry_task

if __name__ == "__main__":
    asyncio.run(main())

Session Configurations

There are several options to tune the scheduling session. You might want to change some of the default configurations depending on your project. You might want to silence more errors on production than by default and you might want to change the default execution type.

Read more from the the config handbook.

App Settings

There are various ways to configure the application but the recommended pattern is to use the setup hook. Read more from the the app settings cookbook.

Using the Condition API

Previously we have used only the string syntax for scheduling. For simple use cases that is sufficient but if your project grows the strings may become a burden. The code analyzers cannot identify possible typos or other problems in them and there is a limitation in reuse. To fix these, there is a condition API that provides functions and instances that are quite similar than the components in the string syntax.

Here are some examples of how the condition API looks like:

from rocketry.conds import (
    every, hourly, daily,
    after_success,
    true, false
)

@app.task(every('10 seconds'))
def do_constantly():
    ...

@app.task(hourly)
def do_hourly():
    ...

@app.task(daily.between('08:00', '14:00'))
def do_daily():
    ...

@app.task(after_success(do_daily))
def do_after():
    ...

@app.task(true & false & ~(true | false))
def do_logic():
    ...

Read more about the condition API in the handbook. From now on, we swith to the condition API but you are free to use the string syntax. Most things work very similarly in both.

Condition Logic

Previously we have introduced some ways to schedule tasks. Sometimes the existing options are not enough and you need to compose a scheduling logic from multiple conditions. For such purpose, there are logical operations:

  • &: AND operator

  • |: OR operator

  • ~: NOT operator

Using these are pretty simple:

from rocketry.conds import true, false

@app.task(true)
def do_constantly():
    ...

@app.task(false)
def do_never():
    ...

@app.task(true & false)
def do_and():
    ...

@app.task(true | false)
def do_or():
    ...

@app.task(~false)
def do_not():
    ...

@app.task((true | false) & ~(true | false))
def do_nested():
    ...

We used conditions true and false but you may replace these with other conditions (ie. daily) from previous examples. Also note how we can use parentheses

Note

The operations are the same with string syntax. This is valid condition syntax:

"(true & false) | (false & ~true)"

Pipelining

Rocketry supports two types of task pipelining:

  • Run a task after another has succeeded, failed or both

  • Put the return or output value of a task as an input argument to another

Run Task After Another

There is are conditions that can be used for this purpose:

from rocketry.conds import after_success, after_fail, after_finish

@app.task()
def do_things():
    ...

@app.task(after_success(do_things))
def do_after_success():
    ...

@app.task(after_fail(do_things))
def do_after_fail():
    ...

@app.task(after_finish(do_things))
def do_after_fail_or_success():
    ...

Set Output as an Input

To pipeline the output-input, there is an argument for the problem. We go through arguments and parametrization with more detail soon but here is an example to pipeline the task returns:

from rocketry.args import Return

@app.task()
def do_first():

    return 'Hello World'

@app.task()
def do_second(arg=Return(do_first)):
    # arg's value is "Hello World"
    ...

Of course, the second task is not quaranteed to run after the first. You can combine the both to achieve proper pipelining:

from rocketry.conds import daily, after_success
from rocketry.args import Return

@app.task(daily)
def do_first():
    return 'Hello World'

@app.task(after_success(do_first))
def do_second(arg=Return(do_first)):
    # arg's value is "Hello World"
    ...

Parameterizing

Parameters are key-value pairs passed to the tasks. The value of the pair is called argument. The argument can be derived from the return of another task, from the return value of a function or a component of the scheduling framework.

There are also two scopes of parameters: session level and task level. Most of the time you are using session level parameters.

Here is an illustration of using a session level parameter:

from rocketry.args import Arg

# Setting parameters to the session
app.params(
    my_arg='Hello world'
)

@app.task()
def do_things(item = Arg('my_arg')):
    ...

We set a session level parameter (my_arg) and we used that in the task do_things. When the task is run, function argument item will get the value of my_arg from session level arguments which is "Hello world". This argument can be reused in multiple tasks as it was set on session level.

Setting an argument to task level only looks like this:

from rocketry.args import SimpleArg

@app.task()
def do_things(item = SimpleArg('Hello world')):
    ...

SimpleArg is just a placeholder argument that is simply the value that was passed (which is 'Hello world'). In the example above the argument is not reusable in other tasks.

Next we will cover some basic argument types that have more functionalities.

Function Argments

Function arguments are arguments which values are derived from the return value of a function. To set a session level function argument:

from rocketry.args import Arg

@app.param('my_arg')
def get_item():
    return 'hello world'

@app.task()
def do_things(item = Arg('my_arg')):
    ...

To set task-level-only function argument:

from rocketry.args import FuncArg

def get_item():
    return 'hello world'

@app.task()
def do_things(item = FuncArg(get_item)):
    ...

Meta Argments

Meta arguments are arguments that contain a component of the scheduling system. These are useful when you need to manipulate the session in a task (ie. shut down the scheduler or add/delete tasks) or manipulate some tasks (ie. force running or change attributes).

An example of the session argument:

from rocketry.args import Session

@app.task()
def manipulate_session(session = Session()):
    ...

An example of the task argument:

from rocketry.args import Task

@app.task()
def manipulate_task(this_task=Task(), another_task=Task('do_things')):
    ...

This is more advanced and we will get to the usage of these later.

Custom Conditions

Creating custom conditions is easy and you can combine your conditions with other conditions using simple logic. Simply call the condition wrapper in the app:

from rocketry.conds import daily

@app.cond()
def things_ready():
    ...
    return True or False

@app.task(daily & things_ready)
def do_things():
    ...

You can also pass arguments to the conditions:

from pathlib import Path
from rocketry.conds import daily

@app.cond()
def file_exists(file):
    return Path(file).is_file()

@app.task(daily & file_exists("myfile.csv"))
def do_things():
    ...

Note

You can pass the arguments as positional (ie. file_exists("myfile.csv")) or as keyword arguments (ie. file_exists(file="myfile.csv")).

You can also use Rocketry’s arguments:

from rocketry.args import Task
from rocketry.conds import daily

@app.cond()
def is_right_task(this_task=Task()):
    return this_task.name.startswith("do_")

@app.task(daily & is_right_task)
def do_things():
    ...

Warning

The conditions should be relatively simple and light. If your condition takes long time to inspect, it might slow down the scheduler. You can fix this by caching the value or by creating a task that runs parallel checking the value for the task.

Task Logging

Rocketry uses Red Bird’s logging handler for implementing a logger that can be read programmatically. Red Bird is a repository pattern library that abstracts database access from application code. This is helpful to create a unified interface to read the logs regardless if they are stored to a CSV file, SQL database or to a plain Python list in memory.

Log to Repository

By default, the logs are put to a Python list and they are gone if the scheduler is restarted. In many cases this is undesired as the scheduler does not know which task had already run, succeeded or failed in case of restart. Therefore you might want to store the log records to a disk by changing the default log repository.

The simplest way to configure the location of the logs is to pass the new repo as logger_repo:

from rocketry import Rocketry
from rocketry.log import MinimalRecord
from redbird.repos import CSVFileRepo

repo = CSVFileRepo(filename="tasks.csv", model=MinimalRecord)
app = Rocketry(logger_repo=repo)

In the example above, we changed the log records to go to a CSV file called tasks.csv. We also specified a log record format that contains the bare minimum. Read more about logging in the logging handbook.

Add Another Log Handlers

As the logger is simply extension of the logging library, you can also add other logging handlers as well:

import logging
from rocketry import Rocketry

app = Rocketry()

# Create a handler
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)

# Add the handler
task_logger = logging.getLogger('rocketry.task')
task_logger.addHandler(handler)

Warning

Make sure the logger rocketry.task has at least one redbird.logging.RepoHandler in handlers or the system cannot read the log information.

Task Naming

Each task should have a unique name within the session. If a name is not given to a task, the name is derived from the arguments of the task.

For function tasks if the name is not specified, the name is set as the name of the function:

>>> @app.task()
>>> def do_things():
>>>     ...

>>> app.session[do_things].name
'do_things'

Warning

As the name must be unique, an error is raised if you try to create multiple tasks from the same function or from multiple functions with same names without specifying name.

You can pass the name yourself as well:

@app.task(name="mytask")
def do_things():
    ...

Note

If you use the decotator (@app.task()) to define function task, the decorator returns the function itself due to pickling issues on some platforms. However, the task can be fetched from session using just the function: session[do_things]. There is a special attribute (__rocketry__) in the task function for enabling this.

Note

Task names are used in many conditions and in logging. They are essential in order to find out when a task started, failed or succeeded.