21/04/2022

Consolidación de datos

¿Qué hacer para consolidar información en la partición de una tabla? ¿Cómo actualizar deltas?

Opciones, seguramente habrá muchas, aunque una que he usado recientemente me parece razonablemente sencilla y efectiva. La idea es montar un dataset que incluya los datos previos y los datos de la nueva partición, chequear que en las nuevas columnas hay o no información y en función de si existe esta nueva información actualizamos sobre las «columnas» antiguas los nuevos valores; después sólo queda la tarea de eliminar las nuevas columnas para hacer coincidir la estructura del dataset con lo esperado en el schema de la base de datos destino.

Ahora toca incluir el código en el que se hace todo esto…., esto se puede hacer en una única instrucción de código, pero queda poco claro, razón por la que voy a hacerlo en varios pasos, que sin duda se verá mejor.



/**
 * Método que realiza la consolidación de los datos, sobreescribiendo los   
 * nuevos valores sobre los registros anteriores si existen 
 * Nuestro método recibe como parámetros los datos en forma de JavaRDD, en 
 * vez de Dataset por lo que necesitamos transformarlos antes de trabajar
 * con ellos
 * @param sparkSession el sparkSession
 * @param previous datos previos, el acumulado/consolidado
 * @param actual datos de la fecha informada
 * @return Dataset de datos consolidados
 */

public static Dataset<Row> consolidate(SparkSession sparkSession, JavaRDD<Row> previous, JavaRDD<Row> actual) {

   // nombres de las columnas de nuestra tabla destino
   String schemaString = "ID CAMPO1 CAMPO2 DATA_DATE";

   List<StructField> fields = new ArrayList<>(4);

   // Hacemos un split por el espacio con los nombres de las columnas y creamos un StructField con el nombre y en estos casos el tipo String
   for (String fieldName : schemaString.split(" ")) {
      StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
      fields.add(field);
   }

   // Creamos el StructType con los StructField que acabamos de crear
   StructType schema = DataTypes.createStructType(fields);

   // se chequea si el JavaRDD tiene datos o no antes de crear el dataset
   if(null != actual) {

      dsActual = sparkSession.createDataFrame(actual, schema);
	
   }else {
	
      // Creamos el Dataset vacío con la estructura definida	    
      dsActual = sparkSession.createDataFrame(new ArrayList<>(), schema);
   }


   // renombramos las columnas antes de hacer el JOIN
   dsActual = dsActual.select(
		dsActual.col(ID),
		dsActual.col(CAMPO1).alias(NEW_CAMPO1), 
		dsActual.col(CAMPO2).alias(NEW_CAMPO2), 
		dsActual.col(DATA_DATE).alias(NEW_DATA_DATE));
   List<String> join = new ArrayList<String>();	
   join.add(ID);

   //antes de llegar aquí habremos pasado el JavaRDD previous a Dataset dsPrevious 
   Dataset<Row> consolidated = dsPrevious.join(dsActual, scala.collection.JavaConversions.asScalaBuffer(join).toList(), 
   "full_outer");

   consolidated = consolidated.withColumn(CAMPO1, when(col(NEW_DATA_DATE).isNull(),col(CAMPO1)).otherwise(col(NEW_CAMPO1)))
.withColumn(CAMPO2, when(col(NEW_DATA_DATE).isNull(), col(CAMPO2)).otherwise(col(NEW_CAMPO2)));

   // se borran las columnas de la nueva fecha de procesamiento
   consolidated = consolidated.drop(NEW_CAMPO1,NEW_CAMPO2,NEW_DATA_DATE);

   return consolidated;
}