Skip to main content
Ctrl+K
如果你觉得内容对你有帮助,请在 GitHub 上点个 star 吧!
Flink 原理与实践 - Home
  • GitHub
  • Flink 原理与实践
  • 1. 大数据技术概述
    • 1.1. 什么是大数据
    • 1.2. 从批处理到流处理
    • 1.3. 代表性大数据技术
    • 1.4. 从 Lambda 到 Kappa:大数据处理平台的演进
    • 1.5. 流处理基础概念
    • 1.6. 编程语言的选择
    • 1.7. 案例:使用 Kafka 构建文本数据流
  • 2. 大数据必备编程知识
    • 2.1. 继承和多态
    • 2.2. 泛型
    • 2.3. 函数式编程
    • 2.4. 案例实战:Flink 开发环境搭建
  • 3. Flink的设计与运行原理
    • 3.1. Flink 数据流图
    • 3.2. 架构与核心组件
    • 3.3. 任务执行与资源划分
    • 3.4. 案例实战:WordCount
  • 4. DataStream API
    • 4.1. Flink 程序的骨架结构
    • 4.2. Transformations
    • 4.3. 数据类型和序列化
    • 4.4. 用户自定义函数
    • 4.5. 案例实战:股票数据流处理
  • 5. 时间和窗口
    • 5.1. Flink 的时间语义
    • 5.2. ProcessFunction
    • 5.3. 窗口算子
    • 5.4. 双流关联
    • 5.5. 迟到数据
    • 5.6. 案例实战:股票价格
  • 6. 状态与检查点
    • 6.1. 实现有状态的计算
    • 6.2. Checkpoint
    • 6.3. Savepoint
    • 6.4. 案例实战:电商用户行为
  • 7. Flink 连接器
    • 7.1. Flink 端到端的 Exactly-Once 保障
    • 7.2. 自定义 Source 和 Sink
    • 7.3. Flink 中常用的 Connector
    • 7.4. 案例实战:读取并输出股票价格数据流
  • 8. Table API & SQL
    • 8.1. Table API & SQL 综述
    • 8.2. 动态表和持续查询
    • 8.3. 时间和窗口
    • 8.4. Join
    • 8.5. SQL DDL
    • 8.6. 系统函数
    • 8.7. 用户自定义函数
    • 8.8. 案例实战:SQL on IoT
  • 9. Flink 的部署和配置
    • 9.1. Flink 集群部署模式
    • 9.2. 配置文件
    • 9.3. 算子链与槽位共享
    • 9.4. 命令行工具
    • 9.5. 与 Hadoop 集成
    • 9.6. 案例实战:作业编码、打包与提交
  • .md

案例实战:Flink 开发环境搭建

本节目录

  • 准备所需软件
  • 下载并安装 Flink
  • 创建 Flink 工程
  • 调试和运行 Flink 程序
  • 运行程序
  • 本章小结

2.4. 案例实战:Flink 开发环境搭建#

本案例实战主要带领读者完成对 Flink 开发环境的搭建。

准备所需软件#

在 1.7 节中我们简单提到了 Kafka 的安装部署所需的软件环境,这里我们再次梳理一下 Flink 开发所需的软件环境。

  1. 操作系统

    • 目前,我们可以在 Linux、macOS 和 Windows 操作系统上开发和运行 Flink。类 UNIX 操作系统(Linux 或 macOS)是大数据首选的操作系统,它们对 Flink 的支持更好,适合进行 Flink 学习和开发。后文会假设读者已经拥有了一个类 UNIX 操作系统。Windows 用户为了构建一个类 UNIX 环境,可以使用专门为 Linux 操作系统打造的子系统(Windows subsystem for Linux,即 WSL)或者是 Cygwin,又或者创建一个虚拟机,在虚拟机中安装 Linux 操作系统。

  2. JDK

    • 和 Kafka 一样,Flink 开发基于 JDK,因此也需要提前安装好 JDK 1.8+ (Java 8 或更高的版本),配置好 Java 环境变量。

  3. 其他工具

    • 其他的工具因开发者习惯不同来安装,不是 Flink 开发所必需的,但这里仍然建议提前安装好以下工具。

      • Apache Maven 3.0+

        • Apache Maven 是一个项目管理工具,可以对 Java 或 Scala 项目进行构建及依赖管理,是进行大数据开发必备的工具。这里推荐使用 Maven 是因为 Flink 源码工程和本书的示例代码工程均使用 Maven 进行管理。

      • IntelliJ IDEA

        • IntelliJ IDEA 是一个非常强大的编辑器和开发工具,内置了 Maven 等一系列工具,是大数据开发必不可少的利器。Intellij IDEA 本来是一个商业软件,它提供了社区免费版本,免费版本已经基本能满足绝大多数的开发需求。

        • 除 IntelliJ IDEA 之外,还有 Eclipse IDE 或 NetBeans IDE 等开发工具,读者可以根据自己的使用习惯选择。由于 IntelliJ IDEA 对 Scala 的支持更好,本书建议读者使用 IntelliJ IDEA。

下载并安装 Flink#

从 Flink 官网下载编译好的 Flink 程序,把下载的.tgz 压缩包放在你想放置的目录。在下载时,Flink 提供了不同的选项,包括 Scala 2.11、Scala 2.12、源码版等。其中,前两个版本是 Flink 官方提供的可执行版,解压后可直接使用,无须从源码开始编译打包。Scala 不同版本间兼容性较差,对于 Scala 开发者来说,需要选择自己常用的版本,对于 Java 开发者来说,选择哪个 Scala 版本区别不大。本书写作时,使用的是 Flink 1.11 和 Scala 2.11,读者可以根据自身情况下载相应版本。

按照下面的方式,解压该压缩包,进入解压目录,并启动 Flink 集群。

$ tar -zxvf flink-1.11.2-bin-scala_2.11.tgz  # 解压
$ cd flink-1.11.2-bin-scala_2.11  # 进入解压目录
$ ./bin/start-cluster.sh  # 启动 Flink 集群

成功启动后,打开浏览器,输入 http://localhost:8081,可以进入 Flink 集群的仪表盘(WebUI),如 图 2.4 所示。Flink WebUI 可以对 Flink 集群进行管理和监控。

../_images/flink-WebUI.png

图 2.4 Flink WebUI#

创建 Flink 工程#

我们使用 Maven 从零开始创建一个 Flink 工程。

$ mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.11.2 \
    -DgroupId=com.myflink \
    -DartifactId=flink-study-scala \
    -Dversion=0.1 \
    -Dpackage=quickstart \
    -DinteractiveMode=false

archetype 是 Maven 提供的一种项目模板,是别人提前准备好了的项目的结构框架,用户只需要使用 Maven 工具下载这个模板,在这个模板的基础上丰富并完善代码逻辑。主流框架一般都准备好了 archetype,如 Spring、Hadoop 等。

不熟悉 Maven 的读者可以先使用 IntelliJ IDEA 内置的 Maven 工具,熟悉 Maven 的读者可直接跳过这部分。

如 图 2.5 所示,在 IntelliJ IDEA 里依次单击“File”→“New”→“Project”,创建一个新工程。

../_images/new-project.png

图 2.5 在 IntelliJ IDEA 中创建新工程#

如 图 2.6 所示,选择左侧的“Maven”,并勾选“Create from archetype”,并单击右侧的“Add Archetype”按钮。

../_images/Maven.png

图 2.6 添加 Maven 项目#

如 图 2.7 所示,在弹出的窗口中填写 archetype 信息。其中 GroupId 为 org.apache.flink,ArtifactId 为 flink-quickstart-java,Version 为 1.11.2,然后单击“OK”。这里主要是告诉 Maven 去资源库中下载哪个版本的模板。随着 Flink 的迭代开发,Version 也在不断更新,读者可以在 Flink 的 Maven 资源库中查看最新的版本。GroupId、ArtifactId、Version 可以唯一表示一个发布出来的 Java 程序包。配置好后,单击 Next 按钮进入下一步。

../_images/archetype.png

图 2.7 填写 archetype 信息#

如 图 2.8 所示,这一步是建立你自己的 Maven 工程,以区别其他 Maven 工程,GroupId 是你的公司或部门名称(可以随意填写),ArtifactId 是工程发布时的 Java 归档(Java Archive,JAR)包名,Version 是工程的版本。这些配置主要用于区别不同公司所发布的不同包,这与 Maven 和版本控制相关,Maven 的教程中都会介绍这些概念,这里不赘述。

../_images/project-info.png

图 2.8 配置你的工程信息#

接下来可以继续单击“Next”按钮,注意最后一步选择你的工程所在的磁盘位置,单击“Finish”按钮,如 图 2.9 所示。至此,一个 Flink 模板就下载好了。

../_images/project-location.png

图 2.9 配置本工程的位置#

工程结构如 图 2.10 所示。左侧的“Project”栏是工程结构,其中 src/main/java 文件夹是 Java 代码文件存放位置,src/main/scala 是 Scala 代码文件存放位置。我们可以在 StreamingJob 这个文件上继续修改,也可以重新创建一个新文件。

../_images/project-structure.png

图 2.10 工程结构#

Note

开发前要单击右下角的“Import Changes”,让 Maven 导入所依赖的包。

调试和运行 Flink 程序#

我们创建一个新的文件,名为 WordCountKafkaInStdOut.java,开始编写第一个 Flink 程序—流式词频统计(WordCount)程序。这个程序接收一个 Kafka 文本数据流,进行词频统计,然后输出到标准输出上。这里先不对程序做深入分析,后文中将会做更详细的解释。

首先要设置 Flink 的运行环境。

// 设置 Flink 运行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

设置 Kafka 相关参数,连接对应的服务器和端口号,读取名为 Shakespeare 的 Topic 中的数据源,将数据源命名为 stream。

// Kafka 参数
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink-group");
String inputTopic = "Shakespeare";
// Source
FlinkKafkaConsumer<String> consumer =
                new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(consumer);

使用 Flink API 处理这个数据流。

// Transformation
// 使用 Flink API 对输入流的文本进行操作
// 切词转换、分组、设置时间窗口、聚合
DataStream<Tuple2<String, Integer>> wordCount = stream
    .flatMap((String line, Collector<Tuple2<String, Integer>> collector) -> {
      String[] tokens = line.split("\\s");
      // 输出结果
      for (String token : tokens) {
        if (token.length() > 0) {
          collector.collect(new Tuple2<>(token, 1));
        }
      }
    })
    .returns(Types.TUPLE(Types.STRING, Types.INT))
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1);

将数据流输出。

// Sink
wordCount.print();

最后运行该程序。

// execute
env.execute("kafka streaming word count");

env.execute()是启动 Flink 作业所必需的,只有在 execute() 方法被调用时,之前调用的各个操作才会被提交到集群上或本地计算机上运行。

该程序的完整代码如代码清单 2-9 所示。

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class WordCountKafkaInStdOut {

    public static void main(String[] args) throws Exception {

        // 设置 Flink 执行环境
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

        // Kafka 参数
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "flink-group");
        String inputTopic = "Shakespeare";
        String outputTopic = "WordCount";

        // Source
        FlinkKafkaConsumer<String> consumer =
                new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(), properties);
        DataStream<String> stream = env.addSource(consumer);

        // Transformation
        // 使用 Flink API 对输入流的文本进行操作
        // 按空格切词、计数、分区、设置时间窗口、聚合
        DataStream<Tuple2<String, Integer>> wordCount = stream
            .flatMap((String line, Collector<Tuple2<String, Integer>> collector) -> {
                String[] tokens = line.split("\\s");
                // 输出结果
                for (String token : tokens) {
                    if (token.length() > 0) {
                        collector.collect(new Tuple2<>(token, 1));
                    }
                }
            })
            .returns(Types.TUPLE(Types.STRING, Types.INT))
            .keyBy(0)
            .timeWindow(Time.seconds(5))
            .sum(1);

        // Sink
        wordCount.print();

        // execute
        env.execute("kafka streaming word count");

    }
}

代码写完后,我们还要在 Maven 的项目对象模型(Project Object Model,POM)文件中引入下面的依赖,让 Maven 可以引用 Kafka。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>

其中,${scala.binary.version} 是所用的 Scala 版本号,可以是 2.11 或 2.12,${flink.version} 是所用的 Flink 的版本号,比如 1.11.2。

运行程序#

我们在 1.7 节中展示过如何启动一个 Kafka 集群,并向某个 Topic 内发送数据流。在本次 Flink 作业启动之前,我们还要按照 1.7 节提到的方式启动一个 Kafka 集群、创建对应的 Topic,并向 Topic 中写入数据。

  1. 在 IntelliJ IDEA 中运行程序

    • 在 IntelliJ IDEA 中,单击绿色运行按钮,运行这个程序。图 2.11 所示的两个绿色运行按钮中的任意一个都可以运行这个程序。

    • IntelliJ IDEA 下方的“Run”栏会显示程序的输出,包括本次需要输出的结果,图 2.12 所示。

../_images/run.png

图 2.11 在 IntelliJ IDEA 中运行 Flink 程序#

../_images/result.png

图 2.12 WordCount 程序运行结果#

恭喜你,你的第一个 Flink 程序运行成功!

Note

如果在 Intellij IDEA 中运行程序时遇到 java.lang.NoClassDefFoundError 报错,这是因为没有把依赖的类都加载进来。在 Intellij IDEA 中单击“Run”->“Edit configurations…”,在“Use classpath of module”选项上选择当前工程,并且勾选“Include dependencies with‘Provided’ Scope”

  1. 向集群提交作业

    • 目前,我们学会了先下载并启动本地集群,接着在模板的基础上添加代码,并在 IntelliJ IDEA 中运行程序。而在生产环境中,我们一般需要将代码编译打包,提交到集群上。我们将在第 9 章详细介绍如何向 Flink 集群提交作业。

    • 注意,这里涉及两个目录:一个是我们存放刚刚编写代码的工程目录,简称工程目录;另一个是从 Flink 官网下载解压的 Flink 主目录,主目录下的 bin 目录中有 Flink 提供的命令行工具。

    • 进入工程目录,使用 Maven 命令行将代码编译打包。

# 使用 Maven 命令行将代码编译打包
# 打好的包一般放在工程目录的 target 目录下
$ mvn clean package

回到 Flink 主目录,使用 Flink 提供的命令行工具 flink,将打包好的作业提交到集群上。命令行的参数 –class 用来指定哪个主类作为入口。我们之后会介绍命令行的具体使用方法。

$ bin/flink run --class
com.flink.tutorials.java.api.projects.wordcount.WordCountKafkaInStdOut
/Users/luweizheng/Projects/big-data/flink-tutorials/target/flink-tutorials-0.1.jar

如图 2.13示,这时,Flink WebUI 上就多了一个 Flink 作业。

../_images/flink-WebUI-job.png

图 2.13 Flink WebUI 中多了一个 Flink 作业#

程序的输出会保存到 Flink 主目录下面的 log 目录下的.out 文件中,可以使用下面的命令查看结果。

$ tail -f log/flink-*-taskexecutor-*.out

必要时,可以使用下面的命令关停本地集群。

$ ./bin/stop-cluster.sh

Flink 开发和调试过程中,一般有如下几种方式运行程序。

  • 使用 IntelliJ IDEA 内置的绿色运行按钮。这种方式主要在本地调试时使用。

  • 使用 Flink 提供的命令行工具向集群提交作业,包括 Java 和 Scala 程序。这种方式更适合生产环境。

  • 使用 Flink 提供的其他命令行工具,比如针对 Scala、Python 和 SQL 的交互式环境。

对于新手,可以先使用 IntelliJ IDEA 提供的内置运行按钮,熟练后再使用命令行工具。

本章小结#

本章中,我们回顾了 Flink 开发经常用到的继承和多态、泛型和函数式编程等概念,在本地搭建了一个 Flink 集群,创建了第一个 Flink 工程,并学会了如何运行 Flink 程序。

previous

2.3. 函数式编程

next

3. Flink的设计与运行原理

本节目录
  • 准备所需软件
  • 下载并安装 Flink
  • 创建 Flink 工程
  • 调试和运行 Flink 程序
  • 运行程序
  • 本章小结

By Weizheng Lu

© Copyright 2023-2024.