eQTL discovery pipeline for the GTEx Consortium: RNA-seq preprocessing

This notebook converts the RNA-seq data processing workflow of eQTL discovery pipeline for the GTEx Consortium (version July 31, 2017), originally written in Python, R and WDL into a single, self-contained SoS script with narratives.

In [1]:
%revisions -s -n 5
Revision Author Date Message
0479b41 Gao Wang 2018-06-13 Update workflows
5cfbede Gao Wang 2018-03-01 Fix f-string issue
19e1665 Gao Wang 2018-03-01 Fix UTF8 coding issue
c71a760 Gao Wang 2018-03-01 File renaming
c083e47 Gao Wang 2018-02-28 Update examples

The workflow perform two analysis on RNA-seq data:

  • Quantile normalization
  • PEER factor anlaysis

The WDL system is developed by Broad institute. Its syntax particularly focuses on human readability, and has recently became a github official language.

Caution that this example aims to demonstrate migrating workflows written in another workflow system to SoS, for readers to compare between them code organization, readibility, syntax and interface to remote computing environment. The data involved are private thus cannot be provided. However, these data will become available in the future at https://gtexportal.org/home/. In the mean time, readers interested in adopting this example for their own analysis can edit [global] section to input their own examples and execute the pipeline. We are happy to provide support with it on github.

Data preprocessing

In [1]:
[global]
rna_rpkm = path("~/Documents/GTEx/gtex7/rna-seq/GTEx_Data_2016-01-15_v7_RNA-seq_RNA-SeQCv1.1.8_gene_rpkm.gct.gz")
rna_cnts = path("~/Documents/GTEx/gtex7/rna-seq/GTEx_Data_2016-01-15_v7_RNA-seq_RNA-SeQCv1.1.8_gene_reads.gct.gz")
genotype = path("~/Documents/GTEx/gtex7/variant_calls/GTEx_Analysis_2016-01-15_v7_WholeGenomeSeq_635Ind_PASS_AB02_GQ20_HETX_MISS15_PLINKQC.PIR.vcf.gz")
sample_attr = path("~/Documents/GTEx/gtex7/sample_annotations/GTEx_Analysis_2016-01-15_v7_SampleAttributesDS.txt")
cwd = path("~/Documents/GTEx")

Please note that here we have specified paths to pre-existing directories in the [global] section, with parameter keyword. This setup allows users to configure all paths in one places for a “default” run, and optionally can configure them from command-line or %sosrun magic. To elaborate, for example:

[global]
parameter: cwd = path('~/Documents/GTEx')

can be override by command argument --cwd /path/to/new/dir. An alternative implementation is to use configuration file, eg, create a file called config.yml and write:

cwd: path('~/Documents/GTEx')

and execute with %sosrun -c config.yml. The downside is that a config.yml file will have to be maintained.

Data file format conversion

Code chunk below is prototyping codes to convert RNA-seq file to HDF5 format. It is not needed when the normalization workflow is executed (next section) because normalization workflow performs format conversion on original data. But it is useful to have here, in case the original data will have to be processed separately.

In [2]:
# Convert RNA-seq data to HDF5 matrices
[RNASeq_to_HDF5]
# RNA-seq count data encoding
parameter: dtype = 'np.uint32'
output: f"{_input[0]:nb}.hdf5"
task: workdir = cwd
python: expand='${ }'
    import pandas as pd
    import numpy as np
    import re, os
    def load_data(fdata, fsample, dtype = np.float32):
        '''First col of expression data is ENCODE gene name, 2nd col is HUGO name'''
        head = pd.read_csv(fdata, skiprows = 2, sep = '\t', nrows = 1)
        dt = {'Description': str, 'Name': str}
        dt.update({x: dtype for x in head.columns if x not in dt})
        data = pd.read_csv(fdata, compression='gzip', skiprows=2, 
                           index_col=0, header=0, dtype = dt, sep='\t').drop('Description', 1)
        samples = pd.read_csv(fsample, dtype=str, delimiter='\t', header=0)
        sample_dict = {}
        for row in samples[['SAMPID', 'SMTSD', 'SMAFRZE']].values:
            if row[2] == 'EXCLUDE':
                continue
            if row[1] not in sample_dict:
                sample_dict[row[1]] = []
            if row[0] in data.columns:
                sample_dict[row[1]].append(row[0])
        return data, dict((re.sub("[\W\d]+", "_", k.strip()).strip('_'), v) for k, v in sample_dict.items() if len(v))
    #
    data, sample = load_data(${_input[0]:r}, ${_input[1]:r}, dtype = ${dtype})
    data = {k: data.loc[:, sample[k]] for k in sample}
    if os.path.isfile(${_output:r}):
        os.remove(${_output:r})
    for k in data:
        data[k].to_hdf(${_output:r}, k, mode = 'a', complevel = 9, complib = 'zlib')

Quantile normalization

Quantile normalization of RNA-seq data

  1. input are rpkm file (for normalization), count file (for QC) and vcf file (for removing samples that do not have genotypes)
  2. Expression values are quantile normalized to the average empirical distribution observed across samples
  3. For each gene, expression values are inverse quantile normalized to a standard normal distribution across samples
    • genes are selected based on expression thresholds of >0.1 RPKM in >10 samples and >5 reads in >10 samples

It results in 4 analysis ready expression data files in HDF5 format of different versions / organizations of the same information: emperical quantile normalized and standard normal quantile normalized, saved as flat tables or grouped by tissues. Additionally 2 original Count and RPKM files are converted to HDF5 format grouped by tissues.

Notice that the implementation is memory intensive. As a result we use task to specifically configure the required resource on a big memory server (lab_server) that is required to run it. See remote tasks for more details on remote computer configurations.

See code chunk below for an implementation.

In [3]:
# Various versions of normalization
[rnaseq_1]
# Minimum RPKM required
parameter: rpkm_cutoff = 0.1
# Minimum read counts required
parameter: read_cutoff = 5
# Minimum sample counts required
parameter: sample_cutoff = 10
input: rna_rpkm, rna_cnts, genotype, sample_attr
output: f"{cwd:a}/rna_processed/{_input[0]:nnb}.qnorm.std.flat.h5",
        f"{cwd:a}/rna_processed/{_input[0]:nnb}.qnorm.flat.h5", 
        f"{cwd:a}/rna_processed/{_input[0]:nnb}.qnorm.std.h5", 
        f"{cwd:a}/rna_processed/{_input[0]:nnb}.qnorm.h5",
        f"{cwd:a}/rna_processed/{_input[0]:nnb}.h5".replace('rpkm', 'reads'),
        f"{cwd:a}/rna_processed/{_input[0]:nnb}.h5"
task: workdir = cwd, queue = "lab_server", walltime = "20:00:00", cores = 1, mem = "90G"
python: expand='${ }'

    # Adopted by Gao Wang from:
    # https://github.com/broadinstitute/gtex-pipeline
    # Originally authored by Francois Aguet

    import numpy as np
    import pandas as pd
    import gzip
    import subprocess
    import scipy.stats as stats
    import re, os

    def annotate_tissue_data(data, fsample):
        '''Save data to tissue specific tables'''
        samples = pd.read_csv(fsample, dtype=str, delimiter='\t', header=0)
        sample_dict = {}
        for row in samples[['SAMPID', 'SMTSD', 'SMAFRZE']].values:
            if row[2] == 'EXCLUDE':
                continue
            if row[1] not in sample_dict:
                sample_dict[row[1]] = []
            if row[0] in data.columns:
                sample_dict[row[1]].append(row[0])
        sample = dict((re.sub("[\W\d]+", "_", k.strip()).strip('_'), v) for k, v in sample_dict.items() if len(v))
        data = {k: data.loc[:, sample[k]] for k in sample}
        return data

    def write_per_tissue_data(data, output):
        if os.path.isfile(output):
            os.remove(output)
        for k in data:
            data[k].to_hdf(output, k, mode = 'a', complevel = 9, complib = 'zlib')

    def get_donors_from_vcf(vcfpath):
        """
        Extract donor IDs from VCF
        """
        with gzip.open(vcfpath) as vcf:
            for line in vcf:
                if line.decode()[:2]=='##': continue
                break
        return line.decode().strip().split('\t')[9:]

    def read_gct(gct_file, donor_ids, dtype):
        """
        Load GCT as DataFrame
        First col of expression data is ENCODE gene name, 2nd col is HUGO name
        ======================================================================
        A more memory friendly version:

        head = pd.read_csv(fdata, skiprows = 2, sep = '\t', nrows = 1)
        dt = {'Description': str, 'Name': str}
        dt.update({x: dtype for x in head.columns if x not in dt})
        data = pd.read_csv(fdata, compression='gzip', skiprows=2,
                           index_col=0, header=0, dtype = dt, sep='\t').drop('Description', 1)

        """
        df = pd.read_csv(gct_file, sep='\t', skiprows=2, index_col=0)
        df.drop('Description', axis=1, inplace=True)
        df.index.name = 'gene_id'
        return df[[i for i in df.columns if '-'.join(i.split('-')[:2]) in donor_ids]].astype(dtype, copy = True)

    def normalize_quantiles(M, inplace=False):
        """
        Note: replicates behavior of R function normalize.quantiles from library("preprocessCore")

        Reference:
         [1] Bolstad et al., Bioinformatics 19(2), pp. 185-193, 2003

        Adapted from https://github.com/andrewdyates/quantile_normalize
        """
        if not inplace:
            M = M.copy()

        Q = M.argsort(axis=0)
        m,n = M.shape

        # compute quantile vector
        quantiles = np.zeros(m)
        for i in range(n):
            quantiles += M[Q[:,i],i]
        quantiles = quantiles / n

        for i in range(n):
            # Get equivalence classes; unique values == 0
            dupes = np.zeros(m, dtype=np.int)
            for j in range(m-1):
                if M[Q[j,i],i]==M[Q[j+1,i],i]:
                    dupes[j+1] = dupes[j]+1

            # Replace column with quantile ranks
            M[Q[:,i],i] = quantiles

            # Average together equivalence classes
            j = m-1
            while j >= 0:
                if dupes[j] == 0:
                    j -= 1
                else:
                    idxs = Q[j-dupes[j]:j+1,i]
                    M[idxs,i] = np.median(M[idxs,i])
                    j -= 1 + dupes[j]
            assert j == -1

        if not inplace:
            return M

    def inverse_quantile_normalization(M):
        """
        After quantile normalization of samples, standardize expression of each gene
        """
        R = stats.mstats.rankdata(M,axis=1)  # ties are averaged
        Q = stats.norm.ppf(R/(M.shape[1]+1))
        return Q

    def normalize_expression(expression_df, counts_df, expression_threshold=0.1, count_threshold=5, min_samples=10, dtype = np.float32):
        """
        Genes are thresholded based on the following expression rules:
          >=min_samples with >expression_threshold expression values
          >=min_samples with >count_threshold read counts
        """
        # donor_ids = ['-'.join(i.split('-')[:2]) for i in expression_df.columns]
        donor_ids = expression_df.columns

        # expression thresholds
        mask = ((np.sum(expression_df>expression_threshold,axis=1)>=min_samples) & (np.sum(counts_df>count_threshold,axis=1)>=min_samples)).values

        # apply normalization
        M = normalize_quantiles(expression_df.loc[mask].values, inplace=False)
        R = inverse_quantile_normalization(M)

        quant_std_df = pd.DataFrame(data=R, columns=donor_ids, index=expression_df.loc[mask].index, dtype = dtype)
        quant_df = pd.DataFrame(data=M, columns=donor_ids, index=expression_df.loc[mask].index, dtype = dtype)
        return quant_std_df, quant_df

    class Environment:
        def __init__(self):
            self.expression_gct = ${_input[0]:ar}
            self.counts_gct = ${_input[1]:ar}
            self.vcf = ${_input[2]:ar}
            self.attributes = ${_input[3]:ar}
            self.prefix = ${_input[0]:nnbr}
            self.output_dir = ${_output[0]:dr}
            self.expression_threshold = ${rpkm_cutoff} 
            self.count_threshold = ${read_cutoff} 
            self.min_samples = ${sample_cutoff}

    args = Environment()
    print('Generating normalized expression files ... ', end='', flush=True)
    donor_ids = get_donors_from_vcf(args.vcf)
    expression_df = read_gct(args.expression_gct, donor_ids, np.float32)
    counts_df = read_gct(args.counts_gct, donor_ids, np.uint32)
    quant_std_df, quant_df = normalize_expression(expression_df, counts_df,
                                                  expression_threshold=args.expression_threshold, 
                                                  count_threshold=args.count_threshold, 
                                                  min_samples=args.min_samples)
    print('Save to HDF5 format, full matrix and per tissue data ...', end='', flush=True)
    prefix = os.path.join(args.output_dir, args.prefix)
    quant_std_per_tissue = annotate_tissue_data(quant_std_df, args.attributes)
    quant_per_tissue = annotate_tissue_data(quant_df, args.attributes)
    expression_per_tissue = annotate_tissue_data(expression_df, args.attributes)
    counts_per_tissue = annotate_tissue_data(counts_df, args.attributes)
    quant_std_df.to_hdf(prefix + ".qnorm.std.flat.h5",  'GTExV7', mode = 'w', complevel = 9, complib = 'zlib')
    quant_df.to_hdf(prefix + ".qnorm.flat.h5", 'GTExV7', mode = 'w', complevel = 9, complib = 'zlib')
    write_per_tissue_data(quant_per_tissue, prefix + ".qnorm.h5")
    write_per_tissue_data(quant_std_per_tissue, prefix + ".qnorm.std.h5")
    write_per_tissue_data(expression_per_tissue, prefix + ".h5")
    write_per_tissue_data(counts_per_tissue, prefix.replace('rpkm', 'reads') + ".h5")
    print('done.')

PEER factor analysis

Code chunk below installs PEER R packages.

In [4]:
# PEER package automatic installation
[peer]
output: f"{cwd:a}/R_peer_source_1.3.tgz"
task: workdir = cwd
download:
    https://github.com/downloads/PMBio/peer/R_peer_source_1.3.tgz
run:
    R CMD INSTALL R_peer_source_1.3.tgz
    rm -f R_peer_source_1.3.tgz

PEER factor analysis package has a number of configuable parameters. For this analysis I use default values hard-coded into the script (see code chunk below, step rnaseq_2). Therefore these settings cannot be configured from input parameter though it is straightforward to implement it otherwise.

For every tissue I compute PEER factor using the top 10,000 expressed genes. It takes 1hr to 3hrs to complete each tissue.

In [5]:
# Removing unwanted variation
[rnaseq_2]
depends: R_library('rhdf5'), R_library('peer')
parameter: tissues = paths(get_output(f"h5ls {_input[2]} | awk '{{print $1}}'").strip().split('\n'))
input: for_each = ['tissues']
output: f"{cwd:a}/peer_analysis/{_tissues}_PEER_covariates.txt", 
        f"{cwd:a}/peer_analysis/{_tissues}_PEER_alpha.txt", 
        f"{cwd:a}/peer_analysis/{_tissues}_PEER_residuals.txt"
task: workdir = cwd, queue = "uchicago_cluster", walltime = "30:00:00", cores = 1, mem = "8G", trunk_size=1, trunk_workers=1
R: expand='${ }'
    alphaprior_a=0.001
    alphaprior_b=0.01
    epsprior_a=0.1
    epsprior_b=10
    max_iter=1000
    use_genes = 10000
    expr.h5 = ${_input[2]:r}

    library(peer, quietly=TRUE)  # https://github.com/PMBio/peer
    library(rhdf5, quietly=TRUE)

    WriteTable <- function(data, filename, index.name) {
        datafile <- file(filename, open = "wt")
        on.exit(close(datafile))
        header <- c(index.name, colnames(data))
        writeLines(paste0(header, collapse="\t"), con=datafile, sep="\n")
        write.table(data, datafile, sep="\t", col.names=FALSE, quote=FALSE)
    }

    loadTable <- function(filename, group, auto_transpose = FALSE) {
      obj <- h5read(filename, group)
      dat <- obj$block0_values
      rownames(dat) <- obj$axis0
      colnames(dat) <- obj$axis1
      if (ncol(dat) > nrow(dat) && auto_transpose) dat <- t(dat)
      return(dat)
    }

    getNumPeer <- function(ss) {
      if (ss<150) return (min(15, ceiling(ss / 5)))
      else if (ss >=150 && ss < 250) return(30)
      else return(35)
    }

    whichpart <- function(x, n) {
      nx <- length(x)
      p <- nx-n
      xp <- sort(x, partial=p)[p]
      which(x > xp)
    }

    getTopGenes <- function(gmat, num = 1000) {
        if (ncol(M) <= num) {
            return(gmat)
        } else {
            top.index <- whichpart(total.expr <- colSums(gmat), num)
            return(gmat[,top.index])
        }
    }

    cat("PEER: loading expression data ... ")
    # rows are number of samples. columns are number of genes
    M <- as.matrix(loadTable(expr.h5, "/${_tissues}"))
    M <- getTopGenes(M, use_genes)
    n = getNumPeer(nrow(M))
    cat("done.\n")

    # run PEER
    cat(paste0("PEER: estimating hidden confounders (", n, " for tissue ", ${_tissues:r} , ")\n"))
    model <- PEER()
    invisible(PEER_setNk(model, n))
    invisible(PEER_setPhenoMean(model, M))
    invisible(PEER_setPriorAlpha(model, alphaprior_a, alphaprior_b))
    invisible(PEER_setPriorEps(model,epsprior_a, epsprior_b))
    invisible(PEER_setNmax_iterations(model, max_iter))
    # if(!is.null(covs)) {
    #   invisible(PEER_setCovariates(model, covs))
    # }
    time <- system.time(PEER_update(model))

    X <- PEER_getX(model)  # samples x PEER factors
    A <- PEER_getAlpha(model)  # PEER factors x 1
    R <- t(PEER_getResiduals(model))  # genes x samples

    # add relevant row/column names
    c <- paste0("InferredCov",1:ncol(X))
    rownames(X) <- rownames(M)
    colnames(X) <- c
    rownames(A) <- c
    colnames(A) <- "Alpha"
    A <- as.data.frame(A)
    A$Relevance <- 1.0 / A$Alpha
    rownames(R) <- colnames(M)
    colnames(R) <- rownames(M)

    # write results
    cat("PEER: writing results ... ")
    WriteTable(t(X), ${_output[0]:r}, "ID")  # format(X, digits=6)
    WriteTable(A, ${_output[1]:r}, "ID")
    WriteTable(R, ${_output[2]:r}, "ID")
    cat("done.\n")

Execute RNA-seq workflow

See execution cells below.

In [ ]:
%sosrun rnaseq