Arguments¶
Arguments are dynamic values for parameter key-value pairs. Read first about parameters from Parameterization.
There are several argument types including:
SimpleArg
: simple wrapper for a value, useful only for testingFuncArg
: represents the return value of a given functionArg
: represents a session parameterReturn
: represents the return value of a taskEnvArg
: represents an environment variableCliArg
: represents a command line argument for the whole program
There are also arguments that simply represent a Rocketry component:
Task
: represents a task instanceSession
: represents the session instance
Arguments are lazy in a sense that their value is defined only when needed, ie. when a task is about to start or when a condition is about to be inspected.
Note
If you don’t use an argument instance as the value of a parameter pair, the passed value will be whatever you put.
Pipelining¶
Arguments can also be pipelined. This is useful if you have multiple sources for a parameter and would like to get the one that is available.
from rocketry.args import Arg, EnvArg, CliArg
env_arg = EnvArg('ROCKETRY_ENV') >> CliArg('--env') >> Arg('env') >> SimpleArg('test')
@app.task()
def do_things(arg=env_arg):
...
The above will take the value of the environment variable ROCKETRY_ENV
if this exists,
else it will take the value of the CLI argument --env
if this exists, else it will take
the session argument env
if this exists and if none of the previous exists,
it will take the value test
.
SimpleArg¶
SimpleArg
is an argument that simply is the given value.
It is useful mostly in testing.
from rocketry.args import SimpleArg
@app.task()
def do_things(arg=SimpleArg('a value')):
assert arg == 'a value'
Arg¶
Arg
is an argument that simply is the given value.
It is useful mostly in testing.
from rocketry.args import Arg
@app.task()
def do_things(arg=Arg('my_session_param')):
assert arg == 'a value'
app.params(my_session_param="a value")
Note
You can also pass default
which is used
if the session parameter is not found: Arg('myparam', default="a value")
FuncArg¶
FuncArg
is an argument that will have the value
of a given function when the argument is materialized.
from rocketry.args import FuncArg
def get_value():
return 'a value'
@app.task()
def do_things(arg=FuncArg(get_value)):
assert arg == 'a value'
EnvArg¶
EnvArg
is an argument that will have the value
of a given environment variable.
import os
from rocketry.args import EnvArg
@app.task()
def do_things(arg=EnvArg("MY_ARG")):
assert arg == 'a value'
os.environ['MY_ARG'] = 'a value'
Note
You can also pass default
which is used
if the environment variable is not found: EnvArg('MY_ARG', default="a value")
CliArg¶
CliArg
is an argument that will have the value
of a given CLI argument.
from rocketry.args import CliArg
@app.task()
def do_things(arg=CliArg("--myparam")):
assert arg == 'a value'
Then call the program:
python myscript.py --myparam "a value"
Note
You can also pass default
which is used
if the CLI argument is not found: CliArg('--myparam', default="a value")
Return¶
Return
is an argument that will have the return value
of another task. It is useful for pipelining tasks’ outputs.
from rocketry.args import Return
@app.task()
def do_first():
...
return 'a value'
@app.task()
def do_second(arg=Return(do_first)):
assert arg == 'a value'
Note
You can also pass default
which is used
if the return value is not found: Return('mytask', default="a value")
Task¶
Task
is an argument that will have the task instance
as the value when materialized. It is useful for advanced
metatasks that manipulate other tasks.
from rocketry.args import Task
@app.task()
def do_things(arg=Task()):
...
Alternatively, you can specify another task to use:
from rocketry.args import Task
@app.task()
def do_other():
...
@app.task()
def do_things(arg=Task(do_other)):
...
TerminationFlag¶
TerminationFlag
is an argument that will have a
threading.Event
as the value. The event will be set
when the task is set to be terminated. It is useful
for creating a threaded tasks that obey termination
process.
from rocketry.args import TerminationFlag
from rocketry.exc import TaskTerminationException
@app.task()
def do_second(arg=TerminationFlag()):
while True:
if flag.is_set():
raise TaskTerminationException("Task was terminated")
... # Do things
Note
The task should raise rocketry.exc.TaskTerminationException
if the flag was set as otherwise the task is considered to be
successful (instead of terminated).
Session¶
Session
is an argument that will have the session instance
as the value when materialized. It is useful for advanced
metatasks that manipulates on the session, ie. for runtime APIs
or other external communication with the scheduler.
from rocketry.args import Session
@app.task()
def do_things(arg=Session()):
...