public class ClickHouseDynamicTableFactory implements DynamicTableSinkFactory {


   public static final String IDENTIFIER = "clickhouse";

   private static final String DRIVER_NAME = "ru.yandex.clickhouse.ClickHouseDriver";

   public static final ConfigOption String URL = ConfigOptions




   .withDescription("the jdbc database url.");

   public static final ConfigOption String TABLE_NAME = ConfigOptions




   .withDescription("the jdbc table name.");

   public static final ConfigOption String USERNAME = ConfigOptions




   .withDescription("the jdbc user name.");

   public static final ConfigOption String PASSWORD = ConfigOptions




   .withDescription("the jdbc password.");

   public static final ConfigOption String FORMAT = ConfigOptions




   .withDescription("the format.");


   public String factoryIdentifier() {

   return IDENTIFIER;


   public Set ConfigOption ? requiredOptions() {

   Set ConfigOption ? requiredOptions = new HashSet ();



   return requiredOptions;


   public Set ConfigOption ? optionalOptions() {

   return new HashSet ();


   public DynamicTableSink createDynamicTableSink(Context context) {

   // either implement your custom validation logic here ...

   final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

   final ReadableConfig config = helper.getOptions();

   // validate all options


   // get the validated options

   JdbcOptions jdbcOptions = getJdbcOptions(config);

   // derive the produced data type (excluding computed columns) from the catalog table

   final DataType dataType = context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();

   // table sink

   return new ClickHouseDynamicTableSink(jdbcOptions, dataType);

   private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {

   final String url = readableConfig.get(URL);

   final JdbcOptions.Builder builder = JdbcOptions.builder()




   .setDialect(new ClickHouseDialect());



   return builder.build();




public class ClickHouseDialect implements JdbcDialect {


   private static final long serialVersionUID = 1L;


   public String dialectName() {

   return "ClickHouse";


   public boolean canHandle(String url) {

   return url.startsWith("jdbc:clickhouse:");


   public JdbcRowConverter getRowConverter(RowType rowType) {

   return new ClickHouseRowConverter(rowType);


   public String getLimitClause(long l) {

   return "limit num : " + l;


   public Optional String defaultDriverName() {

   return Optional.of(ClickHouseDriver.class.getName());


   public String quoteIdentifier(String identifier) {

   return "`" + identifier + "`";





public class ClickHouseDynamicTableSink implements DynamicTableSink {


   private final JdbcOptions jdbcOptions;

   private final DataType dataType;

   private static final JdbcExecutionOptions DEFAULT_EXECUTION_OPTIONS = JdbcExecutionOptions.builder()

   // 写入触发数据量阈值


   // 写入触发时间阈值


   // 重试次数



   public ClickHouseDynamicTableSink(JdbcOptions jdbcOptions, DataType dataType) {

   this.jdbcOptions = jdbcOptions;

   this.dataType = dataType;


   public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {

   return requestedMode;



   public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {

   ClickHouseTableEnum tableEnum = ClickHouseTableEnum.valueOf(jdbcOptions.getTableName());

   TableService tableService = new TableServiceImpl(dataType, tableEnum);

   return SinkFunctionProvider.of(new GenericJdbcSinkFunction (

   new JdbcBatchingOutputFormat (

   new SimpleJdbcConnectionProvider(jdbcOptions),


   thisContext - JdbcBatchStatementExecutor.simple(




   // 批模式下,数据对象重复利用,会发生覆盖问题,需要深拷贝对象

   new RowDataConventFunction())));


   public DynamicTableSink copy() {

   return new ClickHouseDynamicTableSink(jdbcOptions, dataType);


   public String asSummaryString() {

   return "ClickHouse Table Sink";


   static class RowDataConventFunction implements JdbcBatchingOutputFormat.RecordExtractor RowData, RowData , Serializable {


   public RowData apply(RowData rowData) {

   BoxedWrapperRowData newRowData = null;

   try {

   newRowData = new BoxedWrapperRowData(rowData.getArity());

   // 利用反射拷贝旧对象的值

   Field field = ReflectUtil.getField(BoxedWrapperRowData.class, "fields");

   Object[] fields = (Object[]) ReflectUtil.getFieldValue(rowData, field);

   Object[] newFields = new Object[fields.length];

   for (int i = 0; i fields.length; i++) {

   newFields[i] = Objects.isNull(fields[i]) ? null : ReflectUtil.invoke(fields[i], "copy");

   ReflectUtil.setFieldValue(newRowData, "fields", newFields);

   } catch (Exception e) {

   log.error("convert error,data:{},", rowData, e);

   return newRowData;




public class ClickHouseRowConverter extends AbstractJdbcRowConverter {


   private static final long serialVersionUID = 1L;

   public ClickHouseRowConverter(RowType rowType) {



   public String converterName() {

   return "ClickHouse";







  public interface MyBiFunction T, U, R extends Serializable {

   R apply(T t, U u);



  sql 生成类


public class TableServiceImpl {


   private final List LogicalType logicalTypeList;

   private final String insertSql;

   public TableServiceImpl(DataType dataType, ClickHouseTableEnum tableEnum) {

   this.logicalTypeList = dataType.getLogicalType().getChildren();

   this.insertSql = initInsertSql(tableEnum);

   private static final Map Class ? extends LogicalType , MyBiFunction RowData, Integer, Object FUNCTION_MAP = Maps.newHashMap();

   static {

   // 我的业务中用到的类型,可根据自己的业务,进行增加

   FUNCTION_MAP.put(IntType.class, RowData::getInt);

   FUNCTION_MAP.put(VarCharType.class, RowData::getString);

   FUNCTION_MAP.put(DoubleType.class, RowData::getDouble);

   FUNCTION_MAP.put(BigIntType.class, RowData::getLong);

   FUNCTION_MAP.put(CharType.class, RowData::getString);

   public String getInsertSql() {

   return insertSql;

   public JdbcStatementBuilder RowData getStatementBuilder() {

   return (statement, value) - {

   for (int i = 0; i logicalTypeList.size(); i++) {

   LogicalType logicalType = logicalTypeList.get(i);

   Object realValue = FUNCTION_MAP.get(logicalType.getClass()).apply(value, i);

   statement.setObject(i + 1, realValue);

   // 根据枚举字段配置,生成 insert sql

   public static String initInsertSql(ClickHouseTableEnum tableEnum) {

   List String columns = tableEnum.getColumns().stream().map(ClickHouseTableEnum.ColumnObj::getColumnName).collect(Collectors.toList());

   return String.format("insert into %s (%s) values (%s)"

   , tableEnum.name()

   , StrUtil.join(",", columns)

   , StrUtil.repeatAndJoin("?", columns.size(), ","));

   public static void main(String[] args) {




  clickHouseTable 枚举类




  public enum ClickHouseTableEnum {

   * 测试表,因为业务需要,我定义的 ColumnObj 类,实际用个字符串就ok



   , ColumnObj.of("age")

   private final List ColumnObj columns;

   ClickHouseTableEnum(List ColumnObj columns) {

   this.columns = columns;




   public static class ColumnObj {

   * clickHouse 中字段名称

   private String columnName;

   * flink sql 中获取字段的key

   private String sqlColumnKey;

   * 两个值相同的情况,使用此构造函数

   private ColumnObj(String columnName) {

   this.columnName = columnName;

   this.sqlColumnKey = columnName;



  Spi 配置自定义的工厂

  resources 目录下,创建 META-INF/services 目录









public class Test {


   public static void main(String[] args) {

   // 初始化 批模式环境

   EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();

   Configuration configuration = settings.toConfiguration();

   configuration.set(CoreOptions.DEFAULT_PARALLELISM, 5);

   TableEnvironment tableEnv = TableEnvironment.create(configuration);

   // 创建 clickHouse 输出表

   // 注意,WITH 后面的参数,table-name 需要跟 clickHouseTable 枚举类中对应上

   tableEnv.executeSql("CREATE TABLE out_table_test (\n" +

   " `name` STRING,\n" +

   " `age` INT\n" +

   ") WITH (\n" +

   " connector = clickhouse,\n" +

   " url = jdbc:clickhouse://,\n" +

   " table-name = test\n" +


   Table table = tableEnv.sqlQuery("select alice,18 ");


   // 打印日志

   printLog(tableEnv, table, "test");

   private static void printLog(TableEnvironment tableEnv, Table endTable, String outTableName) {

   String outPrint = "consolePrint_" + outTableName;

   tableEnv.executeSql("CREATE TABLE " + outPrint + " " + endTable.getResolvedSchema() + " WITH (\n" +

   " connector = print\n" +



   Table countTable = tableEnv.sqlQuery("select count(*) from " + endTable);

   tableEnv.executeSql("CREATE TABLE " + outPrint + "_count " + countTable.getResolvedSchema() + " WITH (\n" +

   " connector = print\n" +


   countTable.executeInsert(outPrint + "_count");



