当前位置: 首页>编程语言>正文

spark读取orc文件需要的依赖 spark读取hfile


目录

  • 1.使用scala读取文件,生成hfile,bulkload
  • 2.展示一下java代码,不如scala快
  • 3.暂时认为最靠谱的put批量操作


如果你支持scala,恭喜你,很容易

一般写spark程序,文件路径都设置成hdfs路径,不容易报错;要不然容易踩坑,(⊙o⊙)…我踩坑了、将文件上传到hdfs吧。文件路径就写成 hdfs:///

注意

1.使用spark的saveAsNewAPIHadoopFile一定要对rowkey,列族,子列 进行排序,否则执行的时候会报错: java.io.IOException:added a key lexically larger than previous.current call =…

spark读取orc文件需要的依赖 spark读取hfile,spark读取orc文件需要的依赖 spark读取hfile_spark读取orc文件需要的依赖,第1张

2.执行spark的时候报错

Exception in thread “main” java.lang.noclassDefErro:org/apache/zookeeper/KeeperException或者noSuchMethod或者其他的noclassDef…

百度的时候会说是jar包冲突了,但是我实际解决的时候是缺少依赖,导入相应依赖就可以了,将scope设置成comple 不要provided。当你真正导入对应的jar包的时候,再考虑jar包冲突。

3.Exception in thread “main” java.lang.noclassDefFoundErro:org/apache/spark/sparkConf
明明导入了spark的jar包,还报错。 我的解决办法是 把spark对应的依赖放在其他依赖的上面,问题完全解决。因为别的依赖可能会含有spark的依赖。依赖冲突解决办法之一: 对多版本冲突的jar,显式定义 maven有依赖传递有限规则,显式有限,传递依赖靠后。

1.使用scala读取文件,生成hfile,bulkload

import cn.doitedu.commons.utils.SparkUtil
import com.sun.mail.smtp.DigestMD5
import org.apache.commons.codec.digest.DigestUtils
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.{ConnectionFactory, TableDescriptor, TableDescriptorBuilder}
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, TableOutputFormat}
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.SparkSession
object Data2Hbase {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("")
      .master("local")
      //配置序列化方式为Kryo
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()
    import spark.implicits._

    // 加载数据
    val tagsDF = spark.read.parquet("路径")
   
    val conf = HBaseConfiguration.create()
    //配置zookeeper
    conf.set("hbase.zookeeper.quorum", "?01:2181,?t02:2181。。。")
    //配置hbase表名
    conf.set(TableOutputFormat.OUTPUT_TABLE, "表名")
    //配置文件系统
    conf.set("fs.defaultFS", "hdfs://?:8020/")

    val job = Job.getInstance(conf)
    // 指定rowkey类型
    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) 
    job.setMapOutputValueClass(classOf[KeyValue])

    val tableDesc = TableDescriptorBuilder.newBuilder(TableName.valueOf("profile_tags")).build()
    HFileOutputFormat2.configureIncrementalLoadMap(job, tableDesc)

    /**
      * 整理成 (k,v) 元组 ,
      * k(就是hbase表中的rowkye)是 ImmutableBytesWritable类型,
      * v(就是hbase表中的一个qualifier+value=> cell)是 KeyValue类型
      */
    val kvRdd = tagsDF.rdd.map(row => {
      val d = row.getAs[Long]("列名").toString
      val rowkey = 根据你的rowkey规则

      val l = row.getAs[String]("列名")
      val e = row.getAs[String]("列名")
      val u = row.getAs[String]("列名")
      val t = row.getAs[Double]("列名")
      (rowkey,1, e, u, t, weight)
    })
      // 对数据按hbase的要求排序: 先按rowkey,再按列族,再按qualifier(要按照字符串来排序,按照数排序的话,负数的首位是1,转成bytes数组反而更大)
      .sortBy(tp => (tp._1, tp._2, tp._3, tp._4))
      .map(tp => {
        val keyvalue = new KeyValue(tp._1.getBytes(), "列族".getBytes, "子列族(列名)".getBytes, Bytes.toBytes(tp._5))
        // rowkey: ImmutableBytesWritable
        val rowkey = new ImmutableBytesWritable(tp._1.getBytes())
        (rowkey, keyvalue)
      })

    kvRdd.saveAsNewAPIHadoopFile("hdfs://存储hfile路径",
      classOf[ImmutableBytesWritable],
      classOf[KeyValue],
      classOf[HFileOutputFormat2],
      job.getConfiguration
    )
    spark.close()
    val conn = ConnectionFactory.createConnection(conf)
    val admin = conn.getAdmin
    val table = conn.getTable(TableName.valueOf("表名"))
    val locator = conn.getRegionLocator(TableName.valueOf("表名"))

    val loader = new LoadIncrementalHFiles(conf)
    loader.doBulkLoad(new Path(("hdfs://目录")), admin, table, locator)
  }
}



        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.6.5</version>
        </dependency>



        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>

        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.13</version>
        </dependency>
		
		
		<dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>2.0.6</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>2.0.6</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.0.6</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.5</version>
        </dependency>

