数据湖之AI探索
【摘要】 近几年,人工智能越来越多的应用到人们的生活的各个领域,比如网购、金融、医疗、客服等等,其日渐成熟,离不开大数据的支撑。模型训练需要海量的数据参与,并且之后也需要使用得到的模型进行海量数据的分析与预测,以及模型的实时修正,这些都需要足够的计算资源支撑,云与AI的结合,为海量数据的机器学习提供了一个便利的平台,用户无需自己购买机器搭建计算平台,可以直接使用云上已有的大数据服务进行业务相关的实现。...
近几年,人工智能越来越多的应用到人们的生活的各个领域,比如网购、金融、医疗、客服等等,其日渐成熟,离不开大数据的支撑。模型训练需要海量的数据参与,并且之后也需要使用得到的模型进行海量数据的分析与预测,以及模型的实时修正,这些都需要足够的计算资源支撑,云与AI的结合,为海量数据的机器学习提供了一个便利的平台,用户无需自己购买机器搭建计算平台,可以直接使用云上已有的大数据服务进行业务相关的实现。
下面举例了一个使用华为云的数据湖探索DLI提供的serverless服务进行图片分类的例子,图片数据从DIS服务的通道中流式读取,然后使用Spark进行图片分类,并将分类结果写入到OBS服务中,代码如下:
package org.apache.spark.streaming.dli.example import org.apache.spark.streaming.{Seconds, StreamingContext} import org.slf4j.LoggerFactory import org.opencv.core._ import org.{tensorflow => tf} import java.nio.file.{Files, Paths} import java.util.Base64 import org.opencv.imgcodecs.Imgcodecs import org.opencv.imgproc.Imgproc import org.apache.spark.SparkConf import org.apache.spark.sql.{SQLContext, SaveMode} import org.apache.spark.streaming.dis.{ConsumerStrategies, DISUtils} object DLISparkStreamingExample extends Serializable { private val log = LoggerFactory.getLogger(DLISparkStreamingExample.getClass) def main(args: Array[String]): Unit = { if (args.length < 8) { println(s"args is wrong, should be [endpoint region ak sk projectId streamName startingOffsets duration]".stripMargin) return } val (endpoint, region, ak, sk, projectId, streamName, startingOffsets, duration, moxingFile, w, h, savePath) = (args(0), args(1), args(2), args(3), args(4), args(5), args(6), args(7), args(8), args(9), args(10), args(11)) val sparkConf = new SparkConf() .setAppName("Spark streaming DIS Assign example") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.sql.codegen.wholeStage", "false") .set("spark.hadoop.fs.obs.access.key", ak) .set("spark.hadoop.fs.obs.secret.key", sk) val ssc = new StreamingContext(sparkConf, Seconds(duration.toInt)) val params = Map( DISUtils.PROPERTY_ENDPOINT -> endpoint, DISUtils.PROPERTY_REGION_ID -> region, DISUtils.PROPERTY_AK -> ak, DISUtils.PROPERTY_SK -> sk, DISUtils.PROPERTY_PROJECT_ID -> projectId) val stream = DISUtils.createDirectStream[String, String]( ssc, ConsumerStrategies.Assign[String, String](streamName, params, startingOffsets)) val sqlContext = SQLContext.getOrCreate(ssc.sparkContext) import sqlContext.implicits._ var index = 1 stream.foreachRDD { rdd => val result = rdd.map(_.value()).map { picture => val pp = Base64.getDecoder.decode(picture) val re = classificationPredict(safeResize(pp, w.toInt, h.toInt), moxingFile) (re.head._1, re.head._2) } result.toDF("pv", "type").write.option("header", "true").csv(s"obs://$savePath/$index") index = index + 1 } ssc.start() ssc.awaitTermination() } def readPic(path: String): Array[Byte] = { try { val modelData = Files.readAllBytes(Paths.get(path)) modelData } catch { case e: Exception => e.printStackTrace() new Array[Byte](0) } } def classificationPredict(imgData: Array[Byte], moxingPath: String): Array[(Float, Int)] = { val graph = new tf.Graph() val moxingResource = DLISparkStreamingExample.getClass.getClassLoader.getResource(moxingPath) if (moxingResource != null) { val moxingPath = moxingResource.getPath graph.importGraphDef(Files.readAllBytes(Paths.get(moxingPath))) log.info("moxingPath: " + moxingResource.getPath) } else { log.info("moxingPath: null") } val session = new tf.Session(graph) // get input and output val opIter = graph.operations() var inputName: String = null if (opIter.hasNext) { inputName = opIter.next().name() + ":0" } var outputName: String = null while (opIter.hasNext) { outputName = opIter.next().name() + ":0" } // model predict var y_pred: Array[(Float, Int)] = null if (inputName != null && outputName != null) { val x = tf.Tensor.create(imgData) val y = session.runner().feed(inputName, x).fetch(outputName).run().get(0) val scores = Array.ofDim[Float](y.shape()(0).toInt, y.shape()(1).toInt) y.copyTo(scores) y_pred = scores.map(x => (x.max, x.indexOf(x.max))) } session.close() y_pred } def fromBytes(bytes: Array[Byte]): Mat = { Imgcodecs.imdecode(new MatOfByte(bytes: _*), Imgcodecs.CV_LOAD_IMAGE_UNCHANGED) } def safeResize(bytes: Array[Byte], w: Int, h: Int): Array[Byte] = { try { resize(bytes, w, h) } catch { case t: UnsatisfiedLinkError => nu.pattern.OpenCV.loadShared() try { System.loadLibrary(Core.NATIVE_LIBRARY_NAME) } catch { case f: UnsatisfiedLinkError => } resize(bytes, w, h) case e: Throwable => resize(bytes, w, h) } } def resize(bytes: Array[Byte], w: Int, h: Int): Array[Byte] = { val img = fromBytes(bytes) val resizedImg = resize_transform(img, w, h) toBytes(resizedImg) } def resize_transform(img: Mat, w: Int, h: Int): Mat = { val resizedImg = new Mat val newSize = new Size(w, h) Imgproc.resize(img, resizedImg, newSize) resizedImg } def toBytes(img: Mat): Array[Byte] = { // Output quality val imageQuality = new MatOfInt(Imgcodecs.CV_IMWRITE_JPEG_QUALITY, 95) // Encode image for sending back val matOfByte = new MatOfByte Imgcodecs.imencode(".jpg", img, matOfByte, imageQuality) matOfByte.toArray } }
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)