- Spark本地运行的几个实例代码(Java实现)
- 实例一:词频数统计
- 问题描述
- 过程分析
- 代码
- 运行结果
- 实例二:统计平均年龄
- 问题描述
- 过程分析
- 代码
- 运行结果
- 案例三:统计身高最值
- 问题描述
- 过程分析
- 代码
- 运行结果
- 案例四:统计单词频率
- 问题描述
- 过程分析
- 代码
- 运行结果
- 一些总结:
- 运行环境
- 实例一:词频数统计
Spark本地运行的几个实例代码(Java实现)
初学spark,用Java写了几个本地运行的spark实例代码,来记录一下已学的spark常用的算子的使用和处理逻辑,不涉及分布式集群。相关内容仅为自己的个人理解,如有错误还请指出。
实例一:词频数统计
问题描述
统计一个文本文件中的每个单词的出现次数,数据格式:
过程分析
首先通过textFile()函数将文件读入JavaRDD,然后通过flatMap算子将每一行的数据进行分割,得到多个String,一行数据分割得到的多个String以Iterator
代码
public class SparkWordCount { public static void main(String[] args){ SparkConf conf = new SparkConf(); //添加这一行则在本地运行,不添加这一行则默认在集群执行 conf.setMaster("local"); conf.setAppName("WordCount"); //基本的初始化 JavaSparkContext sc=new JavaSparkContext(conf); //创建String类型的RDD,并从本地文件中读取数据 JavaRDD<String> fileRDD = sc.textFile("src/main/files/words.txt");//通过文件读入创建RDD //flatMap()算子用来分割操作,将原RDD中的数据分成一个个片段 //new FlatMapFunction<String, String>中的两个String分别表示输入和输出类型 JavaRDD<String> wordRDD = fileRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String line) throws Exception { //通过Iterator迭代器可以将分割后的多个数据元素全部返回输出 return Arrays.asList(line.split("\\s+")).iterator(); } }); //mapToPair()算子是用来对分割后的一个个片段结果添加计数标志的,如出现次数1,该函数用来创建并返回pair类型的RDD. new PairFunction<String, String, Integer>中分别是输入类型String和输出类型<String, Integer>. JavaPairRDD<String, Integer> wordOneRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<>(word, 1); //Tuple2是spark的二元数组类型,Java中没有 } }); //reduceByKey()算子是根据key来聚合,reduce阶段.new Function2<Integer, Integer, Integer>中分别是用来聚合的两个输入类型Integer,Integer和聚合后的输出类型Integer. JavaPairRDD<String, Integer> wordCountRDD = wordOneRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } }); wordCountRDD.saveAsTextFile("E:\\result7"); }}
运行结果
实例二:统计平均年龄
问题描述
java实现spark统计100万人口的平均年龄以及每个年龄的出现次数,数据格式为"序号 年龄"
数据生成代码:
//生成年龄数据,格式"序号 年龄" private static void makeAgeData() throws IOException { File newFile = new File("src/main/files/peopleAges.txt"); if (newFile.exists()){ System.out.println("文件已存在!"); return; } newFile.createNewFile(); FileWriter fw = new FileWriter(newFile,true); Random rand = new Random(); for (int i=1;i<=1000000;i++){ fw.append(i+" "+(rand.nextInt(100)+1)+"\n"); fw.flush(); } fw.close(); }
过程分析
首先通过textFile()函数将文件数据读入RDD中。然后使用mapToPair()算子将每一行数据中的年龄作为key并对每一个年龄添加计数标记1作为value,接着使用reduceByKey算子对相同年龄值的数据进行聚合。
求平均年龄时首先要求出年龄和,这也是reduce聚合操作。但是要注意reduce算子只能接收单个数据元素组成的RDD作为输入,不能接收pair类型的RDD,所以对源文件读出的RDD先通过map算子输出只有年龄值数据的RDD,然后进行reduce()。
代码
public class AvgAge { public static void main(String[] args){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("AvgAge"); JavaSparkContext sc = new JavaSparkContext(conf); //刚从文件读出来的RDD已经是一行一行的字符串,所以可以直接进行mapToPair JavaRDD<String> fileRDD = sc.textFile("src/main/files/peopleAges.txt"); JavaPairRDD<Integer, Integer> ageOneRdd = fileRDD.mapToPair(new PairFunction<String, Integer, Integer>() { @Override public Tuple2<Integer, Integer> call(String s) throws Exception { return new Tuple2<>(Integer.parseInt(s.split("\\s+")[1]),1); } }); //使用reduceByKey算子对具有相同年龄值的数据进行聚合,获取每个年龄值的人数 JavaPairRDD<Integer, Integer> ageCountRDD = ageOneRdd.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer+integer2; } }); //求平均年龄 //先通过map算子取出每个年龄值作为一个RDD。 //reduce()函数的输入RDD不能是pair,只能是单个数据组成的RDD Integer ageSum = fileRDD.map(new Function<String, Integer>() { @Override public Integer call(String s) throws Exception { return Integer.parseInt(s.split("\\s+")[1]); } }).reduce(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer s, Integer s2) throws Exception { return s+s2; } }); System.out.println("平均年龄:"+ageSum/fileRDD.count()); ageCountRDD.saveAsTextFile("src/main/files/ageAnalysis"); }
运行结果
案例三:统计身高最值
问题描述
统计男女人数,并分别计算出男性和女性的最高和最低身高,数据格式"序号 M/F 身高"
数据生成代码
//生成性别身高数据,格式"序号 性别(M/F) 身高" private static void makeHeightData() throws IOException { File newFile = new File("src/main/files/heightData.txt"); if (newFile.exists()){ System.out.println("文件已存在!"); return; } newFile.createNewFile(); FileWriter fw = new FileWriter(newFile,true); Random rand = new Random(); for (int i=1;i<=50000;i++){ fw.append(i+" M "+(rand.nextInt(100)+100)+"\n"); fw.append(i+" F "+(rand.nextInt(80)+100)+"\n"); fw.flush(); } fw.close(); }
过程分析
首先通过textFile()函数将文件数据读入RDD。然后使用filter算子分别过滤出男性和女性数据,接着用map算子分割出身高值并将其转化成Integer类型,这样才能用于数字排序,然后使用sortBy算子排序。sortBy算子可以直接对RDD中的数据排序,不用区分key还是value。
代码
public class HeightMaxMin { public static void main(String[] args){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("HeightAnalysis"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> fileRDD = sc.textFile("src/main/files/heightData.txt"); //使用filter算子分别过滤出男性和女性数据 JavaRDD<String> maleRDD = fileRDD.filter(new Function<String, Boolean>() { @Override //如果这行数据符合过滤条件则返回true public Boolean call(String s) throws Exception { return s.contains("M"); } }); JavaRDD<String> femaleRDD = fileRDD.filter(new Function<String, Boolean>() { @Override //如果这行数据符合过滤条件则返回true public Boolean call(String s) throws Exception { return s.contains("F"); } }); //使用map算子分割出身高并转化为整数类型,这样才能用排序 JavaRDD<Integer> maleHeightRDD = maleRDD.map(new Function<String, Integer>() { @Override public Integer call(String s) throws Exception { return Integer.parseInt(s.split("\\s+")[2]); } }); JavaRDD<Integer> femaleHeightRDD = femaleRDD.map(new Function<String, Integer>() { @Override public Integer call(String s) throws Exception { return Integer.parseInt(s.split("\\s+")[2]); } }); //使用sortBy算子排序 JavaRDD<Integer> sortmaleHeightRDD = (JavaRDD<Integer>) maleHeightRDD.sortBy(new Function<Integer, Object>() { @Override //返回的是排序的内容,即对输入RDD中的哪一部分进行排序就输出哪一部分 public Object call(Integer integer) throws Exception { return integer; } }, false,10);//false降序true升序,10是partition分区数,因为没有用集群所以也不太明白这个分区数具体指什么 JavaRDD<Integer> sortfemaleHeightRDD = (JavaRDD<Integer>) femaleHeightRDD.sortBy(new Function<Integer, Object>() { @Override public Object call(Integer integer) throws Exception { return integer; } }, false,10);//第二个参数true/false是正序逆序,最后一个参数10是分区数 //first()函数返回排名第一的数据 System.out.println("男性: "+sortmaleHeightRDD.count()+" "+sortmaleHeightRDD.first()); System.out.println("女性: "+sortfemaleHeightRDD.count()+" "+sortfemaleHeightRDD.first()); }}
运行结果
案例四:统计单词频率
问题描述
统计一段文本里出现频率最高的前k个词,注意单词不分大小写。
过程分析
首先从文件读入数据到RDD,然后使用flatMap算子对每一行的数据按照空格进行分割,并将所有的字母都转为小写,接着使用mapToPair算子对每一个单词添加计数标记1,然后使用reduceByKey算子对单词进行reduce聚合,为了根据key来排序,聚合后再使用mapToPair算子将得到的pair里面的key和value调换一下位置。然后使用sortByKey算子根据key来进行排序,最后使用take算子取出排名前5的数据。
代码
public class wordTopK { public static void main(String[] args){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("wordTopK"); JavaSparkContext sc=new JavaSparkContext(conf); JavaRDD<String> fileRDD = sc.textFile("D:\\summer_study\\ppt\\hive.txt"); //使用flatmap算子对每一行数据按空格分隔,并将所有的字母都转为小写 //注意这里不用map是因为map只能输出一个数据元素,而flatMap可以在输入元素后添加任意多元素来输出, // 比如分割后的多个元素组成Iterator来输出,但是Iterator里的每一个元素依然是独立的RDD。 JavaRDD<String> wordRDD = fileRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.toLowerCase().split("\\s+")).iterator(); } }); //使用maoToPair算子给每一个word加上计数标记1 JavaPairRDD<String, Integer> wordOneRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<>(s,1); } }); //使用reduceByKey算子对word进行reduce聚合,为了根据key来排序,聚合后再将得到的pair里面的key和value调换一下 JavaPairRDD<Integer, String> wordCountRDD = wordOneRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer+integer2; } }).mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return new Tuple2<>(stringIntegerTuple2._2,stringIntegerTuple2._1); } }); //使用sortByKey算子排序,false降序,true升序 JavaPairRDD<Integer,String> sortWordCountRDD = wordCountRDD.sortByKey(false); //使用take算子取出前5 List<Tuple2<Integer,String>> result = sortWordCountRDD.take(5); System.out.println("结果:"+result.toString()); }}
运行结果
一些总结:
- RDD就是spark中专用的一种数据格式,代表一种抽象数据类型,spark中的数据都是存在不同类型的RDD中,如JavaRDD<String>,JavaPairRDD<String,Integer>等。
- textFile读文件生成的RDD可以理解成源文件中一行一行的数据,每一行的数据就是一个JavaRDD
. - flatMap算子和map算子的区别:map算子就是将源JavaRDD的一个一个元素的传入call方法,并经过算法后一个一个的返回从而生成一个新的JavaRDD,注意call返回的数据只能是单个数据元素。 flatMap与map一样,是将RDD中的元素依次的传入call方法,他比map多的功能是能在任何一个传入call方法的元素后面添加任意多元素,而能达到这一点,正是因为其进行传参是依次进行的。比如分割后的多个元素组成Iterator来输出,但是Iterator里的每一个元素依然是独立的RDD,这个Iterator只能由flatMap算子输出,map算子不可以,因为它只能输出单个数据元素。
运行环境
ide:Idea
jdk:1.8.0_121
spark:2.1.1
附maven配置:
<?
注:以上仅为个人学习记录,spark初学菜鸟,如有错误,敬请提出。
参考:
Spark 入门实战之最好的实例
Spark学习之路
原文转载:http://www.shaoqun.com/a/618653.html
pat:https://www.ikjzd.com/w/1079
淘粉吧官网:https://www.ikjzd.com/w/1725.html
目录Spark本地运行的几个实例代码(Java实现)实例一:词频数统计问题描述过程分析代码运行结果实例二:统计平均年龄问题描述过程分析代码运行结果案例三:统计身高最值问题描述过程分析代码运行结果案例四:统计单词频率问题描述过程分析代码运行结果一些总结:运行环境Spark本地运行的几个实例代码(Java实现)初学spark,用Java写了几个本地运行的spark实例代码,来记录一下已学的spark常
递四方:https://www.ikjzd.com/w/1066
锦桥纺织网:https://www.ikjzd.com/w/2469
蜜芽宝贝官网:https://www.ikjzd.com/w/1320
离岸公司:https://www.ikjzd.com/w/585
亚马逊这些敏感词要小心了!:https://www.ikjzd.com/home/132125
livingsocial:https://www.ikjzd.com/w/714.html
No comments:
Post a Comment