当前位置: 首页>前端>正文

java 根据内容构建modbus java实现基于内容的推荐


目录

  • (1)基于内容推荐原理
  • (2)基于内容推荐优势与应用场景
  • (3)基于内容推荐架构图详解
  • (4)基于内容推荐TF-IDF详解
  • (5)构建基于内容推荐开发环境
  • (6)内容推荐开发步骤与数据演化详解
  • (7)内容推荐余弦相似度算法详解
  • (8)Spark稀疏向量SparseVector详解
  • (9)基于Spark 内容推荐开发
  • (9.1)RDD基于稀疏向量SparseVector实现
  • (9.2)DF基于分布式矩阵实现
  • (9.3)基于业务数据实现内容推荐
  • (10)总结


(1)基于内容推荐原理

概念理解
基于内容的推荐是在推荐引擎出现之初应用最为广泛的推荐机制,它的核心思想是根据推荐物品或内容的元数据,发现物品或者内容的相关性,然后基于用户以往的喜好记录,推荐给用户相似的物品。

基于内容推荐机制的基本原理

下图中给出了基于内容推荐的一个典型的例子,电影推荐系统,首先我们需要对电影的元数据有一个建模,这里只简单的描述了一下电影的类型;然后通过电影的元数据发现电影间的相似度,因为类型都是“爱情,浪漫”电影A和C被认为是相似的电影(当然,只根据类型是不够的,要得到更好的推荐,我们还可以考虑电影的导演,演员等等);最后实现推荐,对于用户A,他喜欢看电影A,那么系统就可以给他推荐类似的电影C

java 根据内容构建modbus java实现基于内容的推荐,java 根据内容构建modbus java实现基于内容的推荐_spark,第1张

基于内容推荐存在的几个缺点

这种基于内容的推荐机制的好处在于它能很好的建模用户的口味,能提供更加精确的推荐。但它也存在以下几个问题:

  1. 某些物品的特征提取比较难,例如:图像、音乐、电影,如果提供这些物品的人没有提供元数据(例如风格、演员、导演、作者等等),自动提取特征比较不容易
  2. 过于专门化。永远不会推荐和用户曾经喜欢的物品不相干的物品,完全没有利用其他用户的喜好来提高对此用户的推荐质量
  3. 对于新用户有冷启动的问题。刚出现的用户的用户画像为空,无法做出推荐

基于内容推荐的举例

java 根据内容构建modbus java实现基于内容的推荐,java 根据内容构建modbus java实现基于内容的推荐_spark_02,第2张

java 根据内容构建modbus java实现基于内容的推荐,java 根据内容构建modbus java实现基于内容的推荐_java 根据内容构建modbus_03,第3张

(2)基于内容推荐优势与应用场景

基于内容推荐的几个优势
基于内容的推荐一般是推荐系统的起步阶段,而且会持续存在,它的重要性不可取代。因为:

  1. 为某一用户做推荐的时候不需要使用其他用户的数据。
  2. 产品冷启动阶段,新的物品要被推荐出去,首选内容推荐。
  3. 可解释性好,产品的特征决定了推荐值

如何处理冷启动

冷启动在推荐系统中非常常见。在基于内容的推荐算法中,一旦一个新用户来了,由于他还没有购买任何的物品,所以无法给他推荐任何物品的。

  1. 推荐目前热度最高的商品;
  2. 让用户自己标记一下自己喜欢的商品类型(APP新用户)

基于内容推荐的应用场景
基于内容推荐的方法特别适用于文本领域,比如新闻的推荐等等。核心在于把商品描述以及内容更好的利用起来。

(3)基于内容推荐架构图详解

基于内容推荐架构图:

java 根据内容构建modbus java实现基于内容的推荐,java 根据内容构建modbus java实现基于内容的推荐_apache_04,第4张

(4)基于内容推荐TF-IDF详解

内容推荐算法

对于基于内容的推荐系统,最简单的推荐算法当然是计算相似性即可,如何计算相似性,那么首先需要将内容转换成了向量(即特征向量)的形式,然后再计算两个向量之间的相似度。

关于特征值的提取,我们有三种方式:TFIDF、Word2Vec、CountVectorizer

TF一IDF原理

TF一IDF (term frequency一inverse document frequency)是非常常用的加权方法,用以评估一字词对于一个文件集或一个语料库中的其中一份文件的重要程度。字词的重要性随着它在文件中出现的次数成正比增加,但同时会随着它在语料库中出现的频率成反比下降。

