- 🐙 What is this API ?
- ✨ Production and development links
- 🔨 Installation
- 🚀 Build
- 🤖 Usage
- 😵💫 Key Considerations and Limitations
- 🐳 Docker
- 💯 Tests
- ☑️ Code analysis and consistency
- 📈 Releases & Changelog
- 🎬 GitHub Actions
- ©️ License
- ❤️ Contributors
Welcome to CPG Flow!
This API provides a set of tools and workflows for managing population genomics data pipelines, designed to streamline the processing, analysis, and storage of large-scale genomic datasets. It facilitates automated pipeline execution, enabling reproducible research while integrating with cloud-based resources for scalable computation.
CPG Flow supports various stages of genomic data processing, from raw data ingestion to final analysis outputs, making it easier for researchers to manage and scale their population genomics workflows.
The API constructs a DAG (Directed Acyclic Graph) structure from a set of chained stages. This DAG structure then forms the pipeline.
The production version of this API is documented at populationgenomics.github.io/cpg-flow/.
The documentation is updated automatically when a commit is pushed on the alpha (prerelease) or main (release) branch.
The packages are hosted on:
To include cpg-flow in your python project simply install either the latest stable version as layed out in the PyPi package page. =
This is as simple as running the following in your project python environment
pip install cpg-flowFor a specific version
pip install cpg-flow==0.1.2We recommend making the appropriate choice for your individual project. Simply including cpg-flow in your dependency management system of choice will install the latest stable relase. But if neccessary you can pin the version. For example in your pyproject.toml file simply include the following:
dependencies = [
    "cpg-flow",         # latest OR
    "cpg-flow==0.1.2",  # pinned version
]These instructions are for contributors and developers on the cpg-flow project repository. Follow the following steps to setup your environment for development.
To install this project, you will need to have Python and uv installed on your machine:
We use uv for dependency management which can sync your environment locally with the following command:
# Install the package using uv
uv syncHowever, to setup for development we recommend using the makefile setup which will do that for you.
make init-dev # installs pre-commit as a hookTo install cpg-flow locally for testing the code as an editable dependency
make install-localThis will install cpg-flow a an editable dependency in your environment. However, sometimes it can be useful to test the package post-build.
make install-buildThis will build and install the package as it would be distributed.
You can confirm which version of cpg-flow is installed by running
uv pip show cpg-flowFor an Editable package it should show the repo location on your machine under the Editable: key.
Name: cpg-flow
Version: 0.1.2
Location: /Users/whoami/cpg-flow/.venv/lib/python3.10/site-packages
Editable project location: /Users/whoami/cpg-flow
Requires: cpg-utils, grpcio, grpcio-status, hail, loguru, ipywidgets, metamist, networkx, plotly, pre-commit, pyyaml
Required-by:The build version (static until you rebuild) will look like the following.
Name: cpg-flow
Version: 0.1.2
Location: /Users/whoami/cpg-flow/.venv/lib/python3.10/site-packages
Requires: cpg-utils, grpcio, grpcio-status, hail, ipywidgets, loguru, metamist, networkx, plotly, pre-commit, pyyaml
Required-by:To try out the pre-installed cpg-flow in a Docker image, find more information in the Docker section.
To build the project, run the following command:
make buildTo make sure that you're actually using the installed build we suggest calling the following to install the build wheel.
make install-buildThis project provides the framework to construct pipelines but does not offer hosting the logic of any pipelines themselves. This approach offers the benefit of making all components more modular, manageable and decoupled. Pipelines themselves are hosted in a separate repository.
The test_workflows_shared repository acts as a template and demonstrates how to structure a pipeline using CPG Flow.
The components required to build pipelines with CPG Flow:
This file contains the configuration settings to your pipeline. This file allows the pipeline developer to define settings such as:
- what stages will be run or skipped
- what dataset to use
- what access level to use
- any input cohorts
- sequencing type
[workflow]
dataset = 'fewgenomes'
# Note: for fewgenomes and sandbox mentioning datasets by name is not a security risk
# DO NOT DO THIS FOR OTHER DATASETS
input_cohorts = ['COH2142']
access_level = 'test'
# Force stage rerun
force_stages = [
    'GeneratePrimes', # the first stage
    'CumulativeCalc', # the second stage
    'FilterEvens', # the third stage
    'BuildAPrimePyramid', # the last stage
]
# Show a workflow graph locally or save to web bucket.
# Default is false, set to true to show the workflow graph.
show_workflow = true
# ...For a full list of supported config options with documentation, see defaults.toml
This .toml file will be may be named anything, as long as it is correctly passed to the analysis-runner invocation. The analysis-runner supplies its own default settings, and combines it with the settings from this file, before submitting a job.
This file would store the workflow definition as a list of stages, and then run said workflow:
 import os
 from pathlib import Path
 from cpg_flow.workflow import run_workflow
 from cpg_utils.config import set_config_paths
 from stages import BuildAPrimePyramid, CumulativeCalc, FilterEvens, GeneratePrimes
 CONFIG_FILE = str(Path(__file__).parent / '<YOUR_CONFIG>.toml')
 def run_cpg_flow(dry_run=False):
    #See the 'Key Considerations and Limitations' section for notes on the definition of the `workflow` variable.
    # This represents the flow of the DAG
     workflow = [GeneratePrimes, CumulativeCalc, FilterEvens, BuildAPrimePyramid]
     config_paths = os.environ['CPG_CONFIG_PATH'].split(',')
     # Inserting after the "defaults" config, but before user configs:
     set_config_paths(config_paths[:1] + [CONFIG_FILE] + config_paths[1:])
     run_workflow(stages=workflow, dry_run=dry_run)
 if __name__ == '__main__':
   run_cpg_flow()The workflow definition here forms a DAG (Directed Acyclic Graph) structure.
To generate a plot of the DAG,
show_workflow = Trueshould be included in the config. The DAG plot generated from the pipeline definition is available in the logs via the job URL. To find the link to the plot, search the Logs section for the string: "INFO - Link to the graph:".
There are some key considerations and limitations to take into account when designing the DAG:
A Stage represents a node in the DAG. The stages can be abstracted from either a DatasetStage, CohortStage, MultiCohortStage, or a SequencingGroupStage.
The stage definition should use the @stage decorator to optionally set:
- dependent stages (this is used to build the DAG)
- analysis keys (this determines what outputs should be written to metamist)
- the analysis type (this determines the analysis-type to be written to metamist)
All stages require an expected_outputs class method definition, that sets the expected output path location for a given Target such as a SequencingGroup, Dataset, Cohort, or MultiCohort.
Also required, is a queue_jobs class method definition that calls pipeline jobs, and stores the results of these jobs to the paths defined in expected_outputs.
It is good practice to separate the Stage definitions into their own files, to keep the code compact, and manageable.
from cpg_flow.stage import SequencingGroupStage, StageInput, StageOutput, stage
from cpg_flow.targets.sequencing_group import SequencingGroup
from jobs import cumulative_calc
WORKFLOW_FOLDER = 'prime_pyramid'
# ...
# This stage depends on the `GeneratePrimes` stage, and requires outputs from that stage.
@stage(required_stages=[GeneratePrimes], analysis_keys=['cumulative'], analysis_type='custom')
class CumulativeCalc(SequencingGroupStage):
 def expected_outputs(self, sequencing_group: SequencingGroup):
     return {
         'cumulative': sequencing_group.dataset.prefix() / WORKFLOW_FOLDER / f'{sequencing_group.id}_cumulative.txt',
     }
 def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> StageOutput | None:
     input_txt = inputs.as_path(sequencing_group, GeneratePrimes, 'primes')
     b = get_batch()
     cumulative_calc_output_path = str(self.expected_outputs(sequencing_group).get('cumulative', ''))
     # We define a job instance from the `cumulative_calc` job definition.
     job_cumulative_calc = cumulative_calc(b, sequencing_group, input_txt, cumulative_calc_output_path)
     jobs = [job_cumulative_calc]
     return self.make_outputs(
         sequencing_group,
         data=self.expected_outputs(sequencing_group),
         jobs=jobs,
     )
# ...There is a key consideration to take into account when writing the stages:
Every Stage requires a collection of jobs that will be executed within. It is good practice to store these jobs in their own files, as the definitions can often get long.
from cpg_flow.targets.sequencing_group import SequencingGroup
from hailtop.batch import Batch
from hailtop.batch.job import Job
def cumulative_calc(
    b: Batch,
    sequencing_group: SequencingGroup,
    input_file_path: str,
    output_file_path: str,
) -> list[Job]:
    title = f'Cumulative Calc: {sequencing_group.id}'
    job = b.new_job(name=title)
    primes_path = b.read_input(input_file_path)
    cmd = f"""
    primes=($(cat {primes_path}))
    csum=0
    cumulative=()
    for prime in "${{primes[@]}}"; do
        ((csum += prime))
        cumulative+=("$csum")
    done
    echo "${{cumulative[@]}}" > {job.cumulative}
    """
    job.command(cmd)
    print('-----PRINT CUMULATIVE-----')
    print(output_file_path)
    b.write_output(job.cumulative, output_file_path)
    return jobOnce these required components are written, the pipeline is ready to be executed against this framework.
All pipelines can only be exclusively run using the analysis-runner package which grants the user appropriate permissions based on the dataset and access level defined above. analysis-runner requires a repo, commit and the entrypoint file, and then runs the code inside a "driver" image on Hail Batch, logging the invocation to metamist for future audit and reproducibility.
Therefore, the pipeline code needs to be pushed to a remote version control system, for analysis-runner to be able to pull it for execution. A job can then be submitted:
analysis-runner \
  --image "australia-southeast1-docker.pkg.dev/cpg-common/images/cpg_flow:1.0.0" \
  --dataset "fewgenomes" \
  --description "cpg-flow_test" \
  --access-level "test" \
  --output-dir "cpg-flow_test" \
  --config "<YOUR_CONFIG>.toml" \
  workflow.pyIf the job is successfully created, the analysis-runner output will include a job URL. This driver job will trigger additional jobs, which can be monitored via the /batches page on Hail. Monitoring these jobs helps verify that the workflow ran successfully. When all expected jobs complete without errors, this confirms the successful execution of the workflow and indicates that the cpg_flow package is functioning as intended.
See the Docker section for instruction on pulling valid images releases.
The framework exclusively relies on backward traversal. If a stage is not explicitly or indirectly linked to one of the final stages through the required_stages parameter of the @stage decorator, it will not be included in the workflow. In other words, stages that are not reachable from a final stage are effectively ignored. This backward discovery approach ensures that only the stages directly required for the specified final stages are included, optimizing the workflow by excluding irrelevant or unused stages.
The workflow definition serves as a lookup table for the final stages. If a final stage is not listed in this definition, it will not be part of the workflow, as there is no mechanism for forward discovery to identify it.
workflow = [GeneratePrimes, CumulativeCalc, FilterEvens, BuildAPrimePyramid]The expected_outputs method is called for every stage in the workflow, even if the config.toml configures the stage to be skipped. This ensures that the workflow can validate or reference the expected outputs of all stages.
Since this method may depend on workflow-specific configuration settings, these settings must be present in the workflow configuration, regardless of whether the stage will run. To avoid issues, it is common practice to include dummy values for such settings in the default configuration. This is not the intended behaviour and is marked as an area of improvement in a future release.
The API uses the results of the expected_outputs method to determine whether a stage needs to run. A stage is scheduled for execution only if one or more Path objects returned by expected_outputs do not exist in Google Cloud Platform (GCP). If a returned Path object exists, the stage is considered to have already run successfully, and is therefore skipped.
For outputs such as Matrix Tables (.mt), Hail Tables (.ht), or Variant Datasets (.vds), which are complex structures of thousands of files, the check is performed on the object/_SUCCESS file to verify that the output was written completely. However, it has been observed that the object/_SUCCESS file may be written multiple times during processing, contrary to the expectation that it should only be written once after all associated files have been fully processed.
String outputs from the expected_outputs method are not checked by the API. This is because string outputs cannot reliably be assumed to represent valid file paths and may instead correspond to other forms of outputs.
When the expected_outputs check determines that one or more required files do not exist, and the stage is not configured to be skipped, the queue_jobs method is invoked to define the specific work that needs to be scheduled in the workflow.
The queue_jobs method runs within the driver image, before any jobs in the workflow are executed. Because of this, it cannot access or read files generated by earlier stages, as those outputs have not yet been created. The actual outputs from earlier jobs only become available as the jobs are executed during runtime.
When the queue_jobs method schedules a collection of jobs to Hail Batch, one or more jobs are returned from the method, and the framework sets an explicit dependency between these jobs, and all jobs from the Stages set in the required_stages parameter. Therefore, all jobs that run in a Stage must be returned within queue_jobs to ensure no jobs start out of sequence. As an example:
# test_workflows_shared/cpg_flow_test/jobs/filter_evens.py
def filter_evens(
    b: Batch,
    inputs: StageInput,
    previous_stage: Stage,
    sequencing_groups: list[SequencingGroup],
    input_files: dict[str, dict[str, Any]],
    sg_outputs: dict[str, dict[str, Any]],
    output_file_path: str,
) -> list[Job]:
    title = 'Filter Evens'
    # Compute the no evens list for each sequencing group
    sg_jobs = []
    sg_output_files = []
    for sg in sequencing_groups:  # type: ignore
        job = b.new_job(name=title + ': ' + sg.id)
        ...
        cmd = f"""
        ...
        """
        job.command(cmd)
        b.write_output(job.sg_no_evens_file, no_evens_output_file_path)
        sg_jobs.append(job)
    # Merge the no evens lists for all sequencing groups into a single file
    job = b.new_job(name=title)
    job.depends_on(*sg_jobs)
    inputs = ' '.join([b.read_input(f) for f in sg_output_files])
    job.command(f'cat {inputs} >> {job.no_evens_file}')
    b.write_output(job.no_evens_file, output_file_path)
    # ALL jobs are returned back to `queue_jobs`
    # including new jobs created within this job.
    all_jobs = [job, *sg_jobs]
    return all_jobsThese steps are restricted to CPG members only. Anyone will have access to the code in this public repositry and can build a version of cpg-flow themselves. The following requires authentication with the CPG's GCP.
