数据湖之AI探索
【摘要】 近几年,人工智能越来越多的应用到人们的生活的各个领域,比如网购、金融、医疗、客服等等,其日渐成熟,离不开大数据的支撑。模型训练需要海量的数据参与,并且之后也需要使用得到的模型进行海量数据的分析与预测,以及模型的实时修正,这些都需要足够的计算资源支撑,云与AI的结合,为海量数据的机器学习提供了一个便利的平台,用户无需自己购买机器搭建计算平台,可以直接使用云上已有的大数据服务进行业务相关的实现。...
近几年,人工智能越来越多的应用到人们的生活的各个领域,比如网购、金融、医疗、客服等等,其日渐成熟,离不开大数据的支撑。模型训练需要海量的数据参与,并且之后也需要使用得到的模型进行海量数据的分析与预测,以及模型的实时修正,这些都需要足够的计算资源支撑,云与AI的结合,为海量数据的机器学习提供了一个便利的平台,用户无需自己购买机器搭建计算平台,可以直接使用云上已有的大数据服务进行业务相关的实现。
下面举例了一个使用华为云的数据湖探索DLI提供的serverless服务进行图片分类的例子,图片数据从DIS服务的通道中流式读取,然后使用Spark进行图片分类,并将分类结果写入到OBS服务中,代码如下:
package org.apache.spark.streaming.dli.example
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.slf4j.LoggerFactory
import org.opencv.core._
import org.{tensorflow => tf}
import java.nio.file.{Files, Paths}
import java.util.Base64
import org.opencv.imgcodecs.Imgcodecs
import org.opencv.imgproc.Imgproc
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.streaming.dis.{ConsumerStrategies, DISUtils}
object DLISparkStreamingExample extends Serializable {
private val log = LoggerFactory.getLogger(DLISparkStreamingExample.getClass)
def main(args: Array[String]): Unit = {
if (args.length < 8) {
println(s"args is wrong, should be [endpoint region ak sk projectId streamName startingOffsets duration]".stripMargin)
return
}
val (endpoint, region, ak, sk, projectId, streamName, startingOffsets, duration, moxingFile, w, h, savePath)
= (args(0), args(1), args(2), args(3), args(4), args(5), args(6), args(7), args(8), args(9), args(10), args(11))
val sparkConf = new SparkConf()
.setAppName("Spark streaming DIS Assign example")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.sql.codegen.wholeStage", "false")
.set("spark.hadoop.fs.obs.access.key", ak)
.set("spark.hadoop.fs.obs.secret.key", sk)
val ssc = new StreamingContext(sparkConf, Seconds(duration.toInt))
val params = Map(
DISUtils.PROPERTY_ENDPOINT -> endpoint,
DISUtils.PROPERTY_REGION_ID -> region,
DISUtils.PROPERTY_AK -> ak,
DISUtils.PROPERTY_SK -> sk,
DISUtils.PROPERTY_PROJECT_ID -> projectId)
val stream = DISUtils.createDirectStream[String, String](
ssc,
ConsumerStrategies.Assign[String, String](streamName, params, startingOffsets))
val sqlContext = SQLContext.getOrCreate(ssc.sparkContext)
import sqlContext.implicits._
var index = 1
stream.foreachRDD { rdd =>
val result = rdd.map(_.value()).map { picture =>
val pp = Base64.getDecoder.decode(picture)
val re = classificationPredict(safeResize(pp, w.toInt, h.toInt), moxingFile)
(re.head._1, re.head._2)
}
result.toDF("pv", "type").write.option("header", "true").csv(s"obs://$savePath/$index")
index = index + 1
}
ssc.start()
ssc.awaitTermination()
}
def readPic(path: String): Array[Byte] = {
try {
val modelData = Files.readAllBytes(Paths.get(path))
modelData
} catch {
case e: Exception =>
e.printStackTrace()
new Array[Byte](0)
}
}
def classificationPredict(imgData: Array[Byte], moxingPath: String): Array[(Float, Int)] = {
val graph = new tf.Graph()
val moxingResource = DLISparkStreamingExample.getClass.getClassLoader.getResource(moxingPath)
if (moxingResource != null) {
val moxingPath = moxingResource.getPath
graph.importGraphDef(Files.readAllBytes(Paths.get(moxingPath)))
log.info("moxingPath: " + moxingResource.getPath)
} else {
log.info("moxingPath: null")
}
val session = new tf.Session(graph)
// get input and output
val opIter = graph.operations()
var inputName: String = null
if (opIter.hasNext) {
inputName = opIter.next().name() + ":0"
}
var outputName: String = null
while (opIter.hasNext) {
outputName = opIter.next().name() + ":0"
}
// model predict
var y_pred: Array[(Float, Int)] = null
if (inputName != null && outputName != null) {
val x = tf.Tensor.create(imgData)
val y = session.runner().feed(inputName, x).fetch(outputName).run().get(0)
val scores = Array.ofDim[Float](y.shape()(0).toInt, y.shape()(1).toInt)
y.copyTo(scores)
y_pred = scores.map(x => (x.max, x.indexOf(x.max)))
}
session.close()
y_pred
}
def fromBytes(bytes: Array[Byte]): Mat = {
Imgcodecs.imdecode(new MatOfByte(bytes: _*), Imgcodecs.CV_LOAD_IMAGE_UNCHANGED)
}
def safeResize(bytes: Array[Byte], w: Int, h: Int): Array[Byte] = {
try {
resize(bytes, w, h)
} catch {
case t: UnsatisfiedLinkError =>
nu.pattern.OpenCV.loadShared()
try {
System.loadLibrary(Core.NATIVE_LIBRARY_NAME)
} catch {
case f: UnsatisfiedLinkError =>
}
resize(bytes, w, h)
case e: Throwable =>
resize(bytes, w, h)
}
}
def resize(bytes: Array[Byte], w: Int, h: Int): Array[Byte] = {
val img = fromBytes(bytes)
val resizedImg = resize_transform(img, w, h)
toBytes(resizedImg)
}
def resize_transform(img: Mat, w: Int, h: Int): Mat = {
val resizedImg = new Mat
val newSize = new Size(w, h)
Imgproc.resize(img, resizedImg, newSize)
resizedImg
}
def toBytes(img: Mat): Array[Byte] = {
// Output quality
val imageQuality = new MatOfInt(Imgcodecs.CV_IMWRITE_JPEG_QUALITY, 95)
// Encode image for sending back
val matOfByte = new MatOfByte
Imgcodecs.imencode(".jpg", img, matOfByte, imageQuality)
matOfByte.toArray
}
}
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
作者其他文章
评论(0)