spark将数据写入es

举报
bigdata张凯翔 发表于 2021/03/26 01:26:48 2021/03/26
【摘要】 任何内容RDD都可以保存到Elasticsearch,在实践中,这意味着RDD类型是Map(Scala或Java的)类型,JavaBeanScala的案例类。如果不是这种情况,则可以轻松地在Spark中转换数据或使用自己的自定义插件ValueWriter。 import org.apache.spark.SparkContext //Spark Scala进口 import ...

任何内容RDD都可以保存到Elasticsearch,在实践中,这意味着RDD类型是Map(Scala或Java的)类型,JavaBeanScala的案例类。如果不是这种情况,则可以轻松地在Spark中转换数据或使用自己的自定义插件ValueWriter

import org.apache.spark.SparkContext
//Spark Scala进口 
import org.apache.spark.SparkContext._

import org.elasticsearch.spark._ //elasticsearh-hadoop Scala导入
...

val conf = ...
val sc = new SparkContext(conf) //通过其Scala API 启动Spark
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

sc.makeRDD( 
//makeRDD根据指定的集合创建一个临时的,其他任何RDD(Java或Scala)都可以传入
  Seq(numbers, airports)
).saveToEs("spark/docs") 
在Elasticsearch下的内容下建立索引

将Map对象写入ElasticSearch

package cn.itzkx.spark_es

import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._

//将Map对象写入ElasticSearch
//https://www.iteblog.com/archives/1728.html#id
object Spark2Es {
  def main(args: Array[String]): Unit = { val master ="local" val conf = new SparkConf().setAppName("iteblog").setMaster(master) conf.set("es.index.auto.create", "true") conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200")

// //设置es的相关参数
// val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//
// //2、构建SparkContext对象
// val sc: SparkContext = spark.sparkContext val sc = new SparkContext(conf) //sc.setLogLevel("warn") val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3) val airports = Map("OTP" -> "ibex", "SFO" -> "San Fran") sc.makeRDD(Seq(numbers, airports)).saveToEs("itzkx/docs") sc.stop()
  }

}

将case class对象写入ElasticSearch

package cn.itzkx.spark_es

import org.apache.spark.rdd.RDD
import org.elasticsearch.spark.rdd.EsSpark
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark._

//将case class对象写入ElasticSearch
object Spark2Esl { def main(args: Array[String]): Unit = { val master ="local" val conf = new SparkConf().setAppName("iteblog").setMaster(master) conf.set("es.index.auto.create", "true") conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200") // //设置es的相关参数 // val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() // // //2、构建SparkContext对象 // val sc: SparkContext = spark.sparkContext val sc = new SparkContext(conf) case class Trip(departure: String, arrival: String) val upcomingTrip = Trip("OTP", "SFO") val lastWeekTrip = Trip("MUC", "OTP") val rdd1: RDD[Trip] = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip)) rdd1.saveToEs("itzkx/class") /*上面的代码片段将upcomingTrip和lastWeekTrip写入到名为iteblog的_index中, type是class。上面都是通过隐式转换才使得rdd拥有saveToEs方法。 elasticsearch-hadoop还提供显式方法来把RDD写入到ElasticSearch中,如下:需要导包 import org.elasticsearch.spark.rdd.EsSpark*/ val rdd2: RDD[Trip] = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip)) EsSpark.saveToEs(rdd2, "spark/docs") sc.stop()
  }
}

将Json字符串写入ElasticSearch

package cn.itzkx.spark_es

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._

//将Json字符串写入ElasticSearch
object  Spark2Esla{
  def main(args: Array[String]): Unit = { val master ="local" val conf = new SparkConf().setAppName("iteblog").setMaster(master) conf.set("es.index.auto.create", "true") conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200") // //设置es的相关参数 // val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() // // //2、构建SparkContext对象 // val sc: SparkContext = spark.sparkContext val sc = new SparkContext(conf) sc.setLogLevel("warn")
  val json1 = """{"id" : 1, "zkx" : "www.ibex.com", "weylin" : "ibex_hadoop"}"""
  val json2 = """{"id" : 2, "zkx" : "books.ibex.com", "weylin" : "ibex_hadoop"}"""
  sc.makeRDD(Seq(json1, json2)).saveJsonToEs("itzkx/json")
}
}

动态设置插入的type

