当前位置: 首页>后端>正文

Flink 使用之动态Kerberos认证

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

背景

Flink Kerberos认证的配置位于YAML配置文件中,带来的便利是用户不用反复去配置认证信息。但这样使用有局限性。用户如果需要频繁切换不同的用户提交任务,是否可以像spark-submit那样通过--principal--keytab参数来临时指定用户呢?答案是可以的。

使用方式

Flink 1.17版本支持使用-D 配置项的方式在提交任务时指定认证配置。例如:

./flink run-application -t yarn-application -D security.kerberos.login.keytab=/etc/security/keytabs/hdfs.headless.keytab -D security.kerberos.login.principal=hdfs@PAUL.COM ../examples/batch/WordCount.jar

其中-D 配置项的内容和YAML配置文件中的参数名相同。

需要注意的是,经本人验证Flink 1.13.2 和Flink 1.15.4版本存在问题,无法使用此方式。

经查询社区,和修复此问题相关联的Issue为:

  • [FLINK-29435][client] SecurityConfiguration supports dynamic configuration
  • [FLINK-31321][Deployment/YARN] Yarn-session mode, securityConfiguration supports dynamic configuration

如果需要在1.13.x和1.15.x版本中修复,可以将这两个commit cherrypick过去重新编译解决。

除此之外还有一个备用方法:动态指定Kerberos Cache。该方法对于Flink 1.13.x和1.15.x可用。

kinit hdfs@PAUL.COM -kt /etc/security/keytabs/hdfs.headless.keytab -c /opt/paul/krb_cache
export KRB5CCNAME=/opt/paul/krb_cache && ./flink run -m yarn-cluster ../examples/batch/WordCount.jar

kinit命令可以通过-c参数指定自定义的cache文件路径。在Flink使用的时候将其配置到KRB5CCNAME环境变量对应起来。不同用户使用不同的cache文件可避免相互影响。

KRB5CCNAME环境变量仅在当前shell中生效,不会干扰其他的shell提交作业。

源代码分析

security config动态参数解析部分

CliFrontend::mainInternal方法try块内容:

final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
CommandLine commandLine =
        cli.getCommandLine(
                new Options(),
                Arrays.copyOfRange(args, min(args.length, 1), args.length),
                true);
Configuration securityConfig = new Configuration(cli.configuration);
// 获取命令行中-D key=value形式的动态参数,将其合并入securityConfig(Configuration类型)中
DynamicPropertiesUtil.encodeDynamicProperties(commandLine, securityConfig);
// 生成安全相关配置(Hadoop认证还是JAAS)
SecurityUtils.install(new SecurityConfiguration(securityConfig));
// 执行认证过程
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));

上面生成安全相关配置和执行认证过程参见Flink 源码之安全认证。

动态指定Kerberos Cache

相关源代码位于:KerberosUtils::static方法。相关代码片段为:

// 获取系统环境变量KRB5CCNAME
String ticketCache = System.getenv("KRB5CCNAME");
if (ticketCache != null) {
    if (IBM_JAVA) {
        System.setProperty("KRB5CCNAME", ticketCache);
    } else {
        // 添加到kerberosCacheOptions中
        kerberosCacheOptions.put("ticketCache", ticketCache);
    }
}

https://www.xamrdz.com/backend/33x1931356.html

相关文章: