Track data lineage: Transform & Run#

Reference: Transform, Run

What is a Run?#

File are transformed by instances of Run. They are the inputs and outputs of runs.

Conversely, the source of File is always the output of a run!

A Run itself points to the Transform object that’s being run, either a notebook or a pipeline.

import lamindb as ln

Notebook run#

The metadata of Jupyter notebooks is automatically detected and ln.Run assumes global_context=True: we don’t need to keep track of the run record ourselves, but can access it via ln.context:

ln.track()
ℹ️ Instance: testuser1/mydata
ℹ️ User: testuser1
ℹ️ Added notebook: Transform(id='1LCd8kco9lZU', v='0', name='08-run', type=notebook, title='Track data lineage: `Transform` & `Run`', created_by='DzTjkKse', created_at=datetime.datetime(2023, 3, 30, 23, 16, 57))
ℹ️ Added run: Run(id='VbVAEqftAYKe1jl2QnS0', transform_id='1LCd8kco9lZU', transform_v='0', created_by='DzTjkKse', created_at=datetime.datetime(2023, 3, 30, 23, 16, 57))
ln.context.run
Run(id='VbVAEqftAYKe1jl2QnS0', transform_id='1LCd8kco9lZU', transform_v='0', created_by='DzTjkKse', created_at=datetime.datetime(2023, 3, 30, 23, 16, 57))

Let us query where File “iris_new” has been ingested:

ln.select(ln.Transform).join(ln.Run).join(ln.File, name="iris_new").first()
Transform(id='OdlFhFWW7qg3', v='0', name='04-memory', type=notebook, title='Track in-memory data objects', created_by='DzTjkKse', created_at=datetime.datetime(2023, 3, 30, 23, 15, 58))

Alternatively, you can query for the run that contains a notebook attribute:

with ln.Session() as ss:
    file = ss.select(ln.File, name="iris_new").one()
    print(file.source.transform)
[session open] Transform(id='OdlFhFWW7qg3', v='0', name='04-memory', type=notebook, title='Track in-memory data objects', created_by='DzTjkKse', created_at=datetime.datetime(2023, 3, 30, 23, 15, 58))

Pipeline run#

Hide code cell content
filepath = ln.dev.datasets.file_fastq()

When working with a pipeline, we’ll register it before running it.

transform = ln.add(ln.Transform(name="10x scRNA-seq nextseq", v="1"))

transform
Transform(id='g0mYW16n', v='1', name='10x scRNA-seq nextseq', type=pipeline, created_by='DzTjkKse', created_at=datetime.datetime(2023, 3, 30, 23, 16, 57))

We can then use the track() as before (if we don’t register a pipeline with the correct name, we’ll be asked to):

ln.track(transform=transform)
ℹ️ Instance: testuser1/mydata
ℹ️ User: testuser1
ℹ️ Loaded transform: Transform(id='g0mYW16n', v='1', name='10x scRNA-seq nextseq', type=pipeline, created_by='DzTjkKse', created_at=datetime.datetime(2023, 3, 30, 23, 16, 57))
ℹ️ Added run: Run(id='cUu9I6OzZJgZkyrK4D4h', transform_id='g0mYW16n', transform_v='1', created_by='DzTjkKse', created_at=datetime.datetime(2023, 3, 30, 23, 16, 57))
file_fastq = ln.File(filepath)
ln.add(file_fastq)
File(id='E1hNK7YnrfYwc8F9vC3G', name='input', suffix='.fastq.gz', size=16, hash='QDkCIyDtWe8tlrS9zG8gnw', source_id='cUu9I6OzZJgZkyrK4D4h', storage_id='8Pj12JLb', created_at=datetime.datetime(2023, 3, 30, 23, 16, 57))

We can also manually pass a run and not use the global run context (ln.context) set by ln.track:

run = ln.Run(transform=transform, name="ingest-fastq")
ln.File(filepath, source=run)

Track run inputs#

While run outputs are automatically tracked as data sources, run inputs aren’t.

However, you can simply call is_run_input upon loading File.

Let’s register a downstream pipeline:

ln.track(transform=ln.Transform(name="Cell Ranger", v="7"))
ℹ️ Instance: testuser1/mydata
ℹ️ User: testuser1
ℹ️ Added transform: Transform(id='BQNTTPRP', v='7', name='Cell Ranger', type=pipeline, created_by='DzTjkKse', created_at=datetime.datetime(2023, 3, 30, 23, 16, 57))
ℹ️ Added run: Run(id='M4vLFDKQmOA90LYs8fNL', transform_id='BQNTTPRP', transform_v='7', created_by='DzTjkKse', created_at=datetime.datetime(2023, 3, 30, 23, 16, 57))

Let’s query input data for this pipeline, a fastq.

To process in the pipeline, we need to load() it (download it from the cloud and access the on-disk or in-memory representation).

To track it as an input for the current run, set is_run_input=True.

with ln.Session() as ss:
    file_fastq = ss.select(ln.File, name="input").one()
    file_fastq.load(is_run_input=True)
file_fastq.targets
[Run(id='M4vLFDKQmOA90LYs8fNL', transform_id='BQNTTPRP', transform_v='7', created_by='DzTjkKse', created_at=datetime.datetime(2023, 3, 30, 23, 16, 57))]
ln.add(file_fastq)
File(id='E1hNK7YnrfYwc8F9vC3G', name='input', suffix='.fastq.gz', size=16, hash='QDkCIyDtWe8tlrS9zG8gnw', source_id='cUu9I6OzZJgZkyrK4D4h', storage_id='8Pj12JLb', created_at=datetime.datetime(2023, 3, 30, 23, 16, 57))
Hide code cell content
output_filepath = ln.dev.datasets.file_bam()
output_filepath
PosixPath('output.bam')
file = ln.File(output_filepath)

ln.add(file)
File(id='0z1lsm4wUo2XamCsq2jJ', name='output', suffix='.bam', size=14, hash='rGSwtSEKB65DaaQq740p6A', source_id='M4vLFDKQmOA90LYs8fNL', storage_id='8Pj12JLb', created_at=datetime.datetime(2023, 3, 30, 23, 16, 57))

Data lineage#

Now let’s track from which files that the output.bam file is generated, aka, the input file of the run that produced file output.bam

with ln.Session() as ss:
    run = ss.select(ln.Run).join(ln.File, name="output", suffix=".bam").one()
    assert run.inputs[0].name == "input"
    print(run.inputs)
[[session open] File(id='E1hNK7YnrfYwc8F9vC3G', name='input', suffix='.fastq.gz', size=16, hash='QDkCIyDtWe8tlrS9zG8gnw', source_id='cUu9I6OzZJgZkyrK4D4h', storage_id='8Pj12JLb', created_at=datetime.datetime(2023, 3, 30, 23, 16, 57))]