06/03/2019

Incluir una nueva columna en una tabla preexistente en Impala con Spark

La principal consideración a tener en cuenta para añadir una nueva columna en una tabla que ya existe es que el método no genera la columna sobre el DataFrame actual sino que la genera sobre uno nuevo.

// para poder utilizar la función current_timestamp es necesario realizar la 
// importación
import org.apache.spark.sql.functions._

// se define la configuración Spark
val sparkConf = new SparkConf().setAppName(APP_NAME)

// se define el StreamingContext con una frecuencia de 10 segundos    
val ssc = new StreamingContext(sparkConf, Seconds(10))

// se configura el HiveContext
val hc = new HiveContext(ssc.sparkContext)

val table:DataFrame = hc.table( tableName )
val operacionDf:DataFrame = hc.read.schema( table.schema ).json( jsonRDD )

// se añade el current_timestamp en una nueva columna creada para tal efecto
val newDf:DataFrame = operacionDf.withColumn( "aggr_time", date_format( current_timestamp(), "yyyy-MM-dd hh:mm:ss" ) )

newDf.write.mode( SaveMode.Append ).insertInto( tableName )