现象

在运行spark程序期间,在针对 dataFrame 的map操作中,产生了类似HashMap key重复的现象,如图所示

有两个重复健

这个问题导致了后续统计上的一系列问题

分析

起初我们是实际跟踪代码进行分析的,但是发现scala代码中没有任何问题,各种处理也非常合理

代码罗列如下,想看就看看,不看也能理解

    // 目标是针对businessid进行聚合,然后输出各个业务id下,销售天数
    // 获取数据的代码
    val dataframe = spark.sql("select businessid,date,money from table1")

    case class Stat() {
      // 针对每一个businessid的统计
      var moneyByDay: HashMap[String, Double] = HashMap[String, Double]()


      // 针对每一条记录的id进行相加
      def moneyByDayOp(data: (String, Double)) = {
        if (this.moneyByDay.contains(data._1)) {
          val tmpMoney = this.moneyByDay.get(data._1)
          val finalMoney = tmpMoney.getOrElse[Double](0) + data._2
          this.moneyByDay.remove(data._1)
          this.moneyByDay.put(data._1, finalMoney)
        } else {
          if (data._2 > 0)
            this.moneyByDay.put(data._1, data._2)
        }
      }
    }
    /**
      * 归并多条记录的结果
      **/
    def reduceStatModel(one: Stat, another: Stat): Stat = {
      one.moneyByDay ++= another.moneyByDay
      one
    }

    /**
      * 针对每条记录生成一个统计值
      **/
    import org.apache.spark.sql.Row
    def parseProductDay(data: Row): (Long, Stat) = {
      val result: Stat = new Stat
      val a: String = data.getAs[String]("date")
      result.moneyByDayOp(a, data.getAs[Long]("money"))
      (data.getAs[Long]("product_id"), result)
    }

    // 核心流程
    val finalRs = dataframe.rdd
      .map(line => parseProductDay(line))
      .reduceByKey(reduceStatModel(_, _))

    val hashMp = finalRs.collect()(0)._2.moneyByDay
    hashMp.put("20170727",1)
    

针对的存储,其实就是Stat类中的 moneyByDay对象,本质上是一个HashMap,并且通过泛型控制,key类型为String,value的类型为Double

放到集群上执行,是可以通过的,并且得到类似上图的结论

hashMap中包含重复的key,只能是两个可能

  1. key类型相同,但是可能原始字符串中有空格,或者不可见字符
  2. key类型不同,一个是字符串,另一个是其他数据类型

经过检查,1的可能性排除

问题原因是2,居然是2,排除了所有不可能之后,最后的真相即使再不可能,也是真的

原因

其实这种结论是意料之外,情理之中

众所周知,java的泛型检查是编译时的检查,实际运行时,容器类存储和运算都是将对象看做object进行处理的

对于基于jvm的scala,泛型本质上也是一个静态编译检查

上述代码,如果table1表中的字段date,其类型是string,那么万事ok,运算和结论都会正常

但是如果date是其他类型,比如int,那么就会产生问题

问题出现在上述代码的37行

 val a: String = data.getAs[String]("date")
// 这行代码是org.apache.spark.sql.Row对象的一个调用,目的是获取指定类型的字段,并且转化为指定类型

// 源码如下:


  /**
   * Returns the value at position i.
   * For primitive types if value is null it returns 'zero value' specific for primitive
   * ie. 0 for Int - use isNullAt to ensure that value is not null
   *
   * @throws ClassCastException when data type does not match.
   */
  def getAs[T](i: Int): T = get(i).asInstanceOf[T] // 此处是进行强制转化

  /**
   * Returns the value of a given fieldName.
   * For primitive types if value is null it returns 'zero value' specific for primitive
   * ie. 0 for Int - use isNullAt to ensure that value is not null
   *
   * @throws UnsupportedOperationException when schema is not defined.
   * @throws IllegalArgumentException when fieldName do not exist.
   * @throws ClassCastException when data type does not match.
   */
  def getAs[T](fieldName: String): T = getAs[T](fieldIndex(fieldName))

解决方案

  1. 将table1表的字段设计成string,代码可以运行通过

  2. 37行代码改为如下,转化string即可

     val a: String = String.valueOf(data.getAs[String]("date"))
    

疑问

在spark-shell中调用这段代码,其实是会报错的

val a = data.getAs[String]("date")

异常截图

但是为什么集群执行会通过?

并且返回了一个HashMap[String,Double],其中的key都是Int。。。每次调用foreach方法都会报错

普通的foreach操作,抛出了强转异常