{"id":385,"date":"2022-04-21T12:52:16","date_gmt":"2022-04-21T11:52:16","guid":{"rendered":"http:\/\/justmakeit.es\/?p=385"},"modified":"2022-04-21T12:52:16","modified_gmt":"2022-04-21T11:52:16","slug":"consolidacion-de-datos","status":"publish","type":"post","link":"http:\/\/justmakeit.es\/?p=385","title":{"rendered":"Consolidaci\u00f3n de datos"},"content":{"rendered":"\n<p>\u00bfQu\u00e9 hacer para consolidar informaci\u00f3n en la partici\u00f3n de una tabla? \u00bfC\u00f3mo actualizar deltas?<\/p>\n\n\n\n<p>Opciones, seguramente habr\u00e1 muchas, aunque una que he usado recientemente me parece razonablemente sencilla y efectiva. La idea es montar un dataset que incluya los datos previos y los datos de la nueva partici\u00f3n, chequear que en las nuevas columnas hay o no informaci\u00f3n y en funci\u00f3n de si existe esta nueva informaci\u00f3n actualizamos sobre las \u00abcolumnas\u00bb antiguas los nuevos valores; despu\u00e9s s\u00f3lo queda la tarea de eliminar las nuevas columnas para hacer coincidir la estructura del dataset con lo esperado en el schema de la base de datos destino.<\/p>\n\n\n\n<p>Ahora toca incluir el c\u00f3digo en el que se hace todo esto&#8230;., esto se puede hacer en una \u00fanica instrucci\u00f3n de c\u00f3digo, pero queda poco claro, raz\u00f3n por la que voy a hacerlo en varios pasos, que sin duda se ver\u00e1 mejor.<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>\n\n\/**\n * M\u00e9todo que realiza la consolidaci\u00f3n de los datos, sobreescribiendo los   \n * nuevos valores sobre los registros anteriores si existen \n * Nuestro m\u00e9todo recibe como par\u00e1metros los datos en forma de JavaRDD, en \n * vez de Dataset por lo que necesitamos transformarlos antes de trabajar\n * con ellos\n * @param sparkSession el sparkSession\n * @param previous datos previos, el acumulado\/consolidado\n * @param actual datos de la fecha informada\n * @return Dataset de datos consolidados\n *\/\n\npublic static Dataset&lt;Row> consolidate(SparkSession sparkSession, JavaRDD&lt;Row> previous, JavaRDD&lt;Row> actual) {\n\n   \/\/ nombres de las columnas de nuestra tabla destino\n   String schemaString = \"ID CAMPO1 CAMPO2 DATA_DATE\";\n\n   List&lt;StructField> fields = new ArrayList&lt;>(4);\n\n   \/\/ Hacemos un split por el espacio con los nombres de las columnas y creamos un StructField con el nombre y en estos casos el tipo String\n   for (String fieldName : schemaString.split(\" \")) {\n      StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);\n      fields.add(field);\n   }\n\n   \/\/ Creamos el StructType con los StructField que acabamos de crear\n   StructType schema = DataTypes.createStructType(fields);\n\n   \/\/ se chequea si el JavaRDD tiene datos o no antes de crear el dataset\n   if(null != actual) {\n\n      dsActual = sparkSession.createDataFrame(actual, schema);\n\t\n   }else {\n\t\n      \/\/ Creamos el Dataset vac\u00edo con la estructura definida\t    \n      dsActual = sparkSession.createDataFrame(new ArrayList&lt;>(), schema);\n   }\n\n\n   \/\/ renombramos las columnas antes de hacer el JOIN\n   dsActual = dsActual.select(\n\t\tdsActual.col(ID),\n\t\tdsActual.col(CAMPO1).alias(NEW_CAMPO1), \n\t\tdsActual.col(CAMPO2).alias(NEW_CAMPO2), \n\t\tdsActual.col(DATA_DATE).alias(NEW_DATA_DATE));\n   List&lt;String> join = new ArrayList&lt;String>();\t\n   join.add(ID);\n\n   \/\/antes de llegar aqu\u00ed habremos pasado el JavaRDD previous a Dataset dsPrevious \n   Dataset&lt;Row> consolidated = dsPrevious.join(dsActual, scala.collection.JavaConversions.asScalaBuffer(join).toList(), \n   \"<strong>full_outer<\/strong>\");\n\n   consolidated = consolidated.withColumn(CAMPO1, when(col(NEW_DATA_DATE).isNull(),col(CAMPO1)).otherwise(col(NEW_CAMPO1)))\n.withColumn(CAMPO2, when(col(NEW_DATA_DATE).isNull(), col(CAMPO2)).otherwise(col(NEW_CAMPO2)));\n\n   \/\/ se borran las columnas de la nueva fecha de procesamiento\n   consolidated = consolidated.drop(NEW_CAMPO1,NEW_CAMPO2,NEW_DATA_DATE);\n\n   return consolidated;\n}<\/code><\/pre>\n","protected":false},"excerpt":{"rendered":"<p>\u00bfQu\u00e9 hacer para consolidar informaci\u00f3n en la partici\u00f3n de una tabla? \u00bfC\u00f3mo actualizar deltas? Opciones, seguramente habr\u00e1 muchas, aunque una &hellip; <a href=\"http:\/\/justmakeit.es\/?p=385\" class=\"btn btn-readmore\">Read More <span class=\"screen-reader-text\"> \u00abConsolidaci\u00f3n de datos\u00bb<\/span><\/a><\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"closed","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[6],"tags":[49,34],"class_list":["post-385","post","type-post","status-publish","format-standard","hentry","category-cosos","tag-dataset","tag-spark"],"_links":{"self":[{"href":"http:\/\/justmakeit.es\/index.php?rest_route=\/wp\/v2\/posts\/385","targetHints":{"allow":["GET"]}}],"collection":[{"href":"http:\/\/justmakeit.es\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/justmakeit.es\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/justmakeit.es\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/justmakeit.es\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=385"}],"version-history":[{"count":3,"href":"http:\/\/justmakeit.es\/index.php?rest_route=\/wp\/v2\/posts\/385\/revisions"}],"predecessor-version":[{"id":388,"href":"http:\/\/justmakeit.es\/index.php?rest_route=\/wp\/v2\/posts\/385\/revisions\/388"}],"wp:attachment":[{"href":"http:\/\/justmakeit.es\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=385"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/justmakeit.es\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=385"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/justmakeit.es\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=385"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}