flink 官方文档,flink 管理页面
目录
概述算子FlatMapKeyByReduce连接窝测试连接卡夫卡正式测试打包上传服务器
概述
最近做了一个小任务,要使用弗林克处理域名数据,在4GB的域名文档中求出每个域名的顶级域名,最后输出每个顶级域名下的前10个子级域名。一个比较简单的入门级弗林克应用,代码很容易写,主要用到的算子有平面图,关键点,减少。但是由于专家打包问题,总是提示找不到入口类,卡了好久,最后也是成功解决了。
主体代码如下:
公共类FlinkStreamingTopDomain { public static void main(String[]args)引发异常{ //获取流处理运行环境StreamExecutionEnvironment env=StreamExecutionEnvironment。getexecutionenvironment();//获取kafkaConsumer flinkkafconsumer string kafkaConsumer=flinkutil。getkafkaconsumer( ahl _ test1 ,控制台-消费者-72096 );//从当前消费组下标开始读取kafkaconsumer。setstartfromcearliest();数据流源文本=环境。添加源(kafkaConsumer);//算子DataStreamTuple2String,字符串窗口计数=文本。平面地图(新平面地图()).keyBy(0).Reduce(new Reduce());//把数据打印到控制台windowCount.print()。设置平行度(16);//使用16个并行度//注意:因为弗林克是懒加载的,所以必须调用执行方法,上面的代码才会执行env.execute(流式顶级域名计算’);}}
算子
FlatMap
平面图是对一行字符进行处理的,官网上的解释如下
flatmapdastreamdatastream接受一个元素并生成零个、一个或多个元素。将句子拆分成单词的平面地图函数:数据流。平面映射(new FlatMapFunctionString,String(){ @ Override public void平面映射(String value,CollectorString out)抛出异常{ for(字符串字:值。拆分(){ out。收藏(word);} }});其实和大数据的地图差不多,都是把一行字符串进行处理,得到我们想要的键,值,不同之处在于地图处理后得到的是键,值[].即大数据的地图操作会按键自动的将价值处理成数组的形式,而弗林克的平面地图算子只会把每行数据处理成键、值.
下面是我处理业务的平面地图代码
//平面图分割域名,并输出二元组顶级域名,域名公共静态类平面地图实现FlatMapFunctionString,Tuple2String,String { @ override public void flat map(String s,CollectorTuple2String,String out)抛出异常{ string[]values=s.split(\^);//按字符^分割
if(values.length - 1 < 2) { return; } String domain = values[2]; out.collect(new Tuple2<String,String>(ToolUtil.getTopDomain(domain),domain)); } }我这里把数据处理成了二元组形式,之后reduce也是对这个二元组进行处理。
KeyBy
先来看看官网的解释
KeyByDataStream → KeyedStream Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, keyBy() is implemented with hash partitioning. There are different ways to specify keys.This transformation returns a KeyedStream, which is, among other things, required to use keyed state.dataStream.keyBy(value -> value.getSomeKey()) // Key by field "someKey"dataStream.keyBy(value -> value.f0) // Key by the first element of a TupleAttention:A type cannot be a key if: 1.it is a POJO type but does not override the hashCode() method and relies on the Object.hashCode() implementation. 2.it is an array of any type.
keyBy会按照一个keySelector定义的方式进行哈希分区,会将一个流分成多个Partition,相同key的会被分在同一个分区,经过keyBy的流变成KeyedStream。
需要注意的有两点:
1.pojo类型作为key,必须重写hashcode()方法
2.数组类型不能作为key
Reduce
官网的解释如下
ReduceKeyedStream → DataStreamA "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.A reduce function that creates a stream of partial sums:keyedStream.reduce(new ReduceFunction<Integer>() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; }});
reduce是进行滚动处理的,即reduce方法的第一个参数是当前已经得到的结果记为currentResult,第二个参数是当前要处理的<key,value>。流式计算会一条一条的处理数据,每处理完一条数据就得到新的currentResult。
业务处理代码如下
// 拼接同一分区下的ip public static class Reduce implements ReduceFunction<Tuple2<String,String>>{ @Override public Tuple2<String,String> reduce(Tuple2 t1, Tuple2 t2) throws Exception { String[] domains = t1.f1.toString().split("\^"); if(domains.length == 10){ return t1; } t1.f1 = t1.f1.toString() + "^" + t2.f1.toString(); System.out.println(t1.f1 ); return t1; } }
连接socket测试
1.将主体代码里的kafka获取数据,改成socket获取数据
// int port;// try {// ParameterTool parameterTool = ParameterTool.fromArgs(args);// port = parameterTool.getInt("port");// } catch (Exception e){// System.out.println("没有指定port参数,使用默认值1112");// port = 1112;// } // 连接socket获取输入数据// DataStreamSource<String> text = env.socketTextStream("192.168.3.221",port);
2.在服务器开启一个端口号:nc -l -p 1112
3.运行代码
4.服务器输入测试数据就可以实时的获取处理结果
连接kafka
正式
使用kafka命令创建主题
kafka-topics.sh --create --zookeeper IP1:2181 IP2:2181... --replication-factor 2 --partitions 16 --topic ahl_test
kafka建立topic需要先开启zookeeper
运行生产者jar包,用生产者读取数据
java -jar $jar包路径 $topic $path
测试
另外,还可以使用测试生产者实现和socket测试相同的效果
/kafka-console-producer.sh --broker-list slave3:9092 --topic ahl_test1
打包上传服务器
打包上传服务器注意不要使用idea提供的build方式,反正我使用build会一直报错找不到主类,即便我反编译jar包发现主类在里面,并且MF文件也有配置主类信息。这个问题卡了我很久,最后我使用mvn pakage的方式打包并运行成功,把我的打包插件贴出来帮助遇到和我相同问题的人
<plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.0.0</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><!--<createDependencyReducedPom>false</createDependencyReducedPom>--><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.ncs.flink.streaming.FlinkStreamingTopDomain</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins>
Flink运行指令为:
/home/soft/flink-1.12.0//bin/flink run -c com.ncs.flink.streaming.FlinkStreamingDomainJob /home/ahl/flink/situation-mapred-flink-0.0.1-SNAPSHOT.jar
或者可以访问Flink集群的8081端口,在提供的UI页面上传运行
以上就是Flink入门级应用域名处理示例的详细内容,更多关于Flink域名处理的资料请关注盛行IT其它相关文章!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。