SQL on Hadoop的计算引擎的第一步总是从提交查询开始,只有提交了查询,才有后面一系列的sql解析,优化,生成执行计划,调度之类的工作,因此,今天我们来分析一下presto的提交查询是如何进行的(基于prestosql 330版本进行分析)。
presto的连接方式可以有两种,分别是CLI形式的和JDBC形式的,分别对应源码中的presto-cli模块和presto-jdbc模块,后续真正提交到presto coordinator对应到presto-client模块。
CLI方式
CLI方式的提交也就是命令行的方式,我们可以直接通过这种方式和presto集群进行交互,大多时候是已这种方式来进行测试,或者简单的提交查询。
使用方式是时间执行presto-cli-xxx-executable.jar,在其后带上各种参数,比如将presto-cli-xxx-executable.jar重命名为presto进行执行。默认是不需要进行安全认证的,如果你的presto集群要求密码认证的话,可以加上- -user,- -password带上用户名和密码
./presto --server localhost:8080 --catalog hive --schema default
默认情况下可以在~/.presto_history找到以往你执行过的查询语句。
时序图
首先通过时序图的方式大体描述源码的流程,可以有一个全局的印象,然后我们再具体进行描诉分析:
源码分析
执行上面的可执行jar的main class实际上是io.prestosql.cli.Presto这个类,大体上的意思就通过airline解析命令行的参数,然后运行console.run方法。
public final class Presto
{
private Presto() {}
public static void main(String[] args)
{
Console console = singleCommand(Console.class).parse(args);
if (console.helpOption.showHelpIfRequested() ||
console.versionOption.showVersionIfRequested()) {
return;
}
System.exit(console.run() ? 0 : 1);
}
}
接下来分析Console类的run方法的逻辑:
首先是将上述解析之后的命令行参数封装到ClientSession中,然后获取query,判断CLI是否是–execute 和–file提交的查询,当是file方式的时候,不能是execute方式,否则会抛出RuntimeException。与此同时注册了一个shutdownHook,在控制台退出的时候中止所有Running状态的query。随后开始构造一个QueryRunner对象,将上面的Clientsession和其他的命令参数作为构造函数变量,同时会构造一个OkHttpClient和presto集群的coordinator进行通信。
如果已经有了query的话会直接执行executeCommand提交查询,否则等待命令行进行输入sql进行查询,最后走到Console的process方法。以下为简化后的代码
public boolean run()
{
// 命令行参数封装到ClientSession中
ClientSession session = clientOptions.toClientSession();
boolean hasQuery = clientOptions.execute != null;
boolean isFromFile = !isNullOrEmpty(clientOptions.file);
// 判断是--execute 还是--file
if (isFromFile) {
if (hasQuery) {
throw new RuntimeException("both --execute and --file specified");
}
//.....
}
// abort any running query if the CLI is terminated
AtomicBoolean exiting = new AtomicBoolean();
ThreadInterruptor interruptor = new ThreadInterruptor();
CountDownLatch exited = new CountDownLatch(1);
// 注册hutdownHook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
exiting.set(true);
interruptor.interrupt();
awaitUninterruptibly(exited, EXIT_DELAY.toMillis(), MILLISECONDS);
}));
try (QueryRunner queryRunner = new QueryRunner(
session,
// ...)) {
if (hasQuery) {
return executeCommand(
queryRunner,
exiting,
query,
clientOptions.outputFormat,
clientOptions.ignoreErrors,
clientOptions.progress);
}
// 持续接收输入并处理查询
runConsole(queryRunner, exiting);
return true;
}
// ....
}
不管是executeCommand还是runConsole最后的逻辑都会走到process方法,方法的参数之一就是上面构造的QueryRunner对象,接下来我们分析process方法。
process方法通过preprocessQuery获取最后执行的finalSql,再调用QueryRunner的startQuery提交查询,最后通过renderOutput呈现最终结果以输出。
private static boolean process(
QueryRunner queryRunner,
String sql,
OutputFormat outputFormat,
Runnable schemaChanged,
boolean usePager,
boolean showProgress,
Terminal terminal,
PrintStream out,
PrintStream errorChannel)
{
String finalSql;
try {
// 预处理Query,得到finalSql
finalSql = preprocessQuery(
Optional.ofNullable(queryRunner.getSession().getCatalog()),
Optional.ofNullable(queryRunner.getSession().getSchema()),
sql);
}
catch (QueryPreprocessorException e) {
// ....
}
// 真正开始执行查询逻辑
try (Query query = queryRunner.startQuery(finalSql)) {
//输出结果
boolean success = query.renderOutput(terminal, out, errorChannel, outputFormat, usePager, showProgress);
ClientSession session = queryRunner.getSession();
// update catalog and schema if present
if (query.getSetCatalog().isPresent() || query.getSetSchema().isPresent()) {
session = ClientSession.builder(session)
.withCatalog(query.getSetCatalog().orElse(session.getCatalog()))
.withSchema(query.getSetSchema().orElse(session.getSchema()))
.build();
}
// ....
return success;
}
catch (RuntimeException e) {
System.err.println("Error running command: " + e.getMessage());
if (queryRunner.isDebug()) {
e.printStackTrace(System.err);
}
return false;
}
}
QueryRunner的startQuery方法new Query()调用startInternalQuery由StatementClientFactory构造了statementClient,最后将工作交给StatementClientV1,
// QueryRunner的处理处理
public class QueryRunner
implements Closeable {
public Query startQuery(String query)
{
return new Query(startInternalQuery(session.get(), query), debug);
}
public StatementClient startInternalQuery(String query)
{
return startInternalQuery(stripTransactionId(session.get()), query);
}
private StatementClient startInternalQuery(ClientSession session, String query)
{
OkHttpClient.Builder builder = httpClient.newBuilder();
sslSetup.accept(builder);
OkHttpClient client = builder.build();
// StatementClientFactory会new StatementClientV1
return newStatementClient(client, session, query);
}
}
StatementClientV1执行execute,由buildQueryRequest封装request,发送http请求给coodinator的/v1/statement接口进行处理,最后返回结果进行处理。
public StatementClientV1(OkHttpClient httpClient, ClientSession session, String query)
{
// .....
// 封装request
Request request = buildQueryRequest(session, query);
// client发送请求
JsonResponse<QueryResults> response = JsonResponse.execute(QUERY_RESULTS_CODEC, httpClient, request);
if ((response.getStatusCode() != HTTP_OK) || !response.hasValue()) {
state.compareAndSet(State.RUNNING, State.CLIENT_ERROR);
throw requestFailedException("starting query", request, response);
}
// 处理结果
processResponse(response.getHeaders(), response.getValue());
}
JDBC方式
实际生产更多的是开发人员通过数据平台这样的web门户提交给各类分析人员进行提交查询,一般是在sql引擎调度的项目中通过JDBC的方式连接presto集群,进而将用户的SQL查询提交给presto coordinator。
时序图
源码解析
JDBC方式提交查询大体上是和CLI大同小异的,后期都是通过StatementClientV1将查询发送到coordinator,区别在于前期对query以及各种参数的处理不太一样,使用方式就是和Mysql JDBC一样,因此实现也是差不多的,这里就不详细展开描述了,感兴趣的读者可以根据上面的时序图找到对应的源码进行阅读,实现均在presto-jdbc模块。
以上为presto 通过CLI方式和JDBC方式提交查询的源码分析,由于作者经验尚浅,如有错误欢迎指正。