TF意思是词频(Term Frequency)
IDF意思是逆文本频率指数(Inverse Document Frequency)

词频指的是词语在全部内容中出现的频率,频率越高表示词语在文章中的内容重要程度越高。

逆文本频率指数能够反映在所有参加对比的文章中,词语出现的频率;词语的这个值越大,它在所有文章中出现的频率越低,词语越重要,也就是物以稀为贵的意思。

计算方法如下:
TF=该词在该文章出现次数/该文章总词数
IDF=Ig(所有文章数量/包含该词的文章数)
TF一IDF=TF和IDF乘积

java 根据内容构建modbus java实现基于内容的推荐,java 根据内容构建modbus java实现基于内容的推荐_spark_05,第5张

java 根据内容构建modbus java实现基于内容的推荐,java 根据内容构建modbus java实现基于内容的推荐_spark_06,第6张

取对数(Log)的意义

  1. 缩小数据的绝对数值,方便计算。例如,每个数据项的值都很大,许多这样的值进行计算可能对超过常用数据类型的取值范围,这时取对数,就把数值缩小了,例如TF一IDF计算时,由于在大规模语料库中,很多词的频率是非常大的数字。
  2. 取对数之后不会改变数据的性质和相关关系

TF一IDF原理总结
TF刻画了词语对某篇文档的重要性,IDF刻画了词语对整个文档集的重要性。
TF创建了计算相似形的初级向量,IDF优化了这个初级向量,形成能够比较准确的计算相似形的结果向量。

TF创建了内容的初级画像,IDF (去除停用词)使内容画像越来越清晰。

(5)构建基于内容推荐开发环境

基于小说内容推荐的实施过程

1.准备小说(txt文本)

java 根据内容构建modbus java实现基于内容的推荐,java 根据内容构建modbus java实现基于内容的推荐_java 根据内容构建modbus_07,第7张

2.构建工程,下载IKAnalyzer6.5.0.jar分词插件包,将下载的本地jar包install到Maven库中去

mvn install:install-file -DgroupId=com.ikanalyzer -DartifactId=analyzer -Dversion=1.0 
-Dpackaging=jar -Dfile=/Users/caizhengjie/Desktop/大数据项目实战三/IKAnalyzer6.5.0.jar

3.配置pom文件

<properties>
        <scala.version>2.11.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <spark.version>2.4.7</spark.version>

        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.48</version>
        </dependency>

        <dependency>
            <groupId>com.ikanalyzer</groupId>
            <artifactId>analyzer</artifactId>
            <version>1.0</version>
        </dependency>

    </dependencies>

(6)内容推荐开发步骤与数据演化详解

开发思路:

java 根据内容构建modbus java实现基于内容的推荐,java 根据内容构建modbus java实现基于内容的推荐_apache_08,第8张

数据模型转换过程:

文本                 分词                   计算TF(词频)              计算TF-IDF
1.我喜欢吃喜欢玩  ->  1.我 喜欢 吃 喜欢 玩  -> 6[1,3,4,5],[1,2,1,1]  ->  6[1,3,4,5],[1*log(2/2),2*log(2/2),1*log(2/1),1*log(2/1)]  ->  计算相似度
2.我喜欢旅游  ->   2.我 喜欢 旅游    ->      6[1,2,3],[1,1,1]    ->    6[1,2,3],[1*log(2/2),1*log(2/2),1*log(2/1)]

解释:

假设:“我喜欢吃喜欢玩” 这句话分词分了5个部分“我 喜欢 吃 喜欢 玩”,我将这5个词放到6个桶里面,“我”放在1号桶里面,“喜欢”放在3号桶里面,“吃”放在4号桶里面,“玩”放在5号桶里面
即“6[1,3,4,5],[1,2,1,1]” 6就表示桶的数量,“[1,3,4,5]”表示每个词放在桶的位置,“[1,2,1,1]”表示没个词的词频数,由此可以计算TF-IDF,从而计算相似度

(7)内容推荐余弦相似度算法详解

余弦相似度原理

余弦相似度用向量空间中两个向量夹角的余弦值作为衡量两个个体间差异的大小。余弦值越接近1,就表明夹角越接近0度,也就是两个向量越相似,这就叫"余弦相似性”。

java 根据内容构建modbus java实现基于内容的推荐,java 根据内容构建modbus java实现基于内容的推荐_spark_09,第9张

两个向量间的余弦值可以通过使用欧几里得点积公式得出:

java 根据内容构建modbus java实现基于内容的推荐,java 根据内容构建modbus java实现基于内容的推荐_scala_10,第10张

