目录
- 1.使用scala读取文件,生成hfile,bulkload
- 2.展示一下java代码,不如scala快
- 3.暂时认为最靠谱的put批量操作
如果你支持scala,恭喜你,很容易
一般写spark程序,文件路径都设置成hdfs路径,不容易报错;要不然容易踩坑,(⊙o⊙)…我踩坑了、将文件上传到hdfs吧。文件路径就写成 hdfs:///
注意:
1.使用spark的saveAsNewAPIHadoopFile一定要对rowkey,列族,子列 进行排序,否则执行的时候会报错: java.io.IOException:added a key lexically larger than previous.current call =…
2.执行spark的时候报错
Exception in thread “main” java.lang.noclassDefErro:org/apache/zookeeper/KeeperException或者noSuchMethod或者其他的noclassDef…
百度的时候会说是jar包冲突了,但是我实际解决的时候是缺少依赖,导入相应依赖就可以了,将scope设置成comple 不要provided。当你真正导入对应的jar包的时候,再考虑jar包冲突。
3.Exception in thread “main” java.lang.noclassDefFoundErro:org/apache/spark/sparkConf
明明导入了spark的jar包,还报错。 我的解决办法是 把spark对应的依赖放在其他依赖的上面,问题完全解决。因为别的依赖可能会含有spark的依赖。依赖冲突解决办法之一: 对多版本冲突的jar,显式定义 maven有依赖传递有限规则,显式有限,传递依赖靠后。
1.使用scala读取文件,生成hfile,bulkload
import cn.doitedu.commons.utils.SparkUtil
import com.sun.mail.smtp.DigestMD5
import org.apache.commons.codec.digest.DigestUtils
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.{ConnectionFactory, TableDescriptor, TableDescriptorBuilder}
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, TableOutputFormat}
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.SparkSession
object Data2Hbase {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("")
.master("local")
//配置序列化方式为Kryo
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
import spark.implicits._
// 加载数据
val tagsDF = spark.read.parquet("路径")
val conf = HBaseConfiguration.create()
//配置zookeeper
conf.set("hbase.zookeeper.quorum", "?01:2181,?t02:2181。。。")
//配置hbase表名
conf.set(TableOutputFormat.OUTPUT_TABLE, "表名")
//配置文件系统
conf.set("fs.defaultFS", "hdfs://?:8020/")
val job = Job.getInstance(conf)
// 指定rowkey类型
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
val tableDesc = TableDescriptorBuilder.newBuilder(TableName.valueOf("profile_tags")).build()
HFileOutputFormat2.configureIncrementalLoadMap(job, tableDesc)
/**
* 整理成 (k,v) 元组 ,
* k(就是hbase表中的rowkye)是 ImmutableBytesWritable类型,
* v(就是hbase表中的一个qualifier+value=> cell)是 KeyValue类型
*/
val kvRdd = tagsDF.rdd.map(row => {
val d = row.getAs[Long]("列名").toString
val rowkey = 根据你的rowkey规则
val l = row.getAs[String]("列名")
val e = row.getAs[String]("列名")
val u = row.getAs[String]("列名")
val t = row.getAs[Double]("列名")
(rowkey,1, e, u, t, weight)
})
// 对数据按hbase的要求排序: 先按rowkey,再按列族,再按qualifier(要按照字符串来排序,按照数排序的话,负数的首位是1,转成bytes数组反而更大)
.sortBy(tp => (tp._1, tp._2, tp._3, tp._4))
.map(tp => {
val keyvalue = new KeyValue(tp._1.getBytes(), "列族".getBytes, "子列族(列名)".getBytes, Bytes.toBytes(tp._5))
// rowkey: ImmutableBytesWritable
val rowkey = new ImmutableBytesWritable(tp._1.getBytes())
(rowkey, keyvalue)
})
kvRdd.saveAsNewAPIHadoopFile("hdfs://存储hfile路径",
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
job.getConfiguration
)
spark.close()
val conn = ConnectionFactory.createConnection(conf)
val admin = conn.getAdmin
val table = conn.getTable(TableName.valueOf("表名"))
val locator = conn.getRegionLocator(TableName.valueOf("表名"))
val loader = new LoadIncrementalHFiles(conf)
loader.doBulkLoad(new Path(("hdfs://目录")), admin, table, locator)
}
}
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.6.5</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.13</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.0.6</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.0.6</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.0.6</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.5</version>
</dependency>
2.展示一下java代码,不如scala快
注意!!!!!!!!这里的rowkey是 自定义的rowkey+列名。 如果你的rowkey不是这么设计的,就别试了。试了很浪费时间。
import com.alibaba.fastjson.JSONObject;
import com.xc.engine.core.utils.HbaseConnection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.*;
import scala.Tuple2;
import scala.Tuple3;
import java.io.*;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
public class Data2Hbae {
public static void main(String[] args) throws Exception {
//hbase表名
String tableName = "表名";
//创建 hbase配置
Configuration conf = HBaseConfiguration.create();
String zookeeper_servers = "node1:2181,node2:2181,node3:2181";
//配置zookeeper, 通过zookeeper获取元数据, 以便将hfile文件推送到hbase的regionServer中
conf.set("hbase.zookeeper.quorum", zookeeper_servers);
//配置输出到hbase的表名
conf.set(org.apache.hadoop.hbase.mapred.TableOutputFormat.OUTPUT_TABLE, tableName);
conf.set("hbase.mapreduce.hfileoutputformat.table.name", tableName);
//创建hbase连接
Connection conn = ConnectionFactory.createConnection(conf);
//取得hbase表操作对象
TableName tableName1 = TableName.valueOf(tableName);
Table table = conn.getTable(tableName1);
//根据hbase连接取得region在集群中的信息
RegionLocator regionLocator = conn.getRegionLocator(tableName1);
SparkConf sparkConf = new SparkConf()
.setAppName("t1")
.setMaster("local")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
Dataset<Row> load = sparkSession.read()
.format("csv")
.option("header", "true")
.load("输入文件路径");
//配置hfile的保存路径
String hfilePath = "hdfs://node2.trms.com:8020/data/myhfile";
//将数据处理成Tuple2<ImmutableBytesWritable, KeyValue> 类型, 并根据ImmutableBytesWritable对象进行排序,需要保证ImmutableBytesWritable唯一(可以使用rowKey, cf, qf相结合)
load.toJSON().toJavaRDD()
.flatMap(new FlatMapFunction<String, Tuple3<String, String, String>>() {
@Override
public Iterator<Tuple3<String, String, String>> call(String s) throws Exception {
ArrayList<Tuple3<String, String, String>> tuple3s = new ArrayList<>();
JSONObject jsonObject = JSONObject.parseObject(s);
String name = jsonObject.getString("name");
for (String key : jsonObject.keySet()) {
tuple3s.add(Tuple3.apply(name, key, jsonObject.getString(key)));
}
return tuple3s.iterator();
}
})
.mapToPair(new PairFunction<Tuple3<String, String, String>, ImmutableBytesWritable, KeyValue>() {
@Override
public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple3<String, String, String> tp3) throws Exception {
String rowKeyStr = tp3._1();
String q = tp3._2();
String value = tp3._3();
ImmutableBytesWritable rowKey = new ImmutableBytesWritable(Bytes.toBytes(rowKeyStr+q));
KeyValue keyValue = new KeyValue(Bytes.toBytes(rowKeyStr), Bytes.toBytes("f0"), Bytes.toBytes(q), Bytes.toBytes(value));
keyValue.setTimestamp(System.currentTimeMillis());
return Tuple2.apply(rowKey, keyValue);
}
})
.coalesce(1)
.sortByKey()
.saveAsNewAPIHadoopFile(hfilePath, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, conf);
//使用bulkloader把Hfile加载进HBASE中
//根据hbase conf配置创建bulkLoader对象
LoadIncrementalHFiles bulkLoader = new LoadIncrementalHFiles(conf);
//将hfile推送至hbase的regionServer中
bulkLoader.doBulkLoad(new Path(hfilePath), conn.getAdmin(), table, regionLocator);
}
}
3.暂时认为最靠谱的put批量操作
如果你按照上面的1.2点进行批量操作,当你面临rowkey 列族 子列族排序的时候,真的太懊恼了。这个例子是按照rowkey ,把迭代器中的数据放在list里面,也相当于批量操作了。 对于几百条数据,响应速度也就是一两秒。速度也还可以。要不就自己实现一个排序,这种方法比较简单,大部分不支持scala的公司应该都在用。
下面是我按照前两点方法,排序的时候报错了。所以我选择了用put方法。因为我们此次需求是只需要导入hbase一次,如果持续导入,也可以用批量put方法。
注:
1.conn,htable等连接,要写在spark的worker端。写在driver端的数据要支持序列化,否则在worker端不能调用。会报异常:
Exception in thread “main” org.apache.spark.sparkException:Task not serializable 异常
关于hbase相关配置都跟上面相同
第一步:加载数据,按指定切割符号区分
第二步:同foreachpartition代替foreach,因为foreachpartition是行动算子,可以直接出发程序执行。
把hbase相关配置放在这里面、
while(迭代器hasnext){
Tuple3?? = 迭代器.next
String a = ?.(1);
String a = ?.(2);
…
//如果用多个 next直接操作会报错!! 例如这种就是错误的 :String a = 迭代器.next.(1);
String a = 迭代器.next.(2);…
}
主要代码:
Table table = HbaseUtils.getTable("teacher");
// 构建一个put对象
Put put1 = new Put("002".getBytes());
// 添加一列
// 参数一 列族 参数二 对应的属性 参数三 值
put1.addColumn("base_info1".getBytes(), Bytes.toBytes("age"), Bytes.toBytes("38"));
Put put2 = new Put("002".getBytes());
// 添加一列
// 参数一 列族 参数二 对应的属性 参数三 值
put2.addColumn(Bytes.toBytes("base_info1"), "gender".getBytes(), Bytes.toBytes("male"));
ArrayList<Put> puts = new ArrayList<>();
puts.add(put1);
puts.add(put2);
table.put(puts);
// 表关闭
table.close();
pom