数据湖之AI探索

举报
Alice215 发表于 2020/08/31 15:43:22 2020/08/31
【摘要】 近几年,人工智能越来越多的应用到人们的生活的各个领域,比如网购、金融、医疗、客服等等,其日渐成熟,离不开大数据的支撑。模型训练需要海量的数据参与,并且之后也需要使用得到的模型进行海量数据的分析与预测,以及模型的实时修正,这些都需要足够的计算资源支撑,云与AI的结合,为海量数据的机器学习提供了一个便利的平台,用户无需自己购买机器搭建计算平台,可以直接使用云上已有的大数据服务进行业务相关的实现。...

       近几年,人工智能越来越多的应用到人们的生活的各个领域,比如网购、金融、医疗、客服等等,其日渐成熟,离不开大数据的支撑。模型训练需要海量的数据参与,并且之后也需要使用得到的模型进行海量数据的分析与预测,以及模型的实时修正,这些都需要足够的计算资源支撑,云与AI的结合,为海量数据的机器学习提供了一个便利的平台,用户无需自己购买机器搭建计算平台,可以直接使用云上已有的大数据服务进行业务相关的实现。


       下面举例了一个使用华为云的数据湖探索DLI提供的serverless服务进行图片分类的例子,图片数据从DIS服务的通道中流式读取,然后使用Spark进行图片分类,并将分类结果写入到OBS服务中,代码如下:

package org.apache.spark.streaming.dli.example
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.slf4j.LoggerFactory
import org.opencv.core._
import org.{tensorflow => tf}
import java.nio.file.{Files, Paths}
import java.util.Base64
import org.opencv.imgcodecs.Imgcodecs
import org.opencv.imgproc.Imgproc
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.streaming.dis.{ConsumerStrategies, DISUtils}
object DLISparkStreamingExample extends Serializable {
  private val log = LoggerFactory.getLogger(DLISparkStreamingExample.getClass)
  def main(args: Array[String]): Unit = {
    if (args.length < 8) {
      println(s"args is wrong, should be [endpoint region ak sk projectId streamName startingOffsets duration]".stripMargin)
      return
    }
    val (endpoint, region, ak, sk, projectId, streamName, startingOffsets, duration, moxingFile, w, h, savePath)
    = (args(0), args(1), args(2), args(3), args(4), args(5), args(6), args(7), args(8), args(9), args(10), args(11))
    val sparkConf = new SparkConf()
      .setAppName("Spark streaming DIS Assign example")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.sql.codegen.wholeStage", "false")
      .set("spark.hadoop.fs.obs.access.key", ak)
      .set("spark.hadoop.fs.obs.secret.key", sk)
    val ssc = new StreamingContext(sparkConf, Seconds(duration.toInt))
    val params = Map(
      DISUtils.PROPERTY_ENDPOINT -> endpoint,
      DISUtils.PROPERTY_REGION_ID -> region,
      DISUtils.PROPERTY_AK -> ak,
      DISUtils.PROPERTY_SK -> sk,
      DISUtils.PROPERTY_PROJECT_ID -> projectId)
    val stream = DISUtils.createDirectStream[String, String](
      ssc,
      ConsumerStrategies.Assign[String, String](streamName, params, startingOffsets))
    val sqlContext = SQLContext.getOrCreate(ssc.sparkContext)
    import sqlContext.implicits._
    var index = 1
    stream.foreachRDD { rdd =>
      val result = rdd.map(_.value()).map { picture =>
        val pp = Base64.getDecoder.decode(picture)
        val re = classificationPredict(safeResize(pp, w.toInt, h.toInt), moxingFile)
        (re.head._1, re.head._2)
      }
      result.toDF("pv", "type").write.option("header", "true").csv(s"obs://$savePath/$index")
      index = index + 1
    }
    ssc.start()
    ssc.awaitTermination()
  }
  def readPic(path: String): Array[Byte] = {
    try {
      val modelData = Files.readAllBytes(Paths.get(path))
      modelData
    } catch {
      case e: Exception =>
        e.printStackTrace()
        new Array[Byte](0)
    }
  }
  def classificationPredict(imgData: Array[Byte], moxingPath: String): Array[(Float, Int)] = {
    val graph = new tf.Graph()
    val moxingResource = DLISparkStreamingExample.getClass.getClassLoader.getResource(moxingPath)
    if (moxingResource != null) {
      val moxingPath = moxingResource.getPath
      graph.importGraphDef(Files.readAllBytes(Paths.get(moxingPath)))
      log.info("moxingPath: " + moxingResource.getPath)
    } else {
      log.info("moxingPath: null")
    }
    val session = new tf.Session(graph)
    // get input and output
    val opIter = graph.operations()
    var inputName: String = null
    if (opIter.hasNext) {
      inputName = opIter.next().name() + ":0"
    }
    var outputName: String = null
    while (opIter.hasNext) {
      outputName = opIter.next().name() + ":0"
    }
    // model predict
    var y_pred: Array[(Float, Int)] = null
    if (inputName != null && outputName != null) {
      val x = tf.Tensor.create(imgData)
      val y = session.runner().feed(inputName, x).fetch(outputName).run().get(0)
      val scores = Array.ofDim[Float](y.shape()(0).toInt, y.shape()(1).toInt)
      y.copyTo(scores)
      y_pred = scores.map(x => (x.max, x.indexOf(x.max)))
    }
    session.close()
    y_pred
  }
  def fromBytes(bytes: Array[Byte]): Mat = {
    Imgcodecs.imdecode(new MatOfByte(bytes: _*), Imgcodecs.CV_LOAD_IMAGE_UNCHANGED)
  }
  def safeResize(bytes: Array[Byte], w: Int, h: Int): Array[Byte] = {
    try {
      resize(bytes, w, h)
    } catch {
      case t: UnsatisfiedLinkError =>
        nu.pattern.OpenCV.loadShared()
        try {
          System.loadLibrary(Core.NATIVE_LIBRARY_NAME)
        } catch {
          case f: UnsatisfiedLinkError =>
        }
        resize(bytes, w, h)
      case e: Throwable =>
        resize(bytes, w, h)
    }
  }
  def resize(bytes: Array[Byte], w: Int, h: Int): Array[Byte] = {
    val img = fromBytes(bytes)
    val resizedImg = resize_transform(img, w, h)
    toBytes(resizedImg)
  }
  def resize_transform(img: Mat, w: Int, h: Int): Mat = {
    val resizedImg = new Mat
    val newSize = new Size(w, h)
    Imgproc.resize(img, resizedImg, newSize)
    resizedImg
  }
  def toBytes(img: Mat): Array[Byte] = {
    // Output quality
    val imageQuality = new MatOfInt(Imgcodecs.CV_IMWRITE_JPEG_QUALITY, 95)
    // Encode image for sending back
    val matOfByte = new MatOfByte
    Imgcodecs.imencode(".jpg", img, matOfByte, imageQuality)
    matOfByte.toArray
  }
}



【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。