Dimensionality Reduction - PCA exercise
--
Cesc Julbe - 2014-12-04
Sample data
Using
GOG, we have simulated a set of spectra for the 2 level 4 HTM regions, N12020 and N13020. This resulted in of 212.193 BP/RP spectra, characterized by 480 wavelengths points. So our analysis consists on obtaining the 480 PCA for these.
GOG output files have gbin format, for this exercise, our files follow the MDB data model version 15.2.4, and the specific MDB tables generated with the spectra are
MDB/CU5/PhotPipe/Spec/SampledMeanSpec, which contains the BP and RP spectra and their corresponding errors. In this exercise, we have performed the PCA analysis on the BP spectra. The RP procedure would be exactly the same that the one described in this twiki entry, but selecting the corresponding column from the gbin file.
The total size of the gbin package is about 2GB and 8192 gbin files. These files were uploaded to the HDFS system of the cluster.
[edit]Cluster information
The hardware platform used to perform such analysis is a 16 node cluster provided by CESCA (
http://www.csuc.cat/en/).
Each node has 2 CPU with 2 cores, 16GB RAM and about 250GB of storage capacity. Information about the Cluster can be found in this link
http://www.csuc.cat/en/research/supercomputing/machines/comparison-of-technical-specifications (AMD Opteron 275).
[edit]Implementation
To perform the PCA analysis we have used two big data software frameworks:
- Apache Mahout (https://mahout.apache.org/) is a framework on top of Hadoop and MapReduce that provides a set of out-of-the-box features to perform mathematical analysis, clustering, recommendation and some other features.
- Spark (http://spark.apache.org/): It is a data processing framework that provides great performance loading datasets in memory and it is fully integrated with Hadoop.
[edit]Spark code and results
Sparrk provides a shell where this operation can be performed using 'scala' language. There is no need to create an external project.
Here we describe the PCA implemented using the standard PCA and SVD Spark libraries. But before starting, data has to be converted into a Spark compatible format. We already implemented a converter to read data from a gbin file.
import gaia.cu9.datamining.etl.io.gbin.GbinInputFormat
import gaia.cu1.mdb.cu5.photpipe.spec.dm.SampledMeanSpec
import org.apache.hadoop.io.NullWritable
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val spectraFile = sc.newAPIHadoopFile("hdfs://HadoopLogin:8020//home/gaia/fluxes/*.gbin", classOf[GbinInputFormat[SampledMeanSpec]], classOf[NullWritable], classOf[SampledMeanSpec])
val data = spectraFile.map(meanSpec => meanSpec._2.getBpFlux)
val vectors = data.map(Vectors.dense(_))
val matrix = new RowMatrix(vectors)
val pca = matrix.computePrincipalComponents(480)
Results are stored in a new Spark RDD (Resilient Distributed Dataset) called pca, which it is a 480x480 matrix. The operation took about 170 sec.
14/06/06 13:07:09 INFO SparkContext: Job finished: aggregate at RowMatrix.scala:211, took 169.954155682 s
pca: org.apache.spark.mllib.linalg.Matrix =
-4.00420288267922E-5 -4.004732828277735E-5 -2.914906370820691E-5
-8.189079864694138E-5 -8.144893245998208E-5 -7.025422852738328E-5
-1.2323383756027135E-4 -1.2679821107108857E-4 -1.0484161471824294E-4
-1.6490016094683775E-4 -1.7286176392295344E-4 -1.416274986321131E-4
-2.0627822666985452E-4 -2.17201750752136E-4 -1.7676173725768914E-4
-2.492458576566869E-4 -2.643732287592871E-4 -2.1364901109913227E-4
-2.915295264318785E-4 -3.153366141952794E-4 -2.6260651222933254E-4
-3.3393428000782013E-4 -3.6008093091447385E-4 -2.7659922578484794E-4
-3.3555687071740934E-4 -3.6614803017705574E-4 -2.788329627827693E-4
-3.3636338341949126E-4 -3.699501178177391E-4 -2.867077656510642E-4
-3.386180084531634E-4 ...
In addition, we can calculate the svd (Singular Value decomposition) on the same set of spectra, obtaining the U, s and V matrices with only one extra command.
val svd = matrix.computeSVD(480, true)
...
14/06/06 13:20:59 INFO SparkContext: Job finished: aggregate at RowMatrix.scala:211, took 168.406853701 s
svd: org.apache.spark.mllib.linalg.SingularValueDecomposition[org.apache.spark.mllib.linalg.distributed.RowMatrix,org.apache.spark.mllib.linalg.Matrix] =
SingularValueDecomposition(null,[7.80663030834246E7,1.7458560961467907E7,2163031.4335677503],-4.004535712853752E-5 -4.00370862934582E-5 -2.896906653281333E-5
-8.187578803927481E-5 -8.14632667156534E-5 -7.02052108565443E-5
-1.2321410901761936E-4 -1.2681381042241313E-4 -1.0471294890612715E-4
-1.6486836968282857E-4 -1.7289028947677516E-4 -1.4152449357476837E-4
-2.062396874703414E-4 -2.172351432147507E-4 -1.766119728099808E-4
-2.4919493174741636E-4 -2.6441997263363103E-4 -2.1352655576548513E-4
-2.914703764548057E-4 -3.153886542403815E-4 -2.6243404621194565E-4
-3.3386192350613324E-4 -3.60148298069...
[edit]Mahout code and results
Mahout does not use a console but it is executed as a Java program. Still, to perform PCA (through SVD) using Mahout, gbin files have to be converted into a Mahout-specific format. To do so, we used Spark again to create and store the data as Mahout expects it.
import gaia.cu9.datamining.etl.io.gbin.GbinInputFormat
import gaia.cu1.mdb.cu5.photpipe.spec.dm.SampledMeanSpec
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
import org.apache.mahout.math.SequentialAccessSparseVector
import org.apache.mahout.math.VectorWritable
import org.apache.spark.SparkContext._
val spectraFile = sc.newAPIHadoopFile("hdfs://HadoopLogin:8020//home/gaia/fluxes/*.gbin", classOf[GbinInputFormat[SampledMeanSpec]], classOf[NullWritable], classOf[SampledMeanSpec])
val data = spectraFile.map(meanSpec => meanSpec._2.getBpFlux)
val mahoutFormat = data.map(arr => {
val vector = new SequentialAccessSparseVector(arr.length)
for (i <- 0 until arr.length) vector.set(i, arr(i))
(NullWritable.get, new VectorWritable(vector))
})
mahoutFormat.saveAsNewAPIHadoopFile("hdfs://HadoopLogin:8020//home/gaia/mahout-in", classOf[NullWritable], classOf[VectorWritable], classOf[SequenceFileOutputFormat[NullWritable, VectorWritable]])
This spark code will transform the gbin files into mahout-spacific format to perform PCA and SVD.
Outside spark shell, we execute the following steps:
mahout svd --input hdfs://HadoopLogin:8020/home/gaia/mahout-in --output hdfs://HadoopLogin:8020/home/gaia/mahout-svd --numRows 212193 --numCols 480 --rank 3
14/06/12 10:16:36 INFO lanczos.LanczosSolver: 2 passes through the corpus so far...
14/06/12 10:16:36 INFO lanczos.LanczosSolver: Lanczos iteration complete - now to diagonalize the tri-diagonal auxiliary matrix.
14/06/12 10:16:36 INFO lanczos.LanczosSolver: Eigenvector 0 found with eigenvalue 0.0
14/06/12 10:16:36 INFO lanczos.LanczosSolver: Eigenvector 1 found with eigenvalue 5038435.408718643
14/06/12 10:16:36 INFO lanczos.LanczosSolver: Eigenvector 2 found with eigenvalue 7.805355283216032E7
14/06/12 10:16:36 INFO lanczos.LanczosSolver: LanczosSolver finished.
14/06/12 10:16:36 INFO decomposer.DistributedLanczosSolver: Persisting 3 eigenVectors and eigenValues to: hdfs://HadoopLogin:8020/home/gaia/mahout-svd/rawEigenvectors
14/06/12 10:16:36 INFO driver.MahoutDriver: Program took 3723916 ms (Minutes: 62.065266666666666)
PCA is calculated thought SVD, so there is a second step to obtain the eigenvectors.
mahout cleansvd --eigenInput hdfs://HadoopLogin:8020/home/gaia/mahout-svd/rawEigenvectors --corpusInput hdfs://HadoopLogin:8020/home/gaia/mahout-in --output hdfs://HadoopLogin:8020/home/gaia/mahout-out
14/06/12 11:52:03 INFO mapreduce.Job: map 87% reduce 29%
14/06/12 11:52:22 INFO mapreduce.Job: map 88% reduce 29%
14/06/12 11:52:38 INFO mapreduce.Job: map 89% reduce 29%
14/06/12 11:52:41 INFO mapreduce.Job: map 89% reduce 30%
14/06/12 11:52:58 INFO mapreduce.Job: map 90% reduce 30%
14/06/12 11:53:16 INFO mapreduce.Job: map 91% reduce 30%
14/06/12 11:53:34 INFO mapreduce.Job: map 92% reduce 30%
14/06/12 11:53:36 INFO mapreduce.Job: map 92% reduce 31%
14/06/12 11:53:53 INFO mapreduce.Job: map 93% reduce 31%
14/06/12 11:54:10 INFO mapreduce.Job: map 94% reduce 31%
14/06/12 11:54:30 INFO mapreduce.Job: map 95% reduce 31%
14/06/12 11:54:33 INFO mapreduce.Job: map 95% reduce 32%
14/06/12 11:54:47 INFO mapreduce.Job: map 96% reduce 32%
14/06/12 11:55:07 INFO mapreduce.Job: map 97% reduce 32%
14/06/12 11:55:24 INFO mapreduce.Job: map 98% reduce 32%
14/06/12 11:55:27 INFO mapreduce.Job: map 98% reduce 33%
14/06/12 11:55:42 INFO mapreduce.Job: map 99% reduce 33%
14/06/12 11:56:01 INFO mapreduce.Job: map 100% reduce 33%
14/06/12 11:56:10 INFO mapreduce.Job: map 100% reduce 78%
14/06/12 11:56:11 INFO mapreduce.Job: map 100% reduce 100%
14/06/12 11:56:12 INFO mapreduce.Job: Job job_1402390993644_0027 completed successfully
14/06/12 11:56:12 INFO mapreduce.Job: Counters: 50
...
14/06/12 11:56:13 INFO driver.MahoutDriver: Program took 5571674 ms (Minutes: 92.86123333333333)
It is visible how Mahout used two map/reduce tasks which take about 150 minutes together.
Results can be analized, once again, from the spark shell:
val mahoutOutputSvd = sc.newAPIHadoopFile("hdfs://HadoopLogin:8020/home/gaia/mahout-svd", classOf[SequenceFileInputFormat[IntWritable, VectorWritable]], classOf[IntWritable], classOf[VectorWritable])
val mahoutOutput = sc.newAPIHadoopFile("hdfs://HadoopLogin:8020/home/gaia/mahout-out/cleanEigenvectors", classOf[SequenceFileInputFormat[IntWritable, VectorWritable]],
classOf[IntWritable],classOf[VectorWritable])
mahoutOutput.first
14/06/18 12:15:04 INFO FileInputFormat: Total input paths to process : 1
14/06/18 12:15:04 INFO SparkContext: Starting job: take at <console>:29
14/06/18 12:15:04 INFO DAGScheduler: Got job 0 (take at <console>:29) with 1 output partitions (allowLocal=true)
14/06/18 12:15:04 INFO DAGScheduler: Final stage: Stage 0(take at <console>:29)
14/06/18 12:15:04 INFO DAGScheduler: Parents of final stage: List()
14/06/18 12:15:04 INFO DAGScheduler: Missing parents: List()
14/06/18 12:15:04 INFO DAGScheduler: Computing the requested partition locally
14/06/18 12:15:04 INFO NewHadoopRDD: Input split: hdfs://HadoopLogin:8020/home/gaia/mahout-out/cleanEigenvectors:0+4012
14/06/18 12:15:04 INFO SparkContext: Job finished: take at <console>:29, took 0.230572378 s
res1: Array[(org.apache.hadoop.io.IntWritable, org.apache.mahout.math.VectorWritable)] = Array((0,e|2| = |6.093306439441717E15|, err = 1.5579878154836635E-4:{0:-2.699761848800714E- 4,
1:-2.2716366930287612E-4,2:-1.8476785073793741E-4,3:-1.420408773122884E-4,4:-9.963242018020199E-5,5:-5.557804792632324E-5,6:-1.2130739881599524E-5,7:3.130559935257337E-5,
8:3.30493199847959E-5,9:3.393263450902214E-5,10:3.628557624773059E-5,11:3.8377388951731115E-5,12:4.1385491810895534E-5,13:4.511718384279595E-5,14:4.96437231777454E-5,
15:5.412644528993846E-5,16:5.9729602646271147E-5,17:6.642902040950031E-5,18:7.235904871905414E-5,19:7.872337130510405E-5,20:8.518972573559606E-5,21:9.050844605197622E-5,
22:9.557688077454363E-5,23:1.0003418094785282E-4,24:1.0442567445644735E-4,25:1.0809805060085087E-4,26:1.107954...
So. the Mahout implementation, despite the SVD processing is done through mahout, the data setup and result analysis is done using the spark shell.
[edit]Larger test
We have extended the test to a bigger spectra sample. We have generated 2088535 spectra with 480 flux values, which is about 10 times the size of the previous test. With Spark, we generated the PCA components in
293 sec. The hardware provider partner estimated in a factor 30 the gain we could get from the current cluster to the more advanced one (per node), so with 32 nodes instead of the current 16, we can get a x60 gain in rough numbers, so a full-sky PCA over the BP end-of-mission spectra (2e9 spectra) could take about
4800 sec. with Spark (no memory considerations are taken, still to be evaluated).
[edit]Conclusions
Spark setup for such an exercise was far much easier both in usability and performance. Spark processing time was about
170 seconds (with 165sec. more we can calculate SVD as well), while Mahout processing took about
9240 seconds, 54 times more.
Spark results could be validated, at a smaller scale, against an R implementation, but not Mahout's ones yet, due to the more complex output format.
This exercise was performed only with a small subset of BP spectra and a using a not too powerful cluster (16 'old' nodes). A linear projection of these results show that a much more powerful cluster is needed to perform a PCA analisys over a full sky (1.2E9 sources). Spark requires a lot of memory, but offers an excellent performance, specially compared to the more classical Mahout's Map/Reduce approach. And according to some benchmarks, even when lacking some memory, its performance is better than the Mahout one.
Comments