举例:

文章内容向量A:{3,2,1}

文章内容向量B:{1,1,0}

java 根据内容构建modbus java实现基于内容的推荐,java 根据内容构建modbus java实现基于内容的推荐_spark_11,第11张

(8)Spark稀疏向量SparseVector详解

RDD基于稀疏向量SparseVector实现

1.表示形式
稀疏向量:(4,[0,2,3],[1.0,3.0,4.0])
对应原始向量:(1.0, 0.0, 3.0, 4.0)
稀疏向量分成三个对应元素:(size,indices,values)

说明:其中size是原始向量的长度, indices是原始向量中非零位置的索引下标,values是原始向量中非零下标对应的值,其中indices和values两个数组的长度必须一致。

2.Spark的使用方法
(1) Vector.sparse(4,(0,2,3),(1.0,3.0,4.0))
(2) Vector.sparse(4,(0,1.0),(2,3.0),(3,4.0))

(9)基于Spark 内容推荐开发

(9.1)RDD基于稀疏向量SparseVector实现

代码实现:

package com.similarity

import com.java.similarty.IKUtils
import org.apache.spark.mllib.feature.IDF
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object TFRdd {


  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder
      .appName(this.getClass.getName)
      .master("local[5]")
      .getOrCreate

    val sc = spark.sparkContext;

    val sentenceData = sc.makeRDD(Seq(
      (0,"我喜欢,吃喜欢玩"),
      (1,"我喜欢旅游"),
      (2,"我爱玩"),
      (3,"我爱旅游"),
      (4,"明天下雨")
    ))

    // 分词
    val data_doc = sentenceData.map(x => {
      val novelSeq = IKUtils.ikAnalyzer(x._2).split(" ")
      (x._1,novelSeq)
    })


    // 求TF的特征向量
    val hashingTF = new HashingTF(Math.pow(2,18).toInt)
    val docTF = data_doc.map{
      case (novelName, novelSeq) =>{
        val novelTF = hashingTF.transform(novelSeq)
        (novelName,novelTF)
      }
    }

    for (elem <- docTF.collect()) {
      println(elem._1 + ":" + elem._2)
    }

    docTF.cache()

    // 求IDF的特征向量
    val idf = new IDF().fit(docTF.values)

    val docTFidf = docTF.mapValues(v => idf.transform(v))

    for (elem <- docTFidf.collect()) {
      println(elem._1 + ":" + elem._2)
    }
  }
}

计算TF结果:

0:(262144,[52820,105739,163166,219629],[1.0,1.0,2.0,1.0])
1:(262144,[163166,163936,219629],[1.0,1.0,1.0])
2:(262144,[55961,219629],[1.0,1.0])
3:(262144,[163936,173195,219629],[1.0,1.0,1.0])
4:(262144,[86061,98761],[1.0,1.0])

计算TF-IDF结果:

0:(262144,[52820,105739,163166,219629],[1.0986122886681098,1.0986122886681098,1.3862943611198906,0.1823215567939546])
1:(262144,[163166,163936,219629],[0.6931471805599453,0.6931471805599453,0.1823215567939546])
2:(262144,[55961,219629],[1.0986122886681098,0.1823215567939546])
3:(262144,[163936,173195,219629],[0.6931471805599453,1.0986122886681098,0.1823215567939546])
4:(262144,[86061,98761],[1.0986122886681098,1.0986122886681098])

java 根据内容构建modbus java实现基于内容的推荐,java 根据内容构建modbus java实现基于内容的推荐_java 根据内容构建modbus_12,第12张

公式中使用log函数,当词出现在所有文档中时,它的IDF值变为0(使用IDF(带对数函数)之后,由于log(1)=0,则通过TF一IDF计算出的权重就为0)Spark.mllib 中实现词频率统计使用特征hash的方式,原始特征通过hash函数,映射到一个索引值。后面只需要统计这些索引值的频率,就可以知道对应词的频率这种方式避免设计一个全局1对1的词到索引的映射,这个映射在映射大量语料库时需要花费更长的时间但需要注意,通过hash的方式可能会映射到同一个值的情况,即不同的原始特征通过Hash映射后是同一个值。为了降低这种情况出现的概率,我们只能对特征向量升维。提高hash表的桶数,默认特征维度是2^20 = 1,048,576.

(9.2)DF基于分布式矩阵实现

分布式矩阵

