上一篇文章说的ReduceSide Join的一个缺点就是,在map方法之中,只对数据加了tag、提取了groupkey,没有做任何的数据过滤,这样在map-reduce之中的shuffle过程会造成大量的 磁盘IO使得效率降低。
这次使用的是Replicated Join,完成的任务跟上次一样.
它有一个前提:需要关联在一起的两个文件,其中一个文件比较小,至少能放到内存之中。
其中一个关键的地方就是,在运行job之前,先将本地的小文件(简称为smallIn)上传到Hadoop集群的每一个服务器之中。在每个集群之中,大文件的split跟smallIn进行jion的操作。这样效率会比较高。
上传的代码:
Path smallIn = new Path("..."); DistributedCache.addCacheFile(smallIn.toUri(), conf);
除了这种方式,可以在命令行调用的时候自动上传:
bin/hadoop jar -files smallIn DataJoinDC.jar big_in.txt output
我们对比两个文件的原始大小:
u.user: 23KB
u.data: 1933KB
所以,显然我们应该选择u.user作为DistributedCache文件。
在Map类之中,如果使用新的API,则在setup之中进行,如果是old api 则在configure()方法之中进行分解动作。
获取Cache之中的文件的代码如下:
URI[] cachesFiles = DistributedCache.getCacheFiles(conf);
这跟<Hadoop in Action> 上的代码不太一样,书上是调用
Path[] cachesFiles = DistributedCache.getLocalCacheFiles(conf);
一般这是在Local模式下进行的。
Note:
以上一部分是猜测!因为这个程序在我的环境Eclipse + Ubunut (Pseodu-Distributed) 模式没有成功,无法正确的获取到Cache文件!
最后,贴上我的代码做个纪念,暂时保留这个问题:
import java.io.File; import java.io.IOException; import java.net.URI; import java.util.Hashtable; import java.util.Iterator; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class Main { private static final String U_DATA_SEPARATOR = "\t"; private static final String U_USER_SEPARATOR = "[|]"; public static class Map extends Mapper<Text, Text, Text, Text> { private Hashtable<String, String> table = new Hashtable<String, String>(); @Override protected void map(Text key, Text value, Mapper<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { String dataInfo = value.toString(); if(dataInfo.trim().length() == 0) return; if(table.containsKey(key.toString())) { String userInfo = table.get(key.toString()); String[] userTokens = userInfo.split(U_USER_SEPARATOR); String userData = "age=" + userTokens[0]; String[] dataTokens = dataInfo.split(U_DATA_SEPARATOR); String ratingData = "ratings=" + dataTokens[1]; context.write(key, new Text(userData + "|" + ratingData)); } } @Override protected void setup(Mapper<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { try { Configuration conf = context.getConfiguration(); // Path[] cachesFiles = DistributedCache.getLocalCacheFiles(conf); URI[] cachesFiles = DistributedCache.getCacheFiles(conf); if(cachesFiles != null && cachesFiles.length > 0) { Iterator<String> it = FileUtils.lineIterator(new File(cachesFiles[0].toString())); while(it.hasNext()) { String line = it.next(); System.out.println("~~~~~~~ line=" + line); if(line.trim().length() == 0) continue; String[] tokens = line.split(U_USER_SEPARATOR, 2); table.put(tokens[0], tokens[1]); } } else { System.out.println("!!!!!!!!!!!!!!!! empty cache files"); } } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJobName("Replicated Join"); job.setJarByClass(Main.class); job.setMapperClass(Map.class); job.setNumReduceTasks(0); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); // job.setOutputKeyClass(Text.class); // job.setOutputValueClass(TextOutputFormat.class); FileSystem fs = FileSystem.get(conf); // Path localSmallIn = new Path("/home/hadoop/DataSet/movielens/u.user"); Path hdfsSmallIn = new Path("/data/u.user"); // fs.copyFromLocalFile(true, localSmallIn , hdfsSmallIn); // DistributedCache.addCacheFile( // hdfsSmallIn.toUri(), conf); Path bigIn = new Path("/home/hadoop/DataSet/movielens/u.data"); Path out = new Path("/home/hadoop/DataSet/movielens-Replicated-output"); // Path bigIn = new Path("/data/u.data"); // Path out = new Path("/data/movielens-Replicated-output"); if(fs.exists(out)) { System.out.println("输出目录已经存在,将其删除~"); fs.delete(out, true); } FileInputFormat.setInputPaths(job, bigIn); FileOutputFormat.setOutputPath(job, out); conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", U_DATA_SEPARATOR); // job.set("key.value.separator.in.input.line", U_DATA_SEPARATOR); // for u.data System.exit(job.waitForCompletion(true) ? 0 : 1); } }
相关推荐
Hadoop分布式系统:系统设计与架构,源自Apache网站,对Hadoop的HDFS系统做了简单的介绍。
Hadoop分布式文件系统:架构和设计要点.pdf
Hadoop分布式文件系统的模型分析,Hadoop 分布式文件系统是遵循Google 文件系统原理进行开发和实现的,受到了业界极大关注,并 已被广泛应用。 鉴于当前缺乏从系统设计理论的角度对其开展的相关研究,本文从 Hadoop ...
Hadoop分布式文件系统:架构和设计要点中文翻译
Hadoop分布式文件系统:架构和设计.pdf
hdfs官方文档 Hadoop分布式文件系统:结构与设计.pdf
Hadoop分布式文件系统:架构和设计.doc
Hadoop分布式文件系统翻译
Hadoop 分布式文件系统 (HDFS)是一个设计为用在普通硬件设备上的分布式文件系统。它与现有的分布式文件系统有很多近似的地方,但又和这些文件系统有很明显的不同。HDFS是高容错的,设计为部署在廉价硬件上的。HDFS对...
基于SpringMVC+Spring+HBase+Maven搭建的Hadoop分布式云盘系统。使用Hadoop HDFS作为文件存储系统、HBase作为数据存储仓库,采用SpringMVC+Spring框架实现,包括用户注册与登录、我的网盘、关注用户、我的分享、我...
工作中搭建的hadoop分布式文件系统和hive ,mysql等的搭建的具体步骤
hadoop分布式网络爬虫的实现, 采用mapreduce和java,能实现深度搜索
《高可用性的HDFS——Hadoop分布式文件系统深度实践》专注于Hadoop分布式文件系统(hdfs)的主流ha解决方案,内容包括:hdfs元数据解析、hadoop元数据备份方案、hadoop backup node方案、avatarnode解决方案以及最新...
关于hadoop的分布式缓存的源码,用于大家的学习,改进hadoop的分布式缓存
完整的Hadoop分布式文件系统架构,以及源码分析报告
第四章(Hadoop大数据处理实战)Hadoop分布式文件系统.pdf第四章(Hadoop大数据处理实战)Hadoop分布式文件系统.pdf第四章(Hadoop大数据处理实战)Hadoop分布式文件系统.pdf第四章(Hadoop大数据处理实战)Hadoop分布式文件...
Hadoop分布式搭建环境: 系统:centos 6.5 64位 软件:Hadoop 2.2.0 64位 jdk 1.7 64位 用户: hadoop 运行环境:虚拟机vm 10 64位
Hadoop分布式文件系统使用指南.pdf