spark将数据写入es
【摘要】 任何内容RDD都可以保存到Elasticsearch,在实践中,这意味着RDD类型是Map(Scala或Java的)类型,JavaBeanScala的案例类。如果不是这种情况,则可以轻松地在Spark中转换数据或使用自己的自定义插件ValueWriter。
import org.apache.spark.SparkContext
//Spark Scala进口
import ...
任何内容RDD
都可以保存到Elasticsearch,在实践中,这意味着RDD
类型是Map
(Scala或Java的)类型,JavaBean
Scala的案例类。如果不是这种情况,则可以轻松地在Spark中转换数据或使用自己的自定义插件ValueWriter
。
import org.apache.spark.SparkContext
//Spark Scala进口
import org.apache.spark.SparkContext._
import org.elasticsearch.spark._ //elasticsearh-hadoop Scala导入
...
val conf = ...
val sc = new SparkContext(conf) //通过其Scala API 启动Spark
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
sc.makeRDD(
//makeRDD根据指定的集合创建一个临时的,其他任何RDD(Java或Scala)都可以传入
Seq(numbers, airports)
).saveToEs("spark/docs")
在Elasticsearch下的内容下建立索引
将Map对象写入ElasticSearch
package cn.itzkx.spark_es
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
//将Map对象写入ElasticSearch
//https://www.iteblog.com/archives/1728.html#id
object Spark2Es {
def main(args: Array[String]): Unit = { val master ="local" val conf = new SparkConf().setAppName("iteblog").setMaster(master) conf.set("es.index.auto.create", "true") conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200")
// //设置es的相关参数
// val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//
// //2、构建SparkContext对象
// val sc: SparkContext = spark.sparkContext val sc = new SparkContext(conf) //sc.setLogLevel("warn") val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3) val airports = Map("OTP" -> "ibex", "SFO" -> "San Fran") sc.makeRDD(Seq(numbers, airports)).saveToEs("itzkx/docs") sc.stop()
}
}
将case class对象写入ElasticSearch
package cn.itzkx.spark_es
import org.apache.spark.rdd.RDD
import org.elasticsearch.spark.rdd.EsSpark
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark._
//将case class对象写入ElasticSearch
object Spark2Esl { def main(args: Array[String]): Unit = { val master ="local" val conf = new SparkConf().setAppName("iteblog").setMaster(master) conf.set("es.index.auto.create", "true") conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200") // //设置es的相关参数 // val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() // // //2、构建SparkContext对象 // val sc: SparkContext = spark.sparkContext val sc = new SparkContext(conf) case class Trip(departure: String, arrival: String) val upcomingTrip = Trip("OTP", "SFO") val lastWeekTrip = Trip("MUC", "OTP") val rdd1: RDD[Trip] = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip)) rdd1.saveToEs("itzkx/class") /*上面的代码片段将upcomingTrip和lastWeekTrip写入到名为iteblog的_index中, type是class。上面都是通过隐式转换才使得rdd拥有saveToEs方法。 elasticsearch-hadoop还提供显式方法来把RDD写入到ElasticSearch中,如下:需要导包 import org.elasticsearch.spark.rdd.EsSpark*/ val rdd2: RDD[Trip] = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip)) EsSpark.saveToEs(rdd2, "spark/docs") sc.stop()
}
}
将Json字符串写入ElasticSearch
package cn.itzkx.spark_es
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
//将Json字符串写入ElasticSearch
object Spark2Esla{
def main(args: Array[String]): Unit = { val master ="local" val conf = new SparkConf().setAppName("iteblog").setMaster(master) conf.set("es.index.auto.create", "true") conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200") // //设置es的相关参数 // val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() // // //2、构建SparkContext对象 // val sc: SparkContext = spark.sparkContext val sc = new SparkContext(conf) sc.setLogLevel("warn")
val json1 = """{"id" : 1, "zkx" : "www.ibex.com", "weylin" : "ibex_hadoop"}"""
val json2 = """{"id" : 2, "zkx" : "books.ibex.com", "weylin" : "ibex_hadoop"}"""
sc.makeRDD(Seq(json1, json2)).saveJsonToEs("itzkx/json")
}
}
动态设置插入的type
package cn.itzkx.spark_es
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
//动态设置插入的type
object Spark2Eslas {
def main(args: Array[String]): Unit = { val master ="local" val conf = new SparkConf().setAppName("iteblog").setMaster(master) conf.set("es.index.auto.create", "true") conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200") // //设置es的相关参数 // val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() // // //2、构建SparkContext对象 // val sc: SparkContext = spark.sparkContext val sc = new SparkContext(conf) sc.setLogLevel("warn") val game = Map("media_type" -> "game", "title" -> "FF VI", "year" -> "1994") val book = Map("media_type" -> "book", "title" -> "Harry Potter", "year" -> "2010") val cd = Map("media_type" -> "music", "title" -> "Surfing With The Alien") sc.makeRDD(Seq(game, book, cd)).saveToEs("itzkx/{media_type}")
}
}
package cn.itzkx.spark_es
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
import org.elasticsearch.spark.rdd.EsSpark
object Spark2Eslast {
/* 自定义id
在ElasticSearch中,_index/_type/_id的组合可以唯一确定一个Document。
如果我们不指定id的话,ElasticSearch将会自动为我们生产全局唯一的id,自动生成的ID有20个字符长如下: { "_index": "iteblog", "_type": "docs", "_id": "AVZy3d5sJfxPRwCjtWM-", "_score": 1, "_source": { "arrival": "Otopeni", "SFO": "San Fran" }
}
很显然,这么长的字符串没啥意义,而且也不便于我们记忆使用。不过我们可以在插入数据的时候手动指定id的值,如下:*/
def main(args: Array[String]): Unit = { val master ="local" val conf = new SparkConf().setAppName("iteblog").setMaster(master) conf.set("es.index.auto.create", "true") conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200") // //设置es的相关参数 // val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() // // //2、构建SparkContext对象 // val sc: SparkContext = spark.sparkContext val sc = new SparkContext(conf) val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") val airportsRDD = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo))) airportsRDD.saveToEsWithMeta("itzkx/2015")
/* 上面的Seq((1, otp), (2, muc), (3, sfo))语句指定为各个对象指定了id值,
分别为1、2、3。然后你可以通过/iteblog/2015/1 URL搜索到otp对象的值。我们还可以如下方式指定id:*/ //下面这种更适合实际场景【动态映射】 val json1 = """{"id" : 1, "blog" : "www.iteblog.com", "weixin" : "iteblog_hadoop"}""" val json2 = """{"id" : 2, "blog" : "books.iteblog.com", "weixin" : "iteblog_hadoop"}""" //json2:String = {"id" : 2, "blog" : "books.iteblog.com", "weixin":"iteblog_hadoop"} val rdd = sc.makeRDD(Seq(json1, json2)) EsSpark.saveToEs(rdd, "itzkx/docs", Map("es.mapping.id" -> "id"))
//上面通过es.mapping.id参数将对象中的id字段映射为每条记录的id。*/
}
}
package cn.itzkx.spark_es
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.rdd.Metadata._
object Spark2Eslasti {
/* 自定义记录的元数据 可以在写入数据的时候自定义记录的元数据,如下:*/
def main(args: Array[String]): Unit = {
val master = "local"
val conf = new SparkConf().setAppName("itbex").setMaster(master)
conf.set("es.index.auto.create", "true")
conf.set("es.nodes", "192.168.47.100:9200,192.168.47.110:9200,192.168.47.120:9200") // //设置es的相关参数
// val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//
// //2、构建SparkContext对象
// val sc: SparkContext = spark.sparkContext val sc = new SparkContext(conf) //otp元数据1
val otp = Map("iata" -> "OTP", "name" -> "Otopeni") //muc元数据2
val muc = Map("iata" -> "MUC", "name" -> "Munich") //sfo元数据3
val sfo = Map("iata" -> "SFO", "name" -> "San Fran") val otpMeta = Map(ID -> 1, TTL -> "3h") val mucMeta = Map(ID -> 2, VERSION -> "23") val sfoMeta = Map(ID -> 3) val airportsRDD = sc.makeRDD(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo))) //airportsRDD.saveToEsWithMeta "iteblog/2015"
airportsRDD.saveAsTextFile("itzkx/2015")
}
}
package cn.itzkx.spark_es
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.elasticsearch.spark.sql._
// case class used to define the DataFrame
case class Person(name: String, surname: String, age: Int)
object Spark2Eslastic {
def main(args: Array[String]): Unit = { //设置es的相关参数 val master = "local" val conf = new SparkConf().setAppName("it").setMaster(master) conf.set("es.index.auto.create", "true") conf.set("es.nodes", "192.168.220.75:9200,192.168.220.76:9200,192.168.220.77:9200") val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() //2、构建SparkContext对象 val sc: SparkContext = spark.sparkContext //val sc = new SparkContext(conf) // reusing the example from Spark SQL documentation // sc = existing SparkContext val sqlContext = new SQLContext(sc) // create DataFrame // import spark.implicits._ // val personDF: DataFrame = sc.textFile("people.txt") // .map(_.split(",")) // .map(p => Person(p(0), p(1), p(2).trim.toInt)).toDF() import spark.implicits._ val value = sc.textFile("people.txt").map(line => line.split(",")) val people = value.map(p => { (p(0), p(2)) }) //zhangsan zhangsanfeng 108 val personDF = people.toDF("tax_rate_code", "percentage_rate") personDF.saveToEs("itzkx/personDF") personDF.printSchema() personDF.show() sc.stop()
}
}
以上所使用pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>Spark_all</artifactId> <groupId>cn.itcast.Spark_all</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>Spark_Es</artifactId> <dependencies> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.3.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.3</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>6.4.1</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
</project>
SparkSql将df写入es
// reusing the example from Spark SQL documentation
import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SQLContext._
import org.elasticsearch.spark.sql._ ...
// sc = existing SparkContext
val sqlContext = new SQLContext(sc)
// case class used to define the DataFrame
case class Person(name: String, surname: String, age: Int)
// create DataFrame
val people = sc.textFile("people.txt") .map(_.split(",")) .map(p => Person(p(0), p(1), p(2).trim.toInt)) .toDF()
people.saveToEs("spark/people")
文章来源: www.jianshu.com,作者:百忍成金的虚竹,版权归原作者所有,如需转载,请联系作者。
原文链接:www.jianshu.com/p/abb687fb4cf4
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)