当前位置: 首页>数据库>正文

presto中处理json presto cli

SQL on Hadoop的计算引擎的第一步总是从提交查询开始,只有提交了查询,才有后面一系列的sql解析,优化,生成执行计划,调度之类的工作,因此,今天我们来分析一下presto的提交查询是如何进行的(基于prestosql 330版本进行分析)。

presto的连接方式可以有两种,分别是CLI形式的和JDBC形式的,分别对应源码中的presto-cli模块和presto-jdbc模块,后续真正提交到presto coordinator对应到presto-client模块。

CLI方式

CLI方式的提交也就是命令行的方式,我们可以直接通过这种方式和presto集群进行交互,大多时候是已这种方式来进行测试,或者简单的提交查询。
使用方式是时间执行presto-cli-xxx-executable.jar,在其后带上各种参数,比如将presto-cli-xxx-executable.jar重命名为presto进行执行。默认是不需要进行安全认证的,如果你的presto集群要求密码认证的话,可以加上- -user,- -password带上用户名和密码

./presto --server localhost:8080 --catalog hive --schema default

默认情况下可以在~/.presto_history找到以往你执行过的查询语句。

时序图

首先通过时序图的方式大体描述源码的流程,可以有一个全局的印象,然后我们再具体进行描诉分析:

presto中处理json presto cli,presto中处理json presto cli_presto,第1张

源码分析

执行上面的可执行jar的main class实际上是io.prestosql.cli.Presto这个类,大体上的意思就通过airline解析命令行的参数,然后运行console.run方法。

public final class Presto
{
    private Presto() {}

    public static void main(String[] args)
    {
        Console console = singleCommand(Console.class).parse(args);

        if (console.helpOption.showHelpIfRequested() ||
                console.versionOption.showVersionIfRequested()) {
            return;
        }

        System.exit(console.run() ? 0 : 1);
    }
}

接下来分析Console类的run方法的逻辑:
首先是将上述解析之后的命令行参数封装到ClientSession中,然后获取query,判断CLI是否是–execute 和–file提交的查询,当是file方式的时候,不能是execute方式,否则会抛出RuntimeException。与此同时注册了一个shutdownHook,在控制台退出的时候中止所有Running状态的query。随后开始构造一个QueryRunner对象,将上面的Clientsession和其他的命令参数作为构造函数变量,同时会构造一个OkHttpClient和presto集群的coordinator进行通信。
如果已经有了query的话会直接执行executeCommand提交查询,否则等待命令行进行输入sql进行查询,最后走到Console的process方法。以下为简化后的代码

public boolean run()
    {
       // 命令行参数封装到ClientSession中
        ClientSession session = clientOptions.toClientSession();
        boolean hasQuery = clientOptions.execute != null;
        boolean isFromFile = !isNullOrEmpty(clientOptions.file);
        // 判断是--execute 还是--file
        if (isFromFile) {
            if (hasQuery) {
                throw new RuntimeException("both --execute and --file specified");
            }
           //.....
        }

        // abort any running query if the CLI is terminated
        AtomicBoolean exiting = new AtomicBoolean();
        ThreadInterruptor interruptor = new ThreadInterruptor();
        CountDownLatch exited = new CountDownLatch(1);
        // 注册hutdownHook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            exiting.set(true);
            interruptor.interrupt();
            awaitUninterruptibly(exited, EXIT_DELAY.toMillis(), MILLISECONDS);
        }));

        try (QueryRunner queryRunner = new QueryRunner(
                session,
                // ...)) {
            if (hasQuery) {
                return executeCommand(
                        queryRunner,
                        exiting,
                        query,
                        clientOptions.outputFormat,
                        clientOptions.ignoreErrors,
                        clientOptions.progress);
            }
            // 持续接收输入并处理查询
            runConsole(queryRunner, exiting);
            return true;
        }
          // ....
    }

不管是executeCommand还是runConsole最后的逻辑都会走到process方法,方法的参数之一就是上面构造的QueryRunner对象,接下来我们分析process方法。
process方法通过preprocessQuery获取最后执行的finalSql,再调用QueryRunner的startQuery提交查询,最后通过renderOutput呈现最终结果以输出。

private static boolean process(
            QueryRunner queryRunner,
            String sql,
            OutputFormat outputFormat,
            Runnable schemaChanged,
            boolean usePager,
            boolean showProgress,
            Terminal terminal,
            PrintStream out,
            PrintStream errorChannel)
    {
        String finalSql;
        try {
            // 预处理Query,得到finalSql
            finalSql = preprocessQuery(
                    Optional.ofNullable(queryRunner.getSession().getCatalog()),
                    Optional.ofNullable(queryRunner.getSession().getSchema()),
                    sql);
        }
        catch (QueryPreprocessorException e) {
           // ....
        }
               // 真正开始执行查询逻辑
        try (Query query = queryRunner.startQuery(finalSql)) {
              //输出结果
            boolean success = query.renderOutput(terminal, out, errorChannel, outputFormat, usePager, showProgress);

            ClientSession session = queryRunner.getSession();

            // update catalog and schema if present
            if (query.getSetCatalog().isPresent() || query.getSetSchema().isPresent()) {
                session = ClientSession.builder(session)
                        .withCatalog(query.getSetCatalog().orElse(session.getCatalog()))
                        .withSchema(query.getSetSchema().orElse(session.getSchema()))
                        .build();
            }
            //  ....
            return success;
        }
        catch (RuntimeException e) {
            System.err.println("Error running command: " + e.getMessage());
            if (queryRunner.isDebug()) {
                e.printStackTrace(System.err);
            }
            return false;
        }
    }

QueryRunner的startQuery方法new Query()调用startInternalQuery由StatementClientFactory构造了statementClient,最后将工作交给StatementClientV1,

// QueryRunner的处理处理
public class QueryRunner
        implements Closeable {
	 public Query startQuery(String query)
	    {
	        return new Query(startInternalQuery(session.get(), query), debug);
	    }
	
	    public StatementClient startInternalQuery(String query)
	    {
	        return startInternalQuery(stripTransactionId(session.get()), query);
	    }
	
	    private StatementClient startInternalQuery(ClientSession session, String query)
	    {
	        OkHttpClient.Builder builder = httpClient.newBuilder();
	        sslSetup.accept(builder);
	        OkHttpClient client = builder.build();
	         // StatementClientFactory会new StatementClientV1
	        return newStatementClient(client, session, query);
	    }
    }

StatementClientV1执行execute,由buildQueryRequest封装request,发送http请求给coodinator的/v1/statement接口进行处理,最后返回结果进行处理。

public StatementClientV1(OkHttpClient httpClient, ClientSession session, String query)
    {
         // .....
         // 封装request
        Request request = buildQueryRequest(session, query);
        //  client发送请求
        JsonResponse<QueryResults> response = JsonResponse.execute(QUERY_RESULTS_CODEC, httpClient, request);
        if ((response.getStatusCode() != HTTP_OK) || !response.hasValue()) {
            state.compareAndSet(State.RUNNING, State.CLIENT_ERROR);
            throw requestFailedException("starting query", request, response);
        }
        // 处理结果
        processResponse(response.getHeaders(), response.getValue());
    }

JDBC方式

实际生产更多的是开发人员通过数据平台这样的web门户提交给各类分析人员进行提交查询,一般是在sql引擎调度的项目中通过JDBC的方式连接presto集群,进而将用户的SQL查询提交给presto coordinator。

时序图

presto中处理json presto cli,presto中处理json presto cli_封装_02,第2张

源码解析

JDBC方式提交查询大体上是和CLI大同小异的,后期都是通过StatementClientV1将查询发送到coordinator,区别在于前期对query以及各种参数的处理不太一样,使用方式就是和Mysql JDBC一样,因此实现也是差不多的,这里就不详细展开描述了,感兴趣的读者可以根据上面的时序图找到对应的源码进行阅读,实现均在presto-jdbc模块。

以上为presto 通过CLI方式和JDBC方式提交查询的源码分析,由于作者经验尚浅,如有错误欢迎指正。



https://www.xamrdz.com/database/62u1962884.html

相关文章: