1、请描述HDFS的写流程。
答:
2、请描述HDFS的读流程。
答:
3、请描述HDFS副本选择策略(3副本)。
答:
4、请描述HDFS的缓存机制。
答:
HDFS提供了一个高效的缓存加速机制—— Centralized Cache Management ,它允许用户指定要缓存的HDFS路径。NameNode会和保存着所需Block数据的所有DataNode通信,并指导它们把块数据缓存在堆外内存(off-heap)中进行缓存。DataNode会通过心跳机制向NameNode汇报缓存状态。
NameNode查询自身的缓存指令集来确定应该缓存哪个路径。缓存指令持久化存储在fsimage和edit日志中,可以通过Java和命令行API被添加、移除或修改。
在使用HDFS完成数据缓存时,首先要创建一个缓存池。缓存池是一个管理实体,用于管理缓存指令组。缓存池拥有类UNIX的权限,可以限制哪个用户和组可以访问该缓存池。写权限允许用户向缓存池添加、删除缓存指令 。读权限允许用户列出缓存池内的缓存指令,还有其他元数据。
缓存池也可以用于资源管理,可以设置一个最大限制值,用于限制缓存的数据量。
缓存池创建成功后,可以通过命令,将HDFS某个目录、文件缓存到缓存池中,从而完成数据缓存功能。
#创建缓存组,默认为cache_data
hdfs cacheadmin -addPool cache_data -mode 0777
#生成一个1GB大小的文件
dd if=/dev/zero of=/tmp/test.zero bs=1M count=1024
#将文件上传到HDFS
hdfs dfs -put /tmp/test.zero /data
#生成缓存指令
hdfs cacheadmin -addDirective -path /data -pool cache_data -ttl 1d
#显示缓存池的信息
hdfs cacheadmin -listPools -stats cache_data
#统计信息,显示EXP Date
hdfs cacheadmin -listDirectives -path /data
#删除缓存指令
hdfs cacheadmin -removeDirectives -path /data
#删除缓存池
hdfs cacheadmin -removePool cache_data
4、请说出几个常用的HDFS Shell命令。
答:
HDFS命令有两种风格,①hadoop fs ②hdfs dfs 。在使用上没有区别,hadoop fs内部会转换为hdfs dfs来使用。
# HDFS Shell帮助命令,help更加详细,usage简略些
hadoop fs -help [option]
hadoop fs -usage [option]
# 查看文件目录
hadoop fs -ls /
hadoop fs -ls -R /
# 创建文件
hadoop fs -touchz /edits.txt
# 创建目录
hadoop fs -mkdir /tmp_data
# 追加文件
hadoop fs -appendToFile edits.txt /edits.txt
# 查看文件内容
hadoop fs -cat /edits.txt
# 上传文件
hadoop fs -put /LocalPath /HDFSPath
hadoop fs -copyFromLocal /LocalPath /HDFSPath
hadoop fs -moveFromLocal /LocalPath /HDFSPath
# 下载文件
hadoop fs -get /HDFSPath /LocalPath
hadoop fs -copyToLocal /HDFSPath /LocalPath
# 删除文件
hadoop fs -rm /edits.txt
hadoop fs -rm -skipTrash /edits.txt
# 递归删除
hadoop fs -rm -r /shell
# 移动文件
hadoop fs -mv /originFile /newFile
# 拷贝文件
hadoop fs -copy /FilePath /dictPath
# 文件查找
hadoop fs -find / -name part-r-00000
5、HDFS文件的x权限是指?
答:
HDFS文件的x权限没有实际意义,在使用时可以忽略。目录的x权限表示是否可以访问这个目录。
6、HDFS企业级调优
答:
调整namenode处理客户端的线程数为dfs.namenode.handler.count=20 * log2(Cluster Size)。比如集群规模为8台时,此参数设置为60。
NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发的元数据操作。对于大集群或者有大量客户端的集群来说,通常需要增大参数dfs.namenode.handler.count的默认值10。设置该值的一般原则是将其设置为集群大小的自然对数乘以20,即20logN,N为集群大小。
1、如何开启JobHistoryServer。
答:
先配置mapred-site.xml,开启JobHistoryServer。
<!-- MapReduce HistoryServer地址 -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>node01:10020</value>
</property>
<!-- HistoryServer Web地址 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>node01:19888</value>
</property>
<!-- MapReduce日志保存位置 -->
<property>
<name>mapreduce.jobhistory.intermediate-done-dir</name>
<value>/mr-history/log</value>
</property>
<!-- HistoryServer日志保存位置 -->
<property>
<name>mapreduce.jobhistory.done-dir</name>
<value>/mr-history/done</value>
</property>
然后配置yarn-site.xml,开启日志聚合。
<!-- 聚合日志开启后,保存到HDFS中 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
对于yarn-site.xml,还可以增加以下额外参数。
<!-- 聚合日志保存时间 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>2592000</value><!--30 day-->
</property>
<!-- 聚合日志压缩类型 -->
<property>
<name>yarn.nodemanager.log-aggregation.compression-type</name>
<value>gz</value>
</property>
<!-- 不启用聚合日志,日志在本地保存的时间 -->
<property>
<name>yarn.nodemanager.log.retain-seconds</name>
<value>604800</value><!-- 7 day -->
</property>
<!-- nodemanager 本地文件存储目录 -->
<property>
<name>yarn.nodemanager.local-dirs</name>
<value>/opt/app/hadoop-2.7.7/hadoopDatas/yarn/local</value>
</property>
<!-- resourceManager 最多保存已完成任务个数 -->
<property>
<name>yarn.resourcemanager.max-completed-applications</name>
<value>1000</value>
</property>
配置文件分发到各个节点后,重启yarn。在HisotryServer节点,启动服务。
$HADOOP_HOME/sbin/mr-jobhistory-daemon.sh start historyserver
2、如何配置Yarn公平调度。
答:
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
<description>开启公平调度策略</description>
</property>
<property>
<name>yarn.scheduler.fair.allocation.file</name>
<value>/opt/app/hadoop-2.7.7/etc/hadoop/fair-scheduler.xml</value>
<description>公平调度策略配置文件目录</description>
</property>
<property>
<name>yarn.scheduler.fair.preemption</name>
<value>true</value>
<description>开启资源抢占</description>
</property>
<property>
<name>yarn.scheduler.fair.preemption.cluster-utilization-threshold</name>
<value>0.8f</value>
<description>当集群的整体资源利用率超过80%,则开始抢占</description>
</property>
<property>
<name>yarn.scheduler.fair.user-as-default-queue</name>
<value>true</value>
<description>默认提交到default队列</description>
</property>
<property>
<name>yarn.scheduler.fair.allow-undeclared-pools</name>
<value>false</value>
<description>任务无法提交到现有队列,是否允许新建一个队列</description>
</property>
<?xml version="1.0"?>
<allocations>
<!-- 每个队列中的默认调度策略,默认值是fair -->
<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>
<user name="hadoop">
<!-- 用户hadoop最多运行的任务数 -->
<maxRunningApps>30</maxRunningApps>
</user>
<!-- 默认运行任务数 -->
<userMaxAppsDefault>10</userMaxAppsDefault>
<!-- 队列划分 -->
<!-- 共有4个队列:default、hadoop、develop、test -->
<!-- 权重weight分别是:1、2、1、1.5 -->
<queue name="root">
<minResources>512mb,4vcores</minResources>
<maxResources>102400mb,100vcores</maxResources>
<maxRunningApps>100</maxRunningApps>
<weight>1.0</weight>
<schedulingMode>fair</schedulingMode>
<aclSubmitApps> </aclSubmitApps>
<aclAdministerApps> </aclAdministerApps>
<queue name="default">
<minResources>512mb,4vcores</minResources>
<maxResources>30720mb,30vcores</maxResources>
<maxRunningApps>100</maxRunningApps>
<schedulingMode>fair</schedulingMode>
<weight>1.0</weight>
<!-- 允许提交任务的用户和组 -->
<!-- 格式为:用户名1,用户名2 用户名1所属组,用户名2所属组 -->
<!-- * 表示接收所有用户的任务 -->
<!-- 默认情况下,当任务没有指定队列时,会提交到default中 -->
<aclSubmitApps>*</aclSubmitApps>
</queue>
<queue name="hadoop">
<minResources>512mb,4vcores</minResources>
<maxResources>20480mb,20vcores</maxResources>
<maxRunningApps>100</maxRunningApps>
<schedulingMode>fair</schedulingMode>
<weight>2.0</weight>
<!-- 允许提交任务的用户和组 -->
<aclSubmitApps>hadoop hadoop</aclSubmitApps>
<!-- 允许管理任务的用户和组 -->
<aclAdministerApps>hadoop hadoop</aclAdministerApps>
</queue>
<queue name="develop">
<minResources>512mb,4vcores</minResources>
<maxResources>20480mb,20vcores</maxResources>
<maxRunningApps>100</maxRunningApps>
<schedulingMode>fair</schedulingMode>
<weight>1</weight>
<aclSubmitApps>develop develop</aclSubmitApps>
<aclAdministerApps>develop develop</aclAdministerApps>
</queue>
<queue name="test">
<minResources>512mb,4vcores</minResources>
<maxResources>20480mb,20vcores</maxResources>
<maxRunningApps>100</maxRunningApps>
<schedulingMode>fair</schedulingMode>
<weight>1.5</weight>
<aclSubmitApps>test,hadoop,develop test</aclSubmitApps>
<aclAdministerApps>test group_businessC,supergroup</aclAdministerApps>
</queue>
</queue>
<!-- 任务提交规则,由scheduler调度器决定提交的任务进入指定队列 -->
<!-- 包含多个rule标签,rule标签默认情况下create参数=true,表示当前rule可以创建一个新队列来存放任务 -->
<queuePlacementPolicy>
<!-- 任务被提交到指定的队列;如果队列不存在,则不创建 -->
<rule name="specified" create="false"/>
<!-- 任务被提交到,以提交用户所属组名称所命名的队列;如果队列不存在,则不创建 -->
<rule name="primaryGroup" create="false" />
<!-- 当不满足以上规则,则提交到root.default队列(默认值) -->
<rule name="default" queue="root.default"/>
</queuePlacementPolicy>
</allocations>
3、如何配置Yarn容量调度。
答:
root
├── prod
└── dev
├── spark
└── hdp
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>prod,dev</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.dev.queues</name>
<value>hdp,spark</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.prod.capacity</name>
<value>40</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.dev.capacity</name>
<value>60</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.dev.maximum-capacity</name>
<value>75</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.dev.hdp.capacity</name>
<value>50</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.dev.spark.capacity</name>
<value>50</value>
</property>
</configuration>
4、可以动态更新调度器策略吗?
答:
可以,在集群不重启的情况下,在主节点执行以下命令即可。
yarn rmadmin -refreshQueues
但动态更新,只能新增、关闭队列,不能删除队列。
5、作业提交时,如何指定到特定队列。
答:
在MR中,使用mapreduce.job.queuename参数来指定。
Configuration conf = new Configuration();
conf.set("mapreduce.job.queuename", "hadoop");
6、Yarn参数调优。
答:
根据实际调整每个节点和单个任务申请内存值
1、MapReduce调优。
答:
2、常见调优参数。
配置参数 | 参数说明 |
---|---|
mapreduce.map.memory.mb | 一个MapTask可使用的资源上限(单位:MB),默认为1024。如果MapTask实际使用的资源量超过该值,则会被强制杀死。 |
mapreduce.reduce.memory.mb | 一个ReduceTask可使用的资源上限(单位:MB),默认为1024。如果ReduceTask实际使用的资源量超过该值,则会被强制杀死。 |
mapreduce.map.cpu.vcores | 每个MapTask可使用的最多cpu core数目,默认值: 1 |
mapreduce.reduce.cpu.vcores | 每个ReduceTask可使用的最多cpu core数目,默认值: 1 |
mapreduce.reduce.shuffle.parallelcopies | 每个Reduce去Map中取数据的并行数。默认值是5 |
mapreduce.reduce.shuffle.merge.percent | Buffer中的数据达到多少比例开始写入磁盘。默认值0.66 |
mapreduce.reduce.merge.inmem.threshold | Buffer中的数据文件达到多少后开始写入磁盘,默认1000 |
mapreduce.reduce.shuffle.input.buffer.percent | Buffer大小占Reduce可用内存的比例。默认值0.7 |
mapreduce.reduce.input.buffer.percent | 指定多少比例的内存用来存放Buffer中的数据,默认值是0.0 |
配置参数 | 参数说明 |
---|---|
yarn.scheduler.minimum-allocation-mb | 给应用程序Container分配的最小内存,默认值:1024 |
yarn.scheduler.maximum-allocation-mb | 给应用程序Container分配的最大内存,默认值:8192 |
yarn.scheduler.minimum-allocation-vcores | 每个Container申请的最小CPU核数,默认值:1 |
yarn.scheduler.maximum-allocation-vcores | 每个Container申请的最大CPU核数,默认值:32 |
yarn.nodemanager.resource.memory-mb | 给Containers分配的最大物理内存,默认值:8192 |
配置参数 | 参数说明 |
---|---|
mapreduce.task.io.sort.mb | Shuffle的环形缓冲区大小,默认100m |
mapreduce.map.sort.spill.percent | 环形缓冲区溢出的阈值,默认80% |
配置参数 | 参数说明 |
---|---|
mapreduce.map.maxattempts | 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。 |
mapreduce.reduce.maxattempts | 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。 |
mapreduce.task.timeout | Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个Task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该Task处于Block状态,可能是卡住了,也许永远会卡住,为了防止因为用户程序永远Block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是600000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。 |
3、MapReduce过程中,如何解决数据倾斜问题
答:
4、MapReduce过程中,如何避免小文件问题
答:
5、请描述Reduce过程。
答:
6、MapReduce在进行HashPartitoner时,会获取key的hashCode,之后为什么要与Integer.MAX_VALUE进行逻辑与计算?
答:
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
7、如何实现自定义Partitioner?
答:
public class MyPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
// TODO Auto-generated method stub
return 0;
}
}
job.setNumReduceTasks(5);
job.setPartitionerClass(MyPartitioner.class);
8、Hadoop为什么不用Java内置的序列化方法
答:
Java的序列化框架(Serializable)比较重,对象被序列化后,会携带很多额外信息(校验值、Header、继承体系等),不便于在网络中高效传输。所以Hadoop开发了自己的序列化机制。
9、Hadoop数据类型Writable,与Java基本数据类型有什么区别?
答:
public interface WritableComparable<T> extends Writable, Comparable<T>
Writable实现了WritableComparable接口,间接继承了Writable, Comparable类,实现了序列化、排序的功能。而这两个功能,在MapReduce中非常重要,排序是MapTask、ReduceTask默认操作,在集群中进行数据传输时要进行序列化。
10、Hadoop中如何实现自定义序列化
答:
在自定义类中,实现org.apache.hadoop.io.Writable接口,并实现write、readFields方法,完成序列化操作。
public class Person implements Writable {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "My name is " + this.name + ", and i'm " + this.age + " years old.";
}
// 序列化
@Override
public void write(DataOutput out) throws IOException {
out.write(this.name.getBytes());
out.writeInt(this.age);
}
// 反序列化
@Override
public void readFields(DataInput in) throws IOException {
this.setName(in.readLine());
this.setAge(in.readInt());
}
}
11、在MapReduce进行数据处理时,会进行split数据切片,它的默认拆分规则是?如果不按照默认规则进行拆分,会发生什么现象?
答:
Math.max(minSize, Math.min(maxSize, blockSize));
mapreduce.input.fileinputformat.split.minsize=1 默认值为1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值Long.MAXValue
blockSize为128M
mapreduce.input.fileinputformat.split.maxsize
mapreduce.input.fileinputformat.split.minsize
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
...
bytesRemaining -= splitSize;
}
12、在MapReduce中InputFormat做了什么事情?
答:
13、请描述MapReduce所有的FileInputFormat,并分别说明它们的用法(除CombineTextInputFormat)。
答:
14、请详细描述下CombineTextInputFormat。
答:
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
15、如何自定义InputFormat?
答:
// 初始化资源,一般用于打开IO流
// 常用IO流为FSDataInputStream,默认会定义在成员变量中:inputStream
public void initialize(InputSplit split,
TaskAttemptContext context
) throws IOException, InterruptedException
{
FileSplit fs = (FileSplit) split;
Path path = fs.getPath();
FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
inputStream = fileSystem.open(path);
}
// 关闭资源,一般用于关闭IO流
public void close() throws IOException {
IOUtils.closeStream(inputStream);
}
// 类似于指针,如果要读取的数据存在,返回true,否则返回false
public boolean nextKeyValue() throws IOException, InterruptedException {
}
// 获取当前行的key
public KEYIN getCurrentKey() throws IOException, InterruptedException;
// 获取当前行的value
public VALUEIN getCurrentValue() throws IOException, InterruptedException;
// 返回数据读取进度,0-1
public float getProgress() throws IOException, InterruptedException;
job.setInputFormatClass();
16、自定义InputFormat时,如果不想让文件在读取时被切片,可以怎么做?
答:
17、如果没有自定义Map、Reduce,默认会执行什么操作?
答:
18、如何实现自定义排序?
答:
// 如果返回正数,o在前面;返回负数,表示o在后面;相等,返回0
// 意味着如果降序,则o>this时,返回正数;o<this时返回负数。升序时,this>o返回正数;this<o返回负数
public int compareTo(T o) {
return Long.compare(o.value,this.value);
};
19、什么是Combiner?如何设置?
答:
20、如果Map输出时value没有意义(为空),应该如何处理?
答:
21、如何实现自定义分组
答:
public class OrderGroupingComparator extends WritableComparator {
// 初始化空对象用于数据接收
protected OrderGroupingComparator() {
super(OrderBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean aBean = (OrderBean) a;
OrderBean bBean = (OrderBean) b;
int result;
if (aBean.getOrder_id() > bBean.getOrder_id()) {
result = 1;
} else if (aBean.getOrder_id() < bBean.getOrder_id()) {
result = -1;
} else {
result = 0;
}
return result;
}
}
22、Reduce如何实现分组操作?
答:
23、如何实现自定义输出?
答:
public class MyRecordWriter extends RecordWriter<Text, IntWritable> {
private FSDataOutputStream file;
// 自定义方法,开流
public void initialize(TaskAttemptContext job) throws IOException {
String outdir = job.getConfiguration().get(FileOutputFormat.OUTDIR);
FileSystem fileSystem = FileSystem.get(job.getConfiguration());
file = fileSystem.create(new Path(outdir));
}
// 将数据写出
@Override
public void write(Text key, IntWritable value) throws IOException, InterruptedException {
// TODO Auto-generated method stub
file.write(value.hashCode());
}
// 关闭资源
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
IOUtils.closeStream(file);
}
}
public class MyOutputFormat extends FileOutputFormat<Text, IntWritable> {
@Override
public RecordWriter<Text, IntWritable> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
MyRecordWriter mRecordWriter = new MyRecordWriter();
mRecordWriter.initialize(job);
return mRecordWriter;
}
}
24、如何在MapReduce中实现MapJoin?
答:
job.addCacheFile(new URI("path"));
URI[] cacheFiles = context.getCacheFiles();
String path = cacheFiles[0].toString();
BufferReader bufferReader = new BufferReader(new FileReader(path));
String line;
while (StringUtils.isNotEmpty(line = bufferReader.readLine())){
//进行数据处理
}