¿Cómo deberíamos realizar un borrado de datos en HDFS?
Depende de si queremos borrar todos los datos de una partición o si queremos eliminar sólo una parte de los mismos. En el primer caso, deberíamos hacer algo similar a esto:
def deleteHDFS(spark: SparkSession,
entityName: String,
configMap: Map[String, String],
partitionDate: String): Unit = {
val hadoopConf = spark.sparkContext.hadoopConfiguration
val fs = FileSystem.get(hadoopConf)
// Creamos el array en función del valor de la variable
val tableName: Array[String] = entityName match {
// SI necesitamos borrar una única "tabla"
case TYPE_1 =>Array(configMap("kuduTableName1").split("\\.")(1))
// SI necesitamos borrar dos o más "tablas"
case TYPE_2 => Array(configMap("kuduTableName1").split("\\.")(1), configMap("kuduTableName2").split("\\.")(1))
case _ => Array.empty[String] // Array vacío si no coincide
}
// para cada nombre de tabla que debemos tratar, hacemos el borrado de la partición que corresponde al día
tableName.foreach { name =>
// Ruta de la partición en HDFS. En nuestro caso, las rutas coinciden con los nombres de tableName
val partitionPath = new Path(configMap("pathRoot") + "/" + name.toUpperCase + "/date=" + partitionDate)
logger.info("partitionPath = " + partitionPath)
try {
logger.info("SE VA A REALIZAR EL BORRADO SI EXISTE DE LA CARPETA ")
if (fs.exists(partitionPath)) {
logger.info("fs.exists " + fs.exists(partitionPath))
fs.delete(partitionPath, true) // true = borrado recursivo
logger.info(s"Partition deleted: $partitionPath")
} else {
logger.info(partitionPath + " does not exist!")
}
}catch {
case ce: Exception =>
logger.error("An error has ocurred trying to remove an HDFS path", ce)
}
}