Flink1.15 MySQL Catalog 尝鲜

导读:Flink 在 1.15 版本推出了 JDBC Catalog for MySQL JDBC | Apache Flink 的实现。虽然 MysqlCatalogs 只实现了元数据读取,不像 HiveCatalogs 支持元数据获取同时可作元数据管理,但这也帮我们节省了编写大量 CREATE TABLE 的语句的精力。

基础用法

用法很简单,先传入配置参数并实例化 MySqlCatalog,然后在环境中注册该 Catalog 即可

EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);

String name            = "my_catalog";
String defaultDatabase = "mydb";
String username        = "...";
String password        = "...";
String baseUrl         = "..."

tableEnv.registerCatalog("my_catalog", new MySqlCatalog(name, defaultDatabase, username, password, baseUrl));

编写 FlinkSQL 时搭配 Hints | Apache Flink 可实现动态 options 配置,这样我们就省去了每次都要编写冗长的 CREATE TABLE 语句,以及 MySQL元数据维护的问题。

select id,uid  from my_catalog.mydb.table1 /*+ OPTIONS(key=val [, key=val]*) */;

关于设置 JDBC URL 连接参数的问题

当 MySQL 5.0.0 以后的版本有一个名为 useInformationSchema 的数据库连接参数,在默认连接参数情况下,useInformationSchema=false,这会导致 Connection.getMetaData() 方法返回的DatabaseMetaData 对象是com.mysql.jdbc.DatabaseMetaData,而不是com.mysql.jdbc.DatabaseMetaDataUsingInfoSchema。父类DatabaseMetaData 并不一定能正常返回元数据。

笔者在实际使用 MysqlCatalog 时发现其构造方法中 baseUrl 参数只支持传入如 jdbc:mysql://xxxxx:3306,这是因为在构造方法中继承了父类 AbstractJdbcCatalog 的构造方法,而在父类构造方法中会把 baseUrl 直接拼上 defaultDatabase。这会导致我们一开始在传入 baseUrl 时拼上连接参数就会导致 defaultUrl 错误。

public AbstractJdbcCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
        super(catalogName, defaultDatabase);
        // .......
        this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
        this.defaultUrl = this.baseUrl + defaultDatabase;
    }

解决方案

在一阵摸索源码后并没有发现可以传入连接参数的地方,因此需要对 MySqlCatalog 进行一些简单的改造。为了使改动最小笔者决定新写一个 AdbCatalog ,通过继承 MySqlCatalog 并覆盖其部分方法来达到支持连接参数的目的,改动如下:

public class AdbCatalog extends MySqlCatalog {

    /**
     *  用于存放连接参数
     */
    protected final Properties props;

    public AdbCatalog(String catalogName, String defaultDatabase, String baseUrl, Properties props) {
        super(catalogName, defaultDatabase, props.getProperty("user"), props.getProperty("password"), baseUrl);
        this.props = props;
    }

    @Override
    public CatalogBaseTable getTable(ObjectPath tablePath)
            throws TableNotExistException, CatalogException {

        if (!tableExists(tablePath)) {
            throw new TableNotExistException(getName(), tablePath);
        }

        String databaseName = tablePath.getDatabaseName();
        String dbUrl = baseUrl + databaseName;

         // 改为通过 getConnection(url,props) 的方式创建链接,才可以把 useInformationSchema=true 参数传入,访问元数据
        try (Connection conn = DriverManager.getConnection(dbUrl, props)) {
            DatabaseMetaData metaData = conn.getMetaData();
            Optional primaryKey =
                    getPrimaryKey(
                            metaData,
                            databaseName,
                            getSchemaName(tablePath),
                            getTableName(tablePath));

            PreparedStatement ps =
                    conn.prepareStatement(
                            String.format("SELECT * FROM %s;", getSchemaTableName(tablePath)));

            ResultSetMetaData resultSetMetaData = ps.getMetaData();

            String[] columnNames = new String[resultSetMetaData.getColumnCount()];
            DataType[] types = new DataType[resultSetMetaData.getColumnCount()];

            for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
                columnNames[i - 1] = resultSetMetaData.getColumnName(i);
                types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i);
                // TODO 由于 ADB 与 MySQL 字段类型有些不一致, colFlag 计算有问题从而出现主键但是允许 NULL 的情况,最终导致后面 SQL 解析不过。这里直接默认 ADB 所有字段都为 NOT NULL
                types[i - 1] = types[i - 1].notNull();
            }

            Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types);
            primaryKey.ifPresent(
                    pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns()));
            Schema tableSchema = schemaBuilder.build();

            Map props = new HashMap<>();
            props.put(CONNECTOR.key(), IDENTIFIER);
            props.put(URL.key(), dbUrl);
            props.put(USERNAME.key(), username);
            props.put(PASSWORD.key(), pwd);
            props.put(TABLE_NAME.key(), getSchemaTableName(tablePath));
            return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props);
        } catch (Exception e) {
            throw new CatalogException(
                    String.format("Failed getting table %s", tablePath.getFullName()), e);
        }
    }

    @Override
    protected List extractColumnValuesBySQL(
            String connUrl,
            String sql,
            int columnIndex,
            Predicate filterFunc,
            Object... params) {

        List columnValues = Lists.newArrayList();

      	// 改为使用 getConnection(url,props)
        try (Connection conn = DriverManager.getConnection(connUrl, props);
             PreparedStatement ps = conn.prepareStatement(sql)) {
            if (Objects.nonNull(params) && params.length > 0) {
                for (int i = 0; i < params.length; i++) {
                    ps.setObject(i + 1, params[i]);
                }
            }
            ResultSet rs = ps.executeQuery();
            while (rs.next()) {
                String columnValue = rs.getString(columnIndex);
                if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) {
                    columnValues.add(columnValue);
                }
            }
            return columnValues;
        } catch (Exception e) {
            throw new CatalogException(
                    String.format(
                            "The following SQL query could not be executed (%s): %s", connUrl, sql),
                    e);
        }
    }

    protected Optional getPrimaryKey(
            DatabaseMetaData metaData, String database, String schema, String table)
            throws SQLException {
        ResultSet rs = metaData.getPrimaryKeys(database, schema, table);

        Map keySeqColumnName = new HashMap<>();
        String pkName = null;
        while (rs.next()) {
            String columnName = rs.getString("COLUMN_NAME");
            pkName = rs.getString("PK_NAME");
            int keySeq = rs.getInt("KEY_SEQ");
            Preconditions.checkState(
                    !keySeqColumnName.containsKey(keySeq - 1),
                    "The field(s) of primary key must be from the same table.");
            keySeqColumnName.put(keySeq - 1, columnName);
        }
        List pkFields =
                Arrays.asList(new String[keySeqColumnName.size()]);
        keySeqColumnName.forEach(pkFields::set);
        if (!pkFields.isEmpty()) {
            pkName = pkName == null ? "pk_" + String.join("_", pkFields) : pkName;
            return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields));
        }
        return Optional.empty();
    }

}

改动不大,思路大致就是构造方法变更为支持传入 props,在需要获取元数据时改用 DriverManager.getConnection(connUrl, props) 方式创建连接。

Properties adbProps = new Properties();
adbProps.setProperty("user", "xxxxxx");
adbProps.setProperty("password", "xxxxxxx");
adbProps.setProperty("useInformationSchema", "true");
tableEnv.registerCatalog("adb", new AdbCatalog("adb", "defaultDatabase", "baseUrl", adbProps));

关于主键允许 NULL 的问题

由于笔者使用阿里云的 ADB 数据库与传统的 MySQL 有些不同。在覆盖 getTable() 时获取到所有列属性都为允许 NULL 值(包括主键)。在后面 SQL 解析阶段虽然为主键但是却允许 NULL 值导致过不去验证。

String[] columnNames = new String[resultSetMetaData.getColumnCount()];
            DataType[] types = new DataType[resultSetMetaData.getColumnCount()];
            for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
                columnNames[i - 1] = resultSetMetaData.getColumnName(i);
                types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i);
                // 这里所有字段都没有进到设置为 NotNull 的逻辑
              if (resultSetMetaData.isNullable(i) == ResultSetMetaData.columnNoNulls) {
                    types[i - 1] = types[i - 1].notNull();
                }
            }

问题是 ResultSetMetaData.isNullable 中

public int isNullable(int column) throws SQLException {
        try {
            return !this.getField(column).isNotNull() ? 1 : 0;
        } catch (CJException var3) {
            throw SQLExceptionsMapping.translateException(var3, this.exceptionInterceptor);
        }
    }

该方法 isNotNull() 判断是通过 Field 的 colFlag 属性进行 & 计算得出结果的

public boolean isNotNull() {
        return (this.colFlag & 1) > 0;
    }

colFlag 默认值为 0,目前只看到 adjustFlagsByMysqlType() 根据类型对 colFlag 的值造成影响。

private void adjustFlagsByMysqlType() {
        switch(this.mysqlType) {
        case BIT:
            if (this.length > 1L) {
                this.colFlag = (short)(this.colFlag | 128);
                this.colFlag = (short)(this.colFlag | 16);
            }
            break;
        case BINARY:
        case VARBINARY:
            this.colFlag = (short)(this.colFlag | 128);
            this.colFlag = (short)(this.colFlag | 16);
            break;
        case DECIMAL_UNSIGNED:
        case TINYINT_UNSIGNED:
        case SMALLINT_UNSIGNED:
        case INT_UNSIGNED:
        case FLOAT_UNSIGNED:
        case DOUBLE_UNSIGNED:
        case BIGINT_UNSIGNED:
        case MEDIUMINT_UNSIGNED:
            this.colFlag = (short)(this.colFlag | 32);
        }
    }

由于笔者 ADB 表字段均为 NOT NULL 类型,可以直接通过去除掉判断 colFlag 值让字段都置为 NOT NULL 来解决问题,因此没有去深入研究分析,感兴趣的朋友可以深入分析下原因,有结果欢迎告知。

最后

感谢您的阅读,如果喜欢本文欢迎关注和转发,转载需注明出处,本头条号将持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章