使用hadoop java api操作远程hdfs

网上应该一搜就会有很多关于hdfs基本操作的文章和代码,但是在一些细节上,对于新手很重要的那些信息,都没有比较清楚地说明。对,就是对我这样的新手很重要的内容,找了好久找不到。所以,经过一些实践后,我就觉得很有必要把这些东西,新手真的需要的东西稍微记录一下。

为什么是远程

如果只是像网上多数文章里的情景,那么基本就一台机器上装套hadoop,然后也在同样的机器上去尝试,应该是一点问题都没有吧。不过我是没有尝试过了,毕竟对我来说,很难搞到一台带ui的linux机器给我装hadoop和ide去搞开发啊,基本就是windows。而且真正工作要用到hdfs的话,基本不可能就装在自己的机器吧,所以最终还是得要连接上远程的hdfs去操作的呢。那么一上来就这样不是更好么?

最简单的例子

乱七八糟的不多写了。首先上个最简单的例子,能够证明成功连上了远端的hdfs并读取其中数据的例子。

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.revokey.demo</groupId>
<artifactId>demo-hdfs</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.0</version>
</dependency>
</dependencies>
</project>

没错,就依赖这么点东西就够了。

HdfsDemo.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.revokey.demo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
public class HdfsDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop-namenode:8020");
FileSystem fs = FileSystem.get(conf);
FileStatus[] statuses = fs.listStatus(new Path("/"));
for (FileStatus status : statuses) {
System.out.println(status);
}
}
}

稍微解释下:

  • main方法开头两行,设置hdfs的远程调用地址。这很显然,总得告诉程序hdfs在哪里,然后才可能连过去啊。这个配置一般在hadoop的core-site.xml中可以看到。
  • 然后就是使用这个配置,连接上hdfs了
  • 然后把根目录的信息打印出来