分布式矩阵由长整型的行列索引值和双精度浮点型的元素值组成。它可以分布式地存储在一个或多个RDD上,MLlib提供了三种分布式矩阵的存储方案:行矩阵RowMatrix、索引行矩阵IndexedRowMatrix、坐标矩阵CoordinateMatrix和分块矩阵Block Matrix。 它们都属于org.apache.spark.mllib.linalg.distributed包。

通过分布式矩阵的方式,可以提高相似度计算的效率,达到推荐系统的实现需求。

行矩阵
行矩阵RowMatrix是最基础的分布式矩阵类型。每行是一个本地向量,行索引无实际意义(即无法直接使用)。数据存储在一个由行组成的RDD中,其中每一行都使用一个本地向量来进行存储。由于行是通过本地向量来实现的,故列数(即行的维度)被限制在普通整型(integer)的范围内。在实际使用时,由于单机处理本地向量的存储和通信代价,行维度更是需要被控制在一个更小的范围之内。

索引行矩阵
索引行矩阵IndexedRowMatrix与RowMatrix相似,但它的每一行都带有一个有意义的行索引值,这个索引值可以被用来识别不同行,或是进行诸如join之类的操作。其数据存储在一个由IndexedRow组成的RDD里,即每一行都是一个带长整型索引的本地向量。底层实现,是一个带行索引的RDD,这个RDD,每行是Long型索引和本地向量与RowMatrix类似,IndexedRowMatrix的实例可以通过RDD[IndexedRow]实例来创建。

坐标矩阵

CoordinateMatrix是一个基于矩阵项构成的RDD的分布式矩阵。每一个矩阵项MatrixEntry都是一个三元组:(i: Long,j: Long, value:Double),其中堤行索引,j是列索引,value是该位置的值。坐标矩阵一般在矩阵的两个维度都很大,且矩阵非常稀疏的时候使用。CoordinateMatrix实例可通过RDD[MatrixEntry]实例来创建,其中每一个矩阵项都是一个(rowlndex, collndex, elem)的三元组坐标矩阵可以通过transpose()方法对矩阵进行转置操作,并可以通过自带的tolndexedRowMatrix()方法转换成索引行矩阵IndexedRowMatrix。但目前暂不支持CoordinateMatrix的其他计算操作。

java 根据内容构建modbus java实现基于内容的推荐,java 根据内容构建modbus java实现基于内容的推荐_scala_13,第13张

代码实现:

package com.similarity

