Difference: DataMining_WP_160_Data_Lake ( vs. 1)

Revision 12015-06-04 - CescJulbe

Line: 1 to 1
Added:
>
>

TASK DESCRIPTION

Goal

The goal of this task is to make Gaia Archive data available for the Data Mining framework defining a set of procedures and policies.

Technology

Data mining chosen technologies are those belonging to the Hadoop Ecosystem, specifically Spark for Cluster computing, so the Data Repository ("Data Lake" from now on) have to store data in a Hadoopcompatible file format and in a distributed file system (Hadoop Distributed File System - HDFS). Spark provides we integrated APIs to manage and store data in Parquet, a columnar storage format available to any project in the Hadoop ecosystem. Some tests to deal with this format in our project have been done.

DATA SOURCE

Gaia archive data repository is GACS (Gaia Archive Core Systems), a relational DB repository. Data for the DM framework will be read from this repository and stored in a distributed Storage system after going through a conversion process.

Data Conversion

Relational data from GACS can be exported into a Spark/Hadoop friendly format in order to be transformed into a format that can be stored in HDFS and compatible with Hadoop ecosystem services. Spark is well compatible and integrated with Parquet.

Test in Python:

A small subset of data from GACS (catalogueSource gbin, GOG-14) have been exported into ASCII with "|" separator:

  solution_id|source_id|ref_epoch|alpha|alpha_error_old|delta|...
  1926561401929728|2020156414416125990|2010|258.192356672547|1.26341425776958e-07|-39.1387601828524|...
  1926561401929728|2020156413476601894|2010|258.192350480703|1.29114628530812e-07|-39.1387587609943|...
  1926561401929728|2020156418711093277|2010|258.175967940379|1.6687185474118e-07|-39.1180686531723|...

Using Spark python console some code can transform this ascii file format into parquet:

  from pyspark.sql import SQLContext, Row
  
  sqc = SQLContext(sc)
  lines = sc.textFile("path_to_dataset/gacs_dataset_file1")
  parts = lines.map(lambda l: l.split("|"))
  
  sources = parts.map(lambda s: Row(solution_id = long(s[0]), source_id = long(s[1]), ref_epoch = int(s[2]), alpha = float(s[3]), alpha_error_old = float(s[4]), delta =
  float(s[5]), delta_error_old = float(s[6]), varpi = float(s[7]), varpi_error = float(s[8]), mu_alpha_star = float(s[9]), mu_alpha_star_error = float(s[10]), mu_delta =
  float(s[11]), mu_delta_error = float(s[12]), radial_velocity = float(s[13]), radial_velocity_error =float(s[14]), mu_r = float(s[15]), mu_rerror = float(s[16]),
  matched_observations =float(s[17]), observed = bool(s[18]), superseded = bool(s[19]), n_obs = s[20], n_outliers = s[21], f2 = float(s[22]), chi2 = s[23], delta_q = 
  float(s[24]), excess_noise = float(s[25]), excess_noise_sig = float(s[26]), params_solved = float(s[27]), rank_defect = float(s[28]), decomposed_n = s[29], primary_flag = 
  bool(s[30]), relegation_factor = float(s[31]), astrometric_weight = s[32], g_mean_const_flag = float(s[33]), g_mean_const_level = float(s[34]), g_mean_mag_error = 
  float(s[35]), g_mean_mean_mag = float(s[36]), g_mean_n_obs = int(s[37]), rv_constancy_probability = float(s[38]), random_index = long(s[39]), alpha_error = float(s[40]), 
  delta_error = float(s[41])))
  
  schemaSources = sqc.inferSchema(sources)
  schemaSources.registerTempTable("sources")
  result = sqc.sql("SELECT source_id, alpha FROM sources WHERE alpha > 258.18")
  
  result.count()
  847
  lines.count()
  3999
  firstSourceId = result.take(5)[0].source_id
  2020156414416125990

Save and query to/from Parquet file:

  schemaSources.saveAsParquetFile("/data_lake/parquetFormat")
  parquetFile = sqc.parquetFile("/data_lake/parquetFormat")
  parquetFile.registerTempTable("sourcesParquet")
  
  resultParquet = sqc.sql("SELECT source_id, alpha FROM sources WHERE alpha > 258.18")
  
  resultParquet.count()
  847
  lines.count()
  3999
  firstSourceId = resultParquet.take(5)[0].source_id
  2020156414416125990

Query results can also be stored in parquet:

  resultParquet.saveAsParquetFile("/data_lake/parquetFormat/queryResult")

Test in Scala:

Single file experiment

  // sc is an existing SparkContext.
  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  
  // Create an RDD
  val gacsData = sc.textFile("/home/cescjulbe/GACS_DUMP/gog_splits/xaa")
  
  // The schema is encoded in a string
  val schemaString = "solution_id source_id ref_epoch alpha alpha_error_old delta delta_error_old varpi varpi_error mu_alpha_star mu_alpha_star_error mu_delta mu_delta_error
  radial_velocity radial_velocity_error mu_r mu_rerror matched_observations observed superseded n_obs n_outliers f2 chi2 delta_q excess_noise excess_noise_sig params_solved    
  rank_defect decomposed_n primary_flag relegation_factor astrometric_weight g_mean_const_flag g_mean_const_level g_mean_mag_error g_mean_mean_mag g_mean_n_obs       
  rv_constancy_probability random_index alpha_error delta_error"
  
  // Import Spark SQL data types and Row.
  import org.apache.spark.sql._
  
  import org.apache.spark.sql.types.StructField
  import org.apache.spark.sql.types.StructType
  import org.apache.spark.sql.types.StringType
  
  // Generate the schema based on the string of schema
  val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
  
  // Convert records of the RDD (gacs rows) to Rows.
  val rowRDD = gacsData.map(_.split('|')).map(p => Row(p(0).trim, p(1).trim, p(2).trim,p(3).trim, p(4).trim, p(5).trim,p(6).trim, p(7).trim, p(8).trim,p(9).trim, p(10).trim, 
  p(11).trim,p(12).trim, p(13).trim, p(14).trim,p(15).trim, p(16).trim, p(17).trim,p(18).trim, p(19).trim, p(20).trim,p(21).trim, p(22).trim, p(23).trim,p(24).trim, 
  p(25).trim, p(26).trim,p(27).trim, p(28).trim, p(29).trim,p(30).trim, p(31).trim,p(32).trim, p(33).trim, p(34).trim,p(35).trim, p(36).trim,p(37).trim, p(38).trim, 
  p(39).trim, p(40).trim,p(41).trim))
  
  // Apply the schema to the RDD.
  val gacsDataFrame = sqlContext.createDataFrame(rowRDD, schema)
     
  // Register the DataFrames as a table.
  gacsDataFrame.registerTempTable("gacsData")
     
  // SQL statements can be run by using the sql methods provided by sqlContext.
  val results = sqlContext.sql("SELECT * FROM gacsData")
     
  // Or more field specific search
  val results = sqlContext.sql("SELECT alpha, delta FROM gacsData where alpha>258.174")
     
  // Return 858 rows
     
  // ...ETC...
  val results = sqlContext.sql("SELECT alpha, delta FROM gacsData where alpha>258.174 and delta<-39.13")
     
  // return 19 rows

