hive创建目录时相关的几个hdfs中的类:

org.apache.hadoop.hdfs.DistributedFileSystem,FileSystem 的具体实现类 org.apache.hadoop.hdfs.DFSClient,client操作hdfs文件系统的类 org.apache.hadoop.fs.permission.FsPermission 文件权限相关类,主要的方法有getUMask和applyUMask方法

org.apache.hadoop.hdfs.DistributedFileSystem中需要注意的几个方法:
initialize,主要用来初始DFSClient的实例:

  @Override   public void initialize(URI uri, Configuration conf) throws IOException {     super.initialize(uri, conf);     setConf(conf);     String host = uri.getHost();     if (host == null) {       throw new IOException("Incomplete HDFS URI, no host: "+ uri);     }     this.dfs = new DFSClient(uri, conf, statistics);     this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());     this.workingDir = getHomeDirectory();   }

mkdir用来创建一个目录,mkdirs用来创建多个目录(类似于mkdir -p):

  public boolean mkdir(Path f, FsPermission permission) throws IOException {     statistics.incrementWriteOps(1);     return dfs.mkdirs(getPathName(f), permission, false);   }   public boolean mkdirs(Path f, FsPermission permission) throws IOException {     statistics.incrementWriteOps(1);     return dfs.mkdirs(getPathName(f), permission, true);   }

两者最终调用的都是DFSClient.mkdirs方法,org.apache.hadoop.hdfs.DFSClient的mkdirs方法:

final Conf dfsClientConf; ...   public boolean mkdirs(String src, FsPermission permission,       boolean createParent) throws IOException {     if (permission == null) { //如果传入的权限为null       permission = FsPermission.getDefault();     }     FsPermission masked = permission.applyUMask(dfsClientConf.uMask);     return primitiveMkdir(src, masked, createParent); //调用primitiveMkdir方法       }

这里需要注意 FsPermission.getDefault方法和Conf.uMask属性(Conf是DFSClient的内部类,主要用来设置默认配置)
Conf.uMask属性:

uMask = FsPermission.getUMask(conf); //由getUMask获取

getUMask方法:

 public static final String DEPRECATED_UMASK_LABEL = "dfs.umask";   public static final String UMASK_LABEL =                   CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY;  //fs.permissions.umask-mode   public static final int DEFAULT_UMASK =                   CommonConfigurationKeys.FS_PERMISSIONS_UMASK_DEFAULT; //0022                         public static FsPermission getUMask(Configuration conf) {     int umask = DEFAULT_UMASK;     if(conf != null) {       String confUmask = conf.get(UMASK_LABEL);       int oldUmask = conf.getInt(DEPRECATED_UMASK_LABEL, Integer.MIN_VALUE); //老的配置项:dfs.umask,默认值为Integer.MIN_VALUE(-2147483648)       try {         if(confUmask != null) { //如果设置了fs.permissions.umask-mode,则按这个umask,否则为默认的umask(0022)           umask = new UmaskParser(confUmask).getUMask();         }       } catch(IllegalArgumentException iae) {         // Provide more explanation for user-facing message         String type = iae instanceof NumberFormatException ? "decimal"             : "octal or symbolic";         String error = "Unable to parse configuration " + UMASK_LABEL             + " with value " + confUmask + " as " + type + " umask.";         LOG.warn(error);                 // If oldUmask is not set, then throw the exception         if (oldUmask == Integer.MIN_VALUE) {           throw new IllegalArgumentException(error);         }       }               if(oldUmask != Integer.MIN_VALUE) { //如果手动设置了老的配置项dfs.umask         if (umask != oldUmask) { //并且dfs.umask的值不等于0022           LOG.warn(DEPRECATED_UMASK_LABEL               + " configuration key is deprecated. " + "Convert to "               + UMASK_LABEL + ", using octal or symbolic umask "               + "specifications.");           // Old and new umask values do not match - Use old umask           umask = oldUmask; //umask为默认值0022         }       }     }         return new FsPermission((short)umask);   }

在hive中创建hdfs的目录有两种方法
1)通过Utilities的createDirsWithPermission方法,这种方法会重设fs.permissions.umask-mode
2)直接通过DistributedFileSystem的mkdirs方法创建
两者最终都是调用了DFSClient的mkdirs方法,不同的是调用Utilities.createDirsWithPermission创建的目录权限在proxy时权限有可能是777(因为手动设置了权限为777),
比如:
Context类的构造函数中创建临时文件目录通过Context.getMRScratchDir调getLocalScratchDir(local job)或getScratchDir(非local job),其中getScratchDir中调用Utilities.createDirsWithPermission方法调用目录

