FileSystemManager.java
3.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package com.dianping.cat.hadoop.hdfs;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.Cat;
import com.dianping.cat.config.server.ServerConfigManager;
public class FileSystemManager implements Initializable {
@Inject
private ServerConfigManager m_configManager;
private String m_defaultBaseDir;
private Map<String, FileSystem> m_fileSystems = new HashMap<String, FileSystem>();
private Configuration m_config;
public long getFileMaxSize(String id) {
return m_configManager.getHdfsFileMaxSize(id);
}
public FileSystem getFileSystem(String id, StringBuilder basePath) throws IOException {
String serverUri = m_configManager.getHdfsServerUri(id);
String baseDir = m_configManager.getHdfsBaseDir(id);
FileSystem fs = m_fileSystems.get(id);
if (serverUri == null || !serverUri.startsWith("hdfs:")) {
// no config found, use local HDFS
if (fs == null) {
fs = FileSystem.getLocal(m_config);
m_fileSystems.put(id, fs);
}
basePath.append(m_defaultBaseDir).append("/");
if (baseDir == null) {
basePath.append(id);
} else {
basePath.append(baseDir);
}
} else {
if (fs == null) {
URI uri = URI.create(serverUri);
fs = FileSystem.get(uri, m_config);
m_fileSystems.put(id, fs);
}
if (baseDir == null) {
basePath.append(id);
} else {
basePath.append(baseDir);
}
}
return fs;
}
// prepare file /etc/krb5.conf
// prepare file /data/appdatas/cat/cat.keytab
// prepare mapping [host] => [ip] at /etc/hosts
// put core-site.xml at / of classpath
// use "hdfs://dev80.hadoop:9000/user/cat" as example. Notes: host name can't
// be an ip address
private Configuration getHdfsConfiguration() throws IOException {
Configuration config = new Configuration();
Map<String, String> properties = m_configManager.getHdfsProperties();
String authentication = properties.get("hadoop.security.authentication");
config.setInt("io.file.buffer.size", 8192);
config.setInt("dfs.replication", 3);
for (Map.Entry<String, String> property : properties.entrySet()) {
config.set(property.getKey(), property.getValue());
}
if ("kerberos".equals(authentication)) {
// For MAC OS X
// -Djava.security.krb5.realm=OX.AC.UK
// -Djava.security.krb5.kdc=kdc0.ox.ac.uk:kdc1.ox.ac.uk
System.setProperty("java.security.krb5.realm",
getValue(properties, "java.security.krb5.realm", "DIANPING.COM"));
System.setProperty("java.security.krb5.kdc", getValue(properties, "java.security.krb5.kdc", "192.168.7.80"));
UserGroupInformation.setConfiguration(config);
}
return config;
}
private String getValue(Map<String, String> properties, String name, String defaultValue) {
String value = properties.get(name);
if (value != null) {
return value;
} else {
return defaultValue;
}
}
@Override
public void initialize() throws InitializationException {
m_defaultBaseDir = m_configManager.getHdfsLocalBaseDir("hdfs");
if (m_configManager.isHdfsOn()) {
try {
m_config = getHdfsConfiguration();
SecurityUtil.login(m_config, "dfs.cat.keytab.file", "dfs.cat.kerberos.principal");
} catch (IOException e) {
Cat.logError(e);
}
} else {
m_config = new Configuration();
}
}
}