当前位置: 首页>数据库>正文

hive create table 时指定注释 hive create table as select * from

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-Create%2FDropTable


CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name 

 [(col_name data_type [COMMENT col_comment], ...)] 

 [COMMENT table_comment] 

 [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] 

 [CLUSTERED BY (col_name, col_name, ...) [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] 

 [ 

 [ROW FORMAT row_format] [STORED AS file_format] 

 | STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...)] (Note: only available starting with 0.6.0) 

 ] 

 [LOCATION hdfs_path] 

 [TBLPROPERTIES (property_name=property_value, ...)] (Note: only available starting with 0.6.0) 

 [AS select_statement] (Note: this feature is only available starting with 0.5.0.) 



CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name 

 LIKE existing_table_or_view_name 

 [LOCATION hdfs_path] 



create table tablePartition(s string) partitioned by(pt string); 


alter table tablePartition add if not exists partition(pt='1'); 


CliDriver: 

CliDriver.main() { 

 ret = cli.processLine(line); 

} 

CliDriver: 

public int processLine(String line) { // line = create table tablePartition(s string) partitioned by(pt string); 

 ret = processCmd(command); 

} 


CliDriver 

 public int processCmd(String cmd) { // cmd = create table tablePartition(s string) partitioned by(pt string) 

 CommandProcessor proc = CommandProcessorFactory.get(tokens[0]); 

 Driver qp = (Driver) proc; 

 ret = qp.run(cmd).getResponseCode(); 

 } 


CommandProcessor proc = CommandProcessorFactory.get(tokens[0]); // tokens[0] = create 

创建return new Driver();来处理 


Driver 

CommandProcessorResponse run(String command) { 

// command = create table tablePartition(s string) partitioned by(pt string) 

 int ret = compile(command); 

 ret = execute(); 

} 


(1)compile 

public int compile(String command) { 

 return new SemanticAnalyzer(conf); 

 BaseSemanticAnalyzer sem.analyze(tree, ctx); 

 SemanticAnalyzer analyzeInternal(ASTNode ast) 

 SemanticAnalyzer ASTNode [b]analyzeCreateTable[/b](ASTNode ast, QB qb ) 

 plan = new QueryPlan(command, sem); 

} 


SemanticAnalyzer: 

 public void analyzeInternal(ASTNode ast) throws SemanticException { 


 // 带有create的hivesql会执行下面的代码 

 // analyze create table command 

 if (ast.getToken().getType() == HiveParser.TOK_CREATETABLE) { 

 isCreateTable = true; 

 // if it is not CTAS, we don't need to go further and just return 

 if ((child = analyzeCreateTable(ast, qb)) == null) { 

 return; 

 } 

 } 


 } 



 private ASTNode analyzeCreateTable(ASTNode ast, QB qb) 

 throws SemanticException { 

 String tableName = getUnescapedName((ASTNode)ast.getChild(0)); //第一个孩子节点是表名 

 final int CREATE_TABLE = 0; // regular CREATE TABLE 

 final int CTLT = 1; // CREATE TABLE LIKE ... (CTLT) 

 final int CTAS = 2; // CREATE TABLE AS SELECT ... (CTAS) 


Hive.g: 

tableFileFormat 

@init { msgs.push("table file format specification"); } 

@after { msgs.pop(); } 

 : 

 KW_STORED KW_AS KW_SEQUENCEFILE -> TOK_TBLSEQUENCEFILE 

 | KW_STORED KW_AS KW_TEXTFILE -> TOK_TBLTEXTFILE 

 | KW_STORED KW_AS KW_RCFILE -> TOK_TBLRCFILE 

 | KW_STORED KW_AS KW_INPUTFORMAT inFmt=StringLiteral KW_OUTPUTFORMAT outFmt=StringLiteral (KW_INPUTDRIVER inDriver=StringLiteral KW_OUTPUTDRIVER outDriver=StringLiteral)? 

 -> ^(TOK_TABLEFILEFORMAT $inFmt $outFmt $inDriver? $outDriver?) 

 | KW_STORED KW_BY storageHandler=StringLiteral 

 (KW_WITH KW_SERDEPROPERTIES serdeprops=tableProperties)? 

 -> ^(TOK_STORAGEHANDLER $storageHandler $serdeprops?) 

 | KW_STORED KW_AS genericSpec=Identifier 

 -> ^(TOK_FILEFORMAT_GENERIC $genericSpec) 

 ; 

Hive.g里面的词法分析,把STORED AS SEQUENCEFILE直接转化成了一个TOK_TBLSEQUENCEFILE节点。 



 (1)处理file format信息,[STORED AS file_format] 

 (2)有IF NOT EXISTS,那么ifNotExists = true; 

 (3)有EXTERNAL,外部表isExt = true; 

 (4)LIKE ,获得like的表名,command_type = CTLT; likeTableName存在需要处理一些不符合语法的情况。 

 (5)如果有查询语句,说明是[AS select_statement],command_type = CTAS;selectStmt = child; 

 (6)有columns,处理所有的非分区字段信息[(col_name data_type [COMMENT col_comment], ...)],cols = getColumns(child); 

 (7)表的注释[COMMENT table_comment],comment = unescapeSQLString(child.getChild(0).getText()); 

 (8)有分区字段信息,处理所有的分区字段信息,[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)],partCols = getColumns((ASTNode) child.getChild(0), false); 

 (9)处理BUCKETS信息,[CLUSTERED BY (col_name, col_name, ...) [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] 

 (10)处理ROW FORMAT信息,有4个fieldDelim、collItemDelim、mapKeyDelim、lineDelim,lineDelim如果不是"\n"或者"10"那么会抛出异常,[ROW FORMAT row_format] 

 (11)处理location信息,[LOCATION hdfs_path],location = unescapeSQLString(child.getChild(0).getText()); 

 (12)处理表属性信息,[TBLPROPERTIES (property_name=property_value, ...)] (Note: only available starting with 0.6.0),用一个map来存,Map<String, String> tblProps 

 (13)处理serde信息,STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...)] (Note: only available starting with 0.6.0) 

 (14)如果不是SEQUENCEFILE、RCFILE、TEXTFILE三者之中的一个,那么抛出异常。 


 if (ifNotExists) { //如果有IF NOT EXISTS 

 根据tableName在数据库中获取这个表,如果能够获取到,表已存在则返回null. 

 } 


 CreateTableDesc crtTblDesc = null; 

 switch (command_type) { //根据3种不同类型的建表语句执行不同的操作 

 case CREATE_TABLE: // REGULAR CREATE TABLE DDL 

 crtTblDesc=new CreateTableDesc(***); 

 new DDLWork,new DDLTask加入rootTasks 

 case CTLT: // create table like <tbl_name> 

 CreateTableLikeDesc crtTblLikeDesc = new CreateTableLikeDesc 

 new DDLWork,new DDLTask加入rootTasks 

 case CTAS: // create table as select 

 先验证表tableName不存在,如果存在则抛出异常。 

 crtTblDesc = new CreateTableDesc(); 

 qb.setTableDesc(crtTblDesc); 

 return selectStmt; // CTAS返回查询子节点。 

 } 

 return null; //普通的CREATE_TABLE或者CTLT,返回null 

 } 



