19/12/2018

Java + Spark + Hive (II)

A continuación se incluye la clase que crea la sentencia ejecutada para la creación de las tablas, además de la validación de cambios en el esquema de la tabla destino en siguientes ejecuciones y la recuperación de los tipos de datos y los datos de la tabla origen y su inserción en la tabla destino.

 /**
 * 
 */
package io.process.read;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SaveMode;

import io.process.create.JSONReadObject;
import io.process.create.DestinationElement;
import io.process.create.OriginElement;

import org.apache.log4j.Logger;

import org.apache.spark.sql.hive.*;
/**
 * @author suppli3r
 *
 */
public class SparkTableReader {

  private static final Logger LOGGER = Logger.getLogger(SparkTableReader.class);

  private final static String SEPARADOR = ".";
  private final static String OPEN = "(";
  private final static String CLOSE = ")";
  private final static String ESPACIO = " ";
  private final static String COMA = ",";


  /**
   * @param args
   */
  public static void main(String[] args) throws Exception{


    LOGGER.info(">>> SparkTableReader.main");

    if (args.length<1){
      LOGGER.error("NO se ha pasado como parámetro el fichero json de configuración");
      throw new Exception("NO se ha pasado como parámetro el fichero json de configuración");
    }else {
      LOGGER.warn("El fichero de entrada se recibe como parámetro.... " + args[0]);
      LOGGER.info("Chequeo de la existencia del fichero JSON pasado como parámetro");
      Configuration conf = new Configuration();

      Path path = new Path(args[0]);
      try {
        FileSystem fs = path.getFileSystem(conf);
        LOGGER.info("se ha creado el FileSystem");
        FSDataInputStream inputStream = fs.open(path);

      }catch(Exception e){
        LOGGER.error(e.getMessage());
        LOGGER.error("El fichero no está disponible " + args[0]);
        //System.exit(-1);
        throw new Exception("El fichero no está disponible " + args[0]);

      }
    }
    // Para hacer funcionar esta clase en un cluster local en necesario descomentar la siguiente línea y comentar la posterior
	//SparkConf conf = new SparkConf().setMaster("local").setAppName("Table Reader");
    SparkConf conf = new SparkConf();
	
    JavaSparkContext sc = new JavaSparkContext(conf);

    HiveContext hiveContext = new HiveContext(sc.sc());
    JSONReadObject[] a_jsonObjects = JSONRead.jsonFileToObject(args[0]);

    for(JSONReadObject jsonObject: a_jsonObjects){

        process(hiveContext, jsonObject);
    }
  }


  /**
  */
  private static void process(HiveContext hiveContext, JSONReadObject jsonObject) throws Exception{


    // ya tenemos el objeto con el fichero JSON de definición cargado
    // ahora debemos comprobar si existe o no la tabla de destino
    hiveContext.sql((String.format("USE %s", jsonObject.getDestination_db())));

	// comprobamos si existe latabla destino
	String[] tableNames = hiveContext.tableNames();
	Boolean tableExists = false;
	for(String table: tableNames) {
		LOGGER.info("Table = " + table);
		if(table.equalsIgnoreCase(jsonObject.getDestination_table())) {
			tableExists = true;
		}
    }
	// recuperamos los tipos de datos
	addDataType(hiveContext, jsonObject);
	//esto es un apaño para tener los datos recuperados desde el JSON en un Map con clave columnName
	jsonObject.createMaps();
	

	LOGGER.info("tableExists = " + tableExists);
	if(!tableExists){

	  //CREATE TABLE
	  String create_query = makeCreateTable(jsonObject);
		
	  hiveContext.sql(create_query);
		
	  LOGGER.info("EXECUTED OK");
		
	  hiveContext.sql((String.format("USE %s", jsonObject.getDestination_db())));
	  LOGGER.info( String.format("USE %s", jsonObject.getDestination_db()));

	}else{

	  //CHECK TABLE SCHEMA
	  validateCurrentSchema(hiveContext, jsonObject);
	}
	// copiamos los datos desde Origen a Destino
	// eso lo tenemos que hacer independientemente de si se crea la tabla, se modifica o no
	fromOriginToDestination(hiveContext, jsonObject);
	//TODO averiguar si es necesario cerrar el SparkContest
	//TODO sc.close();

  }
  /**
   * Se informan los tipos de datos de las columnas recuperando esa información
   * de las tablas origen
   * @param hiveContext
   * @param jsonObject
   */
  private static void addDataType(HiveContext hiveContext, JSONReadObject jsonObject) throws Exception{

    LOGGER.info(">>> addDataType");
    hiveContext.sql((String.format("USE %s", jsonObject.getOrigin_db())));

    LOGGER.info((String.format("Use %s", jsonObject.getOrigin_db())));

    try {
      scala.Tuple2<String, String>[] columns = hiveContext.table(jsonObject.getOrigin_tableName()).dtypes();
      LOGGER.info("columns.length = " + columns.length);
      // recorremos el schema de la tabla origin
      for(scala.Tuple2<String, String> tmp : columns) {

        LOGGER.info("tmp._1 " + tmp._1);	// nombre_columna
        LOGGER.info("tmp._2 " + tmp._2);
        // comparamos cada una de las columnas que están en el fichero de input json con las columnas de la tabla ya existente
        
        for(OriginElement tmp_origin: jsonObject.getL_origin_elements()){
          if(tmp_origin.getColumnName().startsWith("static:")){
            LOGGER.info("Hay columnas que NO existen en la tabla origen y que deben llegar a la tabla destino...");
            LOGGER.info(tmp_origin.getColumnName());
          }else {
            if (tmp_origin.getColumnName().equalsIgnoreCase(tmp._1)) {
              tmp_origin.setDataType(tmp._2.replace("Type", "").replace("Integer", "int"));
              // informamos el tipo de dato que corresponde al destino recuperando la info
              // de la tabla origin
              break;
            }
          }
        }
      }
    }catch(Throwable e){
      if(e.getClass().getName().equals("org.apache.spark.sql.catalyst.analysis.NoSuchTableException")) {
        LOGGER.error("La tabla origen de la que debemos recuperar los TIPOS de datos NO existe: " + jsonObject.getOrigin_tableName());
        LOGGER.error("Se debe revisar la configuración del fichero de configuración y contrastar los orígenes de datos en él definidos");
        LOGGER.error("Finaliza el proceso con error");
        //System.exit(-1);
        throw new Exception("La tabla origen de la que debemos recuperar los TIPOS de datos NO existe: " + jsonObject.getOrigin_tableName());
      }else{
        throw new Exception("Se ha producido un error NO ESPERADO " + e.getMessage());
      }
    }

    LOGGER.info("<<< addDataType");

  }



  /**
   * Método que construye la sentencia de creación de la tabla destino
   * @param jsonObject
   * @return
   */
  private static String makeCreateTable(JSONReadObject jsonObject){

    LOGGER.info("makeCreateTable " + jsonObject.toString());
    StringBuffer ddl = new StringBuffer();
    ddl.append("CREATE EXTERNAL TABLE ");

    String DESTINATION_TABLE = jsonObject.getDestination_table();
    String DESTINATION_DB = jsonObject.getDestination_db();
    ddl.append(DESTINATION_DB);
    ddl.append(SEPARADOR);
    ddl.append(DESTINATION_TABLE);
    ddl.append(OPEN);
    List<DestinationElement> l_destination_elements = jsonObject.getL_destination_elements();

    SortedMap<String, OriginElement> map_origin = jsonObject.getM_origin();
    for( DestinationElement destination: l_destination_elements){

      //validamos las columnas que se deben crear sin información
      if( destination.getColumnName().startsWith("static:") ) {
        ddl.append(destination.getColumnName().substring(destination.getColumnName().indexOf(":")+1));
        ddl.append(ESPACIO);
        if (destination.getDataType() == null || destination.getDataType().equals("")){
          ddl.append("string");
                    ddl.append(COMA);
         }else{
          // debe llegar siempre un tipo de dato válido, sino fallará
          ddl.append(destination.getDataType());
                    ddl.append(COMA);
        }
      }else {


        String alias = destination.getAlias();
        OriginElement origin = map_origin.get(destination.getColumnName());

        if (null != alias && !alias.equals("")) {
          ddl.append(alias);
          ddl.append(ESPACIO);
          ddl.append(origin.getDataType());
          ddl.append(COMA);
        } else {
          ddl.append(origin.getColumnName());
          ddl.append(ESPACIO);
          ddl.append(origin.getDataType());
          ddl.append(COMA);
        }
      }
    }

    ddl.deleteCharAt(ddl.lastIndexOf(COMA));
    ddl.append(CLOSE);
    ddl.append(" STORED AS PARQUET");
    // actualmente la LOCATION viene definida por la base de datos destino
    // si se quisiera modificar, para lo que sería necesario tener permisos en la nueva ruta
    // sólo seria necesario descomentar las siguientes líneas de código

    //ddl.append(" LOCATION '");
        //ddl.append(jsonObject.getDestination_path());
        //ddl.append("'");


    LOGGER.info("ddl = " + ddl.toString());
    return ddl.toString();
  }

