In the previous section we described how to build a cluster from commodity hardware. Next, we show how to run a statistical genetics problem in parallel across a cluster with MOSIX to reduce the computation time by the total number of cores in a cluster for a computationally intensive statistical genetics problem.
A genetics case study example
Higher resolution and lower cost genotyping arrays have ushered in large studies in humans called Genome Wide Association Studies, or GWAS [
12]. A popular GWAS goal is the identification of single nucleotide polymorphisms, or SNPs, which are variable bases in DNA sequences, that might predispose or alter the progression of disease. This is done by statistically associating disease phenotypes with SNPs. A GWAS can also link many other phenotypes to SNPs. One commonly studied, continuous phenotype is the messenger RNA expression level of a gene. An eQTL (for expression quantitative trait locus) study can identify SNPs that affect the messenger RNA levels (so-called expression traits) of protein-coding genes [
13]. eQTL may propose a mechanism when SNPs associated with disease have no known function, which is often the case with SNPs in noncoding DNA regions. Such SNPs have been shown to affect expression levels of genes with key pathological roles [
14]. Thus, an adequately-powered eQTL study could determine whether a SNP associated with disease also affected the expression levels of any known protein coding genes, and suggest a mechanistic link for further wet-laboratory experimentation. Because an eQTL experiment generally involves thousands of expression traits as the phenotypes of interest, it is generally more computationally intensive than single-phenotype designs.
Here we provide an example eQTL analysis with simulated data using code in the popular open source language R [
15]. This is a scalable method for parallelizing many tasks on a MOSIX-enabled computing network. Together, this example has approximately 25 lines of extra R code to parallelize the eQTL testing on a MOSIX-enabled system, and can be easily adapted to run other tasks or be coded in other languages. Other utilities, such as 'batch' [
16], can be used to streamline parallel execution via MOSIX. In our example, we first simulate a dataset with 100 samples, 1000 genotyped loci, and 1000 genes for eQTL association testing.
g.calls = data.frame(matrix(sample(c(rep(0,500), rep(1,500), rep(2,500)), 100*1000,
replace = TRUE), nrow = 1000)) ## row: genotype calls, column: samples
exprs <- matrix(rnorm(100*1000, 0, 1), nrow = 1000) # row: expression, column: sample
In this code, we first simulate a matrix of single nucleotide polymorphism (SNP) calls in g.calls with an additive coding, i.e., a count of the number of minor alleles an individual has at a locus. We then simulate a matrix of random mRNA expression data (a quantitative trait).
Next, we evenly split the job into batches to be run across the nodes via the function chopper. This helper function creates a string of the beginning and ending indices to the SNPs, starting at beg.index (1 in our example), ending at end.index (the number of genotyped loci in our example), for batches jobs (the number of cores in the cluster).
chopper = function(beg.index, end.index, batches){
a = seq(beg.index, end.index, round((end.index-beg.index)/batches))
b = unique(append(a[2:length(a)]-1, end.index))
return(paste(a, b, sep = "_"))
}
The chopper function is important because each node should process only a small portion of the genotype calls from g.calls. In real examples, one may also wish to chop the dataset into pieces as well. Next, we create a function that will process each of these datasets on each remote node.
par.fx = function(o.file) {
o.f = file(paste(o.file, "_p_val.temp", sep = ""), 'w') # output file
gcr = sort(as.integer(unlist(strsplit(o.file, '_')))) # start/stop
apply(g.calls[(gcr[1]:gcr[2]),], 1,
function(x){
p.vector = apply(exprs, 1, function(y){summary(lm(y ~as.integer(x)))$coeff[2, 4]})
cat(as.character(p.vector), file = o.f, sep = '\n')
})
}
The function par.fx accepts one parameter telling it what portion (i.e., which rows) of g.calls to process, creates an intermediate p-value file, does the actual association testing (an ANOVA of gene expression on genotype), and prints p-values to the intermediate file. Then we write out the dataset g.calls and the function to analyze the dataset par.fx out to disk.
save(g.calls, exprs, par.fx, file="Image.RData")
The amount of output typically generated by our computations (e.g., p-values) usually cannot be kept in memory and must be written to a file connection during computation.
Our next piece of code creates the start/stop indices for analysis, and then creates R script files that will be subsequently batched off. These scripts load in the "Image.RData" file created in the code above, and process part of it.
batch.subsets <- chopper(beg.index = 1, end.index = nrow(g.calls), batches = 56)
r.scripts = sapply(batch.subsets,
function(x) {
batch.script = file(paste("batch", "_", x, ".R", sep = ""), "w")
cat("load(", "\'", "Image.RData", "\' ", ")\n", sep = "", file = batch.script)
cat("par.fx(o.file=", "\'", x, "\'", ")\n", sep = "", file = batch.script)
return(summary(batch.script)$description)
close(batch.script)
})
Once these R script files are created, we then create a shell script for the MOSIX jobs, and run them with the command system.
shell.script <- file(paste(getwd(), "/", "Main.sh", sep = ""), "w")
for(i in 1:(length(r.scripts)))
cat("mosrun -e -b -q20 R --vanilla < ", r.scripts[i], " &\n", sep = "", file = shell.script) quiet = system(paste("source ", summary(shell.script)$description, sep = ""), intern = TRUE)
Lastly, once all of these scripts have finished running, we read in the intermediate files used, combine them into one, and clean up with the following code.
p.file = file("p.values.txt", "w")
t.p.files = list.files(getwd(), pattern="_p_val.temp") # files with ANOVA p-values
t.r.files = list.files(getwd(), pattern="batch_") # intermediate scripts for(i in 1:length(t.p.files)) {
cat(readLines(t.p.files[i]), file = p.file, sep = "\n") # input p-values, output to p.file file.remove(t.p.files[i], t.r.files[i]) # delete intermediate files
}
file.remove("Image.RData", "Main.sh")
While there is relatively little penalty for I/O of parallel jobs with MOSIX across a gigabit network switch when compared to running locally, we found it wise to estimate the I/O required for each process when the per-core files to be transferred exceeded 20 MB, as MOSIX does not monitor I/O when batching jobs. This can be easily accomplished by doing a trial run with one batch and observing the transfer time for the image to be loaded into memory of the executing node. Once a delay time is known, adding the sleep = N (where N is the number of seconds to delay before executing the next line) shell command to the main shell script before each mosrun call will force a delay between the start of MOSIX jobs. We have found this to be adequate in preventing I/O overloads for loading a large R image. Because adding a simple short delay to our script prevented any I/O overload, there was no need to resort to higher-throughput technologies for I/O between nodes. Although MOSIX jobs migrate away to be executed on free nodes, output files exist only on the originating node, making collection of results very easy.