使用idea debug的输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Connected to the target VM, address: '127.0.0.1:9856', transport: 'socket'
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
log4j:WARN No appenders could be found for logger (org.apache.htrace.core.Tracer).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
HdfsLocatedFileStatus{path=hdfs://x.x.x.x:8020/apps; isDirectory=true; modification_time=1517557661298; access_time=0; owner=hadoop; group=supergroup; permission=rwxr-xr-x; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}
HdfsLocatedFileStatus{path=hdfs://x.x.x.x:8020/emr; isDirectory=true; modification_time=1517557567354; access_time=0; owner=hadoop; group=supergroup; permission=rwxrwx---; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}
HdfsLocatedFileStatus{path=hdfs://x.x.x.x:8020/hbase; isDirectory=true; modification_time=1518326887123; access_time=0; owner=hadoop; group=supergroup; permission=rwxr-xr-x; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}
HdfsLocatedFileStatus{path=hdfs://x.x.x.x:8020/spark-history; isDirectory=true; modification_time=1524037867303; access_time=0; owner=hadoop; group=supergroup; permission=rwxr-xr-x; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}
HdfsLocatedFileStatus{path=hdfs://x.x.x.x:8020/tmp; isDirectory=true; modification_time=1517557709416; access_time=0; owner=hadoop; group=supergroup; permission=rwx-wx-wx; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}
HdfsLocatedFileStatus{path=hdfs://x.x.x.x:8020/user; isDirectory=true; modification_time=1517557611265; access_time=0; owner=hadoop; group=supergroup; permission=rwxr-xr-x; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}
Disconnected from the target VM, address: '127.0.0.1:9856', transport: 'socket'
Process finished with exit code 0

看HdfsLocatedFileStatus开头的输出行,可见是可以查询远端hdfs的目录信息了。
slf4j和log4j的警告应该是我没有加入相关的依赖导致的,无视就好。

上传文件

用户权限问题

新手肯定会遇到这样的错误

1
org.apache.hadoop.security.AccessControlException: org.apache.hadoop.security .AccessControlException: Permission denied: user=Administrator, access=WRITE, inode="hadoop": hadoop:supergroup:rwxr-xr-x

看错误内容基本能够知道,就是用户权限的问题。跟linux类似,操作hadoop上的文件,需要相应的用户权限,前面下载没有出现问题,是因为other具有读权限。所以,只要用户权限具有对应文件的写权限,那应该就不会出现上面的错误了。

看到user=Administrator,应该就是使用了我当前windows的用户名了。我不可能又重新在windows上新建一个hadoop用户做开发吧,也不可能把hadoop上的文件权限又改一遍,那么怎么修改这个user呢,显然只能是配置了。通过看FileSystem.get的代码实现,会看到类似下面的代码:

1
2
3
4
5
6
7
8
String envUser;
if (!UserGroupInformation.isSecurityEnabled() && user == null) {
envUser = System.getenv("HADOOP_USER_NAME");
if (envUser == null) {
envUser = System.getProperty("HADOOP_USER_NAME");
}
user = envUser == null ? null : new User(envUser);
}

只要在系统变量里设置好HADOOP_USER_NAME=hadoop,就可以使用hadoop的用户权限去操作hdfs的文件了。

使用腾讯云emr的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
217661 [Thread-7] INFO org.apache.hadoop.hdfs.DataStreamer - Exception in createBlockOutputStream blk_1073751824_11005
java.net.ConnectException: Connection timed out: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.hdfs.DataStreamer.createSocketForPipeline(DataStreamer.java:253)
at org.apache.hadoop.hdfs.DataStreamer.createBlockOutputStream(DataStreamer.java:1725)
at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1679)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:716)
217678 [Thread-7] WARN org.apache.hadoop.hdfs.DataStreamer - Abandoning BP-2129223914-192.168.1.15-1517557509338:blk_1073751824_11005
217763 [Thread-7] WARN org.apache.hadoop.hdfs.DataStreamer - Excluding datanode DatanodeInfoWithStorage[192.168.1.12:4001,DS-76f279b4-3a0b-4407-8d6a-dfb88e7d95ba,DISK]
238831 [Thread-7] INFO org.apache.hadoop.hdfs.DataStreamer - Exception in createBlockOutputStream blk_1073751825_11006
java.net.ConnectException: Connection timed out: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.hdfs.DataStreamer.createSocketForPipeline(DataStreamer.java:253)
at org.apache.hadoop.hdfs.DataStreamer.createBlockOutputStream(DataStreamer.java:1725)
at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1679)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:716)
238832 [Thread-7] WARN org.apache.hadoop.hdfs.DataStreamer - Abandoning BP-2129223914-192.168.1.15-1517557509338:blk_1073751825_11006
238905 [Thread-7] WARN org.apache.hadoop.hdfs.DataStreamer - Excluding datanode DatanodeInfoWithStorage[192.168.1.16:4001,DS-db5ce4d3-c3d6-4b7c-8e5d-8379acfce24c,DISK]
259968 [Thread-7] INFO org.apache.hadoop.hdfs.DataStreamer - Exception in createBlockOutputStream blk_1073751826_11007
java.net.ConnectException: Connection timed out: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.hdfs.DataStreamer.createSocketForPipeline(DataStreamer.java:253)
at org.apache.hadoop.hdfs.DataStreamer.createBlockOutputStream(DataStreamer.java:1725)
at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1679)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:716)
259969 [Thread-7] WARN org.apache.hadoop.hdfs.DataStreamer - Abandoning BP-2129223914-192.168.1.15-1517557509338:blk_1073751826_11007
260004 [Thread-7] WARN org.apache.hadoop.hdfs.DataStreamer - Excluding datanode DatanodeInfoWithStorage[192.168.1.11:4001,DS-729f5121-92c9-4d1b-b680-56f6a2d6f275,DISK]
260070 [Thread-7] WARN org.apache.hadoop.hdfs.DataStreamer - DataStreamer Exception
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /test.txt could only be replicated to 0 nodes instead of minReplication (=1). There are 3 datanode(s) running and 3 node(s) are excluded in this operation.
at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1576)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3107)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3031)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:725)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1491)
at org.apache.hadoop.ipc.Client.call(Client.java:1437)
at org.apache.hadoop.ipc.Client.call(Client.java:1347)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy11.addBlock(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:504)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
at com.sun.proxy.$Proxy12.addBlock(Unknown Source)
at org.apache.hadoop.hdfs.DFSOutputStream.addBlock(DFSOutputStream.java:1078)
at org.apache.hadoop.hdfs.DataStreamer.locateFollowingBlock(DataStreamer.java:1865)
at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1668)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:716)

腾讯云的emr,默认是只有namenode放开外网访问,其他节点都只能私有网络访问,一开始没有注意这个问题,看到上传文件时出现上面的错误,想了想,才发现是有点问题的。
上传时,客户端会先从namenode获取datanode的地址,然后将文件传到datanode。hdfs是能做replication的,数量看具体的配置,datanode一般也会配置为多台,数量大于等于replication数量。看上面的错误,可以知道emr配置minReplication为1,datanode个数为3,上传一个test.txt文件,先尝试传到192.168.1.12:4001的datanode,但是连接失败,然后再尝试192.168.1.16:4001的datanode,同样失败,再尝试192.168.1.12:4001的datanode,还是失败,所有datanode都尝试完,成功个数小于minReplication,所以最后判断上传任务失败,抛出异常。
所以,要远程上传文件到hdfs,必须要确保客户端是可以连接到datanode的。

但是这种情况,就算datanode是允许外网访问的,新手也可能会遇到客户端也会连接不上datanode的问题。为什么呢,是因为一般hadoop各个节点之间是通过内网访问的,客户端从namenode获取的datanode地址也是内网的。怎么样让namenode给客户端返回datanode的外网地址呢?这个我暂时也找不到在哪里配置,但是有一种曲线救国的方法。就是设置客户端使用hostname去访问datanode,这样只要在本地hosts配置一下对应的外网ip就可以了。

1
conf.set("dfs.client.use.datanode.hostname", "true");

完整例子

最后是一个简单的例子,包括了各种基本的文件和目录操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.revokey.demo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
public class HdfsDemo {
private Logger log = LoggerFactory.getLogger(HdfsDemo.class);
private FileSystem fs;
HdfsDemo() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop-namenode:8020");
conf.set("dfs.client.use.datanode.hostname", "true");
fs = FileSystem.get(conf);
}
void Close() throws IOException {
fs.close();
}
void ListStatus(String path) throws IOException {
FileStatus[] statuses = fs.listStatus(new Path("/"));
for (FileStatus status : statuses) {
log.info(status.toString());
}
}
void ReadFile(String path, OutputStream output) throws IOException {
FSDataInputStream input = fs.open(new Path(path));
IOUtils.copyBytes(input, output, 4096);
input.close();
}
void CreateDir(String dir) throws IOException {
fs.mkdirs(new Path(dir));
}
void CreateFile(String src, String dst) throws IOException {
fs.copyFromLocalFile(false, true, new Path(src), new Path(dst));
}
void Download(String src, String dst) throws IOException {
fs.copyToLocalFile(new Path(src), new Path(dst));
}
void Delete(String path, final boolean recursive) throws IOException {
fs.delete(new Path(path), recursive);
}
public static void main(String[] args) throws Exception {
HdfsDemo demo = new HdfsDemo();
demo.CreateDir("/test/123");
demo.CreateFile("C:/Users/Administrator/Desktop/test.txt", "/test.txt");
demo.CreateFile("C:/Users/Administrator/Desktop/test.txt", "/test/123/test.txt");
demo.ListStatus("/");
demo.ReadFile("/test.txt", System.out);
demo.Download("/test", "C:/Users/Administrator/Desktop/");
demo.Delete("/test.txt", false);
demo.Delete("/test", true);
demo.Close();
}
}
饮杯茶O(∩_∩)O~