使用Python和Apache Flink结合的PyFlink的快速指南

在这篇文章中,我们将介绍PyFlink的架构,并提供一个使用PyFlink进行CDN日志分析的快速演示。

本博客是从英文版翻译而来的。您可以在此处查看原文。部分内容采用机器翻译。如果有翻译错误,请您指正,不胜感激。

为什么需要PyFlink?

Python上的Flink和Flink上的Python

那么,PyFlink到底是什么呢?顾名思义,PyFlink就是将Apache Flink与Python结合在一起,或者说是在Python上运行Flink的东西。但是,Python中的Flink到底意味着什么呢?首先,将这两者结合在一起可以让我们在Python中使用Flink的所有功能。更重要的是,PyFlink可以利用Python强大的生态系统在Flink上进行计算,并进一步推动该生态系统的发展。也就是说,对双方来说都是双赢局面。如果我们深入探讨一下这个话题,就会发现Flink框架与Python语言的整合并非偶然。

image.png

Python和大数据生态系统。

为了理解这一点,让我们看一下人们如何使用Python解决实际问题。根据用户调查,大多数人在数据分析和机器学习应用中使用Python。针对这种类型的场景,大数据领域也提供了一些理想的解决方案。除了扩大大数据产品的受众外,Python与大数据的集成通过将独立架构扩展为分布式架构,大大增强了Python生态系统的功能。同时,Python对于分析大量数据也有很高的需求。

image.png

为什么选择Flink和Python?

Python和大数据的整合与其他一些最近的趋势相一致。然而,再次强调,为什么Flink选择支持Python而不是Go或R或其他语言?此外,为什么大多数用户选择PyFlink而不是PySpark或PyHive?

为了理解这个原因,首先让我们考虑使用Flink框架的好处。

有利なアーキテクチャ:Flinkは、統一されたストリームとバッチ処理機能を備えたピュアストリームコンピューティングエンジンです。

爽やかなバイタリティ:ASFの客観的な統計によると、Flinkは2019年に最もアクティブなオープンソースプロジェクトです。

高い信頼性:オープンソースのプロジェクトとして、Flinkは長い間テストされ、ビッグデータ企業の生産環境に広く適用されてきました。

接下来,让我们来看一下Flink为什么选择支持Python而不是其他语言。根据统计数据显示,Python是继Java和C之后,人气第三的编程语言,并且自2018年以来得到了迅猛发展。虽然Java和Scala是Flink的默认语言,但支持Python似乎也是合理的选择。

image.png

PyFlink的出现是与相关技术的发展密不可分的产物。然而,最终目标是为Flink和Python用户带来利益,并解决实际问题,因此仅仅理解PyFlink的意义是不够的。因此,我们需要更深入地探讨如何实现PyFlink。

image.png

PyFlink架构

为了实现PyFlink,我们需要了解应该达到的主要目标和需要解决的核心问题。PyFlink的主要目标是什么?简而言之,PyFlink的主要目标如下所述。

1、使Python用户能够使用Flink的所有功能。
2、通过在Flink上执行Python的分析和计算功能,可以提升Python在大数据问题解决能力。

让我们分析一下为实现这些目标而解决的重要问题。

image.png

使Python用户能够使用Flink功能。

实现PyFlink需要在Flink上像现有的Java引擎一样开发Python引擎吗?答案是否定的。尝试在Flink的1.8版本之前进行过,但并不成功。基本设计原则是以最小的成本实现给定的目标。最简单的方法是提供一个Python API的层,并重新利用现有的计算引擎。

在Flink中,我们需要为Python提供哪些API呢?高级的表格API、SQL和有状态的DataStream API都是很常见的。由于我们深入了解了Flink的内部逻辑,接下来我们将提供Table API和专为Python开发的DataStream API。然而,我们留下了一个重要的问题,那就是还有什么其他的挑战需要解决呢?

image.png

关键问题

明显重要的问题是在Python虚拟机(PyVM)和Java虚拟机(JVM)之间建立握手,这对于Flink来支持多种语言是必不可少的。要解决这个问题,需要选择合适的通信技术。来吧,让我们开始吧。

image.png

虚拟机通信技术的选择

在实现PyVM和JVM之间通信的解决方案中,有两个选项:Apache Beam和Py4J。前者是一个多语言、多引擎兼容的知名项目,后者是专门用于PyVM和JVM通信的解决方案。为了理解Apache Beam和Py4J的区别,可以从几个不同的角度进行比较。首先,考虑以下比喻:要越过一堵墙,Py4J像鼹鼠一样挖洞,而Apache Beam则像一只大熊一样打破整堵墙。从这个角度来看,使用Apache Beam实现VM通信稍微复杂一些。总之,Apache Beam注重通用性,而在极端情况下可能缺乏灵活性。

image.png

除此之外,Flink还需要像FLIP-36一样的交互式编程。此外,为确保Flink正常运行,需要在API设计方面,特别是多语言支持方面保证语义的一致性。由于Apache Beam的现有架构无法满足这些要求,因此Py4J成为支持PyVM和JVM之间通信的最佳选择显而易见。

image.png

技術架构

在建立PyVM和JVM之间的通信后,已经实现了使Python用户能够使用Flink功能的初始目标。这在Flink版本1.9中已经实现。现在让我们来看一下Flink版本1.9的PyFlink API架构。

在Flink1.9版本中,通过使用Py4J实现了虚拟机间通信。启用了PyVM的网关和JVM的网关服务器,使其能够接收Python请求。此外,Python API还提供了TableENV和Table等对象,与Java API提供的对象相同。因此,在编写Python API时的关键是如何调用Java API。在Flink1.9版本中,也解决了作业配置的问题。可以通过执行Python命令,使用Python shell和CLI等多种方式提交作业。

image.png

然而,這種架構有哪些優點呢?首先,這種架構非常簡單,確保了Python API和Java API之間的語義兼容性。其次,它提供了與Java任務相媲美的出色Python任務處理性能。例如,Flink的Java API在去年的雙11期間,每秒能處理25億5,100萬條數據記錄。

image.png

在Flink上执行Python的分析和计算功能

在之前的章节中,我们说明了如何让Python用户能够利用Flink的功能。在这里,我们将介绍在Flink上执行Python函数的方法。一般来说,要在Flink上执行Python函数有两种方法:

选择代表性的Python类库,并将其API添加到PyFlink中。由于Python拥有太多的类库,所以该方法可能需要一些时间。在将API整合之前,我们需要优化Python的执行效率。

2、根据现有的Flink Table API和Python类库的特性,可以将现有的Python类库的函数都作为用户自定义函数来处理,并集成到Flink中。这在Flink的1.10版本以后得到支持。功能集成的重要问题是什么?重复一遍,那就是Python用户自定义函数的执行。

image.png

为执行用户自定义功能而选择的技术

Python的用户自定义函数的执行实际上相当复杂。它涉及到了虚拟机之间的通信、Python执行环境的管理、分析在Java和Python之间交换的业务数据、将Flink的状态后端传递到Python中、监视执行情况等。由于这样的复杂性,Apache Beam登场了。作为一个支持多个引擎和语言的强大工具,Apache Beam能够做很多事情来帮助处理这种情况,因此我们来看看Apache Beam如何处理Python的用户自定义函数的执行。

以下是Portability Framework,为了支持多种语言和引擎而设计的高度抽象化架构,专为Apache Beam而设。目前,Apache Beam支持几种不同的语言,包括Java、Go和Python。图中下方的Beam Fn Runners和Execution表示引擎和用户定义的功能执行环境。Apache Beam使用称为Protobuf的协议缓冲区来抽象化数据结构,并在gRPC协议上进行通信,封装了gRPC的核心服务。在这方面,Apache Beam更像是在PyFlink中照亮用户定义函数执行路径的萤火虫。有趣的是,萤火虫恰好成为了Apache Beam的吉祥物,这可能不仅仅是巧合。