public static boolean createDirsWithPermission(Configuration conf, Path mkdirPath,     FsPermission fsPermission, boolean recursive) throws IOException {   String origUmask = null;   LOG.warn("Create dirs " + mkdirPath + " with permission " + fsPermission + " recursive " +       recursive);   if (recursive) {   //如果recursive为true,设置fs.permissions.umask-mode为000,   //默认情况下recursive = SessionState.get().isHiveServerQuery() &&conf.getBoolean(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname,HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.defaultBoolVal);   //即时来自hiveserver的请求,并且开启了doas,这里还会把权限设置为777(这里我增加了一个逻辑,如果设置了proxy,recursive也为true)   /**   boolean recursive = false;   if (SessionState.get() != null) {     recursive = (SessionState.get().isHiveServerQuery() &&         conf.getBoolean(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname,             HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.defaultBoolVal))||(HiveConf.getBoolVar(conf,HiveConf.ConfVars.HIVE_USE_CUSTOM_PROXY));     fsPermission = new FsPermission((short)00777);   }   */     origUmask = conf.get("fs.permissions.umask-mode");     conf.set("fs.permissions.umask-mode", "000");   }   FileSystem fs = ShimLoader.getHadoopShims().getNonCachedFileSystem(mkdirPath.toUri(), conf);   //这里是DFSClient的实例   boolean retval = false;   try {     retval = fs.mkdirs(mkdirPath, fsPermission);     resetConfAndCloseFS(conf, recursive, origUmask, fs);   } catch (IOException ioe) {     try {       resetConfAndCloseFS(conf, recursive, origUmask, fs); //调用resetConfAndCloseFS,reset fs.permissions.umask-mode的设置     }     catch (IOException e) {       // do nothing - double failure     }   }   return retval; }

resetConfAndCloseFS方法用来重设fs.permissions.umask-mode的设置,这样如果后面创建目录不是使用Utilities.createDirsWithPermission就会使用这个重设的配置

private static void resetConfAndCloseFS (Configuration conf, boolean unsetUmask,     String origUmask, FileSystem fs) throws IOException {   if (unsetUmask) { //unsetUmask为true,即recursive为true的话,需要重设fs.permissions.umask-mode     if (origUmask != null) { //如果有设置项的话,使用设置项       conf.set("fs.permissions.umask-mode", origUmask);     } else {       conf.unset("fs.permissions.umask-mode"); //这里虽然可以unset,后面会有默认值     }   }   fs.close(); }

通过查看DFSClient的源码,发现在DFSClient的构造函数中会初始化ugi的信息,默认为当前用户

final UserGroupInformation ugi; ... this.ugi = UserGroupInformation.getCurrentUser();  如果更改成proxy用户,通过运行hadoop fs -mkdir测试,发现生成的文件目录属主还是当前登录用户 更改DFSClient的构造方法: //this.ugi = UserGroupInformation.getCurrentUser(); if(conf.getBoolean("use.custom.proxy",false)){   this.ugi = UserGroupInformation.createRemoteUser(conf.get("custom.proxy.user")); }else{   this.ugi = UserGroupInformation.getCurrentUser(); }

在hdfs-site.xml配置中增加:
dfs配置中增加:

<property>     <name>use.custom.proxy</name>             <value>true</value> </property> <property>     <name>custom.proxy.user</name>            <value>ericni</value> </property>

使用hdfs创建目录后,目录的属主仍然是hdfs,而数据写入的用户为提交job的用户。
因为上面的原因,要想使创建的hdfs的目录属主为proxy的用户,可以采用创建完后设置owner的方法。
通过查看DistributedFileSystem类的api,发现有setOwner的方法。
以insert overwrite 语句为例,在mapred job提交之前,会根据job的上下文内容,创建map和reduce的临时目录,这个目录是最终数据落地的目录,落地之后,在job完成的finally阶段,会通过MoveTask移动到对应的目录下面临时数据写入目录在ExecDriver类的execute方法中生成:

public int execute(DriverContext driverContext) {   IOPrepareCache ioPrepareCache = IOPrepareCache.get();   ioPrepareCache.clear();   boolean success = true;   Context ctx = driverContext.getCtx();   boolean ctxCreated = false;   Path emptyScratchDir;   MapWork mWork = work.getMapWork();   ReduceWork rWork = work.getReduceWork();   try {     if (ctx == null) {       ctx = new Context(job);       ctxCreated = true;     }     emptyScratchDir = ctx.getMRTmpPath();     FileSystem fs = emptyScratchDir.getFileSystem(job);     fs.mkdirs(emptyScratchDir);   } catch (IOException e) {     e.printStackTrace();     console.printError("Error launching map-reduce job", "\n"         + org.apache.hadoop.util.StringUtils.stringifyException(e));     return 5;   } ....   List<Path> inputPaths = Utilities.getInputPaths(job, mWork, emptyScratchDir, ctx);  //获取输入目录   Utilities.setInputPaths(job, inputPaths);   Utilities.setMapRedWork(job, work, ctx.getMRTmpPath()); ....   Utilities.createTmpDirs(job, mWork); //创建map临时目录   Utilities.createTmpDirs(job, rWork); //创建reduce临时目录
一种思路,在外层创建目录后setOwner,可以在Utilities中增加一个方法调用setOwner: public static void setDirWithOwner(Configuration conf,Path mkdirPath,         String username,String groupname) throws  IOException {    LOG.warn("in Utilities setDirWithOwner path: " + mkdirPath + ",username: " + username + ",groupname: " + groupname);    FileSystem fs = ShimLoader.getHadoopShims().getNonCachedFileSystem(mkdirPath.toUri(), conf);    try {       fs.setOwner(mkdirPath, username, groupname); //调用DistributedFileSystem.setOwner方法    }catch (IOException ios) {      //no-op    }  }

同时更改createTmpDirs方法:

  private static void createTmpDirs(Configuration conf,       List<Operator<? extends OperatorDesc>> ops) throws IOException {     FsPermission fsPermission = new FsPermission((short)00777);     while (!ops.isEmpty()) {       Operator<? extends OperatorDesc> op = ops.remove(0);       if (op instanceof FileSinkOperator) {         FileSinkDesc fdesc = ((FileSinkOperator) op).getConf(); //org.apache.hadoop.hive.ql.plan.FileSinkDesc         Path tempDir = fdesc.getDirName(); //获取目录名         if (tempDir != null) {           Path tempPath = Utilities.toTempPath(tempDir);  //目录增加_tmp.前缀           createDirsWithPermission(conf, tempPath, fsPermission);             if (conf.getBoolean("use.custom.proxy",false)) { //如果设置了use.custom.proxy,则调用setDirWithOwner方法,设置目录权限           LOG.warn("set owner after create dirs");           String username = conf.get("custom.proxy.user");           setDirWithOwner(conf,tempPath,username,null);         }         }       }       if (op.getChildOperators() != null) {         ops.addAll(op.getChildOperators());       }     }   }

上面这种方法有一定的局限性,比如是使用了Utilities.createTmpDirs的方法创建的目录才有用(比如map或者reduce的临时数据目录)。
可以通过改下层的实现:
在DFSClient中增加一个setOwner方法:

public boolean setOwner(String src, String username) throws IOException {     boolean setResult = false;     checkOpen();     try {         namenode.setOwner(src, username, null);         setResult = true;     } catch(RemoteException re) {         throw re.unwrapRemoteException(AccessControlException.class,                                        FileNotFoundException.class,                                        SafeModeException.class,                                        UnresolvedPathException.class);    }finally{         return setResult;    } }

同时更改primitiveMkdir为如下内容:

public boolean primitiveMkdir(String src, FsPermission absPermission,   boolean createParent)   throws IOException {   checkOpen();   boolean MkRe;   boolean SetRe;   if (absPermission == null) {     absPermission =       FsPermission.getDefault().applyUMask(dfsClientConf.uMask);   }   if(LOG.isDebugEnabled()) {     LOG.debug(src + ": masked=" + absPermission);   }   try {       MkRe = namenode.mkdirs(src, absPermission, createParent); //namenode:org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB       if (this.conf.getBoolean("use.custom.proxy",false)){           LOG.warn("change primitiveMkdir add conf: " + this.conf.getBoolean("use.custom.proxy",false));           LOG.warn("change primitiveMkdir add conf: " + this.conf.get("custom.proxy.user"));           String username = this.conf.get("custom.proxy.user");             if (("").equals(username)||username == null||("hdfs").equals(username)){                 //no-op                  SetRe = true;             }else{                  SetRe = setOwner(src,username);             }             }else {           SetRe = true;       }       return MkRe&&SetRe;   } catch(RemoteException re) {     throw re.unwrapRemoteException(AccessControlException.class,                                    InvalidPathException.class,                                    FileAlreadyExistsException.class,                                    FileNotFoundException.class,                                    ParentNotDirectoryException.class,                                    SafeModeException.class,                                    NSQuotaExceededException.class,                                    DSQuotaExceededException.class,                                    UnresolvedPathException.class);   } }

这样,只要是调用了DFSClient的primitiveMkdir方法创建的目录(正常情况下创建目录都会调用primitiveMkdir方法),在proxy的情况下都可以更改目录。


到这里,hive的proxy算是开发完成了,为了实现proxy的功能,对hive和hadoop的代码更改如下:

1.HiveConf中增加两个配置项
2.重写HadoopDefaultAuthenticator的setConf方法
3.更改Context构造方法中关于scratch目录的项
4.更改Utilities中的createDirsWithPermission方法和createTmpDirs方法,并新增setDirWithOwner方法
5.更改HiveHistoryImpl构造方法中关于日志路径的项
6.更改JobClient的init方法
7.更改DFSClient的构造方法,增加一个setOwner方法,同时更改primitiveMkdir方法