本篇文章为你整理了Flink SQL 批模式下 ClickHouse 批量写入(flink cdc 写入clickhouse)的详细内容,包含有flink批量写入kudu flink cdc 写入clickhouse flink批量更新数据库 flink写入clickhouse优化 Flink SQL 批模式下 ClickHouse 批量写入,希望能帮助你了解 Flink SQL 批模式下 ClickHouse 批量写入。
public class ClickHouseDynamicTableFactory implements DynamicTableSinkFactory {
public static final String IDENTIFIER = "clickhouse";
private static final String DRIVER_NAME = "ru.yandex.clickhouse.ClickHouseDriver";
public static final ConfigOption String URL = ConfigOptions
.key("url")
.stringType()
.noDefaultValue()
.withDescription("the jdbc database url.");
public static final ConfigOption String TABLE_NAME = ConfigOptions
.key("table-name")
.stringType()
.noDefaultValue()
.withDescription("the jdbc table name.");
public static final ConfigOption String USERNAME = ConfigOptions
.key("username")
.stringType()
.noDefaultValue()
.withDescription("the jdbc user name.");
public static final ConfigOption String PASSWORD = ConfigOptions
.key("password")
.stringType()
.noDefaultValue()
.withDescription("the jdbc password.");
public static final ConfigOption String FORMAT = ConfigOptions
.key("format")
.stringType()
.noDefaultValue()
.withDescription("the format.");
@Override
public String factoryIdentifier() {
return IDENTIFIER;
@Override
public Set ConfigOption ? requiredOptions() {
Set ConfigOption ? requiredOptions = new HashSet ();
requiredOptions.add(TABLE_NAME);
requiredOptions.add(URL);
return requiredOptions;
@Override
public Set ConfigOption ? optionalOptions() {
return new HashSet ();
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
// either implement your custom validation logic here ...
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig config = helper.getOptions();
// validate all options
helper.validate();
// get the validated options
JdbcOptions jdbcOptions = getJdbcOptions(config);
// derive the produced data type (excluding computed columns) from the catalog table
final DataType dataType = context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
// table sink
return new ClickHouseDynamicTableSink(jdbcOptions, dataType);
private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
final String url = readableConfig.get(URL);
final JdbcOptions.Builder builder = JdbcOptions.builder()
.setDriverName(DRIVER_NAME)
.setDBUrl(url)
.setTableName(readableConfig.get(TABLE_NAME))
.setDialect(new ClickHouseDialect());
readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
return builder.build();
public class ClickHouseDialect implements JdbcDialect {
private static final long serialVersionUID = 1L;
@Override
public String dialectName() {
return "ClickHouse";
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:clickhouse:");
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new ClickHouseRowConverter(rowType);
@Override
public String getLimitClause(long l) {
return "limit num : " + l;
@Override
public Optional String defaultDriverName() {
return Optional.of(ClickHouseDriver.class.getName());
@Override
public String quoteIdentifier(String identifier) {
return "`" + identifier + "`";
Sink输出类(重点)
public class ClickHouseDynamicTableSink implements DynamicTableSink {
private final JdbcOptions jdbcOptions;
private final DataType dataType;
private static final JdbcExecutionOptions DEFAULT_EXECUTION_OPTIONS = JdbcExecutionOptions.builder()
// 写入触发数据量阈值
.withBatchSize(2000)
// 写入触发时间阈值
.withBatchIntervalMs(1000)
// 重试次数
.withMaxRetries(3)
.build();
public ClickHouseDynamicTableSink(JdbcOptions jdbcOptions, DataType dataType) {
this.jdbcOptions = jdbcOptions;
this.dataType = dataType;
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
return requestedMode;
@SneakyThrows
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
ClickHouseTableEnum tableEnum = ClickHouseTableEnum.valueOf(jdbcOptions.getTableName());
TableService tableService = new TableServiceImpl(dataType, tableEnum);
return SinkFunctionProvider.of(new GenericJdbcSinkFunction (
new JdbcBatchingOutputFormat (
new SimpleJdbcConnectionProvider(jdbcOptions),
DEFAULT_EXECUTION_OPTIONS,
thisContext - JdbcBatchStatementExecutor.simple(
tableService.getInsertSql(),
tableService.getStatementBuilder(),
Function.identity()),
// 批模式下,数据对象重复利用,会发生覆盖问题,需要深拷贝对象
new RowDataConventFunction())));
@Override
public DynamicTableSink copy() {
return new ClickHouseDynamicTableSink(jdbcOptions, dataType);
@Override
public String asSummaryString() {
return "ClickHouse Table Sink";
@Slf4j
static class RowDataConventFunction implements JdbcBatchingOutputFormat.RecordExtractor RowData, RowData , Serializable {
@Override
public RowData apply(RowData rowData) {
BoxedWrapperRowData newRowData = null;
try {
newRowData = new BoxedWrapperRowData(rowData.getArity());
// 利用反射拷贝旧对象的值
Field field = ReflectUtil.getField(BoxedWrapperRowData.class, "fields");
Object[] fields = (Object[]) ReflectUtil.getFieldValue(rowData, field);
Object[] newFields = new Object[fields.length];
for (int i = 0; i fields.length; i++) {
newFields[i] = Objects.isNull(fields[i]) ? null : ReflectUtil.invoke(fields[i], "copy");
ReflectUtil.setFieldValue(newRowData, "fields", newFields);
} catch (Exception e) {
log.error("convert error,data:{},", rowData, e);
return newRowData;
public class ClickHouseRowConverter extends AbstractJdbcRowConverter {
private static final long serialVersionUID = 1L;
public ClickHouseRowConverter(RowType rowType) {
super(rowType);
@Override
public String converterName() {
return "ClickHouse";
支持序列化的BiFunction
@FunctionalInterface
public interface MyBiFunction T, U, R extends Serializable {
R apply(T t, U u);
sql 生成类
public class TableServiceImpl {
private final List LogicalType logicalTypeList;
private final String insertSql;
public TableServiceImpl(DataType dataType, ClickHouseTableEnum tableEnum) {
this.logicalTypeList = dataType.getLogicalType().getChildren();
this.insertSql = initInsertSql(tableEnum);
private static final Map Class ? extends LogicalType , MyBiFunction RowData, Integer, Object FUNCTION_MAP = Maps.newHashMap();
static {
// 我的业务中用到的类型,可根据自己的业务,进行增加
FUNCTION_MAP.put(IntType.class, RowData::getInt);
FUNCTION_MAP.put(VarCharType.class, RowData::getString);
FUNCTION_MAP.put(DoubleType.class, RowData::getDouble);
FUNCTION_MAP.put(BigIntType.class, RowData::getLong);
FUNCTION_MAP.put(CharType.class, RowData::getString);
public String getInsertSql() {
return insertSql;
public JdbcStatementBuilder RowData getStatementBuilder() {
return (statement, value) - {
for (int i = 0; i logicalTypeList.size(); i++) {
LogicalType logicalType = logicalTypeList.get(i);
Object realValue = FUNCTION_MAP.get(logicalType.getClass()).apply(value, i);
statement.setObject(i + 1, realValue);
// 根据枚举字段配置,生成 insert sql
public static String initInsertSql(ClickHouseTableEnum tableEnum) {
List String columns = tableEnum.getColumns().stream().map(ClickHouseTableEnum.ColumnObj::getColumnName).collect(Collectors.toList());
return String.format("insert into %s (%s) values (%s)"
, tableEnum.name()
, StrUtil.join(",", columns)
, StrUtil.repeatAndJoin("?", columns.size(), ","));
public static void main(String[] args) {
System.out.println(initInsertSql(ClickHouseTableEnum.attr_order_group));
clickHouseTable 枚举类
@Getter
public enum ClickHouseTableEnum {
* 测试表,因为业务需要,我定义的 ColumnObj 类,实际用个字符串就ok
test(Lists.newArrayList(
ColumnObj.of("name")
, ColumnObj.of("age")
private final List ColumnObj columns;
ClickHouseTableEnum(List ColumnObj columns) {
this.columns = columns;
@Getter
@Setter
@ToString
public static class ColumnObj {
* clickHouse 中字段名称
private String columnName;
* flink sql 中获取字段的key
private String sqlColumnKey;
* 两个值相同的情况,使用此构造函数
private ColumnObj(String columnName) {
this.columnName = columnName;
this.sqlColumnKey = columnName;
Spi 配置自定义的工厂
resources 目录下,创建 META-INF/services 目录
创建文件:org.apache.flink.table.factories.Factory
内容如下:指向自己的工厂类全路径
com.xxx.xxx.xxx.ClickHouseDynamicTableFactory
public class Test {
public static void main(String[] args) {
// 初始化 批模式环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
Configuration configuration = settings.toConfiguration();
configuration.set(CoreOptions.DEFAULT_PARALLELISM, 5);
TableEnvironment tableEnv = TableEnvironment.create(configuration);
// 创建 clickHouse 输出表
// 注意,WITH 后面的参数,table-name 需要跟 clickHouseTable 枚举类中对应上
tableEnv.executeSql("CREATE TABLE out_table_test (\n" +
" `name` STRING,\n" +
" `age` INT\n" +
") WITH (\n" +
" connector = clickhouse,\n" +
" url = jdbc:clickhouse://172.23.4.32:8123/test,\n" +
" table-name = test\n" +
")");
Table table = tableEnv.sqlQuery("select alice,18 ");
table.executeInsert("out_table_test");
// 打印日志
printLog(tableEnv, table, "test");
private static void printLog(TableEnvironment tableEnv, Table endTable, String outTableName) {
String outPrint = "consolePrint_" + outTableName;
tableEnv.executeSql("CREATE TABLE " + outPrint + " " + endTable.getResolvedSchema() + " WITH (\n" +
" connector = print\n" +
")");
endTable.executeInsert(outPrint);
Table countTable = tableEnv.sqlQuery("select count(*) from " + endTable);
tableEnv.executeSql("CREATE TABLE " + outPrint + "_count " + countTable.getResolvedSchema() + " WITH (\n" +
" connector = print\n" +
")");
countTable.executeInsert(outPrint + "_count");
以上就是Flink SQL 批模式下 ClickHouse 批量写入(flink cdc 写入clickhouse)的详细内容,想要了解更多 Flink SQL 批模式下 ClickHouse 批量写入的内容,请持续关注盛行IT软件开发工作室。
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。