(5)FlinkSQL将socket数据写入到mysql方式二

public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);        DataStreamSource streamSource = env.socketTextStream("127.0.0.1", 9999,"
");        SingleOutputStreamOperator waterDS = streamSource.map(new MapFunction() {            @Override            public WaterSensor map(String s) throws Exception {                String[] split = s.split(",");                return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));            }        });        // 将流转化为表        Table table = tableEnv.fromDataStream(waterDS,                $("id"),                $("ts"),                $("vc"),                $("pt").proctime());        tableEnv.createTemporaryView("EventTable", table);        tableEnv.executeSql("CREATE TABLE flinksink (" +                "componentname STRING," +                "componentcount BIGINT NOT NULL," +                "componentsum BIGINT" +                ") WITH (" +                "'connector.type' = 'jdbc'," +                "'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +                "'connector.table' = 'flinksink'," +                "'connector.driver' =  'com.mysql.cj.jdbc.Driver'," +                "'connector.username' = 'root'," +                "'connector.password' = 'root'," +                "'connector.write.flush.max-rows'='3'\r
" +                ")"        );        Table mysql_user = tableEnv.from("flinksink");        mysql_user.printSchema();        Table result = tableEnv.sqlQuery(                "SELECT " +                        "id as componentname, " +                //window_start, window_end,                        "COUNT(ts) as componentcount ,SUM(ts) as componentsum " +                        "FROM TABLE( " +                        "TUMBLE( TABLE EventTable , " +                        "DESCRIPTOR(pt), " +                        "INTERVAL '10' SECOND)) " +                        "GROUP BY id , window_start, window_end"        );        //方式一:写入数据库//        result.executeInsert("flinksink").print(); //;.insertInto("flinksink");        //方式二:写入数据库        tableEnv.createTemporaryView("ResultTable", result);        tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();//        tableEnv.toAppendStream(result, Row.class).print("toAppendStream");           //追加模式        env.execute();    }
发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章