仔细记录一下Java的Map-Reduce job使用distributed cache的方法,毕竟以前一直都是copy paste ~
✓ 适用的Hadoop版本
CDH 5.8.0(Hadoop 2.6.0)
别的版本没有测试过,但后面相近的版本应该也能用。
✓ 准备工作:上传本地文件到HDFS
为了在Java代码中把一个文件加入 distributed cache,需要先把它上传到HDFS,之后应使用 HDFS 路径来加入 distributed cache。
假设要加入 distributed cache 的文件为 file.txt:
hadoop fs -put file.txt /your/hdfs/dir/
✓ job 配置
String fileName = "file.txt";
String filePathHdfs = "/your/hdfs/dir/file.txt";
Job job = Job.getInstance(getConf());
Configuration conf = job.getConfiguration();
conf.set("fileName", fileName);
conf.set("filePathHdfs", filePathHdfs);
job.addCacheFile(new URI(String.format("%s#%s", filePathHdfs, fileName)));
文章来源:https://www.codelast.com/
这里往 conf 里塞了两个变量,一个是文件名 fileName,另一个是文件的HDFS路径 filePathHdfs,并且在 addCacheFile() 的时候,拼成了 /your/hdfs/dir/file.txt#file.txt 这样奇怪的形式,这种形式使得在 mapper 或 reducer 中读取 distributed cache 文件的时候,直接用文件名就能读出文件,特别方便!
✓ 在 mapper 或 reducer 的 setup() 方法中读取 distributed cache 文件
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
File myFile = new File(conf.get("file.txt"));
FileInputStream fis = new FileInputStream(myFile);
BufferedReader reader = new BufferedReader(new InputStreamReader(fis));
String line;
while ((line = reader.readLine()) != null) {
//TODO: deal with each line
}
}
看到没?上面的 conf.get("file.txt") 就只使用了文件名就能找到 distributed cache 里的文件,可以做这样“神奇”的操作是因为背后有一种叫symbolic link的技术。
下面留空的 TODO 那里,需要你自己填写处理每一行数据的逻辑。
文章来源:https://www.codelast.com/
➤➤ 版权声明 ➤➤
转载需注明出处:codelast.com
感谢关注我的微信公众号(微信扫一扫):