[TOC]
Mac OS 安装 flink
1
| brew install apache-flink
|
启动 flink
1
| /usr/local/opt/apache-flink/libexec/bin/start-cluster.sh
|
查看 flink 的 Web 界面 http://localhost:8081/

新建一个 maven 工程
依赖
1 2 3 4 5
| <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.11.0</version> </dependency>
|
SocketTextStreamWordCount
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| package org.fengxuechao.example.flink.quickstart;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;
import java.util.Arrays;
public class SocketTextStreamWordCount { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>"); return; }
String hostname = args[0]; Integer port = Integer.parseInt(args[1]);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream = env.socketTextStream(hostname, port);
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.map(value -> { System.out.println("map:" + value); return value; }).flatMap(new LineSplitter()) .keyBy(0) .sum(1);
sum.print();
env.execute("Java WordCount from SocketTextStream Example"); }
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) { System.out.println("LineSplitter.value:"+s); String[] tokens = s.toLowerCase().split("\\W+"); System.out.println("LineSplitter.value.splits:"+ Arrays.toString(tokens)); for (String token : tokens) { if (token.length() > 0) { collector.collect(new Tuple2<String, Integer>(token, 1)); } System.out.println("LineSplitter.value.split:"+ token); } } } }
|
运行
maven 项目打包
监听 9000 端口

使用 flink 运行程序
1
| flink run -c org.fengxuechao.example.flink.quickstart.SocketTextStreamWordCount flink/quickstart/target/quickstart-1.0-SNAPSHOT.jar 127.0.0.1 9000
|
查看 flink 界面的日志
