Hadoop-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作

章节内容

上一节我们完成了:

  • MapReduce的介绍
  • Hadoop序列化介绍
  • Mapper编写规范
  • Reducer编写规范
  • Driver编写规范
  • WordCount功能开发
  • WordCount本地测试

背景介绍

这里是三台公网云服务器,每台 2C4G,搭建一个Hadoop的学习环境,供我学习。
之前已经在 VM 虚拟机上搭建过一次,但是没留下笔记,这次趁着前几天薅羊毛的3台机器,赶紧尝试在公网上搭建体验一下。

注意,如果你和我一样,打算用公网部署,那一定要做好防火墙策略,避免不必要的麻烦!!!
请大家都以学习为目的,也请不要对我的服务进行嗅探或者攻击!!!

但是有一台公网服务器我还运行着别的服务,比如前几天发的:autodl-keeper 自己写的小工具,防止AutoDL机器过期的。还跑着别的Web服务,所以只能挤出一台 2C2G 的机器。那我的配置如下了:

  • 2C4G 编号 h121
  • 2C4G 编号 h122
  • 2C2G 编号 h123

在这里插入图片描述

业务需求

平常我们在业务上,有很多时候表都是分开的,通过一些 id 或者 code 来进行关联。
在大数据的情况下,也有很多这种情况,我们需要进行联表操作。

表1

项目编码projectCode 项目名projectName

表2

项目编码projectCode 项目类型projectType 项目分类projectFrom

SQL 中,可以通过 LEFT JOIN 来实现字段补齐。大数据下,也需要进行这样的操作,我们需要借助 MapReduce

表1测试

"8aea9ba2-435c-48bd-9751-1cbd4c344d4e"	"社区项目1"
"02d9c090-e467-42b6-9c14-52cacd72a4a8"	"社区项目2"
"244dcaca-0778-4eec-b3a2-403f8fac1dfb"	"智慧社区"
"94befb97-d1af-43f2-b5d5-6df9ce5b9393"	"公交站点"
"f44c8d10-bc92-4398-ad9b-8c11dd48ad7c"	"街道布建"
"2e556d83-bb56-45b1-8d6e-00510902c464"	"街道公交站点"
"3ba00542-eac9-4399-9c2b-3b06e671f4c9"	"未命名项目1"
"5a5982d7-7257-422f-822a-a0c2f31c28d1"	"未命名项目2"

表2测试

"8aea9ba2-435c-48bd-9751-1cbd4c344d4e"	"重要类型"	"种类1"
"02d9c090-e467-42b6-9c14-52cacd72a4a8"	"重要类型"	"种类1"
"244dcaca-0778-4eec-b3a2-403f8fac1dfb"	"重要类型"	"种类1"
"94befb97-d1af-43f2-b5d5-6df9ce5b9393"	"普通类型"	"种类1"
"f44c8d10-bc92-4398-ad9b-8c11dd48ad7c"	"普通类型"	"种类2"
"2e556d83-bb56-45b1-8d6e-00510902c464"	"普通类型"	"种类2"
"3ba00542-eac9-4399-9c2b-3b06e671f4c9"	"一般类型"	"种类2"
"5a5982d7-7257-422f-822a-a0c2f31c28d1"	"一般类型"	"种类2"

SQL连表

假设我们使用SQL的方式联表:

SELECT
  *
FROM
  t_project
LEFT JOIN
  t_project_info
ON
  t_project.projectCode=t_project_info.projectCode

Reduce JOIN

有时候,表可能过大,无法支持我们使用 SQL 进行连表查询。
这里我们编写一个程序来完成操作。

ProjectBean

这里是最终的Bean类,里边是两个表把字段补齐的结果,一会儿我们将使用这个类进行表的连接。

package icu.wzk.demo03;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class ProjectBean implements Writable {

    private String projectCode;

    private String projectName;

    private String projectType;

    private String projectFrom;

    private String flag;

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(projectCode);
        dataOutput.writeUTF(projectName);
        dataOutput.writeUTF(projectType);
        dataOutput.writeUTF(projectFrom);
        dataOutput.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.projectCode = dataInput.readUTF();
        this.projectName = dataInput.readUTF();
        this.projectType = dataInput.readUTF();
        this.projectFrom = dataInput.readUTF();
        this.flag = dataInput.readUTF();
    }

    public ProjectBean(String projectCode, String projectName, String projectType, String projectFrom, String flag) {
        this.projectCode = projectCode;
        this.projectName = projectName;
        this.projectType = projectType;
        this.projectFrom = projectFrom;
        this.flag = flag;
    }

    public ProjectBean() {

    }

    @Override
    public String toString() {
        return "ProjectBean{" +
                "projectCode='" + projectCode + '\'' +
                ", projectName='" + projectName + '\'' +
                ", projectType='" + projectType + '\'' +
                ", projectFrom='" + projectFrom + '\'' +
                ", flag=" + flag + '\'' +
                '}';
    }

    public String getProjectCode() {
        return projectCode;
    }

    public void setProjectCode(String projectCode) {
        this.projectCode = projectCode;
    }

    public String getProjectName() {
        return projectName;
    }

    public void setProjectName(String projectName) {
        this.projectName = projectName;
    }

    public String getProjectType() {
        return projectType;
    }

    public void setProjectType(String projectType) {
        this.projectType = projectType;
    }

    public String getProjectFrom() {
        return projectFrom;
    }

    public void setProjectFrom(String projectFrom) {
        this.projectFrom = projectFrom;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }
}

Reduce Driver

package icu.wzk.demo03;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class ReducerJoinDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        // String inputPath = args[0];
        // String outputPath = args[1];

        // === 测试环境 ===
        String inputPath = "project_test";
        String outputPath = "project_test_output";
        // === ===

        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration, "ReducerJoinDriver");
        job.setJarByClass(ReducerJoinDriver.class);

        job.setMapperClass(ReducerJoinMapper.class);
        job.setReducerClass(ReducerJoinReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(ProjectBean.class);

        job.setOutputKeyClass(ProjectBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }

}

ReduceMapper

package icu.wzk.demo03;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ReducerJoinMapper extends Mapper<LongWritable, Text, Text, ProjectBean> {

    String name;
    ProjectBean projectBean = new ProjectBean();
    Text k = new Text();

    @Override
    protected void setup(Mapper<LongWritable, Text, Text, ProjectBean>.Context context) throws IOException, InterruptedException {
        // 获取路径信息
        name = context.getInputSplit().toString();
    }

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, ProjectBean>.Context context) throws IOException, InterruptedException {
        String line = value.toString();
        if (name.contains("layout_project")) {
            // layout_project
            String[] fields = line.split("\t");
            projectBean.setProjectCode(fields[0]);
            projectBean.setProjectName(fields[1]);
            projectBean.setProjectType("");
            projectBean.setProjectFrom("");
            projectBean.setFlag("layout_project");
            // projectCode 关联
            k.set(fields[0]);
        } else {
            // project_info
            String[] fields = line.split("\t");
            projectBean.setProjectCode(fields[0]);
            projectBean.setProjectName("");
            projectBean.setProjectType(fields[1]);
            projectBean.setProjectFrom(fields[2]);
            projectBean.setFlag("project_info");
            // projectCode 关联
            k.set(fields[0]);
        }
        context.write(k, projectBean);
    }
}

ReduceReducer

package icu.wzk.demo03;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class ReducerJoinReducer extends Reducer<Text, ProjectBean, ProjectBean, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<ProjectBean> values, Reducer<Text, ProjectBean, ProjectBean, NullWritable>.Context context) throws IOException, InterruptedException {
        List<ProjectBean> dataList = new ArrayList<>();
        ProjectBean deviceProjectBean = new ProjectBean();
        for (ProjectBean pb : values) {
            if ("layout_project".equals(pb.getFlag())) {
                // layout_project
                ProjectBean projectProjectBean = new ProjectBean(
                        pb.getProjectCode(),
                        pb.getProjectName(),
                        pb.getProjectType(),
                        pb.getProjectFrom(),
                        pb.getFlag()
                );
                dataList.add(projectProjectBean);
            } else {
                // project_info
                deviceProjectBean = new ProjectBean(
                        pb.getProjectCode(),
                        pb.getProjectName(),
                        pb.getProjectType(),
                        pb.getProjectFrom(),
                        pb.getFlag()
                );
            }
        }

        for (ProjectBean pb : dataList) {
            pb.setProjectType(deviceProjectBean.getProjectType());
            pb.setProjectFrom(deviceProjectBean.getProjectFrom());
            context.write(pb, NullWritable.get());
        }
    }
}

运行结果

ProjectBean{projectCode='"02d9c090-e467-42b6-9c14-52cacd72a4a8"', projectName='"社区项目2"', projectType='"重要类型"', projectFrom='"种类1"', flag=layout_project'}
ProjectBean{projectCode='"244dcaca-0778-4eec-b3a2-403f8fac1dfb"', projectName='"智慧社区"', projectType='"重要类型"', projectFrom='"种类1"', flag=layout_project'}
ProjectBean{projectCode='"2e556d83-bb56-45b1-8d6e-00510902c464"', projectName='"街道公交站点"', projectType='"普通类型"', projectFrom='"种类2"', flag=layout_project'}
ProjectBean{projectCode='"3ba00542-eac9-4399-9c2b-3b06e671f4c9"', projectName='"未命名项目1"', projectType='"一般类型"', projectFrom='"种类2"', flag=layout_project'}
ProjectBean{projectCode='"5a5982d7-7257-422f-822a-a0c2f31c28d1"', projectName='"未命名项目2"', projectType='"一般类型"', projectFrom='"种类2"', flag=layout_project'}
ProjectBean{projectCode='"8aea9ba2-435c-48bd-9751-1cbd4c344d4e"', projectName='"社区项目1"', projectType='"重要类型"', projectFrom='"种类1"', flag=layout_project'}
ProjectBean{projectCode='"94befb97-d1af-43f2-b5d5-6df9ce5b9393"', projectName='"公交站点"', projectType='"普通类型"', projectFrom='"种类1"', flag=layout_project'}
ProjectBean{projectCode='"f44c8d10-bc92-4398-ad9b-8c11dd48ad7c"', projectName='"街道布建"', projectType='"普通类型"', projectFrom='"种类2"', flag=layout_project'}

在这里插入图片描述

方案缺点

JOIN 操作是在 reduce 阶段完成的,reduce端处理压力过大map节点的运算负载很低,资源利用不高

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/772830.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

利用YOLOv8识别自定义模型

一、背景介绍 最近项目需要识别自定义物品&#xff0c;于是学习利用YOLOv8算法&#xff0c;实现物品识别。由于物体类别不再常规模型中&#xff0c;因此需要自己训练相应的模型&#xff0c;特此记录模型训练的过程。 二、训练模型的步骤 1.拍照获取训练图片&#xff08;训练图…

【前端CSS3】CSS显示模式(黑马程序员)

文章目录 一、前言&#x1f680;&#x1f680;&#x1f680;二、CSS元素显示模式&#xff1a;☀️☀️☀️2.1 什么是元素显示模式2.2 块元素2.3 行内元素2.4 行块元素2.5 元素显示模式的转换 三、总结&#x1f680;&#x1f680;&#x1f680; 一、前言&#x1f680;&#x1f…

操作符“->“

title: 操作符"->" date: 2024-01-16 00:00:00 categories: C箭头 tags: 箭头操作以及偏移量计算 #嘎嘎 操作符"->" ->是一个成员访问的操作&#xff0c;它的作用是通过一个指针来访问它所指向的对象的成员变量或成员函数。它的左边必须是一个指…

(vue)eslint-plugin-vue版本问题 安装axios时npm ERR! code ERESOLVE

(vue)eslint-plugin-vue版本问题 安装axios时npm ERR! code ERESOLVE 解决方法&#xff1a;在命令后面加上 -legacy-peer-deps结果&#xff1a; 解决参考&#xff1a;https://blog.csdn.net/qq_43799531/article/details/131403987

全网最适合入门的面向对象编程教程:09 类和对象的Python实现-类之间的关系,你知道多少?

全网最适合入门的面向对象编程教程&#xff1a;09 类和对象的 Python 实现-类之间的关系&#xff0c;你知道多少&#xff1f; 摘要&#xff1a; 本文主要对类之间的关系进行了基本介绍&#xff0c;包括继承、组合、依赖关系&#xff0c;并辅以现实中的例子加以讲解&#xff0…

Java的进程和线程

一Java的进程 二Java的线程 多线程 ◆如果在一个进程中同时运行了多个线程&#xff0c;用来完成不同的工作&#xff0c;则称之为“多线程”。 ◆多个线程交替占用CPU资源&#xff0c;而非真正的并行执行。 ◆多线程好处。 ◆充分利用CPU的资源。 ◆简化编程模型。 ◆良好的用…

js 递归调用 相同对象--数组递归调用

<div class="save-cl"> <a-button @click="saveCl" >保存为常用策略</a-button> </div> saveCl(){ console.log(this.form.filterList[0],--------常用策略)// 此对象为上图对象 console.log(this.allElementsHaveValue(thi…

AGI系列(7)Reflection 在 AI agent 中的应用实例

斯坦福大学教授吴恩达一直非常推崇AI Agent,之前他提出过AI Agent的四种工作模式,分别是Reflection(反思)、Tool use(工具使用)、Planning(规划)和Multi-agent collaboration(多智能体协同)。 近日,他又开源了一个翻译 AI Agent, 他认为 AI 智能体机器翻译对改进传…

阿里巴巴Arthas分析调优JVM实战及常量池详解

目录 一、阿里巴巴Arthas详解 Arthas使用场景 Arthas命令 Arthas使用 二、GC日志详解 如何分析GC日志 CMS G1 GC日志分析工具 三、JVM参数汇总查看命令 四、Class常量池与运行时常量池 字面量 符号引用 五、字符串常量池 字符串常量池的设计思想 三种字符串操作…

掌握电路交换与分组交换:计算机网络的核心技术

计算机网络是现代信息社会的基石&#xff0c;而交换技术是实现网络通信的核心。本文将详细介绍两种典型的交换方式&#xff1a;电路交换和分组交换&#xff0c;帮助基础小白快速掌握这两种技术的基本概念和区别。 什么是电路交换&#xff1f; 电路交换&#xff08;Circuit Swi…

技术革新引领钢材质量智能化检测新纪元,基于YOLOv5全系列【n/s/m/l/x】参数模型开发构建钢材工业生产场景下钢材缺陷智能检测识别系统

随着人工智能&#xff08;AI&#xff09;技术的迅猛发展&#xff0c;其应用领域不断拓宽&#xff0c;正深刻改变着传统产业的运作模式。在钢材生产这一基础工业领域&#xff0c;AI的引入正为钢材的质量检测带来革命性的变革。 在传统的钢材生产流程中&#xff0c;质量检测是确…

【C++】二叉搜索树的模拟实现

前言&#xff1a;今天我们学习相对来说比前面轻松一点的内容&#xff0c;二叉搜索树&#xff0c;在之前我们学习过二叉树今天的内容对于我们就会比较简单一点了&#xff0c;一起加油。 &#x1f496; 博主CSDN主页:卫卫卫的个人主页 &#x1f49e; &#x1f449; 专栏分类:高质…

从零开始学量化~Ptrade使用教程——安装与登录

PTrade交易系统是一款高净值和机构投资者专业投资软件&#xff0c;为用户提供普通交易、篮子交易、日内回转交易、算法交易、量化投研/回测/实盘等各种交易工具&#xff0c;满足用户的各种交易需求和交易场景&#xff0c;帮助用户提高交易效率。 运行环境及安装 操作系统&…

计算机的错误计算(二十一)

摘要 两个不相等数相减&#xff0c;差为0&#xff1a; ? 在计算机的错误计算&#xff08;十九&#xff09;中&#xff0c;高中生小明发现本应为0的算式结果不为0. 今天他又发现对本不为0的算式&#xff0c;计算机的输出为0. 在 Python 中计算 &#xff1a; 则输出为0. 若用 C…

必备的 Adobe XD 辅助工具

想要高效便捷的使用 Adobe XD&#xff0c; Adobe XD 插件是必不可少的&#xff0c; Adobe XD 的插件非常多&#xff0c;但 90%都是英文&#xff0c;并且良莠不齐。在这儿挑选 9 个好用的 Adobe XD 插件给大家&#xff0c;这里是我整理的一些实用 Adobe XD 插件&#xff0c;让你…

3-2 梯度与反向传播

3-2 梯度与反向传播 主目录点这里 梯度的含义 可以看到红色区域的变化率较大&#xff0c;梯度较大&#xff1b;绿色区域的变化率较小&#xff0c;梯度较小。 在二维情况下&#xff0c;梯度向量的方向指向函数增长最快的方向&#xff0c;而其大小表示增长的速率。 梯度的计算 …

UE5 07-给物体添加一个拖尾粒子

添加一个(旧版粒子系统)cascade粒子系统组件 ,在模板中选择一个开发学习初始包里的粒子

Elasticsearch:Ingest architectures - 摄取架构

我们提供各种采集架构&#xff0c;以满足各种用例和网络配置的需求。 要将数据采集到 Elasticsearch&#xff0c;请使用最符合你的需求和用例的选项。对于许多用户和用例来说&#xff0c;最简单的方法是使用 Elastic Agent 采集数据并将其发送到 Elasticsearch。Elastic Agent…

Mybatis框架的集成使用

1_框架概述 框架是一个半成品&#xff0c;已经对基础的代码进行了封装并提供相应的API&#xff0c;开发者在使用框架时直接调用封装好的api可以省去很多代码编写&#xff0c;从而提高工作效率和开发速度,框架是一种经过校验、具有一定功能的半成品软件. 经过校验&#xff1a;指…