Execution¶
There are four methods to execute Rocketry tasks:
main
: Run on synchronously in main thread and processasync
: Run asynchronouslythread
: Run on separate threadprocess
: Run on separate process
Example usage:
@app.task(execution="main")
def do_main():
...
@app.task(execution="async")
async def do_async():
...
@app.task(execution="thread")
def do_thread():
...
@app.task(execution="process")
def do_process():
...
Here is a summary of the methods:
Execution |
Parallerized? |
Can be terminated? |
Can modify the session? |
---|---|---|---|
|
Yes |
Yes |
No |
|
Partially |
Yes if task supports |
Yes |
|
Partially |
Yes if awaits |
Yes |
|
No |
No |
Yes |
If your problem is CPU bound (uses a lot of computational resources), you
should use process
as the execution type. If your problem
is IO bound, then you should use async
if your IO libraries support async and
thread
if they don’t support. If you wish to run your code completely
synchronously, use main
.
Note that process
cannot share the memory with the scheduler engine thus
it cannot modify its content on runtime. Therefore if you wish to build APIs
or other ways modify the scheduler on runtime you should use either main
,
async
or thread
as the execution type.
main
and async
are the least expensive in terms of time and resources it
takes to initiate such task. thread
has some overhead and process
has
quite significantly.
You can also set the default execution method that is used if task does not specify it:
app = Rocketry(execution="async")
@app.task()
def do_main():
...
Main¶
This execution method runs the task unparallelized on main thread and process. The executed task can utilize async but tasks with this execution type blocks the scheduler to do anything else when such a task is running. In other words, if a task with this execution type is running, the scheduler cannot check and launch other tasks in the mean time or do anything else.
Note
If the task can utilize async but the scheduler will not continue scheduling until the task is completed (failed or success).
For example, this task blocks the scheduler:
@app.task(execution="main")
async def do_async():
...
Warning
Tasks with execution main
cannot be terminated. If they get stuck,
the scheduler will get stuck.
Useful if you wish to run synchronously and block all other execution in the scheduler in the meantime.
Async¶
This execution method runs the task asynchronously using asyncio library.
The task should utilize the async in order to gain benefits over
main
execution method. Otherwise the task will block scheduler
to do other things such as launching other tasks, terminating tasks
or checking shutting conditions.
import asyncio
@app.task(execution="async")
async def do_sync():
await asyncio.sleep(10) # Do something async
Warning
Your task also should also use async. Otherwise the task
may completely block the scheduler and the end result is the same
as running with execution as main
. The task also cannot be
terminated if it does not await
.
This does not benefit from async:
@app.task(execution="async")
def do_sync():
...
And neither does this (as it does not use async.sleep
):
import time
@app.task(execution="async")
asnyc def do_sync():
time.sleep(10) # Do something without await
Warning
If a task with execution async
gets stuck and it cannot call await
,
the scheduler will get stuck as well.
Note
Due to GIL (Global Interpreter Lock), only one Python line can be executed at once. Therefore pure Python code without any IO operations won’t have any performance benefits. However, if there are IO operations, there could be improvements in performance.
In order to make async
task to be terminated, the task should await at some point.
Useful for IO bound problems or to integrate APIs.
Thread¶
This execution method runs the task on a separate thread using threading library.
Note
Due to GIL (Global Interpreter Lock), only one Python line can be executed at once. Therefore pure Python code without any IO operations won’t have any performance benefits. However, if there are IO operations, there could be improvements in performance.
In order to make thread
task to be terminated, the task should listen the termination flag:
from rocketry.args import TerminationFlag
from rocketry.exc import TaskTerminationException
@app.task(execution="thread")
def do_thread(flag=TerminationFlag()):
while True:
... # Do something
if flag.is_set():
raise TaskTerminationException()
Warning
If a task with execution thread
never finishes and it does not respect termination,
the scheduler cannot shut down itself.
Note
Terminated thread tasks should raise TaskTerminationException
to signal
they finished prematurely due to termination. Without an exception, the task
was considered to run successfully.
Useful for IO bound problems where there are no async support.
Process¶
This execution method runs the task on a separate process using multiprocessing library.
Warning
The process cannot share memory with the main scheduler thus you cannot
modify the state of the scheduler from tasks parallelized with process
.
However, you may still pass parameters to the task.
Warning
You cannot pass parameters that cannot be pickled to tasks with
process
as the execution type.
Warning
Especially using process
execution type, running the application must be
wrapped with if __name__ == "__main__":
block or otherwise launching a process
task may launch another instance of the scheduler. Also, you should not wrap the
task function with decorators with execution type as process
as otherwise
serialization of the function fails in initiating the process.
Useful for CPU bound problems or for problems in which the code has tendency to get stuck.
Multilaunch¶
By default, each task can be set running only once. In other words, if a task is
already running it cannot be started even if its starting condition was still true
or it was set running using task.run()
. This limitation is in place to make
sure the task logs are consistent.
To illustrate this, your task must finish before it could be started again:
created |
task_name |
action |
---|---|---|
1 |
do_things |
run |
2 |
do_things |
success |
3 |
do_things |
run |
4 |
do_things |
success |
However, in some cases is not preferable and you might want to allow same task to
start multiple times. This can be done by setting multilaunch
true either on session
configuration or to an individual task:
app = Rocketry(config={'multilaunch': True})
@app.task(multilaunch=True)
def do_things():
...
This task can produce logs similar to the following:
created |
task_name |
action |
---|---|---|
1 |
do_things |
run |
2 |
do_things |
run |
3 |
do_things |
success |
4 |
do_things |
success |
There is also a run_id
passed to logs which you can
use to identify individual runs and track their finish.
from rocketry.log import MinimalRunRecord
app = Rocketry(config={'multilaunch': True})
# Setting the log record model
repo = app.session.get_repo()
repo.model = MinimalRunRecord
@app.task(multilaunch=True)
def do_things():
...
By default, this will produce log similar to this:
created |
task_name |
run_id |
action |
---|---|---|---|
1 |
do_things |
fe5018… |
run |
2 |
do_things |
fab4db… |
run |
3 |
do_things |
fe5018… |
success |
4 |
do_things |
fab4db… |
success |
By default, run_id
is a Universal Unique Identifier (UUID).
You can also create custom run IDs based on, for example,
the arguments:
import json
def generate_run_id(task, params=None):
return json.dumps(dict(params), default=str)
@app.task(multilaunch=True, func_run_id=generate_run_id)
def do_things(report_date):
...
You can also set the generator function as the session default:
app = Rocketry(config={'multilaunch': True, 'func_run_id': generate_run_id})
When the task is run (with a date parameter), the logs could look like:
created |
task_name |
run_id |
action |
---|---|---|---|
1 |
do_things |
{“report_date”: “2022-01-01”} |
run |
2 |
do_things |
{“report_date”: “2022-01-02”} |
run |
3 |
do_things |
{“report_date”: “2022-01-01”} |
success |
4 |
do_things |
{“report_date”: “2022-01-02”} |
success |