Flink的函数有哪些

这篇文章主要介绍了Flink的函数有哪些,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

创新互联公司专业为企业提供沙洋网站建设、沙洋做网站、沙洋网站设计、沙洋网站制作等企业网站建设、网页设计与制作、沙洋企业网站模板建站服务,10年沙洋做网站经验,不只是建网站,更提供有价值的思路和整体网络服务。

1. Map: 将数据流中的数据进行一个转化,形成一个新的数据流,消费一个元素,并且产生一个元素

具体代码实现

package com.wudl.core;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @version v1.0
 * @ProjectName Flinklearning
 * @ClassName WordMap
 * @Description TODO map 算子实例
 * @Date 2020/10/29 10:15
 */

public class WordMap {

    /**
     * @param args
     * Map 函数的用法
     * 映射:将数据流中的数据进行一个转化,形成一个新的数据流,消费一个元素,并且产生一个元素
     *参数: Lambda 表达式或者,new MapFunction实现类
     * 返回值:DataStream
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setMaxParallelism(1);
        env.socketTextStream("10.204.125.140", 8899)
                .map(new MapFunction<String, String>() {
                    @Override
                    public String map(String s) throws Exception {
                        String[] split = s.split(",");
                        return split[0] + "---" + split[1];
                    }
                }).print();

        env.execute();


    }
}

2. FlatMap:

将数据流中的整体拆分成一个 一个 的个体使用, 消费一个元素并产生零到多个元素
package com.wudl.core;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.List;

/**
 * @version v1.0
 * @ProjectName Flinklearning
 * @ClassName TransformFlatMap
 * @Description TODO FlatMap
 *
 * FlatMap: 是一种扁平的映射,将数据流中的整体拆分成为一个个的个体使用, 消费后的元素产生零到多个元素
 *
 *
 *
 * @Author wudl
 * @Date 2020/10/29 10:46
 *
 *
 * 函数 FlatMap
 * 将数据流中的整体拆分成一个 一个 的个体使用, 消费一个元素并产生零到多个元素
 * 参数: lambda 表达式或者是FlatFunction的实现类
 * 返回值:DataStream
 *
 *
 *
 */

public class TransformFlatMap {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
//        DataStreamSource<List<Integer>> listDs = env.fromCollection(Arrays.asList(
//                Arrays.asList(1, 2, 3),
//                Arrays.asList(3, 4, 5),
//                Arrays.asList(8,9,0)
//        ));


//        listDs.flatMap(new FlatMapFunction<List<Integer>, Integer>() {
//            @Override
//            public void flatMap(List<Integer> list, Collector<Integer> collector) throws Exception {
//
//                for (Integer number : list) {
//                    collector.collect(number + 100);
//                }
//
//            }
//        }).print();

        DataStreamSource<String> strDs = env.socketTextStream("10.204.125.140", 8899);
        strDs.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                String[] split = s.split(",");
                collector.collect(split[0]+split[1]);
            }
        }).print();

        env.execute();

    }

}

第三种:Filter  对数据流的过滤根据指定的规则将满足条件的(true) 的数据保留, 不瞒住条件的(false) 将丢弃

package com.wudl.core;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @version v1.0
 * @ProjectName Flinklearning
 * @ClassName TransformFilter
 * @Description TODO 流的过滤
 * @Date 2020/11/5 10:26
 */

public class TransformFilter {


    /**
     * 函数中Filter 中过滤
     * 过滤:根据指定的规则将满足条件的(true) 的数据保留, 不瞒住条件的(false)  将丢弃
     * 返回值:DataStream
     */
    public static void main(String[] args) throws Exception {

        //1.获取上下文的环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.设置并行度
        env.setParallelism(1);
        //3.获取数据流
        DataStreamSource<String> SourceDs = env.socketTextStream("10.204.125.140", 8899);
        //4. 过滤数据流
        DataStream<String> filter = SourceDs.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                String[] split = value.split(",");
                return split[1].length() > 3;
            }
        });
        filter.print();
        env.execute();

    }


}

感谢你能够认真阅读完这篇文章,希望小编分享的“Flink的函数有哪些”这篇文章对大家有帮助,同时也希望大家多多支持创新互联,关注创新互联行业资讯频道,更多相关知识等着你来学习!

本文标题:Flink的函数有哪些
文章源于:https://www.cdcxhl.com/article46/jjgihg.html

成都网站建设公司_创新互联,为您提供服务器托管做网站网页设计公司品牌网站设计小程序开发全网营销推广

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联

手机网站建设