导读:Flink 在 1.15 版本推出了 JDBC Catalog for MySQL JDBC | Apache Flink 的实现。虽然 MysqlCatalogs 只实现了元数据读取,不像 HiveCatalogs 支持元数据获取同时可作元数据管理,但这也帮我们节省了编写大量 CREATE TABLE 的语句的精力。
用法很简单,先传入配置参数并实例化 MySqlCatalog,然后在环境中注册该 Catalog 即可
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "my_catalog";
String defaultDatabase = "mydb";
String username = "...";
String password = "...";
String baseUrl = "..."
tableEnv.registerCatalog("my_catalog", new MySqlCatalog(name, defaultDatabase, username, password, baseUrl));编写 FlinkSQL 时搭配 Hints | Apache Flink 可实现动态 options 配置,这样我们就省去了每次都要编写冗长的 CREATE TABLE 语句,以及 MySQL元数据维护的问题。
select id,uid from my_catalog.mydb.table1 /*+ OPTIONS(key=val [, key=val]*) */;当 MySQL 5.0.0 以后的版本有一个名为 useInformationSchema 的数据库连接参数,在默认连接参数情况下,useInformationSchema=false,这会导致 Connection.getMetaData() 方法返回的DatabaseMetaData 对象是com.mysql.jdbc.DatabaseMetaData,而不是com.mysql.jdbc.DatabaseMetaDataUsingInfoSchema。父类DatabaseMetaData 并不一定能正常返回元数据。
笔者在实际使用 MysqlCatalog 时发现其构造方法中 baseUrl 参数只支持传入如 jdbc:mysql://xxxxx:3306,这是因为在构造方法中继承了父类 AbstractJdbcCatalog 的构造方法,而在父类构造方法中会把 baseUrl 直接拼上 defaultDatabase。这会导致我们一开始在传入 baseUrl 时拼上连接参数就会导致 defaultUrl 错误。
public AbstractJdbcCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
super(catalogName, defaultDatabase);
// .......
this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
this.defaultUrl = this.baseUrl + defaultDatabase;
}在一阵摸索源码后并没有发现可以传入连接参数的地方,因此需要对 MySqlCatalog 进行一些简单的改造。为了使改动最小笔者决定新写一个 AdbCatalog ,通过继承 MySqlCatalog 并覆盖其部分方法来达到支持连接参数的目的,改动如下:
public class AdbCatalog extends MySqlCatalog {
/**
* 用于存放连接参数
*/
protected final Properties props;
public AdbCatalog(String catalogName, String defaultDatabase, String baseUrl, Properties props) {
super(catalogName, defaultDatabase, props.getProperty("user"), props.getProperty("password"), baseUrl);
this.props = props;
}
@Override
public CatalogBaseTable getTable(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
if (!tableExists(tablePath)) {
throw new TableNotExistException(getName(), tablePath);
}
String databaseName = tablePath.getDatabaseName();
String dbUrl = baseUrl + databaseName;
// 改为通过 getConnection(url,props) 的方式创建链接,才可以把 useInformationSchema=true 参数传入,访问元数据
try (Connection conn = DriverManager.getConnection(dbUrl, props)) {
DatabaseMetaData metaData = conn.getMetaData();
Optional primaryKey =
getPrimaryKey(
metaData,
databaseName,
getSchemaName(tablePath),
getTableName(tablePath));
PreparedStatement ps =
conn.prepareStatement(
String.format("SELECT * FROM %s;", getSchemaTableName(tablePath)));
ResultSetMetaData resultSetMetaData = ps.getMetaData();
String[] columnNames = new String[resultSetMetaData.getColumnCount()];
DataType[] types = new DataType[resultSetMetaData.getColumnCount()];
for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
columnNames[i - 1] = resultSetMetaData.getColumnName(i);
types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i);
// TODO 由于 ADB 与 MySQL 字段类型有些不一致, colFlag 计算有问题从而出现主键但是允许 NULL 的情况,最终导致后面 SQL 解析不过。这里直接默认 ADB 所有字段都为 NOT NULL
types[i - 1] = types[i - 1].notNull();
}
Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types);
primaryKey.ifPresent(
pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns()));
Schema tableSchema = schemaBuilder.build();
Map props = new HashMap<>();
props.put(CONNECTOR.key(), IDENTIFIER);
props.put(URL.key(), dbUrl);
props.put(USERNAME.key(), username);
props.put(PASSWORD.key(), pwd);
props.put(TABLE_NAME.key(), getSchemaTableName(tablePath));
return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props);
} catch (Exception e) {
throw new CatalogException(
String.format("Failed getting table %s", tablePath.getFullName()), e);
}
}
@Override
protected List extractColumnValuesBySQL(
String connUrl,
String sql,
int columnIndex,
Predicate filterFunc,
Object... params) {
List columnValues = Lists.newArrayList();
// 改为使用 getConnection(url,props)
try (Connection conn = DriverManager.getConnection(connUrl, props);
PreparedStatement ps = conn.prepareStatement(sql)) {
if (Objects.nonNull(params) && params.length > 0) {
for (int i = 0; i < params.length; i++) {
ps.setObject(i + 1, params[i]);
}
}
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String columnValue = rs.getString(columnIndex);
if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) {
columnValues.add(columnValue);
}
}
return columnValues;
} catch (Exception e) {
throw new CatalogException(
String.format(
"The following SQL query could not be executed (%s): %s", connUrl, sql),
e);
}
}
protected Optional getPrimaryKey(
DatabaseMetaData metaData, String database, String schema, String table)
throws SQLException {
ResultSet rs = metaData.getPrimaryKeys(database, schema, table);
Map keySeqColumnName = new HashMap<>();
String pkName = null;
while (rs.next()) {
String columnName = rs.getString("COLUMN_NAME");
pkName = rs.getString("PK_NAME");
int keySeq = rs.getInt("KEY_SEQ");
Preconditions.checkState(
!keySeqColumnName.containsKey(keySeq - 1),
"The field(s) of primary key must be from the same table.");
keySeqColumnName.put(keySeq - 1, columnName);
}
List pkFields =
Arrays.asList(new String[keySeqColumnName.size()]);
keySeqColumnName.forEach(pkFields::set);
if (!pkFields.isEmpty()) {
pkName = pkName == null ? "pk_" + String.join("_", pkFields) : pkName;
return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields));
}
return Optional.empty();
}
} 改动不大,思路大致就是构造方法变更为支持传入 props,在需要获取元数据时改用 DriverManager.getConnection(connUrl, props) 方式创建连接。
Properties adbProps = new Properties();
adbProps.setProperty("user", "xxxxxx");
adbProps.setProperty("password", "xxxxxxx");
adbProps.setProperty("useInformationSchema", "true");
tableEnv.registerCatalog("adb", new AdbCatalog("adb", "defaultDatabase", "baseUrl", adbProps));由于笔者使用阿里云的 ADB 数据库与传统的 MySQL 有些不同。在覆盖 getTable() 时获取到所有列属性都为允许 NULL 值(包括主键)。在后面 SQL 解析阶段虽然为主键但是却允许 NULL 值导致过不去验证。
String[] columnNames = new String[resultSetMetaData.getColumnCount()];
DataType[] types = new DataType[resultSetMetaData.getColumnCount()];
for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
columnNames[i - 1] = resultSetMetaData.getColumnName(i);
types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i);
// 这里所有字段都没有进到设置为 NotNull 的逻辑
if (resultSetMetaData.isNullable(i) == ResultSetMetaData.columnNoNulls) {
types[i - 1] = types[i - 1].notNull();
}
}问题是 ResultSetMetaData.isNullable 中
public int isNullable(int column) throws SQLException {
try {
return !this.getField(column).isNotNull() ? 1 : 0;
} catch (CJException var3) {
throw SQLExceptionsMapping.translateException(var3, this.exceptionInterceptor);
}
}该方法 isNotNull() 判断是通过 Field 的 colFlag 属性进行 & 计算得出结果的
public boolean isNotNull() {
return (this.colFlag & 1) > 0;
}colFlag 默认值为 0,目前只看到 adjustFlagsByMysqlType() 根据类型对 colFlag 的值造成影响。
private void adjustFlagsByMysqlType() {
switch(this.mysqlType) {
case BIT:
if (this.length > 1L) {
this.colFlag = (short)(this.colFlag | 128);
this.colFlag = (short)(this.colFlag | 16);
}
break;
case BINARY:
case VARBINARY:
this.colFlag = (short)(this.colFlag | 128);
this.colFlag = (short)(this.colFlag | 16);
break;
case DECIMAL_UNSIGNED:
case TINYINT_UNSIGNED:
case SMALLINT_UNSIGNED:
case INT_UNSIGNED:
case FLOAT_UNSIGNED:
case DOUBLE_UNSIGNED:
case BIGINT_UNSIGNED:
case MEDIUMINT_UNSIGNED:
this.colFlag = (short)(this.colFlag | 32);
}
}由于笔者 ADB 表字段均为 NOT NULL 类型,可以直接通过去除掉判断 colFlag 值让字段都置为 NOT NULL 来解决问题,因此没有去深入研究分析,感兴趣的朋友可以深入分析下原因,有结果欢迎告知。
感谢您的阅读,如果喜欢本文欢迎关注和转发,转载需注明出处,本头条号将持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。
| 留言与评论(共有 0 条评论) “” |