  /**
   * Método que construye y ejecuta la sentencia de importación de datos desde origen a destino
   * @param hiveContext
   * @param jsonObject
   */
  private static void fromOriginToDestination(HiveContext hiveContext, JSONReadObject jsonObject){
    LOGGER.info(">>> recoverDataFromOrigin");

    StringBuffer dll = new StringBuffer();
    dll.append("SELECT ");
    // Primero recorremos los valores ya existentes en la tabla origen
    for(OriginElement tmp: jsonObject.getL_origin_elements()){

      dll.append(tmp.getColumnName());
      dll.append(COMA);
    }
    // después de recorrer todos los ya existentes, se procesan los valores estáticos nuevos
    // que se insertan con valor vacío
    for(DestinationElement destination : jsonObject.getL_destination_elements()){
      if( destination.getColumnName().startsWith("static:") ){
        dll.append(ESPACIO);
        dll.append("''");
        dll.append(COMA);
      }
    }

    dll.deleteCharAt(dll.lastIndexOf(COMA));
    dll.append(" FROM ");
    dll.append(jsonObject.getOrigin_db());
    dll.append(SEPARADOR);
    dll.append(jsonObject.getOrigin_tableName());

    if(null!=jsonObject.getWhere() && !jsonObject.getWhere().equals("")){

      LOGGER.info("SÍ Hay condición WHERE. Esto no está contemplado para la primera versión");
      dll.append(" WHERE ");
      dll.append(jsonObject.getWhere());
      dll.append(ESPACIO);
    }else{
      LOGGER.info("No hay condición WHERE");
    }

    StringBuffer dll2 = new StringBuffer();

    dll2.append(jsonObject.getDestination_db());
    dll2.append(SEPARADOR);
    dll2.append(jsonObject.getDestination_table());

    // v2.x de Spark ==> hiveContext.sql(dll.toString()).toDF().write().mode(SaveMode.Append).format("parquet").insertInto(dll2.toString());
    LOGGER.debug(dll.toString());
    LOGGER.debug(dll2.toString());
    hiveContext.sql(dll.toString()).toDF().write().mode(SaveMode.Append).format("parquet").saveAsTable(dll2.toString());

    LOGGER.info("<<< recoverDataFromOrigin");

  }

  /**
   * Método que comprueba si la tabla destino ya existe y si tiene o no algún nuevo campo
   * que se deba incluir en la propia tabla.
   * En caso de existir un nuevo campo, actualiza la tabla destino con dicho campo
   * @param hiveContext
   * @param jsonObject
   */
  private static void validateCurrentSchema(HiveContext hiveContext , JSONReadObject jsonObject){

    LOGGER.info(">>> SparkTableReader.validateCurrentSchema");

    StringBuffer dll = new StringBuffer();
    dll.append("SELECT * FROM ");
    dll.append(jsonObject.getDestination_db());
    dll.append(SEPARADOR);
    dll.append(jsonObject.getDestination_table());


    String[] current_columns = hiveContext.sql(dll.toString()).toDF().columns();
    LOGGER.info("current_columns = " + current_columns);

    SortedMap<String, String> new_columns = jsonObject.getListColumns();
    LOGGER.info("new_columns = " + new_columns);

    for(String column_name : current_columns ){
      LOGGER.info("column_name = " + column_name);
      boolean exists = false;
      if (new_columns.containsKey(column_name.toUpperCase())){

        LOGGER.info("Existe la columna " + column_name.toUpperCase());

        // vamos chequeando que las columnas existen en destino
        // y cuando se cumple la condición las eliminamos del listado de columnas del JSON
        new_columns.remove(column_name.toUpperCase());
      }
    }
    // tras borrar del listado de nuevas columnas las que ya se encuentran en la tabla destino
    // debemos realizar un ALTER TABLE con las que nos quedan, porque son nuevas
    if(new_columns.size()>0){
      Iterator<String> itera = new_columns.keySet().iterator();
      while(itera.hasNext()) {
        //for(SortedMap<String, String> new_column: new_columns){
        String key = itera.next();    // la key es el nombre de la columna
        String value = new_columns.get(key);    //el value es el type de la columna
        LOGGER.info("Hay una nueva columna que se debe añadir " + key);
        hiveContext.sql("ALTER TABLE "+ jsonObject.getDestination_db() + "."+jsonObject.getDestination_table() + " ADD COLUMNS (" + key + ESPACIO + value + ")");
        LOGGER.info("Se ha actualizado la tabla añadiendo la nueva columna...." + key + ESPACIO + value);
      }
    }

    LOGGER.info("<<< validateCurrentSchema");

  }
}

Queda pendiente incluir la clase que realiza la lectura del fichero JSON de configuración, que realmente no tiene mucha sustancia y que utiliza la librería Jackson para realizar la lectura y escritura.