伙伴云客服论坛»论坛 S区 S软件开发 查看内容

0 评论

0 收藏

分享

云计算实验:Java MapReduce编程

目录

    【实验作业】简单流量统计【实验作业】索引倒排输出行号

实验题目:
MapReduce:编程
实验内容:
本实验利用 Hadoop 提供的 Java API 停止编程停止 MapReduce 编程。
实验目的:
    掌握MapReduce编程。理解MapReduce原理

【实验作业】简单流量统计

有如下这样的日志文件:
13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230513 00-FD-07-A4-72-B8:CMCC 120.196.40.8 i02.c.aliimg.com 248 0 200
13826230523 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230533 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230543 00-FD-07-A4-72-B8:CMCC 120.196.100.82 Video website 1527 2106 200
13926230553 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13826230563 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13926230573 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
18912688533 00-FD-07-A4-72-B8:CMCC 220.196.100.82 Integrated portal 1938 2910 200
18912688533 00-FD-07-A4-72-B8:CMCC 220.196.100.82 i02.c.aliimg.com 3333 21321 200
13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 Search Engines 9531 9531 200
13826230523 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
该日志文件记录了每个手机用户在一段时间内的网络流量信息,详细字段含义为:
手机号码 MAC地址 IP地址 域名 上行流量(字节数) 下行流量(字节数) 套餐类型
根据以上日志,统计出每个手机用户在该时间段内的总流量(上行流量+下行流量),统计结果的格式为:
手机号码 字节数量
实验结果:
云计算实验:Java MapReduce编程-1.jpg

实验代码:
WcMap.java
  1. import java.io.IOException;
  2. import org.apache.commons.lang.StringUtils;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6.         public class WcMap extends Mapper<LongWritable, Text, Text, LongWritable>{
  7.         @Override
  8.         protected void map(LongWritable key, Text value, Context context)
  9.             throws IOException, InterruptedException {
  10.                     String str = value.toString();
  11.                     String[] words = StringUtils.split(str," ",10);
  12.                     int i=0;
  13.                     for(String word : words){
  14.                         if(i==words.length-2||i==words.length-3)
  15.                         context.write(new Text(words[0]), new LongWritable(Integer.parseInt(word)));
  16.                         i++;
  17.                     }
  18.             }
  19.         }
复制代码
WcReduce.java
  1. import java.io.IOException;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. public class WcReduce extends Reducer<Text, LongWritable, Text, LongWritable>{
  6.     @Override
  7.     protected void reduce(Text key, Iterable<LongWritable> values,Context context)
  8.             throws IOException, InterruptedException {
  9.         long count = 0;
  10.         for(LongWritable value : values){
  11.             count += value.get();
  12.         }
  13.         context.write(key, new LongWritable(count));
  14.     }
  15. }
复制代码
WcRunner.java
  1. import java.io.IOException;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.LongWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. import java.util.Scanner;
  10. import org.apache.hadoop.fs.FSDataInputStream;
  11. import org.apache.hadoop.fs.FileSystem;
  12. import java.net.URI;
  13. public class WcRunner{
  14.     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  15.         Configuration conf = new Configuration();
  16.         Job job = Job.getInstance(conf);
  17.         job.setJarByClass(WcRunner.class);
  18.         job.setMapperClass(WcMap.class);
  19.         job.setReducerClass(WcReduce.class);
  20.         job.setOutputKeyClass(Text.class);
  21.         job.setOutputValueClass(LongWritable.class);
  22.         job.setMapOutputKeyClass(Text.class);
  23.         job.setMapOutputValueClass(LongWritable.class);
  24.         Scanner sc = new Scanner(System.in);
  25.         System.out.print("inputPath:");
  26.         String inputPath = sc.next();
  27.         System.out.print("outputPath:");
  28.         String outputPath = sc.next();
  29.         try {
  30.             FileSystem fs0 = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
  31.             Path hdfsPath = new Path(outputPath);
  32.             fs0.copyFromLocalFile(new Path("/headless/Desktop/workspace/mapreduce/WordCount/data/1.txt"),new Path("/mapreduce/WordCount/input/1.txt"));
  33.             if(fs0.delete(hdfsPath,true)){
  34.                 System.out.println("Directory "+ outputPath +" has been deleted successfully!");
  35.             }
  36.         }catch(Exception e) {
  37.             e.printStackTrace();
  38.         }
  39.         FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000"+inputPath));
  40.         FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000"+outputPath));
  41.         job.waitForCompletion(true);
  42.         try {
  43.             FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
  44.             Path srcPath = new Path(outputPath+"/part-r-00000");
  45.             FSDataInputStream is = fs.open(srcPath);
  46.             System.out.println("Results:");
  47.             while(true) {
  48.                 String line = is.readLine();
  49.                 if(line == null) {
  50.                     break;
  51.                 }
  52.                 System.out.println(line);
  53.             }
  54.             is.close();
  55.         }catch(Exception e) {
  56.             e.printStackTrace();
  57.         }
  58.     }
  59. }
复制代码
【实验作业】索引倒排输出行号

在索引倒排实验中,我们可以得到每个单词分布在哪些文件中,以及在每个文件中呈现的次数,修改以上实现,在输出的倒排索引结果中可以得到每个单词在每个文件中的详细行号信息。输出结果的格式如下:
单词 文件名:行号,文件名:行号,文件名:行号
实验结果:
MapReduce在3.txt的第一行呈现了两次所以有两个1
云计算实验:Java MapReduce编程-2.jpg
  1. import java.io.*;
  2. import java.util.StringTokenizer;
  3. import org.apache.hadoop.io.*;
  4. import org.apache.hadoop.mapreduce.Mapper;
  5. import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  6. public class MyMapper extends Mapper<Object,Text,Text,Text>{
  7.     private Text keyInfo = new Text();
  8.     private Text valueInfo = new Text();
  9.     private FileSplit split;
  10.     int num=0;
  11.     public void map(Object key,Text value,Context context)
  12.             throws IOException,InterruptedException{
  13.         num++;
  14.         split = (FileSplit)context.getInputSplit();
  15.         StringTokenizer itr = new StringTokenizer(value.toString());
  16.         while(itr.hasMoreTokens()){
  17.             keyInfo.set(itr.nextToken()+" "+split.getPath().getName().toString());
  18.             valueInfo.set(num+"");
  19.             context.write(keyInfo,valueInfo);
  20.         }
  21.     }
  22. }
复制代码
  1. import java.io.*;
  2. import org.apache.hadoop.io.*;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4. public class MyCombiner extends Reducer<Text,Text,Text,Text>{
  5.     private Text info = new Text();
  6.     public void reduce(Text key,Iterable<Text>values,Context context)
  7.             throws IOException, InterruptedException{
  8.         String  sum = "";
  9.         for(Text value:values){
  10.             sum += value.toString()+" ";
  11.         }
  12.                 String record = key.toString();
  13.         String[] str = record.split(" ");
  14.         key.set(str[0]);
  15.         info.set(str[1]+":"+sum);
  16.         context.write(key,info);
  17.     }
  18. }
复制代码
  1. import java.io.IOException;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4. public class MyReducer extends Reducer<Text,Text,Text,Text>{
  5.     private Text result = new Text();
  6.     public void reduce(Text key,Iterable<Text>values,Context context) throws
  7.             IOException, InterruptedException{
  8.         String value =new String();
  9.         for(Text value1:values){
  10.             value += value1.toString()+" ; ";
  11.         }
  12.         result.set(value);
  13.         context.write(key,result);
  14.     }
  15. }
复制代码
  1. import java.io.IOException;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  8. import java.util.Scanner;
  9. import org.apache.hadoop.fs.FSDataInputStream;
  10. import org.apache.hadoop.fs.FileSystem;
  11. import java.net.URI;
  12. public class MyRunner {
  13.     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  14.         Configuration conf = new Configuration();
  15.         Job job = Job.getInstance(conf);
  16.         job.setJarByClass(MyRunner.class);
  17.         job.setMapperClass(MyMapper.class);
  18.         job.setReducerClass(MyReducer.class);
  19.         job.setCombinerClass(MyCombiner.class);
  20.         job.setOutputKeyClass(Text.class);
  21.         job.setOutputValueClass(Text.class);
  22.         job.setMapOutputKeyClass(Text.class);
  23.         job.setMapOutputValueClass(Text.class);
  24.         Scanner sc = new Scanner(System.in);
  25.         System.out.print("inputPath:");
  26.         String inputPath = sc.next();
  27.         System.out.print("outputPath:");
  28.         String outputPath = sc.next();
  29.         try {
  30.             FileSystem fs0 = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
  31.             Path hdfsPath = new Path(outputPath);
  32.             if(fs0.delete(hdfsPath,true)){
  33.                 System.out.println("Directory "+ outputPath +" has been deleted successfully!");
  34.             }
  35.         }catch(Exception e) {
  36.             e.printStackTrace();
  37.         }
  38.         FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000"+inputPath));
  39.         FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000"+outputPath));
  40.         job.waitForCompletion(true);
  41.         try {
  42.             FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
  43.             Path srcPath = new Path(outputPath+"/part-r-00000");
  44.             FSDataInputStream is = fs.open(srcPath);
  45.             System.out.println("Results:");
  46.             while(true) {
  47.                 String line = is.readLine();
  48.                 if(line == null) {
  49.                     break;
  50.                 }
  51.                 System.out.println(line);
  52.             }
  53.             is.close();
  54.         }catch(Exception e) {
  55.             e.printStackTrace();
  56.         }
  57.     }
  58. }
复制代码
到此这篇关于云计算实验:Java MapReduce编程的文章就介绍到这了,更多相关Java MapReduce编程内容请搜索网站以前的文章或继续阅读下面的相关文章希望大家以后多多支持网站!

回复

举报 使用道具

全部回复
暂无回帖,快来参与回复吧
本版积分规则 高级模式
B Color Image Link Quote Code Smilies

萌晓
注册会员
主题 18
回复 22
粉丝 0
|网站地图
快速回复 返回顶部 返回列表