Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #158 from CSCfi/dev
Browse files Browse the repository at this point in the history
release-1.8.0
  • Loading branch information
blankdots committed Nov 25, 2020
2 parents 35d4cc4 + a599cde commit d894eec
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 69 deletions.
2 changes: 1 addition & 1 deletion beacon_api/conf/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
title=GA4GHBeacon at CSC

# Version of the Beacon implementation
version=1.7.2
version=1.8.0

# Author of this software
author=CSC developers
Expand Down
141 changes: 85 additions & 56 deletions beacon_api/utils/db_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"""

import os
import sys
import argparse
import json
import itertools
Expand All @@ -46,6 +47,7 @@
import asyncpg
from cyvcf2 import VCF

from pathlib import Path
from datetime import datetime

from .logging import LOG
Expand Down Expand Up @@ -280,19 +282,19 @@ def _chunks(self, iterable, size):
for first in iterator:
yield itertools.chain([first], itertools.islice(iterator, size - 1))

async def load_datafile(self, vcf, datafile, dataset_id, n=1000):
async def load_datafile(self, vcf, datafile, dataset_id, n=1000, min_ac=1):
"""Parse data from datafile and send it to be inserted."""
LOG.info(f"Read data from {datafile}")
try:
LOG.info("Generate database queue(s)")
data = self._chunks(vcf, n)
for record in data:
await self.insert_variants(dataset_id, list(record))
await self.insert_variants(dataset_id, list(record), min_ac)
LOG.info(f"{datafile} has been processed")
except Exception as e:
LOG.error(f"AN ERROR OCCURRED WHILE GENERATING DB QUEUE -> {e}")

async def insert_variants(self, dataset_id, variants):
async def insert_variants(self, dataset_id, variants, min_ac):
"""Insert variant data to the database."""
LOG.info(f"Received {len(variants)} variants for insertion to {dataset_id}")
try:
Expand All @@ -301,67 +303,82 @@ async def insert_variants(self, dataset_id, variants):
LOG.info("Insert variants into the database")
for variant in variants:
# params = (frequency, count, actual variant Type)
# Nothing interesting on the variant with no aaf
# because none of the samples have it
if variant.aaf > 0:
params = self._unpack(variant)
# Coordinates that are read from VCF are 1-based,
# cyvcf2 reads them as 0-based, and they are inserted into the DB as such

# We Process Breakend Records into a different table for now
if params[5] != []:
# await self.insert_mates(dataset_id, variant, params)
# Most likely there will be only one BND per Record
for bnd in params[5]:
params = self._unpack(variant)
# Coordinates that are read from VCF are 1-based,
# cyvcf2 reads them as 0-based, and they are inserted into the DB as such

# params may carry single variants [1] or packed variants [20, 15, 10, 1]
# The first check prunes for single variants, packed variants must be removed afterwards
if params[1][0] >= min_ac:
# Remove packed variants that don't meet the minimum allele count requirements
# Packed variants are always ordered from largest to smallest, this process starts
# popping values from the right (small) side until there are no more small values to pop
while params[1][-1] < min_ac:
params[0].pop() # aaf
params[1].pop() # ac
params[2].pop() # vt
params[3].pop() # alt
if len(params[5]) > 0:
params[5].pop() # bnd

# Nothing interesting on the variant with no aaf
# because none of the samples have it
if variant.aaf > 0:

# We Process Breakend Records into a different table for now
if params[5] != []:
# await self.insert_mates(dataset_id, variant, params)
# Most likely there will be only one BND per Record
for bnd in params[5]:
await self._conn.execute(
"""INSERT INTO beacon_mate_table
(datasetId, chromosome, chromosomeStart, chromosomePos,
mate, mateStart, matePos, reference, alternate, alleleCount,
callCount, frequency, "end")
SELECT ($1), ($2), ($3), ($4),
($5), ($6), ($7), ($8), t.alt, t.ac, ($11), t.freq, ($13)
FROM (SELECT unnest($9::varchar[]) alt, unnest($10::integer[]) ac,
unnest($12::float[]) freq) t
ON CONFLICT (datasetId, chromosome, mate, chromosomePos, matePos)
DO NOTHING""",
dataset_id,
variant.CHROM.replace("chr", ""),
variant.start,
variant.ID,
bnd[0].replace("chr", ""),
bnd[1],
bnd[6],
variant.REF,
params[3],
params[1],
params[4],
params[0],
variant.end,
)
else:
await self._conn.execute(
"""INSERT INTO beacon_mate_table
(datasetId, chromosome, chromosomeStart, chromosomePos,
mate, mateStart, matePos, reference, alternate, alleleCount,
callCount, frequency, "end")
SELECT ($1), ($2), ($3), ($4),
($5), ($6), ($7), ($8), t.alt, t.ac, ($11), t.freq, ($13)
FROM (SELECT unnest($9::varchar[]) alt, unnest($10::integer[]) ac,
unnest($12::float[]) freq) t
ON CONFLICT (datasetId, chromosome, mate, chromosomePos, matePos)
DO NOTHING""",
"""INSERT INTO beacon_data_table
(datasetId, chromosome, start, reference, alternate,
"end", aggregatedVariantType, alleleCount, callCount, frequency, variantType)
SELECT ($1), ($2), ($3), ($4), t.alt, ($6), ($7), t.ac, ($9), t.freq, t.vt
FROM (SELECT unnest($5::varchar[]) alt, unnest($8::integer[]) ac,
unnest($10::float[]) freq, unnest($11::varchar[]) as vt) t
ON CONFLICT (datasetId, chromosome, start, reference, alternate)
DO NOTHING""",
dataset_id,
variant.CHROM.replace("chr", ""),
variant.start,
variant.ID,
bnd[0].replace("chr", ""),
bnd[1],
bnd[6],
variant.REF,
params[3],
variant.end,
variant.var_type.upper(),
params[1],
params[4],
params[0],
variant.end,
params[2],
)
else:
await self._conn.execute(
"""INSERT INTO beacon_data_table
(datasetId, chromosome, start, reference, alternate,
"end", aggregatedVariantType, alleleCount, callCount, frequency, variantType)
SELECT ($1), ($2), ($3), ($4), t.alt, ($6), ($7), t.ac, ($9), t.freq, t.vt
FROM (SELECT unnest($5::varchar[]) alt, unnest($8::integer[]) ac,
unnest($10::float[]) freq, unnest($11::varchar[]) as vt) t
ON CONFLICT (datasetId, chromosome, start, reference, alternate)
DO NOTHING""",
dataset_id,
variant.CHROM.replace("chr", ""),
variant.start,
variant.REF,
params[3],
variant.end,
variant.var_type.upper(),
params[1],
params[4],
params[0],
params[2],
)

LOG.debug("Variants have been inserted")

LOG.debug("Variants have been inserted")
except Exception as e:
LOG.error(f"AN ERROR OCCURRED WHILE ATTEMPTING TO INSERT VARIANTS -> {e}")

Expand All @@ -379,6 +396,7 @@ async def init_beacon_db(arguments=None):
"""Run database operations here."""
# Fetch command line arguments
args = parse_arguments(arguments)
validate_arguments(args)

# Initialise the database connection
db = BeaconDB()
Expand All @@ -400,12 +418,22 @@ async def init_beacon_db(arguments=None):
dataset_id = await db.load_metadata(vcf, args.metadata, args.datafile)

# Insert data into the database
await db.load_datafile(vcf, args.datafile, dataset_id)
await db.load_datafile(vcf, args.datafile, dataset_id, min_ac=int(args.min_allele_count))

# Close the database connection
await db.close()


def validate_arguments(arguments):
"""Check that given arguments are valid."""
if not Path(arguments.datafile).is_file():
sys.exit(f"Could not find datafile: {arguments.datafile}")
if not Path(arguments.metadata).is_file():
sys.exit(f"Could not find metadata file: {arguments.metadata}")
if not arguments.min_allele_count.isdigit():
sys.exit(f"Minimum allele count --min_allele_count must be a positive integer, received: {arguments.min_allele_count}")


def parse_arguments(arguments):
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
Expand All @@ -415,7 +443,8 @@ def parse_arguments(arguments):
)
parser.add_argument("datafile", help=".vcf file containing variant information")
parser.add_argument("metadata", help=".json file containing metadata associated to datafile")
parser.add_argument("--samples", default=None, help="comma separated string of samples to process")
parser.add_argument("--samples", default=None, help="comma separated string of samples to process. EXPERIMENTAL")
parser.add_argument("--min_allele_count", default="1", help="minimum allele count can be raised to ignore rare variants. Default value is 1")
return parser.parse_args(arguments)


Expand Down
31 changes: 22 additions & 9 deletions docs/instructions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,11 @@ Starting PostgreSQL using Docker:
.. code-block:: console
cd beacon-python
docker run -e POSTGRES_USER=beacon \
docker run -d \
-e POSTGRES_USER=beacon \
-e POSTGRES_PASSWORD=beacon \
-e POSTGRES_DB=beacondb \
-v "$PWD/data":/docker-entrypoint-initdb.d
-v "$PWD/data":/docker-entrypoint-initdb.d \
-p 5432:5432 postgres:11.6
.. hint:: If one has their own database the ``beacon_init`` utility can be skipped,
Expand All @@ -182,19 +183,25 @@ For loading datasets to database we provide the ``beacon_init`` utility:

.. code-block:: console
╰─$ beacon_init --help
usage: beacon_init [-h] datafile metadata
$ beacon_init --help
usage: beacon_init [-h] [--samples SAMPLES]
[--min_allele_count MIN_ALLELE_COUNT]
datafile metadata
Load datafiles with associated metadata into the beacon database. See example
data and metadata files in the /data directory.
positional arguments:
datafile .vcf file containing variant information
metadata .json file containing metadata associated to datafile
datafile .vcf file containing variant information
metadata .json file containing metadata associated to datafile
optional arguments:
--samples comma separated string of samples to process
-h, --help show this help message and exit
-h, --help show this help message and exit
--samples SAMPLES comma separated string of samples to process.
EXPERIMENTAL
--min_allele_count MIN_ALLELE_COUNT
minimum allele count can be raised to ignore rare
variants. Default value is 1
As an example, a dataset metadata could be:

Expand All @@ -221,12 +228,18 @@ For loading data into the database we can proceed as follows:
$ beacon_init data/ALL.chrMT.phase3_callmom-v0_4.20130502.genotypes.vcf.gz data/example_metadata.json
For loading data into the database from selected samples only we can proceed as follows:
(EXPERIMENTAL) For loading data into the database from selected samples only we can proceed as follows:

.. code-block:: console
$ beacon_init data/ALL.chrMT.phase3_callmom-v0_4.20130502.genotypes.vcf.gz data/example_metadata.json --samples HG0001,HG0002,HG0003
For ignoring rare alleles, set a minimum allele count with ``--min_allele_count``:

.. code-block:: console
$ beacon_init data/ALL.chrMT.phase3_callmom-v0_4.20130502.genotypes.vcf.gz data/example_metadata.json --min_allele_count 20
.. note:: One dataset can have multiple files, in order to add more files to one dataset, repeat the command above.
The parameters ``callCount`` and ``variantCount`` from the metadata file reflect values of the entire dataset.
These values can be initialised with ``0`` if they are not known and updated in ``beacon_dataset_counts_table`` table.
Expand Down
4 changes: 2 additions & 2 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ async def create_tables(self, sql_file):
"""Mimic create_tables."""
pass

async def insert_variants(self, dataset_id, variants, len_samples):
async def insert_variants(self, dataset_id, variants, min_ac):
"""Mimic insert_variants."""
pass

async def load_metadata(self, vcf, metafile, datafile):
"""Mimic load_metadata."""
pass

async def load_datafile(self, vcf, datafile, datasetId):
async def load_datafile(self, vcf, datafile, datasetId, n=1000, min_ac=1):
"""Mimic load_datafile."""
return ["datasetId", "variants"]

Expand Down
2 changes: 1 addition & 1 deletion tests/test_db_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ async def test_insert_variants(self, db_mock, mock_log):
db_mock.return_value = Connection()
await self._db.connection()
db_mock.assert_called()
await self._db.insert_variants('DATASET1', ['C'])
await self._db.insert_variants('DATASET1', ['C'], 1)
# Should assert logs
mock_log.info.mock_calls = ['Received 1 variants for insertion to DATASET1',
'Insert variants into the database']
Expand Down

0 comments on commit d894eec

Please sign in to comment.