To pull and use the Docker image for the cpg-flow Python package, follow these steps:
- 
Authenticate with Google Cloud Registry: gcloud auth configure-docker australia-southeast1-docker.pkg.dev 
- 
Pull the Docker Image: - 
For alpha releases: docker pull australia-southeast1-docker.pkg.dev/cpg-common/images/cpg_flow:0.1.0-alpha.11 
- 
For main releases: docker pull australia-southeast1-docker.pkg.dev/cpg-common/images/cpg_flow:1.0.0 
 
- 
- 
Run the Docker Container: docker run -it australia-southeast1-docker.pkg.dev/cpg-common/images/cpg_flow:<tag> 
Temporary images are created for each commit and expire in 30 days. These images are useful for development and testing purposes.
- 
Example of pulling a temporary image: docker pull australia-southeast1-docker.pkg.dev/cpg-common/images-tmp/cpg_flow:991cf5783d7d35dee56a7ab0452d54e69c695c4e 
Members of the CPG can find the build images in the Google Cloud Registry under the following paths:
- Alpha and main releases: australia-southeast1-docker.pkg.dev/cpg-common/images/cpg_flow
- Temporary images: australia-southeast1-docker.pkg.dev/cpg-common/images-tmp/cpg_flow
Ensure you have the necessary permissions and are authenticated with Google Cloud to access these images.
Unit tests are run in the Test CI workflow for each branch.
We recommend frequently running the manual test workflow found in test_workflows_shared  specifically the cpg_flow_test workflow during development to ensure updates work with the CPG production environment.
Docummentation for running the tests are found in the repository readme.
Before testing, you must follow the installation steps.
In order to keep the code clean, consistent and free of bad python practices, more than Over 10 pre-commit hooks are enabled !
Complete list of all enabled rules is available in the .pre-commit-config.yaml file.
Before linting, you must follow the installation steps.
Then, run the following command
# Lint
pre-commit run --all-filesWhen setting up local linting for development you can also run the following once:
# Install the pre-commit hook
pre-commit install
# Or equivalently
make init || make init-devMultiple tools are set up to maintain the best code quality and to prevent vulnerabilities:
SonarQube summary is available here.
Releases on main branch are generated and published automatically, pre-releases on the alpha branch are also generated and published by:
It uses the conventional commit strategy.
This is enforced using the commitlint pre-commit hook that checks commit messages conform to the conventional commit standard.
We recommend installing and using the tool **commitizen in order to create commit messages. Once installed, you can use either cz commit or git cz to create a commitizen generated commit message.
Each change when a new release comes up is listed in the CHANGELOG.md file.
Also, you can keep up with changes by watching releases via the Watch GitHub button at the top of this page.
This project uses GitHub Actions to automate some boring tasks.
You can find all the workflows in the .github/workflows directory.
| Name | Description & Status | Triggered on | 
|---|---|---|
| Docker | Builds and pushes Docker images for the project. | pull_requestonmain, alphaandpushonmain, alphaandworkflow_dispatch | 
| Lint | Runs linting checks using pre-commit hooks. | push | 
| Package | Packages the project and publishes it to PyPI and GitHub Releases. | pushonmain, alpha | 
| Renovate | Runs Renovate to update dependencies. | scheduleandworkflow_dispatch | 
| Security Checks | Performs security checks using pip-audit. | workflow_dispatchandpush | 
| Test | Runs unit tests and generates coverage reports. | push | 
| Update Badges | Updates badges.yaml with test results and coverage. | workflow_run(completed) | 
| mkdocs | Deploys API documentation to GitHub Pages. | pushonalpha | 
This project is licensed under the MIT License.
There is no contributor yet. Want to be the first ?
If you want to contribute to this project, please read the contribution guide.