create table tablePartition(s string) partitioned by(pt string); 


hive> explain create table tablePartition(s string) partitioned by(pt string); 

OK 

ABSTRACT SYNTAX TREE: 

 (TOK_CREATETABLE tablePartition TOK_LIKETABLE (TOK_TABCOLLIST (TOK_TABCOL s TOK_STRING)) (TOK_TABLEPARTCOLS (TOK_TABCOLLIST (TOK_TABCOL pt TOK_STRING)))) 


STAGE DEPENDENCIES: 

 Stage-0 is a root stage 


STAGE PLANS: 

 Stage: Stage-0 

 Create Table Operator: 

 Create Table 

 columns: s string 

 if not exists: false 

 input format: org.apache.hadoop.mapred.TextInputFormat 

 # buckets: -1 

 output format: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat 

 partition columns: pt string 

 name: tablePartition 

 isExternal: false 



 private ASTNode analyzeCreateTable(ASTNode ast, QB qb) 

 throws SemanticException { 

 String tableName = unescapeIdentifier(ast.getChild(0).getText()); // 表名tablePartition 

 String likeTableName = null; 

 List<FieldSchema> cols = new ArrayList<FieldSchema>(); 

 List<FieldSchema> partCols = new ArrayList<FieldSchema>(); 

 List<String> bucketCols = new ArrayList<String>(); 

 List<Order> sortCols = new ArrayList<Order>(); 

 int numBuckets = -1; 

 String fieldDelim = null; 

 String fieldEscape = null; 

 String collItemDelim = null; 

 String mapKeyDelim = null; 

 String lineDelim = null; 

 String comment = null; 

 String inputFormat = null; 

 String outputFormat = null; 

 String location = null; 

 String serde = null; 

 String storageHandler = null; 

 Map<String, String> serdeProps = new HashMap<String, String>(); 

 Map<String, String> tblProps = null; 

 boolean ifNotExists = false; 

 boolean isExt = false; 

 ASTNode selectStmt = null; 

 final int CREATE_TABLE = 0; // regular CREATE TABLE 

 final int CTLT = 1; // CREATE TABLE LIKE ... (CTLT) 

 final int CTAS = 2; // CREATE TABLE AS SELECT ... (CTAS) 

 int command_type = CREATE_TABLE; // 0 


 LOG.info("Creating table " + tableName + " position=" 

 + ast.getCharPositionInLine()); // 11/08/28 05:20:26 INFO parse.SemanticAnalyzer: Creating table tablePartition position=13 

 int numCh = ast.getChildCount(); // numCh = 4 


 /* 

 * Check the 1st-level children and do simple semantic checks: 1) CTLT and 

 * CTAS should not coexists. 2) CTLT or CTAS should not coexists with column 

 * list (target table schema). 3) CTAS does not support partitioning (for 

 * now). 

 */ 

 for (int num = 1; num < numCh; num++) { 

 ASTNode child = (ASTNode) ast.getChild(num); // num=1,child =TOK_LIKETABLE; num=2,child=TOK_TABCOLLIST; num=3,child=TOK_TABLEPARTCOLS 

 switch (child.getToken().getType()) { 

 case HiveParser.TOK_IFNOTEXISTS: 

 ifNotExists = true; 

 break; 

 case HiveParser.KW_EXTERNAL: 

 isExt = true; 

 break; 

 case HiveParser.TOK_LIKETABLE: 

 if (child.getChildCount() > 0) { // 

 likeTableName = unescapeIdentifier(child.getChild(0).getText()); 

 if (likeTableName != null) { 

 if (command_type == CTAS) { 

 throw new SemanticException(ErrorMsg.CTAS_CTLT_COEXISTENCE 

 .getMsg()); 

 } 

 if (cols.size() != 0) { 

 throw new SemanticException(ErrorMsg.CTLT_COLLST_COEXISTENCE 

 .getMsg()); 

 } 

 } 

 command_type = CTLT; 

 } 

 break; 

 case HiveParser.TOK_QUERY: // CTAS 

 if (command_type == CTLT) { 

 throw new SemanticException(ErrorMsg.CTAS_CTLT_COEXISTENCE.getMsg()); 

 } 

 if (cols.size() != 0) { 

 throw new SemanticException(ErrorMsg.CTAS_COLLST_COEXISTENCE.getMsg()); 

 } 

 if (partCols.size() != 0 || bucketCols.size() != 0) { 

 boolean dynPart = HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING); 

 if (dynPart == false) { 

 throw new SemanticException(ErrorMsg.CTAS_PARCOL_COEXISTENCE.getMsg()); 

 } else { 

 // TODO: support dynamic partition for CTAS 

 throw new SemanticException(ErrorMsg.CTAS_PARCOL_COEXISTENCE.getMsg()); 

 } 

 } 

 if (isExt) { 

 throw new SemanticException(ErrorMsg.CTAS_EXTTBL_COEXISTENCE.getMsg()); 

 } 

 command_type = CTAS; 

 selectStmt = child; 

 break; 

 case HiveParser.TOK_TABCOLLIST: 

 cols = getColumns(child); //见下面 [FieldSchema(name:s, type:string, comment:null)] 

 break; 

 case HiveParser.TOK_TABLECOMMENT: 

 comment = unescapeSQLString(child.getChild(0).getText()); 

 break; 

 case HiveParser.TOK_TABLEPARTCOLS: 

 partCols = getColumns((ASTNode) child.getChild(0), false); //[FieldSchema(name:pt, type:string, comment:null)] 

 break; 

 case HiveParser.TOK_TABLEBUCKETS: 

 bucketCols = getColumnNames((ASTNode) child.getChild(0)); 

 if (child.getChildCount() == 2) { 

 numBuckets = (Integer.valueOf(child.getChild(1).getText())) 

 .intValue(); 

 } else { 

 sortCols = getColumnNamesOrder((ASTNode) child.getChild(1)); 

 numBuckets = (Integer.valueOf(child.getChild(2).getText())) 

 .intValue(); 

 } 

 break; 

 case HiveParser.TOK_TABLEROWFORMAT: 


 child = (ASTNode) child.getChild(0); 

 int numChildRowFormat = child.getChildCount(); 

 for (int numC = 0; numC < numChildRowFormat; numC++) { 

 ASTNode rowChild = (ASTNode) child.getChild(numC); 

 switch (rowChild.getToken().getType()) { 

 case HiveParser.TOK_TABLEROWFORMATFIELD: 

 fieldDelim = unescapeSQLString(rowChild.getChild(0).getText()); 

 if (rowChild.getChildCount() >= 2) { 

 fieldEscape = unescapeSQLString(rowChild.getChild(1).getText()); 

 } 

 break; 

 case HiveParser.TOK_TABLEROWFORMATCOLLITEMS: 

 collItemDelim = unescapeSQLString(rowChild.getChild(0).getText()); 

 break; 

 case HiveParser.TOK_TABLEROWFORMATMAPKEYS: 

 mapKeyDelim = unescapeSQLString(rowChild.getChild(0).getText()); 

 break; 

 case HiveParser.TOK_TABLEROWFORMATLINES: 

 lineDelim = unescapeSQLString(rowChild.getChild(0).getText()); 

 if (!lineDelim.equals("\n") && !lineDelim.equals("10")) { 

 throw new SemanticException( 

 ErrorMsg.LINES_TERMINATED_BY_NON_NEWLINE.getMsg()); 

 } 

 break; 

 default: 

 assert false; 

 } 

 } 

 break; 

 case HiveParser.TOK_TABLESERIALIZER: 

 child = (ASTNode) child.getChild(0); 

 serde = unescapeSQLString(child.getChild(0).getText()); 

 if (child.getChildCount() == 2) { 

 readProps( 

 (ASTNode) (child.getChild(1).getChild(0)), 

 serdeProps); 

 } 

 break; 

 case HiveParser.TOK_TBLSEQUENCEFILE: 

 inputFormat = SEQUENCEFILE_INPUT; 

 outputFormat = SEQUENCEFILE_OUTPUT; 

 break; 

 case HiveParser.TOK_TBLTEXTFILE: 

 inputFormat = TEXTFILE_INPUT; 

 outputFormat = TEXTFILE_OUTPUT; 

 break; 

 case HiveParser.TOK_TBLRCFILE: 

 inputFormat = RCFILE_INPUT; 

 outputFormat = RCFILE_OUTPUT; 

 serde = COLUMNAR_SERDE; 

 break; 

 case HiveParser.TOK_TABLEFILEFORMAT: 

 inputFormat = unescapeSQLString(child.getChild(0).getText()); 

 outputFormat = unescapeSQLString(child.getChild(1).getText()); 

 break; 

 case HiveParser.TOK_TABLELOCATION: 

 location = unescapeSQLString(child.getChild(0).getText()); 

 break; 

 case HiveParser.TOK_TABLEPROPERTIES: 

 tblProps = DDLSemanticAnalyzer.getProps((ASTNode) child.getChild(0)); 

 break; 

 case HiveParser.TOK_STORAGEHANDLER: 

 storageHandler = unescapeSQLString(child.getChild(0).getText()); 

 if (child.getChildCount() == 2) { 

 readProps( 

 (ASTNode) (child.getChild(1).getChild(0)), 

 serdeProps); 

 } 

 break; 

 default: 

 assert false; 

 } 

 } 


 if ((command_type == CTAS) && (storageHandler != null)) { // false 

 throw new SemanticException(ErrorMsg.CREATE_NON_NATIVE_AS.getMsg()); 

 } 


 if ((inputFormat == null) && (storageHandler == null)) { //true 

 assert outputFormat == null; 

 if ("SequenceFile".equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) { 

 inputFormat = SEQUENCEFILE_INPUT; 

 outputFormat = SEQUENCEFILE_OUTPUT; 

 } else if ("RCFile".equalsIgnoreCase(conf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) { 

 inputFormat = RCFILE_INPUT; 

 outputFormat = RCFILE_OUTPUT; 

 serde = COLUMNAR_SERDE; 

 } else { 

 inputFormat = TEXTFILE_INPUT; // org.apache.hadoop.mapred.TextInputFormat 

 outputFormat = TEXTFILE_OUTPUT;//org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat 

 } 

 } 

默认的文件格式可以通过配置文件设置: 

// HiveConf里面 HIVEDEFAULTFILEFORMAT("hive.default.fileformat", "TextFile"), 

// hive-default.xml里面: 

<property> 

 <name>hive.default.fileformat</name> 

 <value>TextFile</value> 

 <description>Default file format for CREATE TABLE statement. Options are TextFile and SequenceFile. Users can explicitly say CREATE TABLE ... STORED AS <TEXTFILE|SEQUENCEFILE> to override</description> 

</property> 



 // check for existence of table 

 if (ifNotExists) { // false,create的时候没有加上IF NOT EXISTS 

 try { 

 if (null != db.getTable(db.getCurrentDatabase(), tableName, false)) { 

 return null; 

 } 

 } catch (HiveException e) { 

 e.printStackTrace(); 

 } 

 } 


 // Handle different types of CREATE TABLE command 

 CreateTableDesc crtTblDesc = null; 

 switch (command_type) { // 0 


 case CREATE_TABLE: // REGULAR CREATE TABLE DDL 

 crtTblDesc = new CreateTableDesc(tableName, isExt, cols, partCols, 

 bucketCols, sortCols, numBuckets, fieldDelim, fieldEscape, 

 collItemDelim, mapKeyDelim, lineDelim, comment, inputFormat, 

 outputFormat, location, serde, storageHandler, serdeProps, 

 tblProps, ifNotExists); 


 validateCreateTable(crtTblDesc); // 

 rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), 

 crtTblDesc), conf)); //创建DDLWork,createTblDesc不为空。 创建DDLTask,加入rootTasks。 

 break; 


 case CTLT: // create table like <tbl_name> 

 CreateTableLikeDesc crtTblLikeDesc = new CreateTableLikeDesc(tableName, 

 isExt, location, ifNotExists, likeTableName); 

 rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), 

 crtTblLikeDesc), conf)); 

 break; 


 case CTAS: // create table as select 


 // check for existence of table. Throw an exception if it exists. 

 try { 

 Table tab = db.getTable(db.getCurrentDatabase(), tableName, false); 

 // do not throw exception if table does not exist 


 if (tab != null) { 

 throw new SemanticException(ErrorMsg.TABLE_ALREADY_EXISTS 

 .getMsg(tableName)); 

 } 

 } catch (HiveException e) { // may be unable to get meta data 

 throw new SemanticException(e); 

 } 


 crtTblDesc = new CreateTableDesc(tableName, isExt, cols, partCols, 

 bucketCols, sortCols, numBuckets, fieldDelim, fieldEscape, 

 collItemDelim, mapKeyDelim, lineDelim, comment, inputFormat, 

 outputFormat, location, serde, storageHandler, serdeProps, 

 tblProps, ifNotExists); 

 qb.setTableDesc(crtTblDesc); 


 return selectStmt; 

 default: 

 assert false; // should never be unknown command type 

 } 

 return null; //返回null 

 } 



BaseSemanticAnalyzer: 

 protected List<FieldSchema> getColumns(ASTNode ast) throws SemanticException { //获得columns的信息 

//ast = TOK_TABCOLLIST 

 return getColumns(ast, true); 

 } 


 protected List<FieldSchema> getColumns(ASTNode ast, boolean lowerCase) throws SemanticException { 

 List<FieldSchema> colList = new ArrayList<FieldSchema>(); // [] 

 int numCh = ast.getChildCount(); // 1 

 for (int i = 0; i < numCh; i++) { 

 FieldSchema col = new FieldSchema(); 

 ASTNode child = (ASTNode) ast.getChild(i); // TOK_TABCOL 


 String name = child.getChild(0).getText(); // s 

 if(lowerCase) { // true 

 name = name.toLowerCase(); // s 

 } 

 // child 0 is the name of the column 

 col.setName(unescapeIdentifier(name)); //FieldSchema(name:s, type:null, comment:null) 

 // child 1 is the type of the column 

 ASTNode typeChild = (ASTNode) (child.getChild(1)); // TOK_STRING 

 col.setType(getTypeStringFromAST(typeChild)); //FieldSchema(name:s, type:string, comment:null) 


 // child 2 is the optional comment of the column 

 if (child.getChildCount() == 3) { 

 col.setComment(unescapeSQLString(child.getChild(2).getText())); 

 } 

 colList.add(col); //[FieldSchema(name:s, type:string, comment:null)] 

 } 

 return colList; //[FieldSchema(name:s, type:string, comment:null)] 

 } 


BaseSemanticAnalyzer: 

 protected List<FieldSchema> getColumns(ASTNode ast, boolean lowerCase) throws SemanticException { 

 List<FieldSchema> colList = new ArrayList<FieldSchema>(); 

 int numCh = ast.getChildCount(); // 1 

 for (int i = 0; i < numCh; i++) { 

 FieldSchema col = new FieldSchema(); 

 ASTNode child = (ASTNode) ast.getChild(i); // TOK_TABCOL 


 String name = child.getChild(0).getText(); // pt 

 if(lowerCase) { //false 

 name = name.toLowerCase(); 

 } 

 // child 0 is the name of the column 

 col.setName(unescapeIdentifier(name)); // FieldSchema(name:pt, type:null, comment:null) 

 // child 1 is the type of the column 

 ASTNode typeChild = (ASTNode) (child.getChild(1)); // TOK_STRING 

 col.setType(getTypeStringFromAST(typeChild)); // FieldSchema(name:pt, type:string, comment:null) 


 // child 2 is the optional comment of the column 

 if (child.getChildCount() == 3) { 

 col.setComment(unescapeSQLString(child.getChild(2).getText())); 

 } 

 colList.add(col); // [FieldSchema(name:pt, type:string, comment:null)] 

 } 

 return colList; //[FieldSchema(name:pt, type:string, comment:null)] 

 } 



CreateTableDesc 


 public CreateTableDesc(String tableName, boolean isExternal, 

 List<FieldSchema> cols, List<FieldSchema> partCols, 

 List<String> bucketCols, List<Order> sortCols, int numBuckets, 

 String fieldDelim, String fieldEscape, String collItemDelim, 

 String mapKeyDelim, String lineDelim, String comment, String inputFormat, 

 String outputFormat, String location, String serName, 

 String storageHandler, 

 Map<String, String> serdeProps, 

 Map<String, String> tblProps, 

 boolean ifNotExists) { 

 this.tableName = tableName; // tablePartition 

 this.isExternal = isExternal; // false 

 this.bucketCols = new ArrayList<String>(bucketCols); // [] 

 this.sortCols = new ArrayList<Order>(sortCols); // [] 

 this.collItemDelim = collItemDelim; // null 

 this.cols = new ArrayList<FieldSchema>(cols); // [FieldSchema(name:s, type:string, comment:null)] 

 this.comment = comment; // null 

 this.fieldDelim = fieldDelim; // null 

 this.fieldEscape = fieldEscape; // null 

 this.inputFormat = inputFormat; // org.apache.hadoop.mapred.TextInputFormat 

 this.outputFormat = outputFormat; // org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat 

 this.lineDelim = lineDelim; // null 

 this.location = location; // null 

 this.mapKeyDelim = mapKeyDelim; // null 

 this.numBuckets = numBuckets; // -1 

 this.partCols = new ArrayList<FieldSchema>(partCols); // [FieldSchema(name:pt, type:string, comment:null)] 

 this.serName = serName; // null 

 this.storageHandler = storageHandler; // null 

 this.serdeProps = serdeProps; // {} 

 this.tblProps = tblProps; // null 

 this.ifNotExists = ifNotExists; // false 

 } 

CreateTableDesc包含的信息: 

(1)表名,tablePartition 

(2)是否为外部表,默认是false 

(3)非分区字段信息,cols 

(4)inputFormat 

(5)outputFormat 

(6)分区字段信息,partCols 

(7)ifNotExists,默认为false 



 private void validateCreateTable(CreateTableDesc crtTblDesc) 

 throws SemanticException { 


 if ((crtTblDesc.getCols() == null) || (crtTblDesc.getCols().size() == 0)) { // false 

 // for now make sure that serde exists 

 if (StringUtils.isEmpty(crtTblDesc.getSerName()) 

 || !SerDeUtils.shouldGetColsFromSerDe(crtTblDesc.getSerName())) { 

 throw new SemanticException(ErrorMsg.INVALID_TBL_DDL_SERDE.getMsg()); 

 } 

 return; 

 } 


 if (crtTblDesc.getStorageHandler() == null) { 

 try { 

 Class<?> origin = Class.forName(crtTblDesc.getOutputFormat(), true, 

 JavaUtils.getClassLoader()); //class org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat 

 Class<? extends HiveOutputFormat> replaced = HiveFileFormatUtils 

 .getOutputFormatSubstitute(origin); // org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat 

 if (replaced == null) { 

 throw new SemanticException(ErrorMsg.INVALID_OUTPUT_FORMAT_TYPE 

 .getMsg()); 

 } 

 } catch (ClassNotFoundException e) { 

 throw new SemanticException(ErrorMsg.INVALID_OUTPUT_FORMAT_TYPE.getMsg()); 

 } 

 } 


 List<String> colNames = validateColumnNameUniqueness(crtTblDesc.getCols());//验证没有两个column的名字是相同的。 [s] 


 if (crtTblDesc.getBucketCols() != null) { // 

 // all columns in cluster and sort are valid columns 

 Iterator<String> bucketCols = crtTblDesc.getBucketCols().iterator(); 

 while (bucketCols.hasNext()) { 

 String bucketCol = bucketCols.next(); 

 boolean found = false; 

 Iterator<String> colNamesIter = colNames.iterator(); 

 while (colNamesIter.hasNext()) { 

 String colName = colNamesIter.next(); 

 if (bucketCol.equalsIgnoreCase(colName)) { 

 found = true; 

 break; 

 } 

 } 

 if (!found) { 

 throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg()); 

 } 

 } 

 } 


 if (crtTblDesc.getSortCols() != null) { 

 // all columns in cluster and sort are valid columns 

 Iterator<Order> sortCols = crtTblDesc.getSortCols().iterator(); 

 while (sortCols.hasNext()) { 

 String sortCol = sortCols.next().getCol(); 

 boolean found = false; 

 Iterator<String> colNamesIter = colNames.iterator(); 

 while (colNamesIter.hasNext()) { 

 String colName = colNamesIter.next(); 

 if (sortCol.equalsIgnoreCase(colName)) { 

 found = true; 

 break; 

 } 

 } 

 if (!found) { 

 throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg()); 

 } 

 } 

 } 


 if (crtTblDesc.getPartCols() != null) { //分区的名字和columns的名字没有相同的。 

 // there is no overlap between columns and partitioning columns 

 Iterator<FieldSchema> partColsIter = crtTblDesc.getPartCols().iterator(); 

 while (partColsIter.hasNext()) { 

 String partCol = partColsIter.next().getName(); 

 Iterator<String> colNamesIter = colNames.iterator(); 

 while (colNamesIter.hasNext()) { 

 String colName = unescapeIdentifier(colNamesIter.next()); 

 if (partCol.equalsIgnoreCase(colName)) { 

 throw new SemanticException( 

 ErrorMsg.COLUMN_REPEATED_IN_PARTITIONING_COLS.getMsg()); 

 } 

 } 

 } 

 } 

 } 



DDLWork 他的createTblDesc 不为空 

生成的task是org.apache.hadoop.hive.ql.exec.DDLTask 



(2)execute 

Driver: 

public int execute() { 

 // Add root Tasks to runnable 

 for (Task<? extends Serializable> tsk : plan.getRootTasks()) { 

 driverCxt.addToRunnable(tsk); 

 } 

 while (running.size() != 0 || runnable.peek() != null) { 

 // Launch upto maxthreads tasks 

 while (runnable.peek() != null && running.size() < maxthreads) { 

 Task<? extends Serializable> tsk = runnable.remove(); 

 launchTask(tsk, queryId, noName, running, jobname, jobs, driverCxt); // Driver.launchTask 

 } 

 } 

} 


[b]Driver.launchTask 

TaskRunner.runSequential 

Task. executeTask 

DDLTask. Execute 

 CreateTableDesc crtTbl = work.getCreateTblDesc(); 

 if (crtTbl != null) { 

 return createTable(db, crtTbl); 

 } 

DDLTask. createTable 

Hive.createTable 

HiveMetaStoreClient.createTable 

HiveMetaStore.createTable 

HiveMetaStore.create_table_core 

ObjectStore[/b] 



[b]DDLTask[/b]: 

 /** 

 * Create a new table. 

 * 

 * @param db 

 * The database in question. 

 * @param crtTbl 

 * This is the table we're creating. 

 * @return Returns 0 when execution succeeds and above 0 if it fails. 

 * @throws HiveException 

 * Throws this exception if an unexpected error occurs. 

 */ 

 private int createTable(Hive db, CreateTableDesc crtTbl) throws HiveException { 

 // create the table 

 Table tbl = new Table(db.getCurrentDatabase(), crtTbl.getTableName()); 

//Hive.java db是Hive的一个实例 

 public String getCurrentDatabase() { 

 if (null == currentDatabase) { 

 currentDatabase = DEFAULT_DATABASE_NAME; DEFAULT_DATABASE_NAME默认是"default" 

 } 

 return currentDatabase; 

 } 



 if (crtTbl.getPartCols() != null) { 

 tbl.setPartCols(crtTbl.getPartCols()); //[FieldSchema(name:pt, type:string, comment:null)] 

 } 

 if (crtTbl.getNumBuckets() != -1) { 

 tbl.setNumBuckets(crtTbl.getNumBuckets()); 

 } 


 if (crtTbl.getStorageHandler() != null) { 

 tbl.setProperty( 

 org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_STORAGE, 

 crtTbl.getStorageHandler()); 

 } 

 HiveStorageHandler storageHandler = tbl.getStorageHandler(); // null 


 if (crtTbl.getSerName() != null) { // null 

 tbl.setSerializationLib(crtTbl.getSerName()); 

 } else { 

 if (crtTbl.getFieldDelim() != null) { // null false 

 tbl.setSerdeParam(Constants.FIELD_DELIM, crtTbl.getFieldDelim()); 

 tbl.setSerdeParam(Constants.SERIALIZATION_FORMAT, crtTbl 

 .getFieldDelim()); 

 } 

 if (crtTbl.getFieldEscape() != null) { // null false 

 tbl.setSerdeParam(Constants.ESCAPE_CHAR, crtTbl.getFieldEscape()); 

 } 


 if (crtTbl.getCollItemDelim() != null) { 

 tbl 

 .setSerdeParam(Constants.COLLECTION_DELIM, crtTbl 

 .getCollItemDelim()); 

 } 

 if (crtTbl.getMapKeyDelim() != null) { 

 tbl.setSerdeParam(Constants.MAPKEY_DELIM, crtTbl.getMapKeyDelim()); 

 } 

 if (crtTbl.getLineDelim() != null) { 

 tbl.setSerdeParam(Constants.LINE_DELIM, crtTbl.getLineDelim()); 

 } 

 } 


 if (crtTbl.getSerdeProps() != null) { //serde属性 

 Iterator<Entry<String, String>> iter = crtTbl.getSerdeProps().entrySet() 

 .iterator(); 

 while (iter.hasNext()) { 

 Entry<String, String> m = iter.next(); 

 tbl.setSerdeParam(m.getKey(), m.getValue()); 

 } 

 } 

 if (crtTbl.getTblProps() != null) { //表属性 

 tbl.getTTable().getParameters().putAll(crtTbl.getTblProps()); 

 } 


 /* 

 * We use LazySimpleSerDe by default. 

 * 

 * If the user didn't specify a SerDe, and any of the columns are not simple 

 * types, we will have to use DynamicSerDe instead. 

 */ 

 if (crtTbl.getSerName() == null) { //serdename为空 

 if (storageHandler == null) { 

 LOG.info("Default to LazySimpleSerDe for table " + crtTbl.getTableName()); //11/08/28 06:01:29 INFO exec.DDLTask: Default to LazySimpleSerDe for table tablePartition 

 tbl.setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName()); 

 // 默认使用serde是org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe 

 } else { 

 String serDeClassName = storageHandler.getSerDeClass().getName(); 

 LOG.info("Use StorageHandler-supplied " + serDeClassName 

 + " for table " + crtTbl.getTableName()); 

 tbl.setSerializationLib(serDeClassName); 

 } 

 } else { 

 // let's validate that the serde exists 

 validateSerDe(crtTbl.getSerName()); 

 } 


 if (crtTbl.getCols() != null) { //[FieldSchema(name:s, type:string, comment:null)] 

 tbl.setFields(crtTbl.getCols()); 

 } 

 if (crtTbl.getBucketCols() != null) { 

 tbl.setBucketCols(crtTbl.getBucketCols()); 

 } 

 if (crtTbl.getSortCols() != null) { 

 tbl.setSortCols(crtTbl.getSortCols()); 

 } 

 if (crtTbl.getComment() != null) { 

 tbl.setProperty("comment", crtTbl.getComment()); //表的注释 

 } 

 if (crtTbl.getLocation() != null) { 

 tbl.setDataLocation(new Path(crtTbl.getLocation()).toUri()); 

 } 


 tbl.setInputFormatClass(crtTbl.getInputFormat()); 

 tbl.setOutputFormatClass(crtTbl.getOutputFormat()); 


 tbl.getTTable().getSd().setInputFormat( 

 tbl.getInputFormatClass().getName()); // org.apache.hadoop.mapred.TextInputFormat 

 tbl.getTTable().getSd().setOutputFormat( 

 tbl.getOutputFormatClass().getName());//org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat 


 if (crtTbl.isExternal()) { //如果是外部表 

 tbl.setProperty("EXTERNAL", "TRUE"); //添加一个表的属性是EXTERNAL=TRUE 

 tbl.setTableType(TableType.EXTERNAL_TABLE);//设置表的类型是EXTERNAL_TABLE 

 } 


 // If the sorted columns is a superset of bucketed columns, store this fact. 

 // It can be later used to 

 // optimize some group-by queries. Note that, the order does not matter as 

 // long as it in the first 

 // 'n' columns where 'n' is the length of the bucketed columns. 

 if ((tbl.getBucketCols() != null) && (tbl.getSortCols() != null)) { 

 List<String> bucketCols = tbl.getBucketCols(); 

 List<Order> sortCols = tbl.getSortCols(); 


 if ((sortCols.size() > 0) && (sortCols.size() >= bucketCols.size())) { 

 boolean found = true; 


 Iterator<String> iterBucketCols = bucketCols.iterator(); 

 while (iterBucketCols.hasNext()) { 

 String bucketCol = iterBucketCols.next(); 

 boolean colFound = false; 

 for (int i = 0; i < bucketCols.size(); i++) { 

 if (bucketCol.equals(sortCols.get(i).getCol())) { 

 colFound = true; 

 break; 

 } 

 } 

 if (colFound == false) { 

 found = false; 

 break; 

 } 

 } 

 if (found) { 

 tbl.setProperty("SORTBUCKETCOLSPREFIX", "TRUE"); 

 } 

 } 

 } 


 int rc = setGenericTableAttributes(tbl); //设置通用属性 

 if (rc != 0) { 

 return rc; 

 } 


 // create the table 

 db.createTable(tbl, crtTbl.getIfNotExists()); //Hive.createTable(Table tbl, boolean ifNotExists) 

 // add the table privilege to creator 

 if (SessionState.get() != null) { 

 String userName = SessionState.get().getUserName(); 

 Authorizer authorizer = SessionState.get().getAuthorizer(); 

 try { 

 if (userName != null) { 

 AuthorizeEntry entry = 

 new AuthorizeEntry(null, tbl, null, 

 PrivilegeLevel.TABLE_LEVEL.getPrivileges(), 

 PrivilegeLevel.TABLE_LEVEL); 

 authorizer.grant(userName, userName, entry); 

 } 

 } catch (AuthorizeException e) { 

 throw new HiveException(e); 

 } 

 } 

 work.getOutputs().add(new WriteEntity(tbl)); 

 return 0; //成功,返回0 

 } 



默认的一些信息: 

Default to LazySimpleSerDe for table tablePartition 

sd . serdeInfo. serializationLib 是 org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe 


Table默认的inputFormatClass是 

class org.apache.hadoop.mapred.TextInputFormat 

Table默认的outputFormatClass是 

class org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat 



Table 

 public Table(String databaseName, String tableName) { // default tablePartition 

 this(getEmptyTable(databaseName, tableName)); 

 } 


static org.apache.hadoop.hive.metastore.api.Table 

 getEmptyTable(String databaseName, String tableName) { 

 StorageDescriptor sd = new StorageDescriptor(); 

 { 

 sd.setSerdeInfo(new SerDeInfo()); 

 sd.setNumBuckets(-1); 

 sd.setBucketCols(new ArrayList<String>()); 

 sd.setCols(new ArrayList<FieldSchema>()); 

 sd.setParameters(new HashMap<String, String>()); 

 sd.setSortCols(new ArrayList<Order>()); 

 sd.getSerdeInfo().setParameters(new HashMap<String, String>()); 

 // We have to use MetadataTypedColumnsetSerDe because LazySimpleSerDe does 

 // not support a table with no columns. 

 sd.getSerdeInfo().setSerializationLib(MetadataTypedColumnsetSerDe.class.getName()); 

 sd.getSerdeInfo().getParameters().put(Constants.SERIALIZATION_FORMAT, "1"); //这个是默认加入的 

 sd.setInputFormat(SequenceFileInputFormat.class.getName()); // 默认是sequencefile 

 sd.setOutputFormat(HiveSequenceFileOutputFormat.class.getName()); 

 } 


 org.apache.hadoop.hive.metastore.api.Table t = new org.apache.hadoop.hive.metastore.api.Table(); 

 { 

 t.setSd(sd); 

 t.setPartitionKeys(new ArrayList<FieldSchema>()); 

 t.setParameters(new HashMap<String, String>()); 

 t.setTableType(TableType.MANAGED_TABLE.toString()); //默认是MANAGED_TABLE类型的表 

 t.setDbName(databaseName); // default 

 t.setTableName(tableName); // tablePartition 

 t.setDbName(databaseName); //多余的 

 } 

 return t; 

 } 


Constants.SERIALIZATION_FORMAT 

public static final String SERIALIZATION_FORMAT = "serialization.format" 默认为1 


public enum TableType { 

 MANAGED_TABLE, EXTERNAL_TABLE, VIRTUAL_VIEW 

} 


Hive: 

 public void createTable(Table tbl, boolean ifNotExists) throws HiveException { 

 try { 

 if (tbl.getDbName() == null || "".equals(tbl.getDbName().trim())) { 

 tbl.setDbName(getCurrentDatabase()); 

 } 

 if (tbl.getCols().size() == 0) { 

 tbl.setFields(MetaStoreUtils.getFieldsFromDeserializer(tbl.getTableName(), 

 tbl.getDeserializer())); 

 } 

 tbl.checkValidity(); //检查表的合法性,如至少需要有一个column、表名不为空,且名字是字母数字或者下划线的组合、没有两个column的名字相同、分区字段和非分区字段没有相同的字段名 

 getMSC().createTable(tbl.getTTable()); // HiveMetaStoreClient 

 } catch (AlreadyExistsException e) { 

 if (!ifNotExists) { 

 throw new HiveException(e); 

 } 

 } catch (Exception e) { 

 throw new HiveException(e); 

 } 

 } 


HiveMetaStoreClient 

 public void createTable(Table tbl) throws AlreadyExistsException, 

 InvalidObjectException, MetaException, NoSuchObjectException, TException { 

 HiveMetaHook hook = getHook(tbl); 

 if (hook != null) { 

 hook.preCreateTable(tbl); 

 } 

 boolean success = false; 

 try { 

 client.create_table(tbl); // org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler 

 if (hook != null) { 

 hook.commitCreateTable(tbl); 

 } 

 success = true; //成功 

 } finally { 

 if (!success && (hook != null)) { 

 hook.rollbackCreateTable(tbl); 

 } 

 } 

 } 


HiveMetaStore.HMSHandler: 

 public void create_table(final Table tbl) throws AlreadyExistsException, 

 MetaException, InvalidObjectException { 

 incrementCounter("create_table"); 

 logStartFunction("create_table: db=" + tbl.getDbName() + " tbl=" 

 + tbl.getTableName()); // 11/08/28 06:12:02 INFO metastore.HiveMetaStore: 0: create_table: db=default tbl=tablePartition 

 try { 

 executeWithRetry(new Command<Boolean>() { 

 @Override 

 Boolean run(RawStore ms) throws Exception { 

 create_table_core(ms, tbl); // 

 return Boolean.TRUE; //返回true 

 } 

 }); 

 } catch (AlreadyExistsException e) { 

 throw e; 

 } catch (MetaException e) { 

 throw e; 

 } catch (InvalidObjectException e) { 

 throw e; 

 } catch (Exception e) { 

 assert(e instanceof RuntimeException); 

 throw (RuntimeException)e; 

 } 

 } 


HiveMetaStore.HMSHandler: 

 private <T> T executeWithRetry(Command<T> cmd) throws Exception { 

 T ret = null; 


 boolean gotNewConnectUrl = false; 

 boolean reloadConf = HiveConf.getBoolVar(hiveConf, 

 HiveConf.ConfVars.METASTOREFORCERELOADCONF); // false 


 if (reloadConf) { 

 updateConnectionURL(getConf(), null); 

 } 


 int retryCount = 0; 

 Exception caughtException = null; 

 while(true) { 

 try { 

 RawStore ms = getMS(reloadConf || gotNewConnectUrl); // ObjectStore 

 ret = cmd.run(ms); 

 break; 

 } catch (javax.jdo.JDOFatalDataStoreException e) { 

 caughtException = e; 

 } catch (javax.jdo.JDODataStoreException e) { 

 caughtException = e; 

 } 


 if (retryCount >= retryLimit) { 

 throw caughtException; 

 } 


 assert(retryInterval >= 0); 

 retryCount++; 

 LOG.error( 

 String.format( 

 "JDO datastore error. Retrying metastore command " + 

 "after %d ms (attempt %d of %d)", retryInterval, retryCount, retryLimit)); 

 Thread.sleep(retryInterval); 

 // If we have a connection error, the JDO connection URL hook might 

 // provide us with a new URL to access the datastore. 

 String lastUrl = getConnectionURL(getConf()); 

 gotNewConnectUrl = updateConnectionURL(getConf(), lastUrl); 

 } 

 return ret; // 返回true 

 } 



 private void create_table_core(final RawStore ms, final Table tbl) 

 throws AlreadyExistsException, MetaException, InvalidObjectException { 


 if (!MetaStoreUtils.validateName(tbl.getTableName()) 

 || !MetaStoreUtils.validateColNames(tbl.getSd().getCols()) 

 || (tbl.getPartitionKeys() != null && !MetaStoreUtils 

 .validateColNames(tbl.getPartitionKeys()))) { //验证表名、非分区字段名、分区字段名的合法性 

 throw new InvalidObjectException(tbl.getTableName() 

 + " is not a valid object name"); 

 } 


 Path tblPath = null; 

 boolean success = false, madeDir = false; 

 try { 

 ms.openTransaction(); 

 if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) { 

 if (tbl.getSd().getLocation() == null 

 || tbl.getSd().getLocation().isEmpty()) { 

 tblPath = wh.getDefaultTablePath( 

 tbl.getDbName(), tbl.getTableName()); // 默认的路径加上表名就是表的默认路径hdfs://localhost:54310/user/hive/warehouse/tablepartition 

 } else { 

 if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) { 

 LOG.warn("Location: " + tbl.getSd().getLocation() 

 + " specified for non-external table:" + tbl.getTableName()); 

 } 

 tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation())); 

 } 

 tbl.getSd().setLocation(tblPath.toString()); //设置表的路径信息hdfs://localhost:54310/user/hive/warehouse/tablepartition 

 } 


 // get_table checks whether database exists, it should be moved here 

 if (is_table_exists(tbl.getDbName(), tbl.getTableName())) { //如果表已经存在则判抛出异常。 

 throw new AlreadyExistsException("Table " + tbl.getTableName() 

 + " already exists"); 

 } 


 if (tblPath != null) { // 路径不为空 

 if (!wh.isDir(tblPath)) { // 路径是存在与否,若存在是否是目录 

 if (!wh.mkdirs(tblPath)) { //路径不存在,创建 

 throw new MetaException(tblPath 

 + " is not a directory or unable to create one"); 

 } 

 madeDir = true; //创建成功 

 } 

 } 


 // set create time 

 long time = System.currentTimeMillis() / 1000; 

 tbl.setCreateTime((int) time); //设置表的创建时间 

 tbl.putToParameters(Constants.DDL_TIME, Long.toString(time)); //最后操作时间 


 ms.createTable(tbl); //保存到数据库中 

 success = ms.commitTransaction(); //事务是否成功 


 } finally { 

 if (!success) { // 

 ms.rollbackTransaction(); 

 if (madeDir) { 

 wh.deleteDir(tblPath, true); 

 } 

 } 

 } 

 } 


ObjectStore: 

 public void createTable(Table tbl) throws InvalidObjectException, 

 MetaException { 

 boolean commited = false; 

 try { 

 openTransaction(); 

 MTable mtbl = convertToMTable(tbl); 

 pm.makePersistent(mtbl); //保存到数据库中,通过DataNucleus 

 commited = commitTransaction(); 

 } finally { 

 if (!commited) { 

 rollbackTransaction(); 

 } 

 } 

 }

https://www.xamrdz.com/database/6rd1959967.html

相关文章: