
En Spark 2.3 se puede realizar el siguiente ejercicio:
Crear una tabla con una estructura o schema definido como String para todos sus campos
spark.sql("CREATE TABLE IF NOT EXISTS " + databaseName + "." + tableName + " (col1 String, col2 String, col3 String) STORED AS PARQUET");
La ejecución del CREATE TABLE no nos devuelve el schema recién creado, ya que devuelve uno vacío, que no nos servirá para construir el Dataset. Por ello necesitamos ejecutar a continuación algo parecido a esto…
StructType schema = spark.sql(SELECT * FROM " + databaseName + "." + tableName + " LIMIT 1").schema();
A partir de este schema que acabamos de recuperar podemos crear un Dataset con el listado de datos que hemos recuperado del origen que sea…, para ello
Dataset<Row> dataset = spark.createDataFrame(cols, schema);
El objecto cols, en nuestro caso es un JavaRDD<Row> construido a partir de datos recuperados de MongoDB, pero podría venir de cualquier otro origen.
Una vez creado el Dataset, podemos modificar el tipo de los datos de alguna de las columnas de la siguiente manera.
dataset = dataset.withColumn("col1", dataset.col("col1").cast(DataTypes.LongType));
dataset = dataset.withColumn("col2", dataset.col("col2").cast(DataTypes.IntegerType));
Por último persistimos la información en Hive
dataset.write().mode(SaveMode.Overwrite).format("parquet").saveAsTable(tableName);
Por otro lado, la conexión a MongoDB desde Spark se realiza de la siguiente manera…
String spark_mongodb_input_uri = "mongodb://" + user + ":" + password + "@" + host + ":" + port + "/" + authenticationDatabaseName;
SparkSession spark = SparkSession.builder()
// .master("local[6]")// only for debug
.appName("mongo-db-connector").enableHiveSupport()
.config("spark.mongodb.input.uri", spark_mongodb_input_uri)
.config("spark.mongodb.input.database", spark_mongodb_input_database)
.config("spark.mongodb.input.collection", spark_mongodb_input_collection);
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
/* Start Example: Read data from MongoDB ************************/
JavaMongoRDD<Document> rdd = MongoSpark.load(jsc);
/* End Example **************************************************/