当前位置首页 > 学术论文 > 毕业论文
搜柄,搜必应! 快速导航 | 使用教程  [会员中心]

spark使用java读取hbase数据做分布式计算

文档格式:DOCX| 5 页|大小 12.57KB|积分 20|2022-10-05 发布|文档ID:158503754
第1页
下载文档到电脑,查找使用更方便 还剩页未读,继续阅读>>
1 / 5
此文档下载收益归作者所有 下载文档
  • 版权提示
  • 文本预览
  • 常见问题
  • spark使用java读取hbase数据做分布式计算由于spark提供的hbaseTest是scala版本,并没有提供java版我将scala版本改为java版本,并根据数据做了 些计算操作程序目的:查询出hbase满足条件的用户,统计各个等级个数代码如下,西面使用的hbase是0.94注释已经写详细:kage com.sdyc.ndspark.sys;import mons.logging.Log;import mons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.util.Base64;import org.apache.hadoop.hbase.util.Bytes;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;import java.io.ByteArrayOutputStream;import java.io.DataOutputStream;import java.io.IOException;import java.io.Serializable;import java.util.List;/***

    ** spark hbase 测试** Created with IntelliJ IDEA.* User: zhangdonghao* Date: 14-1-26* Time: 上午 9:24* To change this template use File | Settings | File Templates.* 
    ** @author zhangdonghao*/ public class HbaseTest implements Serializable {public Log log = LogFactory.getLog(HbaseTest.class);/*** 将 scan 编码,该方法 copy 自 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil** @param scan* @return* @throws IOException*/static String convertScanToString(Scan scan) throws IOException {ByteArrayOutputStream out = new ByteArrayOutputStream();DataOutputStream dos = new DataOutputStream(out); scan.write(dos);return Base64.encodeBytes(out.toByteArray()); }public void start() {//初始化sparkCon text,这里必须在jars参数里面放上Hbase 的 jar,// 否则会报 unread block data 异常JavaSparkContext sc = new JavaSparkContext("spark://nowledgedata-n3:7077", "hbaseTest","/home/hadoop/software/spark-0.8.1", new String[]{"target/ndspark.jar","target\\dependency\\hbase-0.94.6.jar"});//使用 HBaseConfiguration.create()生成 Configuration//必须在项目classpath下放上hadoop以及hbase的配置文 件。

    Configuration conf = HBaseConfiguration.create(); //设置查询条件,这里值返回用户的等级Scan scan = newScan(); scan.setStartRow(Bytes.toBytes("195861-1035177490")); scan.setStopRow(Bytes.toBytes("195861-1072173147")); scan.addFamily(Bytes.toBytes("info"));scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("levelCode"));try{//需要读取的 hbase 表名 String tableName = "usertable"; conf.set(TableInputFormat.INPUT_TABLE, tableName);conf.set(TableInputFormat.SCAN, convertScanToString(scan));//获得 hbase 查询结果 Result JavaPairRDD hBaseRDD = sc.newAPIHadoopRDD(conf,TableInputFormat.class, ImmutableBytesWritable.class,Result.class);//从 result 中取出用户的等级,并且每一个算一次 JavaPairRDD levels = hBaseRDD.map(new PairFunction, Integer, Integer>() {@Overridepublic Tuple2 call(Tuple 2 immutableBytesWritableResultTuple2) throw s Exception {byte[] o = immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("info"), Bytes.toBytes("levelCode"));if(o != null) {retur n new Tuple2(Bytes.toInt(o), 1);} return null;}});//数据累加JavaPairRDD counts = levels.reduceByKey(new Function2() {public Integer call(Integer i1, Integeri2) {return i1 + i2;}});//打印出最终结果List> output = counts.collect();for (Tuple2 tuple : output) { System.out.println(tuple._1 + ": "+ tuple._2);}} catch (Exception e) {log.warn(e);}}/*** spark 如果计算没写在 main 里面 , 实现的类必须继承 Serializable 接口,
    * 否则会报 Task not serializable: java.io.NotSerializableException 异常*/ public static void main(String[] args) throws InterruptedException {newHbaseTest().start();System.exit(0);}注意:如果使用的是hbase0.96.1.1-hadoop2convertScanToString 函数需要改为:/*** 将 scan 编码,该方法 copy 自 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil** @param scan* @return* @throws IOException*/static String convertScanToString(Scan scan) throws IOException { ClientProtos.Scan proto = ProtobufUtil.toScan(scan); return Base64.encodeBytes(proto.toByteArray());}运行结果如下:0:2852811: 7084:286562:363156:238488:1980210: 69139:159883:319501:388727:216005:2719012: 17。

    点击阅读更多内容
    卖家[上传人]:suijia
    资质:实名认证