{ "cells": [ { "cell_type": "markdown", "metadata": { "kernel": "SoS" }, "source": [ "# eQTL discovery pipeline for the GTEx Consortium: RNA-seq preprocessing\n", "\n", "This notebook converts the RNA-seq data processing workflow of [eQTL discovery pipeline for the GTEx Consortium](https://github.com/broadinstitute/gtex-pipeline/tree/63b13b8ced25cf8ab8e7a26f40a495e523630a9b/qtl) (version July 31, 2017), originally written in Python, R and [WDL](https://software.broadinstitute.org/wdl/) into a **single, self-contained SoS script with narratives**." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "kernel": "SoS" }, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "
RevisionAuthorDateMessage
0479b41Gao Wang2018-06-13Update workflows
5cfbedeGao Wang2018-03-01Fix f-string issue
19e1665Gao Wang2018-03-01Fix UTF8 coding issue
c71a760Gao Wang2018-03-01File renaming
c083e47Gao Wang2018-02-28Update examples
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "%revisions -s -n 5" ] }, { "cell_type": "markdown", "metadata": { "kernel": "SoS" }, "source": [ "The workflow perform two analysis on RNA-seq data:\n", "\n", "* Quantile normalization\n", "* PEER factor anlaysis\n", "\n", "The [WDL](https://software.broadinstitute.org/wdl/) system is developed by Broad institute. Its syntax particularly focuses on human readability, and has recently became a [github official language](https://software.broadinstitute.org/wdl/blog?id=10509).\n", "\n", "Caution that this example aims to demonstrate migrating workflows written in another workflow system to SoS, for readers to compare between them code organization, readibility, syntax and interface to remote computing environment. The data involved are private thus cannot be provided. However, these data will become available in the future at https://gtexportal.org/home/. In the mean time, readers interested in adopting this example for their own analysis can edit `[global]` section to input their own examples and execute the pipeline. We are happy to provide support with it [on github](https://github.com/vatlab/sos-docs/issues)." ] }, { "cell_type": "markdown", "metadata": { "kernel": "SoS" }, "source": [ "## Data preprocessing" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": true, "kernel": "SoS" }, "outputs": [], "source": [ "[global]\n", "rna_rpkm = path(\"~/Documents/GTEx/gtex7/rna-seq/GTEx_Data_2016-01-15_v7_RNA-seq_RNA-SeQCv1.1.8_gene_rpkm.gct.gz\")\n", "rna_cnts = path(\"~/Documents/GTEx/gtex7/rna-seq/GTEx_Data_2016-01-15_v7_RNA-seq_RNA-SeQCv1.1.8_gene_reads.gct.gz\")\n", "genotype = path(\"~/Documents/GTEx/gtex7/variant_calls/GTEx_Analysis_2016-01-15_v7_WholeGenomeSeq_635Ind_PASS_AB02_GQ20_HETX_MISS15_PLINKQC.PIR.vcf.gz\")\n", "sample_attr = path(\"~/Documents/GTEx/gtex7/sample_annotations/GTEx_Analysis_2016-01-15_v7_SampleAttributesDS.txt\")\n", "cwd = path(\"~/Documents/GTEx\")" ] }, { "cell_type": "markdown", "metadata": { "kernel": "SoS" }, "source": [ "Please note that here we have specified paths to pre-existing directories in the `[global]` section, with `parameter` keyword. This setup allows users to configure all paths in one places for a “default” run, and optionally can configure them from command-line or `%sosrun` magic. To elaborate, for example:\n", "\n", "```\n", "[global]\n", "parameter: cwd = path('~/Documents/GTEx')\n", "```\n", "\n", "can be override by command argument `--cwd /path/to/new/dir`. An alternative implementation is to use configuration file, eg, create a file called `config.yml` and write:\n", "\n", "```\n", "cwd: path('~/Documents/GTEx')\n", "```\n", "and execute with `%sosrun -c config.yml`. The downside is that a `config.yml` file will have to be maintained." ] }, { "cell_type": "markdown", "metadata": { "kernel": "SoS" }, "source": [ "### Data file format conversion\n", "Code chunk below is prototyping codes to convert RNA-seq file to HDF5 format. It is not needed when the normalization workflow is executed (next section) because normalization workflow performs format conversion on original data. But it is useful to have here, in case the original data will have to be processed separately." ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": true, "kernel": "SoS" }, "outputs": [], "source": [ "# Convert RNA-seq data to HDF5 matrices\n", "[RNASeq_to_HDF5]\n", "# RNA-seq count data encoding\n", "parameter: dtype = 'np.uint32'\n", "output: f\"{_input[0]:nb}.hdf5\"\n", "task: workdir = cwd\n", "python: expand='${ }'\n", " import pandas as pd\n", " import numpy as np\n", " import re, os\n", " def load_data(fdata, fsample, dtype = np.float32):\n", " '''First col of expression data is ENCODE gene name, 2nd col is HUGO name'''\n", " head = pd.read_csv(fdata, skiprows = 2, sep = '\\t', nrows = 1)\n", " dt = {'Description': str, 'Name': str}\n", " dt.update({x: dtype for x in head.columns if x not in dt})\n", " data = pd.read_csv(fdata, compression='gzip', skiprows=2, \n", " index_col=0, header=0, dtype = dt, sep='\\t').drop('Description', 1)\n", " samples = pd.read_csv(fsample, dtype=str, delimiter='\\t', header=0)\n", " sample_dict = {}\n", " for row in samples[['SAMPID', 'SMTSD', 'SMAFRZE']].values:\n", " if row[2] == 'EXCLUDE':\n", " continue\n", " if row[1] not in sample_dict:\n", " sample_dict[row[1]] = []\n", " if row[0] in data.columns:\n", " sample_dict[row[1]].append(row[0])\n", " return data, dict((re.sub(\"[\\W\\d]+\", \"_\", k.strip()).strip('_'), v) for k, v in sample_dict.items() if len(v))\n", " #\n", " data, sample = load_data(${_input[0]:r}, ${_input[1]:r}, dtype = ${dtype})\n", " data = {k: data.loc[:, sample[k]] for k in sample}\n", " if os.path.isfile(${_output:r}):\n", " os.remove(${_output:r})\n", " for k in data:\n", " data[k].to_hdf(${_output:r}, k, mode = 'a', complevel = 9, complib = 'zlib')" ] }, { "cell_type": "markdown", "metadata": { "kernel": "SoS" }, "source": [ "### Quantile normalization\n", "\n", "Quantile normalization of RNA-seq data \n", "\n", "1. input are rpkm file (for normalization), count file (for QC) and vcf file (for removing samples that do not have genotypes)\n", "2. Expression values are quantile normalized to the average empirical distribution observed across samples\n", "3. For each gene, expression values are inverse quantile normalized to a standard normal distribution across samples\n", " * genes are selected based on expression thresholds of >0.1 RPKM in >10 samples and >5 reads in >10 samples\n", "\n", "It results in 4 **analysis ready expression data files** in HDF5 format of different versions / organizations of the same information: emperical quantile normalized and standard normal quantile normalized, saved as flat tables or grouped by tissues. Additionally 2 original Count and RPKM files are converted to HDF5 format grouped by tissues.\n", "\n", "Notice that the implementation is memory intensive. As a result we use `task` to specifically configure the required resource on a big memory server (`lab_server`) that is required to run it. See [remote tasks](https://vatlab.github.io/sos-docs/doc/documentation/Remote_Execution.html) for more details on remote computer configurations.\n", "\n", "See code chunk below for an implementation." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": true, "kernel": "SoS" }, "outputs": [], "source": [ "# Various versions of normalization\n", "[rnaseq_1]\n", "# Minimum RPKM required\n", "parameter: rpkm_cutoff = 0.1\n", "# Minimum read counts required\n", "parameter: read_cutoff = 5\n", "# Minimum sample counts required\n", "parameter: sample_cutoff = 10\n", "input: rna_rpkm, rna_cnts, genotype, sample_attr\n", "output: f\"{cwd:a}/rna_processed/{_input[0]:nnb}.qnorm.std.flat.h5\",\n", " f\"{cwd:a}/rna_processed/{_input[0]:nnb}.qnorm.flat.h5\", \n", " f\"{cwd:a}/rna_processed/{_input[0]:nnb}.qnorm.std.h5\", \n", " f\"{cwd:a}/rna_processed/{_input[0]:nnb}.qnorm.h5\",\n", " f\"{cwd:a}/rna_processed/{_input[0]:nnb}.h5\".replace('rpkm', 'reads'),\n", " f\"{cwd:a}/rna_processed/{_input[0]:nnb}.h5\"\n", "task: workdir = cwd, queue = \"lab_server\", walltime = \"20:00:00\", cores = 1, mem = \"90G\"\n", "python: expand='${ }'\n", "\n", " # Adopted by Gao Wang from:\n", " # https://github.com/broadinstitute/gtex-pipeline\n", " # Originally authored by Francois Aguet\n", "\n", " import numpy as np\n", " import pandas as pd\n", " import gzip\n", " import subprocess\n", " import scipy.stats as stats\n", " import re, os\n", "\n", " def annotate_tissue_data(data, fsample):\n", " '''Save data to tissue specific tables'''\n", " samples = pd.read_csv(fsample, dtype=str, delimiter='\\t', header=0)\n", " sample_dict = {}\n", " for row in samples[['SAMPID', 'SMTSD', 'SMAFRZE']].values:\n", " if row[2] == 'EXCLUDE':\n", " continue\n", " if row[1] not in sample_dict:\n", " sample_dict[row[1]] = []\n", " if row[0] in data.columns:\n", " sample_dict[row[1]].append(row[0])\n", " sample = dict((re.sub(\"[\\W\\d]+\", \"_\", k.strip()).strip('_'), v) for k, v in sample_dict.items() if len(v))\n", " data = {k: data.loc[:, sample[k]] for k in sample}\n", " return data\n", "\n", " def write_per_tissue_data(data, output):\n", " if os.path.isfile(output):\n", " os.remove(output)\n", " for k in data:\n", " data[k].to_hdf(output, k, mode = 'a', complevel = 9, complib = 'zlib')\n", "\n", " def get_donors_from_vcf(vcfpath):\n", " \"\"\"\n", " Extract donor IDs from VCF\n", " \"\"\"\n", " with gzip.open(vcfpath) as vcf:\n", " for line in vcf:\n", " if line.decode()[:2]=='##': continue\n", " break\n", " return line.decode().strip().split('\\t')[9:]\n", "\n", " def read_gct(gct_file, donor_ids, dtype):\n", " \"\"\"\n", " Load GCT as DataFrame\n", " First col of expression data is ENCODE gene name, 2nd col is HUGO name\n", " ======================================================================\n", " A more memory friendly version:\n", "\n", " head = pd.read_csv(fdata, skiprows = 2, sep = '\\t', nrows = 1)\n", " dt = {'Description': str, 'Name': str}\n", " dt.update({x: dtype for x in head.columns if x not in dt})\n", " data = pd.read_csv(fdata, compression='gzip', skiprows=2,\n", " index_col=0, header=0, dtype = dt, sep='\\t').drop('Description', 1)\n", "\n", " \"\"\"\n", " df = pd.read_csv(gct_file, sep='\\t', skiprows=2, index_col=0)\n", " df.drop('Description', axis=1, inplace=True)\n", " df.index.name = 'gene_id'\n", " return df[[i for i in df.columns if '-'.join(i.split('-')[:2]) in donor_ids]].astype(dtype, copy = True)\n", "\n", " def normalize_quantiles(M, inplace=False):\n", " \"\"\"\n", " Note: replicates behavior of R function normalize.quantiles from library(\"preprocessCore\")\n", "\n", " Reference:\n", " [1] Bolstad et al., Bioinformatics 19(2), pp. 185-193, 2003\n", "\n", " Adapted from https://github.com/andrewdyates/quantile_normalize\n", " \"\"\"\n", " if not inplace:\n", " M = M.copy()\n", "\n", " Q = M.argsort(axis=0)\n", " m,n = M.shape\n", "\n", " # compute quantile vector\n", " quantiles = np.zeros(m)\n", " for i in range(n):\n", " quantiles += M[Q[:,i],i]\n", " quantiles = quantiles / n\n", "\n", " for i in range(n):\n", " # Get equivalence classes; unique values == 0\n", " dupes = np.zeros(m, dtype=np.int)\n", " for j in range(m-1):\n", " if M[Q[j,i],i]==M[Q[j+1,i],i]:\n", " dupes[j+1] = dupes[j]+1\n", "\n", " # Replace column with quantile ranks\n", " M[Q[:,i],i] = quantiles\n", "\n", " # Average together equivalence classes\n", " j = m-1\n", " while j >= 0:\n", " if dupes[j] == 0:\n", " j -= 1\n", " else:\n", " idxs = Q[j-dupes[j]:j+1,i]\n", " M[idxs,i] = np.median(M[idxs,i])\n", " j -= 1 + dupes[j]\n", " assert j == -1\n", "\n", " if not inplace:\n", " return M\n", "\n", " def inverse_quantile_normalization(M):\n", " \"\"\"\n", " After quantile normalization of samples, standardize expression of each gene\n", " \"\"\"\n", " R = stats.mstats.rankdata(M,axis=1) # ties are averaged\n", " Q = stats.norm.ppf(R/(M.shape[1]+1))\n", " return Q\n", "\n", " def normalize_expression(expression_df, counts_df, expression_threshold=0.1, count_threshold=5, min_samples=10, dtype = np.float32):\n", " \"\"\"\n", " Genes are thresholded based on the following expression rules:\n", " >=min_samples with >expression_threshold expression values\n", " >=min_samples with >count_threshold read counts\n", " \"\"\"\n", " # donor_ids = ['-'.join(i.split('-')[:2]) for i in expression_df.columns]\n", " donor_ids = expression_df.columns\n", "\n", " # expression thresholds\n", " mask = ((np.sum(expression_df>expression_threshold,axis=1)>=min_samples) & (np.sum(counts_df>count_threshold,axis=1)>=min_samples)).values\n", "\n", " # apply normalization\n", " M = normalize_quantiles(expression_df.loc[mask].values, inplace=False)\n", " R = inverse_quantile_normalization(M)\n", "\n", " quant_std_df = pd.DataFrame(data=R, columns=donor_ids, index=expression_df.loc[mask].index, dtype = dtype)\n", " quant_df = pd.DataFrame(data=M, columns=donor_ids, index=expression_df.loc[mask].index, dtype = dtype)\n", " return quant_std_df, quant_df\n", "\n", " class Environment:\n", " def __init__(self):\n", " self.expression_gct = ${_input[0]:ar}\n", " self.counts_gct = ${_input[1]:ar}\n", " self.vcf = ${_input[2]:ar}\n", " self.attributes = ${_input[3]:ar}\n", " self.prefix = ${_input[0]:nnbr}\n", " self.output_dir = ${_output[0]:dr}\n", " self.expression_threshold = ${rpkm_cutoff} \n", " self.count_threshold = ${read_cutoff} \n", " self.min_samples = ${sample_cutoff}\n", "\n", " args = Environment()\n", " print('Generating normalized expression files ... ', end='', flush=True)\n", " donor_ids = get_donors_from_vcf(args.vcf)\n", " expression_df = read_gct(args.expression_gct, donor_ids, np.float32)\n", " counts_df = read_gct(args.counts_gct, donor_ids, np.uint32)\n", " quant_std_df, quant_df = normalize_expression(expression_df, counts_df,\n", " expression_threshold=args.expression_threshold, \n", " count_threshold=args.count_threshold, \n", " min_samples=args.min_samples)\n", " print('Save to HDF5 format, full matrix and per tissue data ...', end='', flush=True)\n", " prefix = os.path.join(args.output_dir, args.prefix)\n", " quant_std_per_tissue = annotate_tissue_data(quant_std_df, args.attributes)\n", " quant_per_tissue = annotate_tissue_data(quant_df, args.attributes)\n", " expression_per_tissue = annotate_tissue_data(expression_df, args.attributes)\n", " counts_per_tissue = annotate_tissue_data(counts_df, args.attributes)\n", " quant_std_df.to_hdf(prefix + \".qnorm.std.flat.h5\", 'GTExV7', mode = 'w', complevel = 9, complib = 'zlib')\n", " quant_df.to_hdf(prefix + \".qnorm.flat.h5\", 'GTExV7', mode = 'w', complevel = 9, complib = 'zlib')\n", " write_per_tissue_data(quant_per_tissue, prefix + \".qnorm.h5\")\n", " write_per_tissue_data(quant_std_per_tissue, prefix + \".qnorm.std.h5\")\n", " write_per_tissue_data(expression_per_tissue, prefix + \".h5\")\n", " write_per_tissue_data(counts_per_tissue, prefix.replace('rpkm', 'reads') + \".h5\")\n", " print('done.')" ] }, { "cell_type": "markdown", "metadata": { "kernel": "SoS" }, "source": [ "## PEER factor analysis\n", "Code chunk below installs `PEER` R packages." ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": true, "kernel": "SoS" }, "outputs": [], "source": [ "# PEER package automatic installation\n", "[peer]\n", "output: f\"{cwd:a}/R_peer_source_1.3.tgz\"\n", "task: workdir = cwd\n", "download:\n", " https://github.com/downloads/PMBio/peer/R_peer_source_1.3.tgz\n", "run:\n", " R CMD INSTALL R_peer_source_1.3.tgz\n", " rm -f R_peer_source_1.3.tgz" ] }, { "cell_type": "markdown", "metadata": { "kernel": "SoS" }, "source": [ "PEER factor analysis package has a number of configuable parameters. For this analysis I use default values hard-coded into the script (see code chunk below, step `rnaseq_2`). Therefore these settings cannot be configured from input parameter though it is straightforward to implement it otherwise.\n", "\n", "For every tissue I compute PEER factor using the top 10,000 expressed genes. It takes 1hr to 3hrs to complete each tissue." ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": true, "kernel": "SoS" }, "outputs": [], "source": [ "# Removing unwanted variation\n", "[rnaseq_2]\n", "depends: R_library('rhdf5'), R_library('peer')\n", "parameter: tissues = paths(get_output(f\"h5ls {_input[2]} | awk '{{print $1}}'\").strip().split('\\n'))\n", "input: for_each = ['tissues']\n", "output: f\"{cwd:a}/peer_analysis/{_tissues}_PEER_covariates.txt\", \n", " f\"{cwd:a}/peer_analysis/{_tissues}_PEER_alpha.txt\", \n", " f\"{cwd:a}/peer_analysis/{_tissues}_PEER_residuals.txt\"\n", "task: workdir = cwd, queue = \"uchicago_cluster\", walltime = \"30:00:00\", cores = 1, mem = \"8G\", trunk_size=1, trunk_workers=1\n", "R: expand='${ }'\n", " alphaprior_a=0.001\n", " alphaprior_b=0.01\n", " epsprior_a=0.1\n", " epsprior_b=10\n", " max_iter=1000\n", " use_genes = 10000\n", " expr.h5 = ${_input[2]:r}\n", "\n", " library(peer, quietly=TRUE) # https://github.com/PMBio/peer\n", " library(rhdf5, quietly=TRUE)\n", "\n", " WriteTable <- function(data, filename, index.name) {\n", " datafile <- file(filename, open = \"wt\")\n", " on.exit(close(datafile))\n", " header <- c(index.name, colnames(data))\n", " writeLines(paste0(header, collapse=\"\\t\"), con=datafile, sep=\"\\n\")\n", " write.table(data, datafile, sep=\"\\t\", col.names=FALSE, quote=FALSE)\n", " }\n", "\n", " loadTable <- function(filename, group, auto_transpose = FALSE) {\n", " obj <- h5read(filename, group)\n", " dat <- obj$block0_values\n", " rownames(dat) <- obj$axis0\n", " colnames(dat) <- obj$axis1\n", " if (ncol(dat) > nrow(dat) && auto_transpose) dat <- t(dat)\n", " return(dat)\n", " }\n", "\n", " getNumPeer <- function(ss) {\n", " if (ss<150) return (min(15, ceiling(ss / 5)))\n", " else if (ss >=150 && ss < 250) return(30)\n", " else return(35)\n", " }\n", "\n", " whichpart <- function(x, n) {\n", " nx <- length(x)\n", " p <- nx-n\n", " xp <- sort(x, partial=p)[p]\n", " which(x > xp)\n", " }\n", "\n", " getTopGenes <- function(gmat, num = 1000) {\n", " if (ncol(M) <= num) {\n", " return(gmat)\n", " } else {\n", " top.index <- whichpart(total.expr <- colSums(gmat), num)\n", " return(gmat[,top.index])\n", " }\n", " }\n", "\n", " cat(\"PEER: loading expression data ... \")\n", " # rows are number of samples. columns are number of genes\n", " M <- as.matrix(loadTable(expr.h5, \"/${_tissues}\"))\n", " M <- getTopGenes(M, use_genes)\n", " n = getNumPeer(nrow(M))\n", " cat(\"done.\\n\")\n", "\n", " # run PEER\n", " cat(paste0(\"PEER: estimating hidden confounders (\", n, \" for tissue \", ${_tissues:r} , \")\\n\"))\n", " model <- PEER()\n", " invisible(PEER_setNk(model, n))\n", " invisible(PEER_setPhenoMean(model, M))\n", " invisible(PEER_setPriorAlpha(model, alphaprior_a, alphaprior_b))\n", " invisible(PEER_setPriorEps(model,epsprior_a, epsprior_b))\n", " invisible(PEER_setNmax_iterations(model, max_iter))\n", " # if(!is.null(covs)) {\n", " # invisible(PEER_setCovariates(model, covs))\n", " # }\n", " time <- system.time(PEER_update(model))\n", "\n", " X <- PEER_getX(model) # samples x PEER factors\n", " A <- PEER_getAlpha(model) # PEER factors x 1\n", " R <- t(PEER_getResiduals(model)) # genes x samples\n", "\n", " # add relevant row/column names\n", " c <- paste0(\"InferredCov\",1:ncol(X))\n", " rownames(X) <- rownames(M)\n", " colnames(X) <- c\n", " rownames(A) <- c\n", " colnames(A) <- \"Alpha\"\n", " A <- as.data.frame(A)\n", " A$Relevance <- 1.0 / A$Alpha\n", " rownames(R) <- colnames(M)\n", " colnames(R) <- rownames(M)\n", "\n", " # write results\n", " cat(\"PEER: writing results ... \")\n", " WriteTable(t(X), ${_output[0]:r}, \"ID\") # format(X, digits=6)\n", " WriteTable(A, ${_output[1]:r}, \"ID\")\n", " WriteTable(R, ${_output[2]:r}, \"ID\")\n", " cat(\"done.\\n\")" ] }, { "cell_type": "markdown", "metadata": { "kernel": "SoS" }, "source": [ "## Execute RNA-seq workflow\n", "See execution cells below." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "kernel": "SoS" }, "outputs": [], "source": [ "%sosrun rnaseq" ] } ], "metadata": { "kernelspec": { "display_name": "SoS", "language": "sos", "name": "sos" }, "language_info": { "codemirror_mode": "sos", "file_extension": ".sos", "mimetype": "text/x-sos", "name": "sos", "nbconvert_exporter": "sos_notebook.converter.SoS_Exporter", "pygments_lexer": "sos" }, "sos": { "default_kernel": "SoS", "kernels": [ [ "SoS", "sos", "", "", "" ] ], "panel": { "displayed": true, "height": 0, "style": "side" }, "version": "0.18.5" } }, "nbformat": 4, "nbformat_minor": 2 }