当前位置: 首页>后端>正文

Flink 使用之 SQL Gateway

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

背景

Flink 1.16.0整合了SQL Gateway,提供了多种客户端远程并发执行SQL的能力。Flink终于拥有了类似于Spark Thrift server的能力。

本篇为大家带来Flink SQL Gateway的部署、配置和使用。

作者使用的环境信息:

  • Flink 1.16.0
  • Hadoop 3.1.1
  • Hive 3.1.2

官网关于SQL Gateway的讲解参见https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql-gateway/overview/。

部署服务

SQL Gateway提交作业的执行后端可以是Flink的standalone集群或者是Yarn集群。

Standalone 集群

部署standalone集群可参见官网https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/resource-providers/standalone/overview/。

简单来说有如下步骤:

  1. 建立集群主节点到各个子节点的免密。
  2. 解压Flink 1.16.0安装包到主节点。
  3. 编辑$FLINK_HOME/conf/masters$FLINK_HOME/conf/workers文件,分别填写job manager和task manager的ip或者hostname,一行填写一个。通过这种方式手工指定Flink结群各角色在集群中的分布情况。
  4. 切换到需要运行Flink集群的用户,在主节点执行$FLINK_HOME/bin/start-cluster.sh,启动集群。

关闭standalone集群可以执行$FLINK_HOME/bin/stop-cluster.sh

集群成功启动之后可以接着启动sql-client。执行:

$FLINK_HOME/bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=xxx.xxx.xxx.xxx

其中-Dsql-gateway.endpoint.rest.address用来指定SQL Gateway服务绑定的地址。注意如果指定为localhost则SQL Gateway只能通过本机访问,无法对外提供服务。SQL Gateway服务日志文件在$FLINK_HOME/log目录中。

可以执行$FLINK_HOME/bin/sql-gateway.sh -h获取sql-gateway.sh命令更多的使用方式:

Usage: sql-gateway.sh [start|start-foreground|stop|stop-all] [args]
  commands:
    start               - Run a SQL Gateway as a daemon
    start-foreground    - Run a SQL Gateway as a console application
    stop                - Stop the SQL Gateway daemon
    stop-all            - Stop all the SQL Gateway daemons
    -h | --help         - Show this help message

建议调试运行的时候使用start-foreground前台运行,方便查看运行日志和故障重启服务。

Yarn 集群

将Flink 1.16.0安装包解压在Yarn集群任意节点,然后切换Flink用户执行:

export HADOOP_CLASSPATH=`hadoop classpath`
$FLINK_HOME/bin/yarn-session.sh -d -s 2 -jm 2048 -tm 2048

启动Flink Yarn集群。yarn-session.sh后面的参数按照实际情况修改。最后需要在Yarn管理页面的RUNNING Applications页面检查Flink Yarn集群是否正常启动。

要求Flink用户必须拥有提交Yarn作业的权限。如果没有,需要切换用户或者使用Ranger赋权。

Yarn启动成功之后接着启动SQL Gateway。务必使用和启动yarn-session相同的用户来启动SQL Gateway。否则SQL Gateway无法找到yarn application id。尽管能正常启动,但是执行SQL提交任务的时候会失败。

SQL Gateway正常启动后应能看到类似如下的日志:

INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-flink

Yarn properties file命名格式为.yarn-properties-{用户名}。本文作者使用flink用户,所以文件名为.yarn-properties-flink。如果有这一行日志,说明SQL Gateway找到了Flink Yarn集群。

在后面使用过程中,作业成功提交之后,日志中可以看到类似如下内容:

INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface xxx.xxx.xxx.xxx:40494 of application 'application_1670204805747_0006'.
INFO  org.apache.flink.client.program.rest.RestClusterClient       [] - Submitting job 'collect' (8bbea014547408c4716a483a701af8ab).
INFO  org.apache.flink.client.program.rest.RestClusterClient       [] - Successfully submitted job 'collect' (8bbea014547408c4716a483a701af8ab) to 'http://ip:40494'.

