Data pipelines¶
In this project, a pipeline refers to a process that takes a Target
and a
list of DataProduct
objects as inputs, performs some process on the data
(which are usually FITS files), and produces some outputs. A process refers
to an instance of a pipeline for a particular target and set of input files.
Processes are run by selecting a number of data products on the target data
page in tom_education
, and selecting the pipeline to run. Processes run
asynchronously, and a separate page shows the status and log output as it
progresses.
On completion, outputs from a process are saved in the TOM as either
DataProduct
or ReducedDatum
objects. See the section below on
Saving outputs for more details.
Defining a pipeline¶
Pipelines are defined by creating a sub-class of
tom_education.models.PipelineProcess
somewhere within your project, and
referencing them from settings.py
, e.g:
...
TOM_EDUCATION_PIPELINES = {
# name: pipeline class. The name will be shown in the UI
'My pipeline': 'my_project.pipelines.MyPipeline'
}
...
The following class serves as a minimal example showing the methods that must be defined.
from tom_dataproducts.models import DataProduct, ReducedDatum
from tom_education.models import PipelineProcess, PipelineOutput
class ExamplePipelineProcess(PipelineProcess):
# Label used as a prefix for names of generated data products
short_name = 'example'
# Optional list of suffixes for input files. The process will fail if
# trying to run with files that do not end in one of these suffixes.
allowed_suffixes = ['.fits', '.fz']
# Make this a proxy: we do not want this to be a concrete model (which
# would use a separate DB table and require migrations)
class Meta:
proxy = True
# app label is required if this class is defined outside of an app
# listed in INSTALLED_APPS
app_label = 'myapp'
def do_pipeline(self, tmpdir):
"""
This method does the actual work.
``tmpdir`` is ``pathlib.Path`` object for a temporary directory which can
be used to write outputs and other temporary files.
This method will return a sequence of PipelineOutput objects (or
tuples) for the output files that should be saved in the TOM.
"""
# The ``Target`` object is available as ``self.target``
ra = self.target.ra
dec = self.target.dec
# The input files are available as ``self.input_files``. This is a Django
# ``ManyRelatedManager`` object of ``DataProduct`` objects: use
# ``self.input_files.all()`` to get the inputs as a list.
for product in self.input_files.all():
path = product.data.path
# Do something with file...
# Create some output files
outcsv = tmpdir / 'my_output.csv'
outcsv.write_text('data here')
outpng = tmpdir / 'anther_output.png'
outpng.write_bytes(b'PNG data here')
return [
PipelineOutput(path=outcsv, output_type=ReducedDatum),
(outpng, DataProduct) # Can output a tuple instead of PipelineOutput
]
Saving Outputs¶
Outputs are saved in the TOM as either DataProduct
or ReducedDatum
; the
definition of these models can be found in the tom_dataproducts source code.
DataProduct
objects have a file field which can store arbitrary files. This
can be used for binary files such as FITS files and other images.
A ReducedDatum
object represents a single piece of ‘reduced’ data, and stores
the value as a free-form text field. This can be used for text outputs such as
CSV, and plain text files.
To specify which output type should be used for each file, the
do_pipeline()
method of a PipelineProcess
class should return a list of
PipelineOutput
objects (or tuples from which a PipelineOutput
can be
instantiated). PipelineOutput
is a namedtuple
class with the following fields
path
: Apathlib.Path
object for the file to saveoutput_type
: EitherDataProduct
orReducedDatum
tag
: (Optional) A string value to use for thetag
field forDataProduct
objects ordata_type
field forReducedDatum
.
If at least one DataProduct
output is produced, a new DataProductGroup
is
created to hold these products.
Errors¶
To stop a process and mark it as a failure, raise a
tom_education.models.AsyncError
exception:
from tom_education.models import AsyncError
...
class MyPipeline(PipelineProcess):
...
def do_pipeline(self, tmpdir):
raise AsyncError('Something went terribly wrong')
This will set the status
field of the process to
tom_education.models.ASYNC_STATUS_FAILED
, and the given error message will be
shown in the UI.
Status updates¶
For long-running pipelines, or ones with several steps, it may be useful to set
the status of a process as it progresses. For this, a PipelineProcess
has a
status
field which can be set to any string value. The PipelineProcess
must be saved after the status is changed with self.save()
. This status
will be shown and updated in the UI on the page for the process.
To prevent repetition in the code when updating the status, performing some
task and saving, a context manager self.update_status
is available:
def do_pipeline(self, tmpdir):
with self.update_status('Doing the first thing'):
# do something
...
with self.update_status('Done that - now doing the second thing')
# do something else
...
...
On successful completion of the process (that is, if do_pipeline()
finishes
without raising a AsyncError
), the status is set to
tom_education.models.ASYNC_STATUS_CREATED
.
Log output¶
More granular updates can be given by logging messages with self.log()
:
def do_pipeline(self, tmpdir):
with self.update_status('Doing something important'):
self.log('first sub-step')
self.log('second sub-step')
...
...
Log output is also shown in the UI on the page for a process.
Flags¶
Pipelines may be parametrised by a number of boolean parameters (‘flags’),
whose values are selected by the user when the process is scheduled and passed
as keyword arguments to the do_pipeline()
method. To define flags, set the
flags
attribute on the pipeline class:
class FlagDemo(PipelineProcess):
# ``flags`` is a dictionary that maps flag names to their default values and
# long descriptions
flags = {
'some_flag': {'default': True, 'long_name': 'Long name to be displayed in the UI'},
'another_flag': {'default': False, 'long_name': 'Another flag'}
}
...
def do_pipeline(self, tmpdir, **flags):
some_flag = flags.pop('some_flag')
another_flag = flags.pop('another_flag')
...
...
Note that flag names should not contain whitespace.
Status and log page¶
The status and log output of a pipeline process can be viewed at
/pipeline/<PK>
. Log output is updated every second, and can be scrolled
automatically. This page also provides links to the data product group
containing outputs.