Driver类是对

org.apache.hadoop.hive.ql.processors.CommandProcessor.java

接口的实现,重写了run方法,定义了常见sql的执行方式.

public class Driver implements CommandProcessor

具体的方法调用顺序:

run--->runInternal--->(createTxnManager+recordValidTxns)----->compileInternal---> compile--analyzer(BaseSemanticAnalyzer)--->execute

其中compile和execute是两个比较重要的方法:

compile用来完成语法和语义的分析,生成执行计划

execute执行物理计划,即提交相应的mapredjob

通过打印perflog可以看到Driver类的简单地时序图:

下面来看下Driver类的几个常用的方法实现:

1)createTxnManager 用来获取目前设置的用于实现lock的类,比如:

org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager

2)checkConcurrency 用来判断当前hive设置是否支持并发控制:

boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);

主要是通过判断hive.support.concurrency参数,默认是false

3)getClusterStatus 调用JobClient类的getClusterStatus方法来获取集群的状态:

public ClusterStatus getClusterStatus() throws Exception {     ClusterStatus cs;     try {       JobConf job = new JobConf(conf , ExecDriver.class);       JobClient jc = new JobClient(job);       cs = jc.getClusterStatus();     } catch (Exception e) {       e.printStackTrace();       throw e;     }     LOG.info( "Returning cluster status: " + cs.toString());     return cs;   }

4)getSchema   //返回表的schema信息

5)

doAuthorization/doAuthorizationV2/getHivePrivObjects

用来在开启权限验证情况下对sql的权限检测操作

6)

getLockObjects/acquireReadWriteLocks/releaseLocks

都是和锁相关的方法 ,其中getLockObjects用来获取锁的对象(锁的路径,锁的模式等),最终返回一个包含所有锁的list,acquireReadWriteLocks用来控制获取锁,releaseLocks用来释放锁:

getLockObjects:   private List<HiveLockObj> getLockObjects(Database d, Table t, Partition p, HiveLockMode mode)       throws SemanticException {     List<HiveLockObj> locks = new LinkedList<HiveLockObj>();     HiveLockObjectData lockData =       new HiveLockObjectData( plan.getQueryId(),                              String. valueOf(System.currentTimeMillis ()),                              "IMPLICIT",                              plan.getQueryStr());     if (d != null) {       locks.add( new HiveLockObj(new HiveLockObject(d.getName(), lockData), mode));  //数据库层面的锁       return locks;     }     if (t != null) {  // 表层面的锁       locks.add( new HiveLockObj(new HiveLockObject(t.getDbName(), lockData), mode));       locks.add( new HiveLockObj(new HiveLockObject(t, lockData), mode));       mode = HiveLockMode.SHARED;       locks.add( new HiveLockObj(new HiveLockObject(t.getDbName(), lockData), mode));       return locks;     }     if (p != null) { //分区层面的锁       locks.add( new HiveLockObj(new HiveLockObject(p.getTable().getDbName(), lockData), mode));       if (!(p instanceof DummyPartition)) {         locks.add( new HiveLockObj(new HiveLockObject(p, lockData), mode));       }       // All the parents are locked in shared mode       mode = HiveLockMode.SHARED;       // For dummy partitions, only partition name is needed       String name = p.getName();       if (p instanceof DummyPartition) {         name = p.getName().split( "@")[2];       }       String partialName = "";       String[] partns = name.split( "/");       int len = p instanceof DummyPartition ? partns.length : partns.length - 1;       Map<String, String> partialSpec = new LinkedHashMap<String, String>();       for ( int idx = 0; idx < len; idx++) {         String partn = partns[idx];         partialName += partn;         String[] nameValue = partn.split( "=");         assert(nameValue.length == 2);         partialSpec.put(nameValue[0], nameValue[1]);         try {           locks.add( new HiveLockObj(                       new HiveLockObject(new DummyPartition(p.getTable(), p.getTable().getDbName()                                                             + "/" + p.getTable().getTableName()                                                             + "/" + partialName,                                                               partialSpec), lockData), mode));           partialName += "/";         } catch (HiveException e) {           throw new SemanticException(e.getMessage());         }       }       locks.add( new HiveLockObj(new HiveLockObject(p.getTable(), lockData), mode));       locks.add( new HiveLockObj(new HiveLockObject(p.getTable().getDbName(), lockData), mode));     }     return locks;   }

acquireReadWriteLocks调用了锁具体实现类的acquireLocks方法

releaseLocks调用了锁具体实现类的releaseLocks方法

7)

run方法是Driver类的入口方法,调用了runInternal方法,我们主要来看runInternal的方法,大体步骤:

运行hive.exec.driver.run.hooks中设置的hook, 运行HiveDriverRunHook相关类的的preDriverRun方法---->检测是否支持并发,并获取并发实现的类 --->compileInternal---->运行锁相关的操作(判断是否只对mapred job进行锁,获取锁等) ---->调用execute---->释放锁--->运行HiveDriverRunHook相关类的的postDriverRun方法 ---->返回CommandProcessorResponse对象

相关代码:

private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled)       throws CommandNeedRetryException {     errorMessage = null;     SQLState = null;     downstreamError = null;     if (!validateConfVariables()) {       return new CommandProcessorResponse(12, errorMessage , SQLState );     }     HiveDriverRunHookContext hookContext = new HiveDriverRunHookContextImpl(conf , command);     // Get all the driver run hooks and pre-execute them.     List<HiveDriverRunHook> driverRunHooks;     try {               //运行hive.exec.driver.run.hooks中设置的hook       driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS,            HiveDriverRunHook. class);                for (HiveDriverRunHook driverRunHook : driverRunHooks) {           driverRunHook.preDriverRun(hookContext); //运行HiveDriverRunHook相关类的的preDriverRun方法       }     } catch (Exception e) {       errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);       SQLState = ErrorMsg. findSQLState(e.getMessage());       downstreamError = e;       console.printError( errorMessage + "\n"           + org.apache.hadoop.util.StringUtils.stringifyException(e));       return new CommandProcessorResponse(12, errorMessage , SQLState );     }     // Reset the perf logger     PerfLogger perfLogger = PerfLogger.getPerfLogger( true);     perfLogger.PerfLogBegin( CLASS_NAME, PerfLogger.DRIVER_RUN);     perfLogger.PerfLogBegin( CLASS_NAME, PerfLogger.TIME_TO_SUBMIT);     int ret;     boolean requireLock = false;     boolean ckLock = false;     try {       ckLock = checkConcurrency();  //检测是否支持并发,并获取并发实现的类,比如常用的 org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager       createTxnManager();     } catch (SemanticException e) {       errorMessage = "FAILED: Error in semantic analysis: " + e.getMessage();       SQLState = ErrorMsg. findSQLState(e.getMessage());       downstreamError = e;       console.printError( errorMessage, "\n"           + org.apache.hadoop.util.StringUtils.stringifyException(e));       ret = 10;       return new CommandProcessorResponse(ret, errorMessage , SQLState );     }     ret = recordValidTxns();     if (ret != 0) return new CommandProcessorResponse(ret, errorMessage, SQLState);     if (!alreadyCompiled) {       ret = compileInternal(command);  //调用compileInternal方法       if (ret != 0) {         return new CommandProcessorResponse(ret, errorMessage, SQLState);       }     }     // the reason that we set the txn manager for the cxt here is because each     // query has its own ctx object. The txn mgr is shared across the     // same instance of Driver, which can run multiple queries.     ctx.setHiveTxnManager( txnMgr);     if (ckLock) {  //断是否只对mapred job进行锁,参数hive.lock.mapred.only.operation,默认为false       boolean lockOnlyMapred = HiveConf.getBoolVar( conf, HiveConf.ConfVars.HIVE_LOCK_MAPRED_ONLY);       if(lockOnlyMapred) {         Queue<Task<? extends Serializable>> taskQueue = new LinkedList<Task<? extends Serializable>>();         taskQueue.addAll( plan.getRootTasks());         while (taskQueue.peek() != null) {           Task<? extends Serializable> tsk = taskQueue.remove();           requireLock = requireLock || tsk.requireLock();           if(requireLock) {             break;           }           if (tsk instanceof ConditionalTask) {             taskQueue.addAll(((ConditionalTask)tsk).getListTasks());           }           if(tsk.getChildTasks()!= null) {             taskQueue.addAll(tsk.getChildTasks());           }           // does not add back up task here, because back up task should be the same           // type of the original task.         }       } else {         requireLock = true;       }     }     if (requireLock) { //获取锁       ret = acquireReadWriteLocks();       if (ret != 0) {         try {           releaseLocks( ctx.getHiveLocks());         } catch (LockException e) {           // Not much to do here         }         return new CommandProcessorResponse(ret, errorMessage, SQLState);       }     }     ret = execute(); //job运行     if (ret != 0) {       //if needRequireLock is false, the release here will do nothing because there is no lock       try {         releaseLocks( ctx.getHiveLocks());       } catch (LockException e) {         // Nothing to do here       }       return new CommandProcessorResponse(ret, errorMessage , SQLState );     }     //if needRequireLock is false, the release here will do nothing because there is no lock     try {       releaseLocks( ctx.getHiveLocks());     } catch (LockException e) {       errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);       SQLState = ErrorMsg. findSQLState(e.getMessage());       downstreamError = e;       console.printError( errorMessage + "\n"           + org.apache.hadoop.util.StringUtils.stringifyException(e));       return new CommandProcessorResponse(12, errorMessage , SQLState );     }     perfLogger.PerfLogEnd( CLASS_NAME, PerfLogger.DRIVER_RUN);     perfLogger.close(LOG, plan);     // Take all the driver run hooks and post-execute them.     try {       for (HiveDriverRunHook driverRunHook : driverRunHooks) {  //运行HiveDriverRunHook相关类的的postDriverRun方法           driverRunHook.postDriverRun(hookContext);       }     } catch (Exception e) {       errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);       SQLState = ErrorMsg. findSQLState(e.getMessage());       downstreamError = e;       console.printError( errorMessage + "\n"           + org.apache.hadoop.util.StringUtils.stringifyException(e));       return new CommandProcessorResponse(12, errorMessage , SQLState );     }     return new CommandProcessorResponse(ret);   }

8)

再来看下compileInternal方法   private static final Object compileMonitor = new Object();   private int compileInternal(String command) {     int ret;     synchronized ( compileMonitor) {       ret = compile(command);  //调用compile方法     }     if (ret != 0) {       try {         releaseLocks( ctx.getHiveLocks());       } catch (LockException e) {         LOG.warn("Exception in releasing locks. "             + org.apache.hadoop.util.StringUtils.stringifyException(e));       }     }     return ret;   }

 调用了compile方法,compile方法分析命令,生成Task,关于compile的具体实现后面详细讲解

9.execute方法,提交task并等待task运行完毕,并打印task运行的信息,比如消耗的时间等

(这里信息也比较多,后面单独讲解