package cn.itzkx.spark_es

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
//动态设置插入的type
object Spark2Eslas {
  def main(args: Array[String]): Unit = { val master ="local" val conf = new SparkConf().setAppName("iteblog").setMaster(master) conf.set("es.index.auto.create", "true") conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200") // //设置es的相关参数 // val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() // // //2、构建SparkContext对象 // val sc: SparkContext = spark.sparkContext val sc = new SparkContext(conf) sc.setLogLevel("warn") val game = Map("media_type" -> "game", "title" -> "FF VI", "year" -> "1994") val book = Map("media_type" -> "book", "title" -> "Harry Potter", "year" -> "2010") val cd = Map("media_type" -> "music", "title" -> "Surfing With The Alien") sc.makeRDD(Seq(game, book, cd)).saveToEs("itzkx/{media_type}")
  }
}
package cn.itzkx.spark_es

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
import org.elasticsearch.spark.rdd.EsSpark
object Spark2Eslast {
 /* 自定义id
  在ElasticSearch中,_index/_type/_id的组合可以唯一确定一个Document。
  如果我们不指定id的话,ElasticSearch将会自动为我们生产全局唯一的id,自动生成的ID有20个字符长如下: { "_index": "iteblog", "_type": "docs", "_id": "AVZy3d5sJfxPRwCjtWM-", "_score": 1, "_source": { "arrival": "Otopeni", "SFO": "San Fran" }
  }
  很显然,这么长的字符串没啥意义,而且也不便于我们记忆使用。不过我们可以在插入数据的时候手动指定id的值,如下:*/
 def main(args: Array[String]): Unit = { val master ="local" val conf = new SparkConf().setAppName("iteblog").setMaster(master) conf.set("es.index.auto.create", "true") conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200") // //设置es的相关参数 // val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() // // //2、构建SparkContext对象 // val sc: SparkContext = spark.sparkContext val sc = new SparkContext(conf) val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") val airportsRDD = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo))) airportsRDD.saveToEsWithMeta("itzkx/2015")

 /* 上面的Seq((1, otp), (2, muc), (3, sfo))语句指定为各个对象指定了id值,
 分别为1、2、3。然后你可以通过/iteblog/2015/1 URL搜索到otp对象的值。我们还可以如下方式指定id:*/ //下面这种更适合实际场景【动态映射】 val json1 = """{"id" : 1, "blog" : "www.iteblog.com", "weixin" : "iteblog_hadoop"}""" val json2 = """{"id" : 2, "blog" : "books.iteblog.com", "weixin" : "iteblog_hadoop"}""" //json2:String = {"id" : 2, "blog" : "books.iteblog.com", "weixin":"iteblog_hadoop"} val rdd = sc.makeRDD(Seq(json1, json2)) EsSpark.saveToEs(rdd, "itzkx/docs", Map("es.mapping.id" -> "id"))
  //上面通过es.mapping.id参数将对象中的id字段映射为每条记录的id。*/
 }
}
package cn.itzkx.spark_es

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.rdd.Metadata._
object Spark2Eslasti {
/*  自定义记录的元数据 可以在写入数据的时候自定义记录的元数据,如下:*/
def main(args: Array[String]): Unit = {
  val master = "local"
  val conf = new SparkConf().setAppName("itbex").setMaster(master)
  conf.set("es.index.auto.create", "true")
  conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200") // //设置es的相关参数
  // val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  //
  // //2、构建SparkContext对象
  // val sc: SparkContext = spark.sparkContext val sc = new SparkContext(conf) //otp元数据1
  val otp = Map("iata" -> "OTP", "name" -> "Otopeni") //muc元数据2
  val muc = Map("iata" -> "MUC", "name" -> "Munich") //sfo元数据3
  val sfo = Map("iata" -> "SFO", "name" -> "San Fran") val otpMeta = Map(ID -> 1, TTL -> "3h") val mucMeta = Map(ID -> 2, VERSION -> "23") val sfoMeta = Map(ID -> 3) val airportsRDD = sc.makeRDD(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo))) //airportsRDD.saveToEsWithMeta "iteblog/2015"
  airportsRDD.saveAsTextFile("itzkx/2015")


}
}

package cn.itzkx.spark_es

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.elasticsearch.spark.sql._

// case class used to define the DataFrame
case class Person(name: String, surname: String, age: Int)
object Spark2Eslastic {
  def main(args: Array[String]): Unit = { //设置es的相关参数 val master = "local" val conf = new SparkConf().setAppName("it").setMaster(master) conf.set("es.index.auto.create", "true") conf.set("es.nodes", "192.168.220.75:9200,192.168.220.76:9200,192.168.220.77:9200") val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() //2、构建SparkContext对象 val sc: SparkContext = spark.sparkContext //val sc = new SparkContext(conf) // reusing the example from Spark SQL documentation // sc = existing SparkContext val sqlContext = new SQLContext(sc) //  create DataFrame // import spark.implicits._ // val personDF: DataFrame = sc.textFile("people.txt") // .map(_.split(",")) // .map(p => Person(p(0), p(1), p(2).trim.toInt)).toDF() import spark.implicits._ val value = sc.textFile("people.txt").map(line => line.split(",")) val people = value.map(p => { (p(0), p(2)) }) //zhangsan zhangsanfeng 108 val personDF = people.toDF("tax_rate_code", "percentage_rate") personDF.saveToEs("itzkx/personDF") personDF.printSchema() personDF.show() sc.stop()
  }
}

以上所使用pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>Spark_all</artifactId> <groupId>cn.itcast.Spark_all</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>Spark_Es</artifactId> <dependencies> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.3.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.3</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>6.4.1</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
</project>

SparkSql将df写入es

// reusing the example from Spark SQL documentation

import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SQLContext._

import org.elasticsearch.spark.sql._ ...

// sc = existing SparkContext
val sqlContext = new SQLContext(sc)

// case class used to define the DataFrame
case class Person(name: String, surname: String, age: Int)

//  create DataFrame
val people = sc.textFile("people.txt") .map(_.split(",")) .map(p => Person(p(0), p(1), p(2).trim.toInt)) .toDF()

people.saveToEs("spark/people") 

文章来源: www.jianshu.com,作者:百忍成金的虚竹,版权归原作者所有,如需转载,请联系作者。

原文链接:www.jianshu.com/p/abb687fb4cf4

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。