09/05/2024

Lectura de Elastic Search con Scala

Método para realizar la consulta de datos en Elastic Search, hay que añadir la query que se quiere ejecutar, que será de un tipo parecido al siguiente


var query = {"query":
  {"bool":
    {"must": [
      { "match": { "condicion1": "valor1" }},
      { "match": { "condicion2": "valor2" }},
      { "bool": { "should": [ { "match": { "condicion3": "valor3" }},
                              { "match": { "condicion4": "valor4" }}
                            ]
                }
      }]
    }
  }
}

Idealmente, la query estará en un fichero properties; de no ser así, será necesario incluirla en el código como un String, y para ello será necesario escapar las comillas dobles con \»

En el caso de la creación del Array[Column] con las columnas a recuperar de la consulta, he incluído un ejemplo, en el que habría que escapar de nuevo las dobles comillas para que el String sea válido. El contenido de schema sería similar a: col1;col2;col3;col4;»}

var schema = getColumnsFromES({"properties":  [
    {"name":"col1","type":"keyword","alias":"alias1","isArray":"false"}, 
    {"name":"col2","type":"keyword","alias":"alias2","isArray":"false"}, 
    {"name":"col3","type":"keyword","alias":"alias3","isArray":"false"}, 
    {"name":"col4","type":"keyword","alias":"alias4","isArray":"false"}, 
    {"name":"col5","type":"keyword","alias":"alias5","isArray":"false"}]}")

def getColumnsFromES(schemaES: String): Array[Column] = {
    var result = Array.empty[Column]
    val jsonObject = new Gson().fromJson(schemaES, classOf[JsonObject])
    val jsonArray: JsonArray = jsonObject.getAsJsonArray("properties")
    var logColumns = ""
    jsonArray.forEach(
      elem => {
        val name = elem.getAsJsonObject.get("name").getAsString
        val alias = elem.getAsJsonObject.get("alias").getAsString
        logColumns = logColumns + name + ";"
        if (elem.getAsJsonObject.get("isArray").getAsString == "true") {
          result = result :+ new Column(name).alias(alias)
        } else {
          result = result :+ new Column(name).cast(StringType).alias(alias)
        }
      }
    )
    logger.info("Columns for json metadata mapping:" + logColumns)
    result
}

def getInfoFromElasticSearch(
      spark,            // : SparkSession
      eks_master,       // : String es.nodes : listado de hosts de elastic
      "9200",           // : String es.port : connection port
      "user",           // : String es.net.http.auth.user : connection user
      "password",       // : String es.net.http.auth.pass : user password
      query,            // : String es.query : query
      schema,           // : Array[Column] schema
      collections,      // : Array[String] es.read.field.as.array.include
    ): DataFrame = {

    var df: DataFrame = null
    try {
      // fill the options to read
      var jMap = Map[String, String]()
      jMap += ("es.nodes" -> hostname)
      jMap += ("es.port" -> port)
      jMap += ("es.net.http.auth.user" -> user)
      jMap += ("es.net.http.auth.pass" -> pass)
      jMap += ("es.read.field.empty.as.null" -> "no")
      jMap += ("es.nodes.wan.only" -> "true")
      jMap += ("es.mapping.date.rich" -> "false")
      jMap += ("es.scroll.size" -> "5000")
      jMap += ("es.input.max.docs.per.partition" -> "10000")
      jMap += ("es.query" -> query)
      if (collections!= null && collections.length > 0) {
        jMap += ("es.read.field.as.array.include" -> collections.mkString(","))
      }

    df = spark
        .read
        .format("org.elasticsearch.spark.sql")
        .options(jMap)          // Map[String, String]
        .load("trace.form.cont.uti.corp.*")
        .select(schema: _*)     // Array[Column]
    } catch {
      case ce: Exception =>
        logger.error("Error reading data from elasticsearch", ce)

    df
}