统计手机上网的上行流量和下行流量
数据格式:
统计手机的上网流量只需要“手机号”、“上行流量”、“下行流量”三个字段,根据这三个字段创建bean对象,该对象要实现Writable接口,以便实现序列化,并且要有无参构造方法,hadoop会使用反射创建对象
public class PhoneBean implements Writable { private String phone; private Long upPayLoad; private Long downPayLoad; private Long totalPayLoad; public PhoneBean() { } public PhoneBean(String phone, Long upPayLoad, Long downPayLoad) { super(); this.phone = phone; this.upPayLoad = upPayLoad; this.downPayLoad = downPayLoad; this.totalPayLoad = upPayLoad + downPayLoad; } @Override public String toString() { return this.upPayLoad + "\t" + this.downPayLoad + "\t" + this.totalPayLoad; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(phone); out.writeLong(upPayLoad); out.writeLong(downPayLoad); } @Override public void readFields(DataInput in) throws IOException { this.phone = in.readUTF(); this.upPayLoad = in.readLong(); this.downPayLoad = in.readLong(); } setter/getter略 }
程序,注意不要引错包
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class PhoneCount { public static class PCMapper extends Mapper<LongWritable, Text, Text, PhoneBean> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, PhoneBean>.Context context) throws IOException, InterruptedException { String val = value.toString(); String[] vals = val.split("\t"); String phone = vals[1]; Long upPayLoad = Long.parseLong(vals[8]); Long downPayLoad = Long.parseLong(vals[9]); PhoneBean bean = new PhoneBean(phone, upPayLoad, downPayLoad); context.write(new Text(phone), bean); } } public static class PCReducer extends Reducer<Text, PhoneBean, Text, PhoneBean> { @Override protected void reduce(Text key, Iterable<PhoneBean> iterable, Reducer<Text, PhoneBean, Text, PhoneBean>.Context context) throws IOException, InterruptedException { Long upTotal = 0L; Long downTotal = 0L; for (PhoneBean pb : iterable) { upTotal += pb.getUpPayLoad(); downTotal += pb.getDownPayLoad(); } context.write(key, new PhoneBean("", upTotal, downTotal)); } } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(PhoneCount.class); job.setMapperClass(PCMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(PhoneBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); job.setReducerClass(PCReducer.class); // reducer input key and value equals reduce output key and value ignore job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(PhoneBean.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
把需要的数据上传到hdfs,程序打包后运行
hadoop jar phone2.jar /phone/phone.dat /phone/output
结果
13480253104 180 180 360 13502468823 7335 110349 117684 13560436666 1116 954 2070 13560439658 2034 5892 7926 略
其中手机号13560439658是两次上网,其它手机号都是一次上网,该手机号作为验证数据
通过partition对手机号进行划分,使用Map来模拟从数据库中查询出来的partition的规则
public static class PCPartitioner extends Partitioner<Text, PhoneBean> { private static Map<String, Integer> dataMap = new HashMap<String, Integer>(); static { dataMap.put("135", 1); dataMap.put("136", 1); dataMap.put("137", 1); dataMap.put("138", 2); dataMap.put("139", 2); dataMap.put("150", 3); } @Override public int getPartition(Text key, PhoneBean value, int numPartitions) { String phone = key.toString(); String code = phone.substring(0, 3); Integer partition = dataMap.get(code); return partition == null ? 0 : partition; } }
设置reduce的任务数,通过参数传入程序
// set partition job.setPartitionerClass(PCPartitioner.class); job.setNumReduceTasks(Integer.parseInt(args[2]));
partition分了0、1、2、3个区总共四个分区,但如果reduce的数量小于partition的会报一个IO的异常,因为每个reduce对应一个输出文件
#设置reduce的数量为3 hadoop jar phone3.jar /phone/phone.dat /phone/output1 3 #程序执行时的异常 15/09/21 16:51:34 INFO mapreduce.Job: Task Id : attempt_1442818713228_0003_m_000000_0, Status : FAILED Error: java.io.IOException: Illegal partition for 15013685858 (3)
如果设置的reduce的数量大于partition数量,写出的reduce文件将为空文件
#设置reduce数量为5 hadoop jar phone3.jar /phone/phone.dat /phone/output2 5 [root@centos1 sbin]# hadoop fs -ls /phone/output2 Found 6 items -rw-r--r-- 1 root supergroup 0 2015-09-21 16:53 /phone/output2/_SUCCESS -rw-r--r-- 1 root supergroup 156 2015-09-21 16:53 /phone/output2/part-r-00000 -rw-r--r-- 1 root supergroup 241 2015-09-21 16:53 /phone/output2/part-r-00001 -rw-r--r-- 1 root supergroup 127 2015-09-21 16:53 /phone/output2/part-r-00002 -rw-r--r-- 1 root supergroup 27 2015-09-21 16:53 /phone/output2/part-r-00003 -rw-r--r-- 1 root supergroup 0 2015-09-21 16:53 /phone/output2/part-r-00004
partiton的注意事项:
1、partition规则要清晰
2、reduce的数量要等于或大于partition数量
相关推荐
08.mapreduce编程案例--流量统计求和--自定义数据类型.mp4
为MapReduce框架对电话号码的上行流量和下行流量及总流量进行统计的模板数据
python实现mapreduce词频统计 执行方式:打开cmd命令,cd到代码所在文件夹,输入python wordcout_map.py > words.txt | sort | python wordcout_reduce.py执行
在hadoop平台上,用mapreduce编程实现大数据的词频统计
利用mapreduce编写的统计度分布程序
MapReduce字数统计案例,希望大家交流,最好自己写完后在对比交流,欢迎前来交流
mapreduce在hadoop实现词统计和列式统计,mrwordcount工程是统计hadoop文件中的词数,mrflowcount工程是统hadoop文件中的列表
网站流量数据分析 (MapReduce+Hive综合实验)
1)统计每一个手机号耗费的总上行流量、下行流量、总流量 2)将统计结果按照手机归属地不同号段(手机号前3位)输出到不同文件中 3)根据需求1)产生的结果再次对总流量进行排序。 4)按照要求2)每个手机号段输出的...
MapReduce 中PCA异常,以及检测实现系统等
利用MapReduce实现了求学生成绩的最大值,最小值,及成绩分布。结合我的博客“MapReduce之学生平均成绩”看,效果更好。
大数据小型项目源码之mapreduce英语单词频次统计,附带所需全部jar包,欢迎下载学习。
使用hadoop的eclipse插件开发的mapreduce程序,实现对数据的简单统计处理,包括可视化结果
mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce ...
MapReduce Java API实例-统计单次出现频率示例代码-MapReduceDemo.rar MapReduce Java API实例-统计单次出现频率示例代码-MapReduceDemo.rar MapReduce Java API实例-统计单次出现频率示例代码-MapReduceDemo.rar
一次很好的mapreduce框架学习。复习了框架大部分组件,代码都是一个一个敲上去的,借鉴了视频中老师的讲解,只包含了7个代码中关键的java文件,其他包都需要自己导入。
在面对运营商全网流量数据时,如何快速从海量的Netflow流量数据中生成统计矩阵供PCA检测分析,是PCA异常流量检测方法面临的最大困难之一。MapReduce是随着云计算兴起的一项分布式计算方法,它充分利用计算机集群的...
计算机后端-PHP视频教程. mongodb10 MapReduce 统计栏目下的商品.wmv
数据顺序依次为:时间戳、手机号、mark地址、ip地址、访问地址、网站类型、请求时间、响应时间、请求流量、响应流量、状态 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg....