(7)FlinkSQL将kafka数据写入到mysql方式二

public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);        tableEnv.executeSql("CREATE TABLE WaterSensor (" +                "id STRING," +                "ts BIGINT," +                "vc BIGINT," +//                "`pt` TIMESTAMP(3),"+//                "WATERMARK FOR pt AS pt - INTERVAL '10' SECOND" +                "pt as PROCTIME() " +                ") WITH (" +                "'connector' = 'kafka'," +                "'topic' = 'kafka_data_waterSensor'," +                "'properties.bootstrap.servers' = '127.0.0.1:9092'," +                "'properties.group.id' =  'test'," +                "'scan.startup.mode' = 'earliest-offset'," +//                "'json.fail-on-missing-field' = 'false'," +//                "'json.ignore-parse-errors' = 'true'," +                "'format' = 'json'" +                ")"        );        tableEnv.executeSql("CREATE TABLE flinksink (" +                "componentname STRING," +                "componentcount BIGINT NOT NULL," +                "componentsum BIGINT" +                ") WITH (" +                "'connector.type' = 'jdbc'," +                "'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +                "'connector.table' = 'flinksink'," +                "'connector.driver' =  'com.mysql.cj.jdbc.Driver'," +                "'connector.username' = 'root'," +                "'connector.password' = 'root'," +                "'connector.write.flush.max-rows'='3'\r
" +                ")"        );        Table result = tableEnv.sqlQuery(                "SELECT " +                        "id as componentname, " +                //window_start, window_end,                        "COUNT(ts) as componentcount ,SUM(ts) as componentsum " +                        "FROM TABLE( " +                        "TUMBLE( TABLE WaterSensor , " +                        "DESCRIPTOR(pt), " +                        "INTERVAL '10' SECOND)) " +                        "GROUP BY id , window_start, window_end"        );//        //方式一:写入数据库////        result.executeInsert("flinksink").print(); //;.insertInto("flinksink");//        //方式二:写入数据库        tableEnv.createTemporaryView("ResultTable", result);        tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();        env.execute();    }
发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章