11/12/2018

Java + Spark + Hive

Últimamente me ha tocado trabajar en un desarrollo con tecnología Big Data en el que era necesario realizar la lectura de unas tablas Hive y convertirlas en otras tablas también de Hive con sólo algunos de los campos de las originales. Como aún estoy muy verde con Scala y su sintaxis me sigue resultando extraña, he decidido hacerlo en Java con Spark. Probablemente intente realizar este mismo desarrollo con Scala cuando haya finalizado con él y poder así comparar rendimiento y comportamiento de ambos.
Concretando partimos de 14 tablas originales y debemos construir 40 nuevas.
Para ello hemos decidido crear un fichero de definición de los orígenes y destinos de las tablas en formato JSON que tras procesar nos permiten crear y/o modificar las tablas de destino e insertar los datos desde los orígenes.

A continuación trataré de detallar los pasos seguidos para completar el desarrollo.

Fichero JSON de definición

{
"origin_db": "nombre_bd_origen",
"origin_tableName" : "nombre_tabla_origen",
"l_origin_elements": [
{ "columnName": "campo_1"},
{ "columnName": "campo_2"},
{ "columnName": "campo_3"}
],
"where": "",
"destination_db": "nombre_bd_destino",
"destination_table": "nombre_tabla_destino",
"l_destination_elements": [
{ "columnName": "campo_1",
"alias": "nombre_campo_destino"},
{ "columnName": "campo_2"},
{ "columnName": "campo_3"}
]
}

JSONReadObject.java

import java.io.Serializable;
import java.util.*;
import com.fasterxml.jackson.annotation.JsonProperty;

public class JSONReadObject implements Serializable{

@JsonProperty
private String origin_db;
@JsonProperty
private String origin_tableName;
@JsonProperty
private List l_origin_elements;
@JsonProperty
private String where; // conditions for filters
@JsonProperty
private String destination_db;
@JsonProperty
private String destination_table;
@JsonProperty
private List l_destination_elements;

// Estos dos atributos NO se cargan desde el JSON
// se cargan con la estructura ID, XXXElement tras la lectura de la info del JSON
private SortedMap m_origin;
private SortedMap m_destination;

public JSONReadObject() {
super();
}

public void createMaps(){
// recuperamos los listados de columnas de origin y destino
// ordenados y con clave ID
this.m_destination = this.destListToMap(l_destination_elements);
this.m_origin = this.oriListToMap(this.l_origin_elements);
}

public SortedMap getListColumns(){
SortedMap columns = new TreeMap();
for(OriginElement origin: this.getL_origin_elements()) {

if( null != this.getM_destination().get(origin.getColumnName()).getAlias() &&
!this.getM_destination().get(origin.getColumnName()).getAlias().equals("")) {
columns.put(this.getM_destination().get(origin.getColumnName()).getAlias().toUpperCase(), origin.getDataType());
}else{
columns.put(origin.getColumnName().toUpperCase(), origin.getDataType());
}
}
return columns;
}

@Override
//public String toString() {

//getters and setters
}

OriginElement.java

public class OriginElement implements Serializable {

private String columnName;
private String dataType; // esta información NO se recupera del JSON, sino que se recupera de la tabla a la que se referencia

//getters and setters
// default constructor
}

DestinationElement.java

public class DestinationElement implements Serializable {

private String columnName;
private String alias;

// getters and setters
// default constructor
}