Performing a larger scale tests with about 350Mb of ASCII data and 250 files

  // Create an RDD
  val gacsData = sc.textFile("/home/cescjulbe/GACS_DUMP/gog_splits/")
  
  gacsData.count
  
  // Res=1000000
  
  // Convert records of the RDD (gacs rows) to Rows.
  val rowRDD = gacsData.map(_.split('|')).map(p => Row(p(0).trim, p(1).trim, p(2).trim,p(3).trim, p(4).trim, p(5).trim,p(6).trim, p(7).trim, p(8).trim,p(9).trim, p(10).trim, 
  p(11).trim,p(12).trim, p(13).trim, p(14).trim,p(15).trim, p(16).trim, p(17).trim,p(18).trim, p(19).trim, p(20).trim,p(21).trim, p(22).trim, p(23).trim,p(24).trim, 
  p(25).trim, p(26).trim,p(27).trim, p(28).trim, p(29).trim,p(30).trim, p(31).trim,p(32).trim, p(33).trim, p(34).trim,p(35).trim, p(36).trim,p(37).trim, p(38).trim, 
  p(39).trim, p(40).trim,p(41).trim))
  
  rowRDD.count
  // Res=1000000
  
  // Repeating all over again
  // SQL statements can be run by using the sql methods provided by sqlContext.
  val results = sqlContext.sql("SELECT * FROM gacsData")
  results.count
  
  // res=100000
  
  // Or more specific search
  val results = sqlContext.sql("SELECT alpha, delta FROM gacsData where alpha>258.174")
  
  // Return 16982 rows
  
  // ...ETC...
  val results = sqlContext.sql("SELECT alpha, delta FROM gacsData where alpha>258.174 and delta<-39.13")
  
  // res 19 (again !)


Loading the data from parquet and convert it to be ingested into MlLib methods:

  results.saveAsParquetFile("home/cescjulbe/gacsParquetDataset1")
  
  // LOADING DATA FROM DISK
  
  val dataFromDisk = sqlContext.load("/home/cescjulbe/gacsParquetDataset1")
  dataFromDisk.registerTempTable("loadedData")
  val sqlFromDisk = sqlContext.sql("select * from loadedData")
  sqlFromDisk.count
  // >> res 3999
  
  // Let's convert these Rows to LabeledPoints to be used by MlLib libraries:
  /* This conversion is still a little crappy, many refactoring needed
   *    In this example we created a labeled point with value 0 from the row item
   *    and values 3 to 6 as features...it has little sense from relational point of view,
   *    just a sample of how to convert the row item to the labeled point*/
  
  val filtered = dataFromDisk.map { line =>
     var z = new Array[Double](4)
     z(0) = line.get(3).toString.toDouble
     z(1) = line.get(4).toString.toDouble
     z(2) = line.get(5).toString.toDouble
     z(3) = line.get(6).toString.toDouble
     LabeledPoint(line.get(0).toString.toDouble, Vectors.dense(z))
  }.cache()
  
  filtered.take(1)
  // item result
  res58: Array[org.apache.spark.mllib.regression.LabeledPoint] = Array((1.926561401929728E15,[258.192356672547,1.26341425776958E-7,-39.1387601828524,1.1221430319961E-7]))
  
  // Indeces to take may come from outside, external method:
  val indexToTake=5
  
  val filtered = dataFromDisk.map { line => 
    var z = new Array[Double](4)
    z(0) = line.get(3).toString.toDouble
    z(1) = line.get(4).toString.toDouble
    z(2) = line.get(indexToTake).toString.toDouble
    z(3) = line.get(6).toString.toDouble
    LabeledPoint(line.get(0).toString.toDouble, Vectors.dense(z))
  }.cache()
  
  /* Now this RDD is ready to be "consumed by the MlLib operation implemented by Spark */
  /* Similar conversion can be done for all specific MlLib requirements such as the 
  Vectors.sparse case as well.... */

Data source

Post GACS ingestion task. A format must be deined.

-- Cesc Julbe (erased user) - 2015-06-04

Comments

<--/commentPlugin-->
 
This site is powered by the TWiki collaboration platform Powered by PerlCopyright © 2008-2021 by the contributing authors. All material on this collaboration platform is the property of the contributing authors.
Ideas, requests, problems regarding TWiki? Send feedback