image.png

在下图中,跑步者表示为Flink的Java算子。跑步者将映射到Python执行环境的SDK工作器。Apache Beam具有抽象化服务,包括控制、数据、状态和日志等。实际上,这些服务已经长时间在Beam Flink Runner上稳定高效运行。这简化了PyFlink UDF的执行。此外,Apache Beam具有API调用和用户定义函数执行的解决方案。PyFlink使用Py4J在API级别上进行虚拟机之间的通信,并使用Apache Beam的可移植性框架来配置用户定义函数的执行环境。

这表明了PyFlink在技术选择上严格遵守以最小成本实现给定目标的原则,并始终使用最适合长期发展的技术架构。在与Apache Beam合作期间,我已向Beam社区提交了20多个优化补丁。

image.png

用户定义功能架构 (in simplified Chinese)

UDF架构不仅要实现PyVM与JVM之间的通信,还需在编译和执行阶段满足不同的要求。在以下的PyLink用户定义函数架构图中,JVM的操作以绿色表示,PyVM的操作以蓝色表示。让我们看一下编译中的本地设计。本地设计依赖于纯粹的API映射调用。我们使用Py4J进行VM通信。每次调用Python的API时,相应的Java的API也会被同步调用。

为了支持用户定义的函数,需要用户定义函数注册API(register_function)。当定义Python的用户定义函数时,还需要一些第三方库。因此,添加依赖关系需要一系列的附加方法,例如add_Python_file()。在编写Python作业时,在提交作业之前还需要调用Java的API来创建JobGraph。然后,可以通过CLI等多种不同的方式将作业提交到集群中。

image.png

让我们看一下在这种架构下Python API和Java API是如何工作的。在Java方面,JobMaster将任务分配给TaskManager,就像普通的Java作业一样,TaskManager在JVM和PyVM上执行涉及操作符的任务。对于Python的用户定义函数操作符,我们需要设计各种gRPC服务用于JVM和PyVM之间的通信,例如用于业务数据通信的DataService以及用于调用Java状态后端的Python UDF的StateService等。此外,还提供了许多其他服务,如日志记录和指标等。

这些服务是基于Beam的Fn API构建的。用户定义的函数最终在Python工作器上执行,并且相应的gRPC服务将结果返回给JVM内的Python用户定义的函数操作符。Python工作器可以作为进程,在Docker容器内甚至外部服务集群中运行。这个扩展机制为将PyFlink与其他Python框架整合提供了坚实的基础。现在,我们已经对PyFlink 1.10引入的Python用户定义函数架构有了基本的理解,让我们来看看它的好处。

首先,它是一個成熟的多語言支持框架。基於Beam的架構能夠輕鬆擴展以支援其他語言。其次,它支持有狀態的用戶定義函數。Beam對有狀態的服務進行了抽象化,並使PyFlink能夠輕鬆支持有狀態的用戶定義函數。第三,易於維護。 Apache Beam和Apache Flink這兩個活躍的社區共同維護和優化相同的框架。

image.png

PyFlink 的使用方法

让我们先理解PyFlink的架构和背后的思想,然后看看PyFlink的具体应用场景。

PyFlink的应用场景

PyFlink支持哪些业务场景?我们可以从两个角度分析其应用场景:Python和Java。请记住,PyFlink适用于所有适用于Java的场景。
1、事件驱动的场景,如点击农场或监控。
2、数据分析的场景,如库存管理或数据可视化。
3、也被称为ETL场景的日志分析和数据流水线。
4、机器学习的场景,如精准推荐等。
所有这些场景都可以使用PyFlink。PyFlink也适用于特定于Python的场景,例如科学计算。有这么多的应用场景,你可能会想知道可以使用哪些具体的PyFlink API。因此,让我们现在来调查这个问题。

image.png

PyFlink的安装

image.png

PyFlink API 可以用以下方式来翻译成中文:

PyFlink 应用程序接口

image.png

在 PyFlink 中定义用户自定义函数

ScalarFunction可以通过添加更多的辅助功能(例如,通过添加指标)来扩展。此外,PyFlink的用户函数支持Python支持的所有方法定义,如lambda函数、命名函数和可调用函数等。

当您定义了这些方法之后,可以使用PyFlink装饰器进行标记,并描述输入输出的数据类型。此外,为了推断类型,还可以利用Python的类型提示功能来进一步优化以后的版本。通过下面的示例,您可以更好地理解用户定义函数的定义方式。

image.png

一个例子是定义了一个Python用户自定义函数。

在这个例子中,我们将对两个数字进行相加。我们需要导入必要的类,并定义上述函数。这个例子非常简单明了,让我们继续进行实际的情况。

image.png

在PyFlink中:阿里巴巴云CDN实时日志分析。

image.png

必需要的条件

我们在便宜上将实际业务统计要求简化了。在这个例子中,我们按地区收集了页面浏览数、下载数和下载速度的统计数据。数据的格式只选择了核心字段。例如,UUID是唯一的日志ID,client_ip是访问来源,request_time是资源的下载时间,response_size表示资源的数据大小。尽管需要收集按地区统计的数据,但原始日志中没有包含地区字段。因此,我们需要定义一个Python UDF来根据client_ip查询每个数据点所属的地域。让我们分析一下用户定义函数的定义方法。

用户自定义功能定义

这里定义了一个名为ip_to_province()的用户自定义函数。输入是IP地址,输出是地区名称的字符串。在这里,输入类型和输出类型都被定义为字符串。这个查询服务是用于演示的。在生产环境中,需要替换为可靠的地区查询服务。

image.png
import re
import json
from pyFlink.table import DataTypes
from pyFlink.table.udf import udf
from urllib.parse import quote_plus
from urllib.request import urlopen

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def ip_to_province(ip):
   """
   format:
       {
       'ip': '27.184.139.25',
       'pro': '河北省',
       'proCode': '130000',
       'city': '石家庄市',
       'cityCode': '130100',
       'region': '灵寿县',
       'regionCode': '130126',
       'addr': '河北省石家庄市灵寿县 电信',
       'regionNames': '',
       'err': ''
       }
   """
   try:
       urlobj = urlopen( \
        'http://whois.pconline.com.cn/ipJson.jsp?ip=%s' % quote_plus(ip))
       data = str(urlobj.read(), "gbk")
       pos = re.search("{[^{}]+\}", data).span()
       geo_data = json.loads(data[pos[0]:pos[1]])
       if geo_data['pro']:
           return geo_data['pro']
       else:
           return geo_data['err']
   except:
       return "UnKnow"

连结器的定义

我们已经分析了需求,并定义了用户自定义功能,所以下一步我们应继续进行作业开发。在一般的作业结构中,我们需要定义一个源连接器来读取Kafka数据,并且还需要定义一个汇连接器来将计算结果保存到MySQL数据库中。最后还需要编写统计逻辑。

PyFlink 支持 SQL DDL 语句,并且可以使用简单的 DDL 语句来定义源连接器。请务必将 connector.type 设置为 Kafka。此外,还可以使用 DDL 语句来定义 Sink 连接器,并将 connector.type 设置为 jdbc。正如您所见,连接器定义的逻辑非常简单。接下来,让我们来看一下统计学的核心逻辑。

image.png
kafka_source_ddl = """
CREATE TABLE cdn_access_log (
 uuid VARCHAR,
 client_ip VARCHAR,
 request_time BIGINT,
 response_size BIGINT,
 uri VARCHAR
) WITH (
 'connector.type' = 'kafka',
 'connector.version' = 'universal',
 'connector.topic' = 'access_log',
 'connector.properties.zookeeper.connect' = 'localhost:2181',
 'connector.properties.bootstrap.servers' = 'localhost:9092',
 'format.type' = 'csv',
 'format.ignore-parse-errors' = 'true'
)
"""

mysql_sink_ddl = """
CREATE TABLE cdn_access_statistic (
 province VARCHAR,
 access_count BIGINT,
 total_download BIGINT,
 download_speed DOUBLE
) WITH (
 'connector.type' = 'jdbc',
 'connector.url' = 'jdbc:mysql://localhost:3306/Flink',
 'connector.table' = 'access_statistic',
 'connector.username' = 'root',
 'connector.password' = 'root',
 'connector.write.flush.interval' = '1s'
)
"""

核心统计逻辑学

在这部分,首先需要从数据源中读取数据,然后使用ip_to_province(ip)函数将客户端ip转换为特定地区。接下来,收集按地区分类的页面浏览量、下载量和下载速度的统计数据。最后,将统计结果存储到结果表中。在这个统计逻辑中,除了使用Python的用户定义函数外,还使用了Flink内置的两个Java AGG函数,即sum和count。

image.png
# 核心的统计逻辑
t_env.from_path("cdn_access_log")\
   .select("uuid, "
           "ip_to_province(client_ip) as province, " # IP 转换为地区名称
           "response_size, request_time")\
   .group_by("province")\
   .select( # 计算访问量
           "province, count(uuid) as access_count, " 
           # 计算下载总量 
           "sum(response_size) as total_download,  " 
           # 计算下载速度
           "sum(response_size) * 1.0 / sum(request_time) as download_speed") \
   .insert_into("cdn_access_statistic")

用于实时日志分析的完整代码。

现在,让我们再次检查一下代码。首先,导入核心依赖,然后创建ENV,最后设置计划器。目前,Flink支持Flink计划器和Blink计划器。推荐使用Blink计划器。

然后,执行DDL语句,在先前定义的Kafka源表和MySQL结果表中进行注册。接下来,注册Python的UDF。请注意,在API请求中指定UDF的其他依赖文件,并将它们与作业一起发送到集群。最后,编写核心统计逻辑,调用执行者并提交作业。到此为止,我们已经创建了阿里巴巴云CDN的实时日志分析作业。现在,让我们来查看实际的统计结果。

image.png
import os

from pyFlink.datastream import StreamExecutionEnvironment
from pyFlink.table import StreamTableEnvironment, EnvironmentSettings
from enjoyment.cdn.cdn_udf import ip_to_province
from enjoyment.cdn.cdn_connector_ddl import kafka_source_ddl, mysql_sink_ddl

# 创建Table Environment, 并选择使用的Planner
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(
   env,
   environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())

# 创建Kafka数据源表
t_env.sql_update(kafka_source_ddl)
# 创建MySql结果表
t_env.sql_update(mysql_sink_ddl)

# 注册IP转换地区名称的UDF
t_env.register_function("ip_to_province", ip_to_province)

# 添加依赖的Python文件
t_env.add_Python_file(
    os.path.dirname(os.path.abspath(__file__)) + "/enjoyment/cdn/cdn_udf.py")
t_env.add_Python_file(os.path.dirname(
    os.path.abspath(__file__)) + "/enjoyment/cdn/cdn_connector_ddl.py")

# 核心的统计逻辑
t_env.from_path("cdn_access_log")\
   .select("uuid, "
           "ip_to_province(client_ip) as province, " # IP 转换为地区名称
           "response_size, request_time")\
   .group_by("province")\
   .select( # 计算访问量
           "province, count(uuid) as access_count, " 
           # 计算下载总量 
           "sum(response_size) as total_download,  " 
           # 计算下载速度
           "sum(response_size) * 1.0 / sum(request_time) as download_speed") \
   .insert_into("cdn_access_statistic")

# 执行作业
t_env.execute("pyFlink_parse_cdn_log")

实时日志分析的输出结果

将模拟数据作为CDN日志数据发送到Kafka。图表右侧实时收集了按地区统计的页面访问量、下载量和下载速度数据。

image.png

PyFlink的未来展望是什么?

通常,使用PyFlink进行业务开发很简单。即使不理解基础实现,也可以通过SQL和Table API轻松地描述业务逻辑。让我们来看一下PyFlink的整体情况。

根据目标制定的路线图

PyFlink的开发旨在让Python用户能够使用Flink的功能,并将Python函数集成到Flink中。根据PyFlink路线图,首先建立了PyVM和JVM之间的通信。然后,在Flink 1.9中,提供了将现有的Flink Table API的功能开放给Python用户的Python Table API。在Flink 1.10中,进行了Apache Beam的集成、设置Python用户定义函数执行环境、管理Python与其他类库的依赖关系,以及定义用户定义函数API来支持Python函数与Flink的集成,为将Python函数集成到Flink中做好了准备。

为了扩展Python的功能,PyFlink支持Pandas Series和DataFrame,并且可以直接在PyFlink中使用Pandas的用户定义函数。此外,未来还计划在SQL客户端上启用Python的用户定义函数,使PyFlink更易于使用。此外,还提供了Python ML Pipeline API,以使Python用户可以在PyFlink中使用机器学习。对于执行Python用户定义功能的监视在实际的生产和业务中非常重要。因此,PyFlink还提供了管理Python用户定义函数的指标。这些功能将集成到Flink 1.11中。

然而,這些只是PyFlink未來開發計劃的一部分而已。未來還有很多工作需要完成,如PyFlink性能優化、提供圖計算API、支持Pandas on Flink的Pandas原生API等。我們將繼續使Python用戶能夠持續使用Flink的現有功能,並將Python的強大功能整合到Flink中,以實現擴展Python生態系統的初衷。

image.png

PyFlink 1.11 的预览版本

让我们简单地了解一下Flink 1.11版本中的PyFlink的一些要点。

功能性 – Functional

让我们详细了解基于 Flink 1.11 的 PyFlink 的核心功能。我们致力于提供 PyFlink 1.11 支持的功能性、性能和易用性,并计划提供对 Pandas 用户自定义函数的支持。这样,我们可以直接在 PyFlink 中使用像累积分布函数这样的实用 Pandas 类库功能。

image.png

此外,还将在PyFlink中整合ML Pipeline API,以满足机器学习场景中的业务需求。下面将以一个使用PyFlink实现KMeans算法的示例来介绍。

image.png

表演

此外,我們也希望致力於提升PyFlink的性能。我們將通過使用Codegen、CPython、優化序列化和反序列化等方式來優化Python UDF的執行性能。初步比較表明,PyFlink 1.11的性能相較於PyFlink 1.10提升了約15倍。

image.png

易于使用

为了使PyFlink更易于使用,我们将在SQL DDL和SQL客户端中支持 Python 用户自定义函数。这样一来,您可以在多个渠道上使用PyFlink。

image.png

PyFlink的路线图、使命和愿景。

已经介绍了PyFlink的定义、其意义、API架构、用户自定义函数架构以及架构背后的权衡和优势。我们还看到了Flink 1.11中CDN的案例、PyFlink的路线图和要点。但除此之外,还有什么是必要的呢?

让我们来看一下PyFlink的未来。PyFlink受到了让Python用户能够使用Flink功能并在Flink上运行Python函数的使命驱使,那么PyFlink有哪些展望呢?您可能已经知道,PyFlink是Apache Flink的一部分,包括了运行时和API层。

在这两个层面上,PyFlink会如何发展?在运行时,PyFlink会构建一个用于JVM和PyVM之间通信的gRPC常规服务(如Control,Data,State等)。在这个框架中,PyFlink抽象了Java和Python用户定义函数的操作符,并构建了Python的执行容器,以多种方式支持Python的执行。例如,PyFlink可以作为一个进程,在Docker容器内,甚至在外部服务集群中执行。特别是在执行外部服务集群时,无限扩展性将通过套接字实现。所有这些对于随后的Python整合起着重要的作用。

关于API方面,我们将使Python基础的API能够在Flink中使用,以实现我们的任务。这也依赖于Py4J VM通信框架。在PyFlink中,计划逐步支持Flink的Java API(包括Python Table API、UDX、ML Pipeline、DataStream、CEP、Gelly、State API等)以及最受Python用户欢迎的Pandas API等多个API。基于这些API,PyFlink将继续与其他生态系统进行整合,并与Notebook、Zeppelin、Jupyter、Alink、阿里巴巴的开源Flink等合作,以便于开发。目前,PyAlink的功能已完全集成在其中。PyFlink还计划与像TensorFlow等现有的AI系统平台集成。

为此,我们可以看出基于任务的力量使得PyFlink能够持续发展。重申一遍,PyFlink的任务是使得Python用户能够使用Flink的功能,并在Flink上执行Python的分析和计算功能。目前,PyFlink的核心贡献者正在努力追求这一任务在社区中的实现。

image.png

PyFlink 的核心贡献者

最后,我要介绍 PyFlink 的核心提交者。

    • Fu Dian: Flink と他の 2 つのトップレベルの Apache プロジェクトのコミッター。Fu は PyFlink に多大な貢献をしています。

 

    • Huang Xingbo: 専用の PyFlink UDF パフォーマンスオプティマイザ。Huang氏は、かつてアリババのセキュリティアルゴリズムチャレンジ大会で優勝し、AIやミドルウェアのパフォーマンス大会で数々の好成績を残しています。

 

    • Cheng Hequn: Flink コミュニティの有名なコミッター。チェンは何度も非常に有益な情報を共有してきました。多くのユーザーはまだ彼のFlink Knowledge Mapを覚えているかもしれません。

 

    Zhong Wei: PyFlink のユーザー定義関数の依存性管理と使いやすさの最適化に注力してきたコミッター。中さんは多くのコードを投稿しています。

我的身份是最后一位提交者。我的介绍在这篇文章的最后部分。如果对PyFlink有任何问题,请随时联系我们的提交团队。

image.png

建议您将一般问题发送给Flink用户列表中的成员,通过电子邮件进行共享。如果有紧急问题,请发送邮件给Flink的贡献者。但是,为了有效地积累和共享知识,您也可以在Stackoverflow上提问。在提问之前,请先搜索您的问题,看是否已有答案。如果没有,清楚地描述您的问题。最后,请不要忘记在问题中添加PyFlink标签。

image.png

简要概述

在本文中,我们对PyFlink进行了深入分析。在PyFlink API的架构中,我们使用Py4J来进行PyVM和JVM之间的通信,并且同时保持了Python和Java API之间的语义一致性。在Python用户定义函数的架构中,我们集成了Apache Beam的可移植性框架,提供了高效稳定的Python用户定义函数。此外,还解释了背后的思想、技术权衡以及现有建筑的优点等。

接下来,我们介绍了适用于PyFlink的业务场景,并以阿里云CDN实时日志分析为例,介绍了PyFlink的实际功能。

在查看了PyFlink的路线图之后,我们预览了Flink 1.11中PyFlink的要点。相比PyFlink 1.10,我们期望PyFlink 1.11能够提高15倍以上的性能。最后,我们对PyFlink的使命进行了分析,即”让Python用户能够使用PyFlink”和”在Flink上执行Python的解析和计算功能”。

关于作者

孫金城,這篇文章的作者,2011年加入了阿里巴巴。他在阿里巴巴工作了9年期間,領導了多個內部核心系統的開發,包括阿里巴巴集團的行動日誌管理系統、阿里蘭、雲端轉碼系統和文件轉換系統等。他在2016年初認識了Apache Flink社區,起初以開發者身份參與項目建設。之後,他主導了特定模塊的開發,負責構建Apache Flink的Python API(PyFlink)。目前他還是Apache Flink和ALC(北京)的PMC成員,並擔任Apache Flink、Apache Beam和Apache IoTDB的提交者。

阿里巴巴云拥有两个数据中心,并拥有60个以上的可用区,是2019年加特纳认定的亚太地区第一的云基础设施提供商。
请点击这里查看阿里巴巴云的详细信息。
阿里巴巴云日本官方页面。

bannerAds