SQL Gateway能够找到Flink Yarn集群对应的application id,并且将作业提交给这个集群。

配置项

可以通过如下方式动态指定SQL Gateway的配置项

$FLINK_HOME/bin/sql-gateway.sh -Dkey=value

官网给出的配置项列表如下:

Key Default Type Description
sql-gateway.session.check-interval 1 min Duration The check interval for idle session timeout, which can be disabled by setting to zero or negative value.
sql-gateway.session.idle-timeout 10 min Duration Timeout interval for closing the session when the session hasn't been accessed during the interval. If setting to zero or negative value, the session will not be closed.
sql-gateway.session.max-num 1000000 Integer The maximum number of the active session for sql gateway service.
sql-gateway.worker.keepalive-time 5 min Duration Keepalive time for an idle worker thread. When the number of workers exceeds min workers, excessive threads are killed after this time interval.
sql-gateway.worker.threads.max 500 Integer The maximum number of worker threads for sql gateway service.
sql-gateway.worker.threads.min 5 Integer The minimum number of worker threads for sql gateway service.
  • sql-gateway.session.check-interval: 多长时间检查一次session是否超时。配置为0或者负数可以禁止这个行为。
  • sql-gateway.session.idle-timeout: session的超时时间,超时的session会被自动关闭。同样配置为0或者负数可以禁止这个行为。
  • sql-gateway.session.max-num: 活跃session数量的最大值。
  • sql-gateway.worker.keepalive-time: 空闲的worker线程保活时间。当实际worker线程数超过最小worker线程数之时,多出来的线程会在这个时间之后被kill掉。
  • sql-gateway.worker.threads.max: 最大worker线程数。
  • sql-gateway.worker.threads.min: 最小worker线程数。

使用

Flink SQL Gateway支持Rest API模式和hiveserver2模式。下面分别介绍它们的使用方式。

Rest API

前面部署过程中SQL Gateway默认是以Rest API的形式提供服务,这里直接讲解使用方式。假设在我们的测试环境SQL Gateway运行的IP和端口为sql-gateway-ip:8083

首先执行:

curl --request POST http://sql-gateway-ip:8083/v1/sessions

创建并获取到一个sessionHandle。示例返回如下:

{"sessionHandle":"2f35eb7e-97f0-40a4-b22d-f49c3a8fe7ef"}

然后以执行SQL SELECT 1语句为例。格式为:

curl --request POST http://sql-gateway-ip:8083/v1/sessions/${sessionHandle}/statements/ --data '{"statement": "SELECT 1"}'

我们替换sessionHandle为上面返回的sessionHandle,实际命令如下:

curl --request POST http://sql-gateway-ip:8083/v1/sessions/2f35eb7e-97f0-40a4-b22d-f49c3a8fe7ef/statements/ --data '{"statement": "SELECT 1"}'

得到的返回值包含一个operationHandle,如下所示:

{"operationHandle":"7dcb0266-ed64-423d-a984-310dc6398e5e"}

最后我们使用sessionHandleoperationHandle来获取运行结果。格式为:

curl --request GET http://sql-gateway-ip:8083/v1/sessions/${sessionHandle}/operations/${operationHandle}/result/0

其中最后一个0为token。可以理解为查询结果是分页(分批)返回,token为页码。

替换sessionHandleoperationHandle为前面获取的真实值,实际命令如下:

curl --request GET http://localhost:8083/v1/sessions/2f35eb7e-97f0-40a4-b22d-f49c3a8fe7ef/operations/7dcb0266-ed64-423d-a984-310dc6398e5e/result/0

得到结果如下:

{"results":{"columns":[{"name":"EXPRSELECT 1","logicalType":{"type":"INTEGER","nullable":false},"comment":null}],"data":[{"kind":"INSERT","fields":[1]}]},"resultType":"PAYLOAD","nextResultUri":"/v1/sessions/2f35eb7e-97f0-40a4-b22d-f49c3a8fe7ef/operations/7dcb0266-ed64-423d-a984-310dc6398e5e/result/1"}

我们从result -> data -> fields 可以得到nextResultUri的运行结果为1。

前面提到token的作用类似于分页。上面JSON的nextResultUri告诉我们获取下一批结果的URL。发现token从0变成了1。我们访问这个curl --request GET http://localhost:8083/v1/sessions/2f35eb7e-97f0-40a4-b22d-f49c3a8fe7ef/operations/7dcb0266-ed64-423d-a984-310dc6398e5e/result/1

{"results":{"columns":[{"name":"EXPRresultType","logicalType":{"type":"INTEGER","nullable":false},"comment":null}],"data":[]},"resultType":"EOS","nextResultUri":null}

返回如下内容:

EOS

可以看到nextResultUri

hiveserver2

,表示所有结果都已经获取到了。此时flink-connector-hive_2.12-1.16.0.jar为null,没有下一页结果。

lib

除了上述的Rest API之外,SQL Gateway还支持hiveserver2模式。

官网SQL Gateway hiveserver2模式相关内容参见https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/hive-compatibility/hiveserver2/。

要支持hiveserver2模式要求配置相关的依赖。首先需要添加

  • hive-common.jar
  • 到Flink的
  • hive-service-rpc.jar
  • 目录中。jar下载地址为:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.16.0/flink-connector-hive_2.12-1.16.0.jar

    除此之外还需要Hive的相关依赖:

    • hive-exec.jar
    • libthrift.jar
    • libfb303.jar
    • antlr-runtime.jar
    • lib $FLINK_HOME/bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=xxx.xxx.xxx.xxx -Dsql-gateway.endpoint.type=hiveserver2 -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/path/to/hive/conf -Dsql-gateway.endpoint.hiveserver2.thrift.port=10000

    这些包的版本需要和集群内的Hive保持一致,建议从集群Hive安装位置的

  • -Dsql-gateway.endpoint.rest.address: SQL Gateway服务绑定地址。
  • 目录直接复制。

    以hiveserver2模式启动SQL Gateway的命令为:

    rest

    其参数的含义为:

      hiveserver2
    • -Dsql-gateway.endpoint.type: 指定endpoint类型。默认值为hive-site.xml即Rest API。使用
    • -Dsql-gateway.endpoint.hiveserver2.thrift.port: hiveserver2模式SQL Gateway使用的端口。相当于Hive thriftserver的端口。
    • 类型必须显式配置。
    • -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hive' that implements 'org.apache.flink.table.planner.delegation.DialectFactory' in the classpath. Available factory identifiers are: Note: if you want to use Hive dialect, please first move the jar `flink-table-planner_2.12` located in `FLINK_HOME/opt` to `FLINK_HOME/lib` and then move out the jar `flink-table-planner-loader` from `FLINK_HOME/lib`. at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:545) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0] at org.apache.flink.table.planner.delegation.PlannerBase.getDialectFactory(PlannerBase.scala:161) ~[?:?] at org.apache.flink.table.planner.delegation.PlannerBase.getParser(PlannerBase.scala:171) ~[?:?] at org.apache.flink.table.api.internal.TableEnvironmentImpl.getParser(TableEnvironmentImpl.java:1694) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.<init>(TableEnvironmentImpl.java:240) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0] at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.<init>(AbstractStreamTableEnvironmentImpl.java:89) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0] at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.<init>(StreamTableEnvironmentImpl.java:84) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0] at org.apache.flink.table.gateway.service.context.SessionContext.createStreamTableEnvironment(SessionContext.java:309) ~[flink-sql-gateway-1.16.0.jar:1.16.0] at org.apache.flink.table.gateway.service.context.SessionContext.createTableEnvironment(SessionContext.java:269) ~[flink-sql-gateway-1.16.0.jar:1.16.0] at org.apache.flink.table.gateway.service.operation.OperationExecutor.getTableEnvironment(OperationExecutor.java:218) ~[flink-sql-gateway-1.16.0.jar:1.16.0] at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:89) ~[flink-sql-gateway-1.16.0.jar:1.16.0] at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatementopt(SqlGatewayServiceImpl.java:182) ~[flink-sql-gateway-1.16.0.jar:1.16.0] at org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation(OperationManager.java:111) ~[flink-sql-gateway-1.16.0.jar:1.16.0] at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$runflink-table-planner_2.12-1.16.0.jar(OperationManager.java:239) ~[flink-sql-gateway-1.16.0.jar:1.16.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_121] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_121] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_121] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_121] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121] 2022-12-08 17:42:03,007 INFO org.apache.flink.table.catalog.hive.HiveCatalog [] - Created HiveCatalog 'hive' 2022-12-08 17:42:03,008 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Trying to connect to metastore with URI thrift://xxx.xxx.xxx.xxx:9083 2022-12-08 17:42:03,008 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Opened a connection to metastore, current connections: 3 2022-12-08 17:42:03,009 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Connected to metastore. 2022-12-08 17:42:03,010 INFO org.apache.hadoop.hive.metastore.RetryingMetaStoreClient [] - RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=yarn (auth:SIMPLE) retries=24 delay=5 lifetime=0 2022-12-08 17:42:03,010 INFO org.apache.flink.table.catalog.hive.HiveCatalog [] - Connected to Hive metastore 2022-12-08 17:42:03,026 INFO org.apache.flink.table.module.ModuleManager [] - Loaded module 'hive' from class org.apache.flink.table.module.hive.HiveModule 2022-12-08 17:42:03,030 INFO org.apache.flink.table.gateway.service.session.SessionManager [] - Session f3f6f339-f5b0-425f-94ad-3e9ad11981c1 is opened, and the number of current sessions is 3. 2022-12-08 17:42:03,043 ERROR org.apache.flink.table.gateway.service.operation.OperationManager [] - Failed to execute the operation 7922e186-8110-4bb8-b93d-db17d88eac48. org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hive' that implements 'org.apache.flink.table.planner.delegation.DialectFactory' in the classpath. 配置文件所在目录。方便连接到Hive metastore,获取表的元数据信息。
    • lib

    除了上面列举出的之外,hiveserver2模式还有很多配置项,参见https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/hive-compatibility/hiveserver2/#endpoint-options。这里不再一一列出。

    现在启动SQL Gateway可能出现下面的错误:

    lib

    如果遇到这个错误,说明Flink没有发现Hive方言,需要将Flink flink-table-planner-loader-1.16.0.jar目录中的libantlr-runtime-3.5.2.jar flink-cep-1.16.0.jar flink-connector-files-1.16.0.jar flink-connector-hive_2.12-1.16.0.jar flink-csv-1.16.0.jar flink-dist-1.16.0.jar flink-json-1.16.0.jar flink-scala_2.12-1.16.0.jar flink-shaded-zookeeper-3.5.9.jar flink-table-api-java-uber-1.16.0.jar flink-table-planner_2.12-1.16.0.jar flink-table-runtime-1.16.0.jar hive-common-3.1.0.3.0.1.0-187.jar hive-exec-3.1.0.3.0.1.0-187.jar hive-service-rpc-3.1.0.3.0.1.0-187.jar libfb303-0.9.3.jar libthrift-0.9.3.jar log4j-1.2-api-2.17.1.jar log4j-api-2.17.1.jar log4j-core-2.17.1.jar log4j-slf4j-impl-2.17.1.jar 目录,然后将

  • hadoop-common.jar
  • 目录中的
  • hadoop-mapreduce-client-common.jar
  • 移除掉。

    到目前为止Flink的

  • hadoop-mapreduce-client-core.jar
  • 目录内容为:

  • hadoop-mapreduce-client-jobclient.jar
  • 此时已经可以正常使用SQL Gateway。但是使用Flink查询Hive表仍会出现缺少依赖问题。还需要添加Hadoop相关依赖:

      lib antlr-runtime-3.5.2.jar flink-cep-1.16.0.jar flink-connector-files-1.16.0.jar flink-connector-hive_2.12-1.16.0.jar flink-csv-1.16.0.jar flink-dist-1.16.0.jar flink-json-1.16.0.jar flink-scala_2.12-1.16.0.jar flink-shaded-zookeeper-3.5.9.jar flink-table-api-java-uber-1.16.0.jar flink-table-planner_2.12-1.16.0.jar flink-table-runtime-1.16.0.jar hadoop-common-3.1.1.3.0.1.0-187.jar hadoop-mapreduce-client-common-3.1.1.3.0.1.0-187.jar hadoop-mapreduce-client-core-3.1.1.3.0.1.0-187.jar hadoop-mapreduce-client-jobclient-3.1.1.3.0.1.0-187.jar hive-common-3.1.0.3.0.1.0-187.jar hive-exec-3.1.0.3.0.1.0-187.jar hive-service-rpc-3.1.0.3.0.1.0-187.jar libfb303-0.9.3.jar libthrift-0.9.3.jar log4j-1.2-api-2.17.1.jar log4j-api-2.17.1.jar log4j-core-2.17.1.jar log4j-slf4j-impl-2.17.1.jar auth=noSasl jdbc:hive2://sql-gateway-ip:10000/default;auth=noSasl

    最终org.apache.thrift.protocol.TProtocolException: Missing version in readMessageBegin, old client? 目录内容为:

    DBeaver

    最后再次尝试启动,笔者测试能够启动成功。

    接下来的工作是使用JDBC连接SQL Gateway。需要注意的是连接URL必须添加auth属性。比如:

    noSasl

    否则SQL Gateway会出现下面错误:

    C:\Users\xxx\AppData\Roaming\DBeaverData\drivers\remote\

    接下来分别介绍使用DBeaver,Java代码和Beeline方式连接Flink SQL Gateway。

    C:\Users\xxx\AppData\Roaming\DBeaverData\drivers\remote\timveil\hive-jdbc-uber-jar\releases\download\v1.9-2.6.5

    依次点击 新建连接 -> Apache Hive(可以搜索出来)。在主要 -> 一般窗格中填写主机端口号和数据库(可不写)。然后在驱动属性tab页,添加名称为

    使用Java代码

    的用户属性,值为<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>3.1.2</version> </dependency> 。点击完成按钮,连接创建完毕,可以点击工具栏SQL按钮打开SQL窗口编写SQL。

    注意:在创建连接的最后异步需要从GitHub上下载Hive JDBC驱动。可能会因为网络问题下载超时,在DBeaver中点击重试也没办法解决。我们可以手动下载。方法为在连接到数据库向导中点击编辑驱动,点击库这个tab页。可以看到驱动的下载链接。将其复制到浏览器下载。然后我们进入public static void main(String[] args) throws Exception { Class.forName("org.apache.hive.jdbc.HiveDriver"); try ( // Please replace the JDBC URI with your actual host, port and database. Connection connection = DriverManager.getConnection("jdbc:hive2://sql-gateway-ip:10000/default;auth=noSasl"); Statement statement = connection.createStatement()) { statement.execute("select * from some_table"); ResultSet resultSet = statement.getResultSet(); while (resultSet.next()) { System.out.println(resultSet.getString(1)); } } } 目录逐层向下查找驱动类的存放路径,例如org.apache.hive.jdbc.HiveDriver。将浏览器下载好的驱动放置到这个目录(如果目录中有DBeaver下载了一半失败的驱动文件,需要先删除掉)。点击在连接到数据库向导的完成按钮关闭向导就可以了。

    使用 Beeline

    Maven需要添加如下依赖:

    ./beeline
    !connect jdbc:hive2://sql-gateway-ip:10000/default;auth=noSasl
    

    然后编写Java代码:

    2022-12-09 10:24:28,600 ERROR org.apache.flink.table.endpoint.hive.HiveServer2Endpoint     [] - Failed to GetInfo.
    java.lang.UnsupportedOperationException: Unrecognized TGetInfoType value: CLI_ODBC_KEYWORDS.
            at org.apache.flink.table.endpoint.hive.HiveServer2Endpoint.GetInfo(HiveServer2Endpoint.java:371) [flink-connector-hive_2.12-1.16.0.jar:1.16.0]
            at org.apache.hive.service.rpc.thrift.TCLIService$Processor$GetInfo.getResult(TCLIService.java:1537) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
            at org.apache.hive.service.rpc.thrift.TCLIService$Processor$GetInfo.getResult(TCLIService.java:1522) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
            at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
            at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
            at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
            at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
    2022-12-09 10:24:28,600 ERROR org.apache.thrift.server.TThreadPoolServer                   [] - Thrift error occurred during processing of message.
    org.apache.thrift.protocol.TProtocolException: Required field 'infoValue' is unset! Struct:TGetInfoResp(status:TStatus(statusCode:ERROR_STATUS, infoMessages:[*java.lang.UnsupportedOperationException:Unrecognized TGetInfoType value: CLI_ODBC_KEYWORDS.:9:8, org.apache.flink.table.endpoint.hive.HiveServer2Endpoint:GetInfo:HiveServer2Endpoint.java:371, org.apache.hive.service.rpc.thrift.TCLIService$Processor$GetInfo:getResult:TCLIService.java:1537, org.apache.hive.service.rpc.thrift.TCLIService$Processor$GetInfo:getResult:TCLIService.java:1522, org.apache.thrift.ProcessFunction:process:ProcessFunction.java:39, org.apache.thrift.TBaseProcessor:process:TBaseProcessor.java:39, org.apache.thrift.server.TThreadPoolServer$WorkerProcess:run:TThreadPoolServer.java:286, java.util.concurrent.ThreadPoolExecutor:runWorker:ThreadPoolExecutor.java:1142, java.util.concurrent.ThreadPoolExecutor$Worker:run:ThreadPoolExecutor.java:617, java.lang.Thread:run:Thread.java:745], errorMessage:Unrecognized TGetInfoType value: CLI_ODBC_KEYWORDS.), infoValue:null)
            at org.apache.hive.service.rpc.thrift.TGetInfoResp.validate(TGetInfoResp.java:379) ~[hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
            at org.apache.hive.service.rpc.thrift.TCLIService$GetInfo_result.validate(TCLIService.java:5228) ~[hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
            at org.apache.hive.service.rpc.thrift.TCLIService$GetInfo_result$GetInfo_resultStandardScheme.write(TCLIService.java:5285) ~[hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
            at org.apache.hive.service.rpc.thrift.TCLIService$GetInfo_result$GetInfo_resultStandardScheme.write(TCLIService.java:5254) ~[hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
            at org.apache.hive.service.rpc.thrift.TCLIService$GetInfo_result.write(TCLIService.java:5205) ~[hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
            at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:53) ~[hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
            at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) ~[hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
            at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
            at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
    2022-12-09 10:24:28,600 WARN  org.apache.thrift.transport.TIOStreamTransport               [] - Error closing output stream.
    java.net.SocketException: Socket closed
            at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:118) ~[?:1.8.0_121]
            at java.net.SocketOutputStream.write(SocketOutputStream.java:155) ~[?:1.8.0_121]
            at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) ~[?:1.8.0_121]
            at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) ~[?:1.8.0_121]
            at java.io.FilterOutputStream.close(FilterOutputStream.java:158) ~[?:1.8.0_121]
            at org.apache.thrift.transport.TIOStreamTransport.close(TIOStreamTransport.java:110) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
            at org.apache.thrift.transport.TSocket.close(TSocket.java:235) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
            at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:303) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
            at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
    

    和传统JDBC使用方式没有任何区别。需要注意Hive驱动的类名为。

    启动beeline并使用如下命令连接SQL Gateway:

    
    

    接下来会询问使用的用户名和密码。由于当前版本不支持认证,可直接回车略过。连接成功之后可以像使用Hive一样使用SQL语句。

    上面是官网给出的使用beeline工具的方式。但本人在验证的过程中遇到了如下错误:

    
    

    调查这个错误发现是Flnk 1.16.0版本的bug。这个问题链接为FLINK-29839。社区已经在1.16.1版本中解决。

    本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。


    https://www.xamrdz.com/backend/3kc1942666.html

    相关文章: