{"id":311,"date":"2018-12-19T17:34:56","date_gmt":"2018-12-19T16:34:56","guid":{"rendered":"http:\/\/justmakeit.es\/?p=311"},"modified":"2018-12-19T17:34:56","modified_gmt":"2018-12-19T16:34:56","slug":"java-spark-hive-ii","status":"publish","type":"post","link":"http:\/\/justmakeit.es\/?p=311","title":{"rendered":"Java + Spark + Hive (II)"},"content":{"rendered":"\n<p>A continuaci\u00f3n se incluye la clase que crea la sentencia ejecutada para la creaci\u00f3n de las tablas, adem\u00e1s de la validaci\u00f3n de cambios en el esquema de la tabla destino en siguientes ejecuciones y la recuperaci\u00f3n de los tipos de datos y los datos de la tabla origen y su inserci\u00f3n en la tabla destino.<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code> \/**\n * \n *\/\npackage io.process.read;\n\nimport java.io.IOException;\nimport java.util.*;\n\nimport org.apache.hadoop.conf.Configuration;\nimport org.apache.hadoop.fs.FSDataInputStream;\nimport org.apache.hadoop.fs.FileSystem;\nimport org.apache.hadoop.fs.Path;\nimport org.apache.spark.SparkConf;\nimport org.apache.spark.api.java.JavaSparkContext;\nimport org.apache.spark.sql.SaveMode;\n\nimport io.process.create.JSONReadObject;\nimport io.process.create.DestinationElement;\nimport io.process.create.OriginElement;\n\nimport org.apache.log4j.Logger;\n\nimport org.apache.spark.sql.hive.*;\n\/**\n * @author suppli3r\n *\n *\/\npublic class SparkTableReader {\n\n  private static final Logger LOGGER = Logger.getLogger(SparkTableReader.class);\n\n  private final static String SEPARADOR = \".\";\n  private final static String OPEN = \"(\";\n  private final static String CLOSE = \")\";\n  private final static String ESPACIO = \" \";\n  private final static String COMA = \",\";\n\n\n  \/**\n   * @param args\n   *\/\n  public static void main(String[] args) throws Exception{\n\n\n    LOGGER.info(\">>> SparkTableReader.main\");\n\n    if (args.length&lt;1){\n      LOGGER.error(\"NO se ha pasado como par\u00e1metro el fichero json de configuraci\u00f3n\");\n      throw new Exception(\"NO se ha pasado como par\u00e1metro el fichero json de configuraci\u00f3n\");\n    }else {\n      LOGGER.warn(\"El fichero de entrada se recibe como par\u00e1metro.... \" + args[0]);\n      LOGGER.info(\"Chequeo de la existencia del fichero JSON pasado como par\u00e1metro\");\n      Configuration conf = new Configuration();\n\n      Path path = new Path(args[0]);\n      try {\n        FileSystem fs = path.getFileSystem(conf);\n        LOGGER.info(\"se ha creado el FileSystem\");\n        FSDataInputStream inputStream = fs.open(path);\n\n      }catch(Exception e){\n        LOGGER.error(e.getMessage());\n        LOGGER.error(\"El fichero no est\u00e1 disponible \" + args[0]);\n        \/\/System.exit(-1);\n        throw new Exception(\"El fichero no est\u00e1 disponible \" + args[0]);\n\n      }\n    }\n    \/\/ Para hacer funcionar esta clase en un cluster local en necesario descomentar la siguiente l\u00ednea y comentar la posterior\n\t\/\/SparkConf conf = new SparkConf().setMaster(\"local\").setAppName(\"Table Reader\");\n    SparkConf conf = new SparkConf();\n\t\n    JavaSparkContext sc = new JavaSparkContext(conf);\n\n    HiveContext hiveContext = new HiveContext(sc.sc());\n    JSONReadObject[] a_jsonObjects = JSONRead.jsonFileToObject(args[0]);\n\n    for(JSONReadObject jsonObject: a_jsonObjects){\n\n        process(hiveContext, jsonObject);\n    }\n  }\n\n\n  \/**\n  *\/\n  private static void process(HiveContext hiveContext, JSONReadObject jsonObject) throws Exception{\n\n\n    \/\/ ya tenemos el objeto con el fichero JSON de definici\u00f3n cargado\n    \/\/ ahora debemos comprobar si existe o no la tabla de destino\n    hiveContext.sql((String.format(\"USE %s\", jsonObject.getDestination_db())));\n\n\t\/\/ comprobamos si existe latabla destino\n\tString[] tableNames = hiveContext.tableNames();\n\tBoolean tableExists = false;\n\tfor(String table: tableNames) {\n\t\tLOGGER.info(\"Table = \" + table);\n\t\tif(table.equalsIgnoreCase(jsonObject.getDestination_table())) {\n\t\t\ttableExists = true;\n\t\t}\n    }\n\t\/\/ recuperamos los tipos de datos\n\taddDataType(hiveContext, jsonObject);\n\t\/\/esto es un apa\u00f1o para tener los datos recuperados desde el JSON en un Map con clave columnName\n\tjsonObject.createMaps();\n\t\n\n\tLOGGER.info(\"tableExists = \" + tableExists);\n\tif(!tableExists){\n\n\t  \/\/CREATE TABLE\n\t  String create_query = makeCreateTable(jsonObject);\n\t\t\n\t  hiveContext.sql(create_query);\n\t\t\n\t  LOGGER.info(\"EXECUTED OK\");\n\t\t\n\t  hiveContext.sql((String.format(\"USE %s\", jsonObject.getDestination_db())));\n\t  LOGGER.info( String.format(\"USE %s\", jsonObject.getDestination_db()));\n\n\t}else{\n\n\t  \/\/CHECK TABLE SCHEMA\n\t  validateCurrentSchema(hiveContext, jsonObject);\n\t}\n\t\/\/ copiamos los datos desde Origen a Destino\n\t\/\/ eso lo tenemos que hacer independientemente de si se crea la tabla, se modifica o no\n\tfromOriginToDestination(hiveContext, jsonObject);\n\t\/\/TODO averiguar si es necesario cerrar el SparkContest\n\t\/\/TODO sc.close();\n\n  }\n  \/**\n   * Se informan los tipos de datos de las columnas recuperando esa informaci\u00f3n\n   * de las tablas origen\n   * @param hiveContext\n   * @param jsonObject\n   *\/\n  private static void addDataType(HiveContext hiveContext, JSONReadObject jsonObject) throws Exception{\n\n    LOGGER.info(\">>> addDataType\");\n    hiveContext.sql((String.format(\"USE %s\", jsonObject.getOrigin_db())));\n\n    LOGGER.info((String.format(\"Use %s\", jsonObject.getOrigin_db())));\n\n    try {\n      scala.Tuple2&lt;String, String>[] columns = hiveContext.table(jsonObject.getOrigin_tableName()).dtypes();\n      LOGGER.info(\"columns.length = \" + columns.length);\n      \/\/ recorremos el schema de la tabla origin\n      for(scala.Tuple2&lt;String, String> tmp : columns) {\n\n        LOGGER.info(\"tmp._1 \" + tmp._1);\t\/\/ nombre_columna\n        LOGGER.info(\"tmp._2 \" + tmp._2);\n        \/\/ comparamos cada una de las columnas que est\u00e1n en el fichero de input json con las columnas de la tabla ya existente\n        \n        for(OriginElement tmp_origin: jsonObject.getL_origin_elements()){\n          if(tmp_origin.getColumnName().startsWith(\"static:\")){\n            LOGGER.info(\"Hay columnas que NO existen en la tabla origen y que deben llegar a la tabla destino...\");\n            LOGGER.info(tmp_origin.getColumnName());\n          }else {\n            if (tmp_origin.getColumnName().equalsIgnoreCase(tmp._1)) {\n              tmp_origin.setDataType(tmp._2.replace(\"Type\", \"\").replace(\"Integer\", \"int\"));\n              \/\/ informamos el tipo de dato que corresponde al destino recuperando la info\n              \/\/ de la tabla origin\n              break;\n            }\n          }\n        }\n      }\n    }catch(Throwable e){\n      if(e.getClass().getName().equals(\"org.apache.spark.sql.catalyst.analysis.NoSuchTableException\")) {\n        LOGGER.error(\"La tabla origen de la que debemos recuperar los TIPOS de datos NO existe: \" + jsonObject.getOrigin_tableName());\n        LOGGER.error(\"Se debe revisar la configuraci\u00f3n del fichero de configuraci\u00f3n y contrastar los or\u00edgenes de datos en \u00e9l definidos\");\n        LOGGER.error(\"Finaliza el proceso con error\");\n        \/\/System.exit(-1);\n        throw new Exception(\"La tabla origen de la que debemos recuperar los TIPOS de datos NO existe: \" + jsonObject.getOrigin_tableName());\n      }else{\n        throw new Exception(\"Se ha producido un error NO ESPERADO \" + e.getMessage());\n      }\n    }\n\n    LOGGER.info(\"&lt;&lt;&lt; addDataType\");\n\n  }\n\n\n\n  \/**\n   * M\u00e9todo que construye la sentencia de creaci\u00f3n de la tabla destino\n   * @param jsonObject\n   * @return\n   *\/\n  private static String makeCreateTable(JSONReadObject jsonObject){\n\n    LOGGER.info(\"makeCreateTable \" + jsonObject.toString());\n    StringBuffer ddl = new StringBuffer();\n    ddl.append(\"CREATE EXTERNAL TABLE \");\n\n    String DESTINATION_TABLE = jsonObject.getDestination_table();\n    String DESTINATION_DB = jsonObject.getDestination_db();\n    ddl.append(DESTINATION_DB);\n    ddl.append(SEPARADOR);\n    ddl.append(DESTINATION_TABLE);\n    ddl.append(OPEN);\n    List&lt;DestinationElement> l_destination_elements = jsonObject.getL_destination_elements();\n\n    SortedMap&lt;String, OriginElement> map_origin = jsonObject.getM_origin();\n    for( DestinationElement destination: l_destination_elements){\n\n      \/\/validamos las columnas que se deben crear sin informaci\u00f3n\n      if( destination.getColumnName().startsWith(\"static:\") ) {\n        ddl.append(destination.getColumnName().substring(destination.getColumnName().indexOf(\":\")+1));\n        ddl.append(ESPACIO);\n        if (destination.getDataType() == null || destination.getDataType().equals(\"\")){\n          ddl.append(\"string\");\n                    ddl.append(COMA);\n         }else{\n          \/\/ debe llegar siempre un tipo de dato v\u00e1lido, sino fallar\u00e1\n          ddl.append(destination.getDataType());\n                    ddl.append(COMA);\n        }\n      }else {\n\n\n        String alias = destination.getAlias();\n        OriginElement origin = map_origin.get(destination.getColumnName());\n\n        if (null != alias &amp;&amp; !alias.equals(\"\")) {\n          ddl.append(alias);\n          ddl.append(ESPACIO);\n          ddl.append(origin.getDataType());\n          ddl.append(COMA);\n        } else {\n          ddl.append(origin.getColumnName());\n          ddl.append(ESPACIO);\n          ddl.append(origin.getDataType());\n          ddl.append(COMA);\n        }\n      }\n    }\n\n    ddl.deleteCharAt(ddl.lastIndexOf(COMA));\n    ddl.append(CLOSE);\n    ddl.append(\" STORED AS PARQUET\");\n    \/\/ actualmente la LOCATION viene definida por la base de datos destino\n    \/\/ si se quisiera modificar, para lo que ser\u00eda necesario tener permisos en la nueva ruta\n    \/\/ s\u00f3lo seria necesario descomentar las siguientes l\u00edneas de c\u00f3digo\n\n    \/\/ddl.append(\" LOCATION '\");\n        \/\/ddl.append(jsonObject.getDestination_path());\n        \/\/ddl.append(\"'\");\n\n\n    LOGGER.info(\"ddl = \" + ddl.toString());\n    return ddl.toString();\n  }\n\n  \/**\n   * M\u00e9todo que construye y ejecuta la sentencia de importaci\u00f3n de datos desde origen a destino\n   * @param hiveContext\n   * @param jsonObject\n   *\/\n  private static void fromOriginToDestination(HiveContext hiveContext, JSONReadObject jsonObject){\n    LOGGER.info(\">>> recoverDataFromOrigin\");\n\n    StringBuffer dll = new StringBuffer();\n    dll.append(\"SELECT \");\n    \/\/ Primero recorremos los valores ya existentes en la tabla origen\n    for(OriginElement tmp: jsonObject.getL_origin_elements()){\n\n      dll.append(tmp.getColumnName());\n      dll.append(COMA);\n    }\n    \/\/ despu\u00e9s de recorrer todos los ya existentes, se procesan los valores est\u00e1ticos nuevos\n    \/\/ que se insertan con valor vac\u00edo\n    for(DestinationElement destination : jsonObject.getL_destination_elements()){\n      if( destination.getColumnName().startsWith(\"static:\") ){\n        dll.append(ESPACIO);\n        dll.append(\"''\");\n        dll.append(COMA);\n      }\n    }\n\n    dll.deleteCharAt(dll.lastIndexOf(COMA));\n    dll.append(\" FROM \");\n    dll.append(jsonObject.getOrigin_db());\n    dll.append(SEPARADOR);\n    dll.append(jsonObject.getOrigin_tableName());\n\n    if(null!=jsonObject.getWhere() &amp;&amp; !jsonObject.getWhere().equals(\"\")){\n\n      LOGGER.info(\"S\u00cd Hay condici\u00f3n WHERE. Esto no est\u00e1 contemplado para la primera versi\u00f3n\");\n      dll.append(\" WHERE \");\n      dll.append(jsonObject.getWhere());\n      dll.append(ESPACIO);\n    }else{\n      LOGGER.info(\"No hay condici\u00f3n WHERE\");\n    }\n\n    StringBuffer dll2 = new StringBuffer();\n\n    dll2.append(jsonObject.getDestination_db());\n    dll2.append(SEPARADOR);\n    dll2.append(jsonObject.getDestination_table());\n\n    \/\/ v2.x de Spark ==> hiveContext.sql(dll.toString()).toDF().write().mode(SaveMode.Append).format(\"parquet\").insertInto(dll2.toString());\n    LOGGER.debug(dll.toString());\n    LOGGER.debug(dll2.toString());\n    hiveContext.sql(dll.toString()).toDF().write().mode(SaveMode.Append).format(\"parquet\").saveAsTable(dll2.toString());\n\n    LOGGER.info(\"&lt;&lt;&lt; recoverDataFromOrigin\");\n\n  }\n\n  \/**\n   * M\u00e9todo que comprueba si la tabla destino ya existe y si tiene o no alg\u00fan nuevo campo\n   * que se deba incluir en la propia tabla.\n   * En caso de existir un nuevo campo, actualiza la tabla destino con dicho campo\n   * @param hiveContext\n   * @param jsonObject\n   *\/\n  private static void validateCurrentSchema(HiveContext hiveContext , JSONReadObject jsonObject){\n\n    LOGGER.info(\">>> SparkTableReader.validateCurrentSchema\");\n\n    StringBuffer dll = new StringBuffer();\n    dll.append(\"SELECT * FROM \");\n    dll.append(jsonObject.getDestination_db());\n    dll.append(SEPARADOR);\n    dll.append(jsonObject.getDestination_table());\n\n\n    String[] current_columns = hiveContext.sql(dll.toString()).toDF().columns();\n    LOGGER.info(\"current_columns = \" + current_columns);\n\n    SortedMap&lt;String, String> new_columns = jsonObject.getListColumns();\n    LOGGER.info(\"new_columns = \" + new_columns);\n\n    for(String column_name : current_columns ){\n      LOGGER.info(\"column_name = \" + column_name);\n      boolean exists = false;\n      if (new_columns.containsKey(column_name.toUpperCase())){\n\n        LOGGER.info(\"Existe la columna \" + column_name.toUpperCase());\n\n        \/\/ vamos chequeando que las columnas existen en destino\n        \/\/ y cuando se cumple la condici\u00f3n las eliminamos del listado de columnas del JSON\n        new_columns.remove(column_name.toUpperCase());\n      }\n    }\n    \/\/ tras borrar del listado de nuevas columnas las que ya se encuentran en la tabla destino\n    \/\/ debemos realizar un ALTER TABLE con las que nos quedan, porque son nuevas\n    if(new_columns.size()>0){\n      Iterator&lt;String> itera = new_columns.keySet().iterator();\n      while(itera.hasNext()) {\n        \/\/for(SortedMap&lt;String, String> new_column: new_columns){\n        String key = itera.next();    \/\/ la key es el nombre de la columna\n        String value = new_columns.get(key);    \/\/el value es el type de la columna\n        LOGGER.info(\"Hay una nueva columna que se debe a\u00f1adir \" + key);\n        hiveContext.sql(\"ALTER TABLE \"+ jsonObject.getDestination_db() + \".\"+jsonObject.getDestination_table() + \" ADD COLUMNS (\" + key + ESPACIO + value + \")\");\n        LOGGER.info(\"Se ha actualizado la tabla a\u00f1adiendo la nueva columna....\" + key + ESPACIO + value);\n      }\n    }\n\n    LOGGER.info(\"&lt;&lt;&lt; validateCurrentSchema\");\n\n  }\n}<\/code><\/pre>\n\n\n\n<p>Queda pendiente incluir la clase que realiza la lectura del fichero JSON de configuraci\u00f3n, que realmente no tiene mucha sustancia y que utiliza la librer\u00eda Jackson para realizar la lectura y escritura.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>A continuaci\u00f3n se incluye la clase que crea la sentencia ejecutada para la creaci\u00f3n de las tablas, adem\u00e1s de la &hellip; <a href=\"http:\/\/justmakeit.es\/?p=311\" class=\"btn btn-readmore\">Read More <span class=\"screen-reader-text\"> \u00abJava + Spark + Hive (II)\u00bb<\/span><\/a><\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"closed","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[6],"tags":[37,36,35,34],"class_list":["post-311","post","type-post","status-publish","format-standard","hentry","category-cosos","tag-hive","tag-java","tag-json","tag-spark"],"_links":{"self":[{"href":"http:\/\/justmakeit.es\/index.php?rest_route=\/wp\/v2\/posts\/311","targetHints":{"allow":["GET"]}}],"collection":[{"href":"http:\/\/justmakeit.es\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/justmakeit.es\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/justmakeit.es\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/justmakeit.es\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=311"}],"version-history":[{"count":3,"href":"http:\/\/justmakeit.es\/index.php?rest_route=\/wp\/v2\/posts\/311\/revisions"}],"predecessor-version":[{"id":314,"href":"http:\/\/justmakeit.es\/index.php?rest_route=\/wp\/v2\/posts\/311\/revisions\/314"}],"wp:attachment":[{"href":"http:\/\/justmakeit.es\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=311"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/justmakeit.es\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=311"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/justmakeit.es\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=311"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}