21/10/2021

Más cosas con Datasets

Voy a crear un Dataset vacío para ir acumulando en él los resultados de distintas consultas que voy a ir realizando. Para ello utilizaremos el método .union(), pero es necesario que el Dataset tenga el schema que van a tener los Dataset con los que se va a utilizar ese .union().

// Definimos los nombres de las columnas de nuestro Dataset
String schemaString = "ID CAMPO RULEID";

List<StructField> fields = new ArrayList<>();

// Hacemos un split por el espacio con los nombres de las columnas y creamos un StructField con el nombre y en estos casos el tipo String
for (String fieldName : schemaString.split(" ")) {
    StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
    fields.add(field);
}

// Creamos el StructType con los StructField que acabamos de crear
StructType schema = DataTypes.createStructType(fields);

// Creamos el Dataset vacío con la estructura que acabamos de definir	    
Dataset<Row> emptyDataSet = sparkSession.createDataFrame(new ArrayList<>(), schema);

Una vez creado el emptyDataSet, podemos empezar a usarlo…

String query = "SELECT ID,CAMPO FROM TBL WHERE....";
String nombreColumna = "RULEID";
// Nuestro dataset que recuperamos de la ejecución de la query sólo tiene las columnas ID y CAMPO
Dataset temporal = sparkSession.sql(query);

// Incluimos la información que necesitamos tener 
temporal = temporal.withColumn(nombreColumna, functions.lit(valor));

// Hacemos el union de los datasets para acumular datos
emptyDataSet = emptyDataSet.union(temporal);