2.展示一下java代码,不如scala快

注意!!!!!!!!这里的rowkey是 自定义的rowkey+列名。 如果你的rowkey不是这么设计的,就别试了。试了很浪费时间。

import com.alibaba.fastjson.JSONObject;
import com.xc.engine.core.utils.HbaseConnection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.*;
import scala.Tuple2;
import scala.Tuple3;

import java.io.*;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;

public class Data2Hbae {
    public static void main(String[] args) throws Exception {
        //hbase表名
        String tableName = "表名";
        //创建 hbase配置
        Configuration conf = HBaseConfiguration.create();
        String zookeeper_servers = "node1:2181,node2:2181,node3:2181";
        //配置zookeeper, 通过zookeeper获取元数据, 以便将hfile文件推送到hbase的regionServer中
        conf.set("hbase.zookeeper.quorum", zookeeper_servers);
        //配置输出到hbase的表名
        conf.set(org.apache.hadoop.hbase.mapred.TableOutputFormat.OUTPUT_TABLE, tableName);
        conf.set("hbase.mapreduce.hfileoutputformat.table.name", tableName);

        //创建hbase连接
        Connection conn = ConnectionFactory.createConnection(conf);

        //取得hbase表操作对象
        TableName tableName1 = TableName.valueOf(tableName);
        Table table = conn.getTable(tableName1);

        //根据hbase连接取得region在集群中的信息
        RegionLocator regionLocator = conn.getRegionLocator(tableName1);

        SparkConf sparkConf = new SparkConf()
                .setAppName("t1")
                .setMaster("local")
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");


        SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();

        Dataset<Row> load = sparkSession.read()
                .format("csv")
                .option("header", "true")
                .load("输入文件路径");
                
        //配置hfile的保存路径
        String hfilePath = "hdfs://node2.trms.com:8020/data/myhfile";

        //将数据处理成Tuple2<ImmutableBytesWritable, KeyValue> 类型, 并根据ImmutableBytesWritable对象进行排序,需要保证ImmutableBytesWritable唯一(可以使用rowKey, cf, qf相结合)
        load.toJSON().toJavaRDD()
                .flatMap(new FlatMapFunction<String, Tuple3<String, String, String>>() {
                    @Override
                    public Iterator<Tuple3<String, String, String>> call(String s) throws Exception {
                        ArrayList<Tuple3<String, String, String>> tuple3s = new ArrayList<>();
                        JSONObject jsonObject = JSONObject.parseObject(s);
                        String name = jsonObject.getString("name");
                        for (String key : jsonObject.keySet()) {
                            tuple3s.add(Tuple3.apply(name, key, jsonObject.getString(key)));
                        }
                        return tuple3s.iterator();
                    }
                })
                .mapToPair(new PairFunction<Tuple3<String, String, String>, ImmutableBytesWritable, KeyValue>() {
                    @Override
                    public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple3<String, String, String> tp3) throws Exception {
                        String rowKeyStr = tp3._1();
                        String q = tp3._2();
                        String value = tp3._3();


                        ImmutableBytesWritable rowKey = new ImmutableBytesWritable(Bytes.toBytes(rowKeyStr+q));

                        KeyValue keyValue = new KeyValue(Bytes.toBytes(rowKeyStr), Bytes.toBytes("f0"), Bytes.toBytes(q), Bytes.toBytes(value));
                        keyValue.setTimestamp(System.currentTimeMillis());
                        return Tuple2.apply(rowKey, keyValue);
                    }
                })
                .coalesce(1)
                .sortByKey()
                .saveAsNewAPIHadoopFile(hfilePath, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, conf);

        //使用bulkloader把Hfile加载进HBASE中
        //根据hbase conf配置创建bulkLoader对象
        LoadIncrementalHFiles bulkLoader = new LoadIncrementalHFiles(conf);
        //将hfile推送至hbase的regionServer中
        bulkLoader.doBulkLoad(new Path(hfilePath), conn.getAdmin(), table, regionLocator);
    }
}

3.暂时认为最靠谱的put批量操作

如果你按照上面的1.2点进行批量操作,当你面临rowkey 列族 子列族排序的时候,真的太懊恼了。这个例子是按照rowkey ,把迭代器中的数据放在list里面,也相当于批量操作了。 对于几百条数据,响应速度也就是一两秒。速度也还可以。要不就自己实现一个排序,这种方法比较简单,大部分不支持scala的公司应该都在用。

下面是我按照前两点方法,排序的时候报错了。所以我选择了用put方法。因为我们此次需求是只需要导入hbase一次,如果持续导入,也可以用批量put方法。

spark读取orc文件需要的依赖 spark读取hfile,spark读取orc文件需要的依赖 spark读取hfile_spark_02,第2张

注:

1.conn,htable等连接,要写在spark的worker端。写在driver端的数据要支持序列化,否则在worker端不能调用。会报异常:

Exception in thread “main” org.apache.spark.sparkException:Task not serializable 异常

spark读取orc文件需要的依赖 spark读取hfile,spark读取orc文件需要的依赖 spark读取hfile_spark_03,第3张

关于hbase相关配置都跟上面相同

第一步:加载数据,按指定切割符号区分

spark读取orc文件需要的依赖 spark读取hfile,spark读取orc文件需要的依赖 spark读取hfile_spark读取orc文件需要的依赖_04,第4张

第二步:同foreachpartition代替foreach,因为foreachpartition是行动算子,可以直接出发程序执行。

把hbase相关配置放在这里面、

while(迭代器hasnext){

Tuple3?? = 迭代器.next

String a = ?.(1);
String a = ?.
(2);


//如果用多个 next直接操作会报错!! 例如这种就是错误的 :String a = 迭代器.next.(1);
String a = 迭代器.next.
(2);…

}

spark读取orc文件需要的依赖 spark读取hfile,spark读取orc文件需要的依赖 spark读取hfile_apache_05,第5张

主要代码:

Table table = HbaseUtils.getTable("teacher");
//  构建一个put对象
Put put1 = new Put("002".getBytes());
//  添加一列
//  参数一 列族  参数二 对应的属性   参数三 值
put1.addColumn("base_info1".getBytes(), Bytes.toBytes("age"), Bytes.toBytes("38"));

Put put2 = new Put("002".getBytes());
//  添加一列
//  参数一 列族  参数二 对应的属性   参数三 值
put2.addColumn(Bytes.toBytes("base_info1"), "gender".getBytes(), Bytes.toBytes("male"));

ArrayList<Put> puts = new ArrayList<>();
puts.add(put1);
puts.add(put2);

table.put(puts);
//  表关闭
table.close();

spark读取orc文件需要的依赖 spark读取hfile,spark读取orc文件需要的依赖 spark读取hfile_hadoop_06,第6张

pom

spark读取orc文件需要的依赖 spark读取hfile,spark读取orc文件需要的依赖 spark读取hfile_spark_07,第7张

spark读取orc文件需要的依赖 spark读取hfile,spark读取orc文件需要的依赖 spark读取hfile_spark_08,第8张

spark读取orc文件需要的依赖 spark读取hfile,spark读取orc文件需要的依赖 spark读取hfile_spark读取orc文件需要的依赖_09,第9张

spark读取orc文件需要的依赖 spark读取hfile,spark读取orc文件需要的依赖 spark读取hfile_hbase_10,第10张

spark读取orc文件需要的依赖 spark读取hfile,spark读取orc文件需要的依赖 spark读取hfile_hbase_11,第11张



https://www.xamrdz.com/lan/5fk1963609.html

相关文章: