Método para realizar la consulta de datos en Elastic Search, hay que añadir la query que se quiere ejecutar, que será de un tipo parecido al siguiente
var query = {"query":
{"bool":
{"must": [
{ "match": { "condicion1": "valor1" }},
{ "match": { "condicion2": "valor2" }},
{ "bool": { "should": [ { "match": { "condicion3": "valor3" }},
{ "match": { "condicion4": "valor4" }}
]
}
}]
}
}
}
Idealmente, la query estará en un fichero properties; de no ser así, será necesario incluirla en el código como un String, y para ello será necesario escapar las comillas dobles con \»
En el caso de la creación del Array[Column] con las columnas a recuperar de la consulta, he incluído un ejemplo, en el que habría que escapar de nuevo las dobles comillas para que el String sea válido. El contenido de schema sería similar a: col1;col2;col3;col4;»}
var schema = getColumnsFromES({"properties": [
{"name":"col1","type":"keyword","alias":"alias1","isArray":"false"},
{"name":"col2","type":"keyword","alias":"alias2","isArray":"false"},
{"name":"col3","type":"keyword","alias":"alias3","isArray":"false"},
{"name":"col4","type":"keyword","alias":"alias4","isArray":"false"},
{"name":"col5","type":"keyword","alias":"alias5","isArray":"false"}]}")
def getColumnsFromES(schemaES: String): Array[Column] = {
var result = Array.empty[Column]
val jsonObject = new Gson().fromJson(schemaES, classOf[JsonObject])
val jsonArray: JsonArray = jsonObject.getAsJsonArray("properties")
var logColumns = ""
jsonArray.forEach(
elem => {
val name = elem.getAsJsonObject.get("name").getAsString
val alias = elem.getAsJsonObject.get("alias").getAsString
logColumns = logColumns + name + ";"
if (elem.getAsJsonObject.get("isArray").getAsString == "true") {
result = result :+ new Column(name).alias(alias)
} else {
result = result :+ new Column(name).cast(StringType).alias(alias)
}
}
)
logger.info("Columns for json metadata mapping:" + logColumns)
result
}
def getInfoFromElasticSearch(
spark, // : SparkSession
eks_master, // : String es.nodes : listado de hosts de elastic
"9200", // : String es.port : connection port
"user", // : String es.net.http.auth.user : connection user
"password", // : String es.net.http.auth.pass : user password
query, // : String es.query : query
schema, // : Array[Column] schema
collections, // : Array[String] es.read.field.as.array.include
): DataFrame = {
var df: DataFrame = null
try {
// fill the options to read
var jMap = Map[String, String]()
jMap += ("es.nodes" -> hostname)
jMap += ("es.port" -> port)
jMap += ("es.net.http.auth.user" -> user)
jMap += ("es.net.http.auth.pass" -> pass)
jMap += ("es.read.field.empty.as.null" -> "no")
jMap += ("es.nodes.wan.only" -> "true")
jMap += ("es.mapping.date.rich" -> "false")
jMap += ("es.scroll.size" -> "5000")
jMap += ("es.input.max.docs.per.partition" -> "10000")
jMap += ("es.query" -> query)
if (collections!= null && collections.length > 0) {
jMap += ("es.read.field.as.array.include" -> collections.mkString(","))
}
df = spark
.read
.format("org.elasticsearch.spark.sql")
.options(jMap) // Map[String, String]
.load("trace.form.cont.uti.corp.*")
.select(schema: _*) // Array[Column]
} catch {
case ce: Exception =>
logger.error("Error reading data from elasticsearch", ce)
df
}