import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, IndexedRow, IndexedRowMatrix}
import org.apache.spark.sql.types.{DoubleType, LongType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object TFidfDF {
  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession
      .builder
      .appName(this.getClass.getName)
      .master("local[5]")
      .getOrCreate

    // 数据源
    val sentenceData = spark.createDataFrame(Seq(
      (1,"我 喜欢 吃 喜欢 玩"),
      (2,"我 喜欢 旅游"),
      (5,"我 爱 旅游"),
      (6,"我 喜欢 你")
    )).toDF("id","sentence")

    // tokenizer将分成的词转换成Array,因为在下面的代码transform中需要的是Array
    val tokenizer = new Tokenizer()
      .setInputCol("sentence")
      .setOutputCol("words")

    // 将分成的词转换成Array,并重新返回一个表
    val wordsData = tokenizer.transform(sentenceData)

    wordsData.show(false)
    /**
     * +---+------------------+------------------------+
     * | id|          sentence|                   words|
     * +---+------------------+------------------------+
     * |  0|我 喜欢 吃 喜欢 玩|[我, 喜欢, 吃, 喜欢, 玩]|
     * |  1|      我 喜欢 旅游|        [我, 喜欢, 旅游]|
     * |  2|        我 爱 旅游|          [我, 爱, 旅游]|
     * |  3|        我 喜欢 你|          [我, 喜欢, 你]|
     * +---+------------------+------------------------+
     */

    // 空格分词,将每个词转换成Int型,并计算在其文档中的词频(TF)
    // setNumFeatures(200)表示将Hash分桶的数量设置为200个,可以根据你的词语数量来调整,一般来说,
    // 这个值越大不同的词被计算为一个Hash值的概率就越小,数据也更准确,但需要消耗更大的内存
    val hashModel = new HashingTF()
      .setInputCol("words")
      .setOutputCol("rawFeatures")
      .setNumFeatures(Math.pow(2,18).toInt)

    val featurizedData = hashModel.transform(wordsData)
    featurizedData.show(false)
    /**
     * +---+------------------+------------------------+-------------------------------------------------------+
     * |id |sentence          |words                   |rawFeatures                                            |
     * +---+------------------+------------------------+-------------------------------------------------------+
     * |0  |我 喜欢 吃 喜欢 玩|[我, 喜欢, 吃, 喜欢, 玩]|(262144,[52820,105739,163166,219629],[1.0,1.0,2.0,1.0])|
     * |1  |我 喜欢 旅游      |[我, 喜欢, 旅游]        |(262144,[163166,163936,219629],[1.0,1.0,1.0])          |
     * |2  |我 爱 旅游        |[我, 爱, 旅游]          |(262144,[163936,173195,219629],[1.0,1.0,1.0])          |
     * |3  |我 喜欢 你        |[我, 喜欢, 你]          |(262144,[129171,163166,219629],[1.0,1.0,1.0])          |
     * +---+------------------+------------------------+-------------------------------------------------------+
     */

    // 计算IDF
    val idf = new IDF()
      .setInputCol("rawFeatures")
      .setOutputCol("features")

    val idfModel = idf.fit(featurizedData)

    // 算出TF-IDF结果
    val tfidfData = idfModel.transform(featurizedData).select("id","features")

    tfidfData.show(false)

    /**
     * +---+-----------------------------------------------------------------------------------------------------+
     * |id |features                                                                                             |
     * +---+-----------------------------------------------------------------------------------------------------+
     * |0  |(262144,[52820,105739,163166,219629],[0.9162907318741551,0.9162907318741551,0.44628710262841953,0.0])|
     * |1  |(262144,[163166,163936,219629],[0.22314355131420976,0.5108256237659907,0.0])                         |
     * |2  |(262144,[163936,173195,219629],[0.5108256237659907,0.9162907318741551,0.0])                          |
     * |3  |(262144,[129171,163166,219629],[0.9162907318741551,0.22314355131420976,0.0])                         |
     * +---+-----------------------------------------------------------------------------------------------------+
     */

    // 通过分布式矩阵计算相似性
    val colSimilar : CoordinateMatrix = new IndexedRowMatrix(tfidfData.rdd.map{
      case Row(id : Int , v : org.apache.spark.ml.linalg.Vector) =>
        IndexedRow(id , Vectors.fromML(v))
    }).toCoordinateMatrix()
      .transpose()
      .toRowMatrix().columnSimilarities()

    val colSimlilarRow = colSimilar.entries.map(line => Row(line.i,line.j,line.value))

    val structFieldDF = Array(
      StructField("id",LongType,nullable = true),
      StructField("similar_id",LongType,nullable = true),
      StructField("score",DoubleType,nullable = true)
    )

    val structType = StructType(structFieldDF)

    // 计算相似性结果
    val result = spark.createDataFrame(colSimlilarRow,structType)
    result.sort(result("score").desc).show(false)

    /**
     * +---+----------+-------------------+
     * |id |similar_id|score              |
     * +---+----------+-------------------+
     * |2  |5         |0.4462193664666129 |
     * |1  |2         |0.1303511225242502 |
     * |2  |6         |0.09471720742031595|
     * |1  |6         |0.07704888495631654|
     * +---+----------+-------------------+
     */

    // 将结果写入到mysql数据库
    result.write
      .format("jdbc")
      .option("url","jdbc:mysql://bigdata-pro-m04:3306/db_novel")
      .option("dbtable","similar_df")
      .option("user","root")
      .option("password","199911")
      .option("driver","com.mysql.jdbc.Driver")
      .save()

  }
}

TF-IDF源码分析:

java 根据内容构建modbus java实现基于内容的推荐,java 根据内容构建modbus java实现基于内容的推荐_spark_14,第14张

(9.3)基于业务数据实现内容推荐

根据上面的内容推荐模版,实现小说的内容推荐

代码实现:

package com.similarity

import com.java.similarty.IKUtils
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, IndexedRow, IndexedRowMatrix}
import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object NovelDFSimilar {
  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession
      .builder
      .appName(this.getClass.getName)
      .master("local[5]")
      .getOrCreate

    val sc = spark.sparkContext

    // 数据路径
    val fileRdd = sc.wholeTextFiles("/Users/caizhengjie/Desktop/大数据项目实战三/novel_doc")

    // 创建表名
    val structFieldNovel = Array(
      StructField("novelName",StringType,nullable = true),
      StructField("novelContent",StringType,nullable = true)
    )

    val structTypeNovel = StructType(structFieldNovel)

    // 将novelName提取出来,将novelContent分词
    val ikwordRdd = fileRdd.map(eachNovel => {
      val novelName = eachNovel._1.split("/").last.replace(".txt","")
      val novelContent = IKUtils.ikAnalyzer(eachNovel._2)
      Row(novelName,novelContent)
    })

    // 创建novelDF数据源
    val novelDF = spark.createDataFrame(ikwordRdd,structTypeNovel)

    // tokenizer将分成的词转换成Array,因为在下面的代码transform中需要的是Array
    val tokenizer = new Tokenizer()
      .setInputCol("novelContent")
      .setOutputCol("words")

    // 将分成的词转换成Array,并重新返回一个表
    val wordsData = tokenizer.transform(novelDF)

    // 空格分词,将每个词转换成Int型,并计算在其文档中的词频(TF)
    // setNumFeatures(200)表示将Hash分桶的数量设置为200个,可以根据你的词语数量来调整,一般来说,
    // 这个值越大不同的词被计算为一个Hash值的概率就越小,数据也更准确,但需要消耗更大的内存
    val hashModel = new HashingTF()
      .setInputCol("words")
      .setOutputCol("rawFeatures")
      .setNumFeatures(Math.pow(2,20).toInt)

    val featurizedData = hashModel.transform(wordsData)

    // 计算IDF
    val idf = new IDF()
      .setInputCol("rawFeatures")
      .setOutputCol("features")

    val idfModel = idf.fit(featurizedData)

    // 算出TF-IDF结果
    val tfidfData = idfModel.transform(featurizedData).select("novelName","features")

    // 通过分布式矩阵计算相似性
    val colSimilar : CoordinateMatrix  = new IndexedRowMatrix(tfidfData.rdd.map{
      case Row(_,v : org.apache.spark.ml.linalg.Vector) => (Vectors.fromML(v))
    }.zipWithIndex.map{
      case (v , i ) => IndexedRow(i , v)
    }).toCoordinateMatrix()
      .transpose()
      .toRowMatrix().columnSimilarities()

    val colSimlilarRow = colSimilar.entries.map(line => Row(line.i,line.j,line.value))

    val structFieldDF = Array(
      StructField("id",LongType,nullable = true),
      StructField("similar_id",LongType,nullable = true),
      StructField("score",DoubleType,nullable = true)
    )

    val structType = StructType(structFieldDF)

    // 计算相似性结果
    val result = spark.createDataFrame(colSimlilarRow,structType)
    result.sort(result("score").desc).show(false)

    /**
     * +---+----------+--------------------+
     * |id |similar_id|score               |
     * +---+----------+--------------------+
     * |1  |13        |0.09571598783452588 |
     * |0  |6         |0.08929584987265798 |
     * |0  |3         |0.08605577188991365 |
     * |13 |17        |0.08217174035151106 |
     * |10 |17        |0.07877005714926918 |
     * |4  |5         |0.07764874673752648 |
     * |4  |17        |0.07732714194268539 |
     * |15 |17        |0.07039954411051563 |
     * |4  |10        |0.07037576188770689 |
     * |10 |15        |0.06676619187615601 |
     * |0  |10        |0.06055534696058648 |
     * |0  |17        |0.058161958421292986|
     * |2  |4         |0.05596547961792596 |
     * |10 |14        |0.055509500020517265|
     * |8  |15        |0.05404924627963231 |
     * |7  |15        |0.053519500617819946|
     * |4  |8         |0.05158017049023847 |
     * |2  |10        |0.04938876120139621 |
     * |14 |19        |0.04865702296296277 |
     * |10 |19        |0.04851523036427419 |
     * +---+----------+--------------------+
     */

    // 将结果写入到mysql数据库
    result.write
      .format("jdbc")
      .option("url","jdbc:mysql://bigdata-pro-m04:3306/db_novel")
      .option("dbtable","novel_similar_df")
      .option("user","root")
      .option("password","199911")
      .option("driver","com.mysql.jdbc.Driver")
      .save()
  }
}

(10)总结

TF-IDF:计算文本内容的特征值
相似度算法:余弦相似度 -> 修正余弦相似度

推荐计算过程:
原内容 -> ikanalyzer分词 -> TF向量(Spark => SparseVector) -> IDF向量 -> TF-IDF向量(Spark => SparseVector => Spark DF(RowMatrix)) -> 相似度

RDD基于稀疏向量通过余弦相似度算法计算相似度
DF基于分布式矩阵通过余弦相似度算法计算相似度


https://www.xamrdz.com/web/22d1942918.html

相关文章: