在刚接触Flink时,踩了很多坑,接下来就把自己的遇到的问题和大家分享:
首先分享正确的过程:
1、在idea中新建maven项目,在pom.xml文件中导入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.6.4</version>
<scope>provided</scope>
</dependency>
2、编写java程序
package TestFlink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class flinkWordCount {
public static void main(String[] args) throws Exception{
//1、初始化flink的环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、DataSource
DataStreamSource<String> source = env.socketTextStream("localhost", 5001);
//3、DataProcess
DataStream<Tuple2<String, Integer>> wordones = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
/**
* @param line (输入字符串)
* @param out (输出)
* @throws Exception
*/
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
//将分割的字符串以二元数组的形式存储
Tuple2<String, Integer> wordone = new Tuple2<String, Integer>(word, 1);
//将单词组以二元数组输出
out.collect(wordone);
}
}
});
//3.1按照单词进行分组
KeyedStream<Tuple2<String, Integer>, Tuple> tuple2TupleKeyedStream = wordones.keyBy(0);
//3.2计算单词出现的总次数
DataStream<Tuple2<String, Integer>> sum = tuple2TupleKeyedStream.sum(1);
//4、DataSink(将结果直接打印 )
sum.print();
//5、启动并执行程序
env.execute("flink WordCount");
}
}
3、启动NC模拟实时数据(打开CMD ,然后输入nc -l - p 5001)

4、启动Java程序,并在CMD中输入字符串 this is a word

接下来分享遇到的问题:
1、报Flink java.lang.NoClassDefFoundError: org/apache/flink/api/common/functions/FlatMapFunction

解决方法,可参考https://blog.csdn.net/walykyy/article/details/105910155
2、报java.lang.IlleagalStateException: No ExecutorFactory found to execute the application

解决方法:修改pom.xml文件中的flink的引用版本,可直接复制上小节正确流程的pom.xml文件依赖。
|