eQTL discovery pipeline for the GTEx Consortium: RNA-seq preprocessing
This notebook converts the RNA-seq data processing workflow of eQTL discovery pipeline for the GTEx Consortium (version July 31, 2017), originally written in Python, R and WDL into a single, self-contained SoS script with narratives.
%revisions -s -n 5
The workflow perform two analysis on RNA-seq data:
- Quantile normalization
- PEER factor anlaysis
The WDL system is developed by Broad institute. Its syntax particularly focuses on human readability, and has recently became a github official language.
Caution that this example aims to demonstrate migrating workflows written in another workflow system to SoS, for readers to compare between them code organization, readibility, syntax and interface to remote computing environment. The data involved are private thus cannot be provided. However, these data will become available in the future at https://gtexportal.org/home/. In the mean time, readers interested in adopting this example for their own analysis can edit [global]
section to input their own examples and execute the pipeline. We are happy to provide support with it on github.
[global]
rna_rpkm = path("~/Documents/GTEx/gtex7/rna-seq/GTEx_Data_2016-01-15_v7_RNA-seq_RNA-SeQCv1.1.8_gene_rpkm.gct.gz")
rna_cnts = path("~/Documents/GTEx/gtex7/rna-seq/GTEx_Data_2016-01-15_v7_RNA-seq_RNA-SeQCv1.1.8_gene_reads.gct.gz")
genotype = path("~/Documents/GTEx/gtex7/variant_calls/GTEx_Analysis_2016-01-15_v7_WholeGenomeSeq_635Ind_PASS_AB02_GQ20_HETX_MISS15_PLINKQC.PIR.vcf.gz")
sample_attr = path("~/Documents/GTEx/gtex7/sample_annotations/GTEx_Analysis_2016-01-15_v7_SampleAttributesDS.txt")
cwd = path("~/Documents/GTEx")
Please note that here we have specified paths to pre-existing directories in the [global]
section, with parameter
keyword. This setup allows users to configure all paths in one places for a “default” run, and optionally can configure them from command-line or %sosrun
magic. To elaborate, for example:
[global]
parameter: cwd = path('~/Documents/GTEx')
can be override by command argument --cwd /path/to/new/dir
. An alternative implementation is to use configuration file, eg, create a file called config.yml
and write:
cwd: path('~/Documents/GTEx')
and execute with %sosrun -c config.yml
. The downside is that a config.yml
file will have to be maintained.
Data file format conversion
Code chunk below is prototyping codes to convert RNA-seq file to HDF5 format. It is not needed when the normalization workflow is executed (next section) because normalization workflow performs format conversion on original data. But it is useful to have here, in case the original data will have to be processed separately.
# Convert RNA-seq data to HDF5 matrices
[RNASeq_to_HDF5]
# RNA-seq count data encoding
parameter: dtype = 'np.uint32'
output: f"{_input[0]:nb}.hdf5"
task: workdir = cwd
python: expand='${ }'
import pandas as pd
import numpy as np
import re, os
def load_data(fdata, fsample, dtype = np.float32):
'''First col of expression data is ENCODE gene name, 2nd col is HUGO name'''
head = pd.read_csv(fdata, skiprows = 2, sep = '\t', nrows = 1)
dt = {'Description': str, 'Name': str}
dt.update({x: dtype for x in head.columns if x not in dt})
data = pd.read_csv(fdata, compression='gzip', skiprows=2,
index_col=0, header=0, dtype = dt, sep='\t').drop('Description', 1)
samples = pd.read_csv(fsample, dtype=str, delimiter='\t', header=0)
sample_dict = {}
for row in samples[['SAMPID', 'SMTSD', 'SMAFRZE']].values:
if row[2] == 'EXCLUDE':
continue
if row[1] not in sample_dict:
sample_dict[row[1]] = []
if row[0] in data.columns:
sample_dict[row[1]].append(row[0])
return data, dict((re.sub("[\W\d]+", "_", k.strip()).strip('_'), v) for k, v in sample_dict.items() if len(v))
#
data, sample = load_data(${_input[0]:r}, ${_input[1]:r}, dtype = ${dtype})
data = {k: data.loc[:, sample[k]] for k in sample}
if os.path.isfile(${_output:r}):
os.remove(${_output:r})
for k in data:
data[k].to_hdf(${_output:r}, k, mode = 'a', complevel = 9, complib = 'zlib')
Quantile normalization
Quantile normalization of RNA-seq data
- input are rpkm file (for normalization), count file (for QC) and vcf file (for removing samples that do not have genotypes)
- Expression values are quantile normalized to the average empirical distribution observed across samples
- For each gene, expression values are inverse quantile normalized to a standard normal distribution across samples
- genes are selected based on expression thresholds of >0.1 RPKM in >10 samples and >5 reads in >10 samples
It results in 4 analysis ready expression data files in HDF5 format of different versions / organizations of the same information: emperical quantile normalized and standard normal quantile normalized, saved as flat tables or grouped by tissues. Additionally 2 original Count and RPKM files are converted to HDF5 format grouped by tissues.
Notice that the implementation is memory intensive. As a result we use task
to specifically configure the required resource on a big memory server (lab_server
) that is required to run it. See remote tasks for more details on remote computer configurations.
See code chunk below for an implementation.
# Various versions of normalization
[rnaseq_1]
# Minimum RPKM required
parameter: rpkm_cutoff = 0.1
# Minimum read counts required
parameter: read_cutoff = 5
# Minimum sample counts required
parameter: sample_cutoff = 10
input: rna_rpkm, rna_cnts, genotype, sample_attr
output: f"{cwd:a}/rna_processed/{_input[0]:nnb}.qnorm.std.flat.h5",
f"{cwd:a}/rna_processed/{_input[0]:nnb}.qnorm.flat.h5",
f"{cwd:a}/rna_processed/{_input[0]:nnb}.qnorm.std.h5",
f"{cwd:a}/rna_processed/{_input[0]:nnb}.qnorm.h5",
f"{cwd:a}/rna_processed/{_input[0]:nnb}.h5".replace('rpkm', 'reads'),
f"{cwd:a}/rna_processed/{_input[0]:nnb}.h5"
task: workdir = cwd, queue = "lab_server", walltime = "20:00:00", cores = 1, mem = "90G"
python: expand='${ }'
# Adopted by Gao Wang from:
# https://github.com/broadinstitute/gtex-pipeline
# Originally authored by Francois Aguet
import numpy as np
import pandas as pd
import gzip
import subprocess
import scipy.stats as stats
import re, os
def annotate_tissue_data(data, fsample):
'''Save data to tissue specific tables'''
samples = pd.read_csv(fsample, dtype=str, delimiter='\t', header=0)
sample_dict = {}
for row in samples[['SAMPID', 'SMTSD', 'SMAFRZE']].values:
if row[2] == 'EXCLUDE':
continue
if row[1] not in sample_dict:
sample_dict[row[1]] = []
if row[0] in data.columns:
sample_dict[row[1]].append(row[0])
sample = dict((re.sub("[\W\d]+", "_", k.strip()).strip('_'), v) for k, v in sample_dict.items() if len(v))
data = {k: data.loc[:, sample[k]] for k in sample}
return data
def write_per_tissue_data(data, output):
if os.path.isfile(output):
os.remove(output)
for k in data:
data[k].to_hdf(output, k, mode = 'a', complevel = 9, complib = 'zlib')
def get_donors_from_vcf(vcfpath):
"""
Extract donor IDs from VCF
"""
with gzip.open(vcfpath) as vcf:
for line in vcf:
if line.decode()[:2]=='##': continue
break
return line.decode().strip().split('\t')[9:]
def read_gct(gct_file, donor_ids, dtype):
"""
Load GCT as DataFrame
First col of expression data is ENCODE gene name, 2nd col is HUGO name
======================================================================
A more memory friendly version:
head = pd.read_csv(fdata, skiprows = 2, sep = '\t', nrows = 1)
dt = {'Description': str, 'Name': str}
dt.update({x: dtype for x in head.columns if x not in dt})
data = pd.read_csv(fdata, compression='gzip', skiprows=2,
index_col=0, header=0, dtype = dt, sep='\t').drop('Description', 1)
"""
df = pd.read_csv(gct_file, sep='\t', skiprows=2, index_col=0)
df.drop('Description', axis=1, inplace=True)
df.index.name = 'gene_id'
return df[[i for i in df.columns if '-'.join(i.split('-')[:2]) in donor_ids]].astype(dtype, copy = True)
def normalize_quantiles(M, inplace=False):
"""
Note: replicates behavior of R function normalize.quantiles from library("preprocessCore")
Reference:
[1] Bolstad et al., Bioinformatics 19(2), pp. 185-193, 2003
Adapted from https://github.com/andrewdyates/quantile_normalize
"""
if not inplace:
M = M.copy()
Q = M.argsort(axis=0)
m,n = M.shape
# compute quantile vector
quantiles = np.zeros(m)
for i in range(n):
quantiles += M[Q[:,i],i]
quantiles = quantiles / n
for i in range(n):
# Get equivalence classes; unique values == 0
dupes = np.zeros(m, dtype=np.int)
for j in range(m-1):
if M[Q[j,i],i]==M[Q[j+1,i],i]:
dupes[j+1] = dupes[j]+1
# Replace column with quantile ranks
M[Q[:,i],i] = quantiles
# Average together equivalence classes
j = m-1
while j >= 0:
if dupes[j] == 0:
j -= 1
else:
idxs = Q[j-dupes[j]:j+1,i]
M[idxs,i] = np.median(M[idxs,i])
j -= 1 + dupes[j]
assert j == -1
if not inplace:
return M
def inverse_quantile_normalization(M):
"""
After quantile normalization of samples, standardize expression of each gene
"""
R = stats.mstats.rankdata(M,axis=1) # ties are averaged
Q = stats.norm.ppf(R/(M.shape[1]+1))
return Q
def normalize_expression(expression_df, counts_df, expression_threshold=0.1, count_threshold=5, min_samples=10, dtype = np.float32):
"""
Genes are thresholded based on the following expression rules:
>=min_samples with >expression_threshold expression values
>=min_samples with >count_threshold read counts
"""
# donor_ids = ['-'.join(i.split('-')[:2]) for i in expression_df.columns]
donor_ids = expression_df.columns
# expression thresholds
mask = ((np.sum(expression_df>expression_threshold,axis=1)>=min_samples) & (np.sum(counts_df>count_threshold,axis=1)>=min_samples)).values
# apply normalization
M = normalize_quantiles(expression_df.loc[mask].values, inplace=False)
R = inverse_quantile_normalization(M)
quant_std_df = pd.DataFrame(data=R, columns=donor_ids, index=expression_df.loc[mask].index, dtype = dtype)
quant_df = pd.DataFrame(data=M, columns=donor_ids, index=expression_df.loc[mask].index, dtype = dtype)
return quant_std_df, quant_df
class Environment:
def __init__(self):
self.expression_gct = ${_input[0]:ar}
self.counts_gct = ${_input[1]:ar}
self.vcf = ${_input[2]:ar}
self.attributes = ${_input[3]:ar}
self.prefix = ${_input[0]:nnbr}
self.output_dir = ${_output[0]:dr}
self.expression_threshold = ${rpkm_cutoff}
self.count_threshold = ${read_cutoff}
self.min_samples = ${sample_cutoff}
args = Environment()
print('Generating normalized expression files ... ', end='', flush=True)
donor_ids = get_donors_from_vcf(args.vcf)
expression_df = read_gct(args.expression_gct, donor_ids, np.float32)
counts_df = read_gct(args.counts_gct, donor_ids, np.uint32)
quant_std_df, quant_df = normalize_expression(expression_df, counts_df,
expression_threshold=args.expression_threshold,
count_threshold=args.count_threshold,
min_samples=args.min_samples)
print('Save to HDF5 format, full matrix and per tissue data ...', end='', flush=True)
prefix = os.path.join(args.output_dir, args.prefix)
quant_std_per_tissue = annotate_tissue_data(quant_std_df, args.attributes)
quant_per_tissue = annotate_tissue_data(quant_df, args.attributes)
expression_per_tissue = annotate_tissue_data(expression_df, args.attributes)
counts_per_tissue = annotate_tissue_data(counts_df, args.attributes)
quant_std_df.to_hdf(prefix + ".qnorm.std.flat.h5", 'GTExV7', mode = 'w', complevel = 9, complib = 'zlib')
quant_df.to_hdf(prefix + ".qnorm.flat.h5", 'GTExV7', mode = 'w', complevel = 9, complib = 'zlib')
write_per_tissue_data(quant_per_tissue, prefix + ".qnorm.h5")
write_per_tissue_data(quant_std_per_tissue, prefix + ".qnorm.std.h5")
write_per_tissue_data(expression_per_tissue, prefix + ".h5")
write_per_tissue_data(counts_per_tissue, prefix.replace('rpkm', 'reads') + ".h5")
print('done.')
# PEER package automatic installation
[peer]
output: f"{cwd:a}/R_peer_source_1.3.tgz"
task: workdir = cwd
download:
https://github.com/downloads/PMBio/peer/R_peer_source_1.3.tgz
run:
R CMD INSTALL R_peer_source_1.3.tgz
rm -f R_peer_source_1.3.tgz
PEER factor analysis package has a number of configuable parameters. For this analysis I use default values hard-coded into the script (see code chunk below, step rnaseq_2
). Therefore these settings cannot be configured from input parameter though it is straightforward to implement it otherwise.
For every tissue I compute PEER factor using the top 10,000 expressed genes. It takes 1hr to 3hrs to complete each tissue.
# Removing unwanted variation
[rnaseq_2]
depends: R_library('rhdf5'), R_library('peer')
parameter: tissues = paths(get_output(f"h5ls {_input[2]} | awk '{{print $1}}'").strip().split('\n'))
input: for_each = ['tissues']
output: f"{cwd:a}/peer_analysis/{_tissues}_PEER_covariates.txt",
f"{cwd:a}/peer_analysis/{_tissues}_PEER_alpha.txt",
f"{cwd:a}/peer_analysis/{_tissues}_PEER_residuals.txt"
task: workdir = cwd, queue = "uchicago_cluster", walltime = "30:00:00", cores = 1, mem = "8G", trunk_size=1, trunk_workers=1
R: expand='${ }'
alphaprior_a=0.001
alphaprior_b=0.01
epsprior_a=0.1
epsprior_b=10
max_iter=1000
use_genes = 10000
expr.h5 = ${_input[2]:r}
library(peer, quietly=TRUE) # https://github.com/PMBio/peer
library(rhdf5, quietly=TRUE)
WriteTable <- function(data, filename, index.name) {
datafile <- file(filename, open = "wt")
on.exit(close(datafile))
header <- c(index.name, colnames(data))
writeLines(paste0(header, collapse="\t"), con=datafile, sep="\n")
write.table(data, datafile, sep="\t", col.names=FALSE, quote=FALSE)
}
loadTable <- function(filename, group, auto_transpose = FALSE) {
obj <- h5read(filename, group)
dat <- obj$block0_values
rownames(dat) <- obj$axis0
colnames(dat) <- obj$axis1
if (ncol(dat) > nrow(dat) && auto_transpose) dat <- t(dat)
return(dat)
}
getNumPeer <- function(ss) {
if (ss<150) return (min(15, ceiling(ss / 5)))
else if (ss >=150 && ss < 250) return(30)
else return(35)
}
whichpart <- function(x, n) {
nx <- length(x)
p <- nx-n
xp <- sort(x, partial=p)[p]
which(x > xp)
}
getTopGenes <- function(gmat, num = 1000) {
if (ncol(M) <= num) {
return(gmat)
} else {
top.index <- whichpart(total.expr <- colSums(gmat), num)
return(gmat[,top.index])
}
}
cat("PEER: loading expression data ... ")
# rows are number of samples. columns are number of genes
M <- as.matrix(loadTable(expr.h5, "/${_tissues}"))
M <- getTopGenes(M, use_genes)
n = getNumPeer(nrow(M))
cat("done.\n")
# run PEER
cat(paste0("PEER: estimating hidden confounders (", n, " for tissue ", ${_tissues:r} , ")\n"))
model <- PEER()
invisible(PEER_setNk(model, n))
invisible(PEER_setPhenoMean(model, M))
invisible(PEER_setPriorAlpha(model, alphaprior_a, alphaprior_b))
invisible(PEER_setPriorEps(model,epsprior_a, epsprior_b))
invisible(PEER_setNmax_iterations(model, max_iter))
# if(!is.null(covs)) {
# invisible(PEER_setCovariates(model, covs))
# }
time <- system.time(PEER_update(model))
X <- PEER_getX(model) # samples x PEER factors
A <- PEER_getAlpha(model) # PEER factors x 1
R <- t(PEER_getResiduals(model)) # genes x samples
# add relevant row/column names
c <- paste0("InferredCov",1:ncol(X))
rownames(X) <- rownames(M)
colnames(X) <- c
rownames(A) <- c
colnames(A) <- "Alpha"
A <- as.data.frame(A)
A$Relevance <- 1.0 / A$Alpha
rownames(R) <- colnames(M)
colnames(R) <- rownames(M)
# write results
cat("PEER: writing results ... ")
WriteTable(t(X), ${_output[0]:r}, "ID") # format(X, digits=6)
WriteTable(A, ${_output[1]:r}, "ID")
WriteTable(R, ${_output[2]:r}, "ID")
cat("done.\n")
%sosrun rnaseq