近日,调度系统迁移工具Air2phin宣布开源。借助Air2phin,用户可2步将调度系统从Airflow迁移至ApacheDolphinScheduler,为有调度系统迁移需要的用户带来极大便利。 Air2phin是什么? Air2phin是一个最近宣布开源的调度系统迁移工具,旨在将ApacheAirflowDAGs文件转换成ApacheDolphinSchedulerPythonSDK定义文件,从而实现用户将调度系统(Workfloworchestration)从Airflow迁移到DolphinScheduler的目的。它是一个基于多规则的AST转换器,使用LibCST来解析和转换Airflow的DAG代码,其全部规则使用Yaml文件定义,并提供了一定的自定义规则扩展能力。 近期,Air2phin已经发布了0。0。12版本,提供了丰富的功能,可以更好地帮助用户完成Airflow到ApacheDolphinScheduler的迁移。AST是AbstractSyntaxTree(抽象语法树)的缩写,它是一种以树状结构表示代码语法结构的数据结构。在编译器中,AST是由词法分析器和语法分析器生成的。词法分析器将源代码转换成标记流(tokenstream),语法分析器将标记流转换成抽象语法树。AST是一种树状结构,它由一系列节点组成,每个节点表示代码中的一个语法结构(如表达式、语句、函数、类等),节点之间的关系表示语法结构之间的嵌套关系。 为什么开源Air2phin? 可能有人会问,为什么我需要一个迁移工具?这是因为随着业务的发展,企业或组织原来使用的工作流编排系统已经无法满足当前的需求,需要将工作流编排系统迁移到新的平台或者更新到新的版本。经过调研,很多用户有了将调度系统从开源工作流编排系统Airflow迁移到ApacheDolphinScheduler上来的需求。 在迁移过程中,由于数据处理任务可能涉及多个系统之间的依赖关系,迁移过程需要确保在不影响业务运行的前提下完成。此时,调度系统迁移工具就可以发挥重要作用,它能减少人工干预,尽量自动化地完成两个调度系统间的迁移工作,并且能兼容多个系统间的多个版本,几乎可以做到用户无干预完成迁移。 为此,白鲸开源专门研发了开源迁移工具Air2phin,可以让用户2步将调度系统从Airflow迁移至ApacheDolphinScheduler,为用户带来极大的便利。 为了让大家更好地理解Air2phin的重要性,我们先从调度系统的相关背景知识开始,了解将调度系统从Airflow迁移至ApacheDolphinScheduler的好处。 为什么要从Airflow迁移至DolphinScheduler? 什么是工作流编排系统? 工作流编排系统,是以尊重编排规则和业务逻辑的方式管理数据流。工作流编排工具让用户可以将多个有关联的任务转换为可以安排、运行和观测的工作流,帮助企业更好地管理和控制业务流程,从而提高业务效率。工作流编排是数据处理流程中不可或缺的组件之一,负责根据预先定义的规则和逻辑执行数据处理任务,确保数据处理流程按照预期顺利执行,常见工作流编排系统包括ApacheDolphinScheduler、ApacheAirflow、ApacheOozie,Azkaban等。 Airflow是什么? 其中,ApacheAirflow是一个开源的工作流编排系统,它可以帮助用户创建、调度和监控复杂的工作流程。Airflow最初由Airbnb开发,并于2016年开源,现在由Apache软件基金会维护。Airflow使用Python语言编写,具有高度的可扩展性和灵活性,支持多种任务类型,如计算、数据处理、通知、交互等。Airflow的工作流程是通过编写Python脚本来定义的,可以使用Airflow提供的操作符和钩子,以及自定义操作符和钩子来扩展其功能。但其有着不可忽视的缺陷,比如需要需要深度二次开发,脱离社区版本,升级成本高;Python技术栈维护迭代成本高;schedulerloop扫描Dagfolder延迟降低性能的问题;以及在生产环境中使用稳定性差等。 在新数据时代业务需求下诞生的ApacheDolphinScheduler是一个开源的分布式工作流调度系统,弥补了以往调度系统的弱势,旨在为企业用户提供一种可靠、高效、易于使用的工作流调度平台,支持多种任务类型,如计算、数据处理、ETL等。与Airflow相比,DolphinScheduler采用了分布式架构,提供了多种任务类型,用户可以定义任务之间的依赖关系,设置任务的优先级和调度策略等,其使用可视化的界面来创建和管理工作流程的特性更是与Airflow形成鲜明对比,变得更加易于操作,对非编程人员来说更加友好。经过调研对比,对于很多用户来说,将调度系统迁移至ApacheDolphinScheduler是一个降本增效的更优选择。 Air2phin如何安装和使用 Air2phin是一个python的包,可以通过Python的包安装工具pip完成安装,详见air2phingettingstart。spanclasscodesnippetouterpythonmpipspanclasscodesnippetkeywordinstallspanspanclasscodesnippetcommentupgradeair2phinspanspan 一个简单的例子 我们通过一个简单的例子,来说明如何使用Air2phin的。我们截取了airflowtutorial。py中的部分代码作为Air2phin转化的例子,来说明Air2phin如何逐步完成转化成dolphinschedulerpythonsdk。 图1:airflowtutorial。py中的部分代码图2:Air2phin如何逐步完成转化成dolphinschedulerpythonsdk 假设将airflowtutorial。py部分内容保存至文件tutorialpart。py,想要将其转化成dolphinschedulerpythonsdk定义,只需要一行命令就能完成。结果如图2所示,因为命令增加了inplace参数,所以Air2phin会直接将原文件覆盖,如果不需要覆盖原问题,可以不使用inplace参数,Air2phin会新增一个tutorialpartair2phin。py文件来保存转化后的内容。spanclasscodesnippetouterspanclasscodesnippetselectortagair2phinspanspanclasscodesnippetselectortagmigratespanspanclasscodesnippetselectortaginplacespanspanclasscodesnippetselectortagtutorialpartspanspanclasscodesnippetselectorclass。pyspanspan通过观察,我们发现这次转化分别触发了多条转化规则,包括将airflow。DAG转换成 pydolphinscheduler。core。processdefinition。ProcessDefinition,这个规则在第三行(import语句)以及第六行DAGcontext 将airflow。operators。bash。BashOperator转换成 pydolphinscheduler。tasks。shell。Shell,这个规则在任务t1,t2中都被使用 除了对应的类转化之外,我们需要将类的属性进行转化,如将 airflow。DAG。scheduleinterval转换成了ProcessDefinition。schedule,同时修改了部分值的内容,如将timedelta(days1)转成000? 最后,我们只需要安装pydolphinscheduler,并且将转化后的文件通过python运行,就能完成工作流的迁移了,详见pydolphinscheduler使用(https:dolphinscheduler。apache。orgpythonmainstart。htmlinstallingpydolphinscheduler)。spanclasscodesnippetouterspanclasscodesnippetcomment安装apachedolphinschedulerspanspanspanclasscodesnippetouterspanclasscodesnippetattrpythonspanspanclasscodesnippetstringmpipinstallapachedolphinschedulerspanspanspanclasscodesnippetouterspanclasscodesnippetcomment将工作流提交到dolphinschedulerspanspanspanclasscodesnippetouterspanclasscodesnippetattrpythonspanspanclasscodesnippetstringtutorialpart。pyspanspan在运行pythontutorialpart。py时,需要保证dolphinschedulerAPI和pythongateway服务已经启动,并且开放了对应的端口,详见启动pythongatewayservice。至此,我们通过一个简单的例子,说明了Air2phin是如何完成迁移的。 工作原理 Airflow和dolphinschedulerpythonsdk如何工作?在了解Air2phin如果工作之前,先了解Airflow和dolphinschedulerpythonsdk如何工作是非常重要的前置条件,帮助我们更好地了解Air2phin的迁移步骤,当遇到问题的时候也能更加从容地应对。Airflow如何工作:Airflow工作流相关的信息都保存在DAG文件中,之后将DAG文件放置到Airflow的指定目录,Airflow的Scheduler会间隔一定时间去扫描和解析Airflow的DAG文件,所以DAG文件是被动被扫描和更新的。 dolphinschedulerpythonsdk:同Airflow类似,将全部工作流相关的信息都通过Python文件定义,但是dolphinschedulerpythonsdk是通过人为主动触发的方式,将工作流信息提交,运行命令python工作流文件名即可完成主动任务提交。 Air2phin工作流程 了解完两者是如何使用,如何提交发现工作流的,将更加利于我们对Air2phin的工作原理的理解。因为Airflow的DAG文件以及DolphinScheduler的Pythonsdk定义文件都是Python编写的,所以Air2phin的大部分代码都是处理两者间的差异,最后将Airflow的代码转化成dolphinschedulerpythonsdk和定义。Air2phin使用了LibCST(https:libcst。readthedocs。ioenlatest)来实现airflowpythonDAG代码的抽象语法树解析,然后通过LibCST的Transformer(https:libcst。readthedocs。ioenlatesttutorial。htmlBuildVisitororTransformer)结合转化规则最后转化成dolphinschedulerpythonsdk的定义。Air2phin整体工作流程如下:从标准输入或者文件中获取原本的AirflowDAG内容 从Yaml文件加载所有转换规则 将AirflowDAG内容通过LibCST解析成CST树 通过LibCSTTransformer转换dolphinschedulerpythonsdk定义内容 Air2phin最佳实践 迁移整个文件夹而不是单个文件 当用户想要迁移Airflow到DolphinScheduler的时候,都是想要整体做迁移而不是单个文件迁移的,Air2phin提供整体文件夹迁移的能力,只需要将路径从文件路径改成文件夹即可。spanclasscodesnippetouter迁移整个spanclasscodesnippetregexpairflowspandags文件夹spanspanclasscodesnippetouterair2phinmigrateinplacespanclasscodesnippetregexpairflowspandagsspan增加自定义的规则 部分使用Airflow的用户自定义Hook或者Operator,用户自定义的Operator无法通过Air2phin内置的转化规则完成转化,需要用户增加自定义的规则,并告诉Air2phin规则的位置。例如我们有一个叫MyCustomOperator的算子是继承PostgresOperator的大部分功能,只是命名不一样,其定义如下:spanclasscodesnippetouterspanclasscodesnippetattrfromairflow。providers。postgres。operators。postgresimportPostgresOperatorspanspanspanclasscodesnippetouterspanclasscodesnippetattrclassspanspanclasscodesnippetstringMyCustomOperator(PostgresOperator):spanspanspanclasscodesnippetouterspanclasscodesnippetattrdefspanspanclasscodesnippetstringinit(spanspanspanclasscodesnippetouterspanclasscodesnippetattrself,spanspanspanclasscodesnippetouterspanclasscodesnippetattr,spanspanspanclasscodesnippetouterspanclasscodesnippetattrsqlspan:spanclasscodesnippetstringstrIterable〔str〕,spanspanspanclasscodesnippetouterspanclasscodesnippetattrmycustomconnidspan:spanclasscodesnippetstringstrpostgresdefault,spanspanspanclasscodesnippetouterspanclasscodesnippetattrautocommitspan:spanclasscodesnippetstringboolFalse,spanspanspanclasscodesnippetouterspanclasscodesnippetattrparametersspan:spanclasscodesnippetstringIterableMappingNoneNone,spanspanspanclasscodesnippetouterspanclasscodesnippetattrdatabasespan:spanclasscodesnippetstringstrNoneNone,spanspanspanclasscodesnippetouterspanclasscodesnippetattrruntimeparametersspan:spanclasscodesnippetstringMappingNoneNone,spanspanspanclasscodesnippetouterspanclasscodesnippetattrkwargs,spanspanspanclasscodesnippetouterspanclasscodesnippetmeta)spanspanclasscodesnippetstringNone:spanspanspanclasscodesnippetouterspanclasscodesnippetattrsuper()。init(spanspanspanclasscodesnippetouterspanclasscodesnippetattrsqlspanspanclasscodesnippetstringsql,spanspanspanclasscodesnippetouterspanclasscodesnippetattrpostgresconnidspanspanclasscodesnippetstringmycustomconnid,spanspanspanclasscodesnippetouterspanclasscodesnippetattrautocommitspanspanclasscodesnippetstringautocommit,spanspanspanclasscodesnippetouterspanclasscodesnippetattrparametersspanspanclasscodesnippetstringparameters,spanspanspanclasscodesnippetouterspanclasscodesnippetattrdatabasespanspanclasscodesnippetstringdatabase,spanspanspanclasscodesnippetouterspanclasscodesnippetattrruntimeparametersspanspanclasscodesnippetstringruntimeparameters,spanspanspanclasscodesnippetouterspanclasscodesnippetattrkwargs,spanspanspanclasscodesnippetouterspanclasscodesnippetattr)spanspan 它在Airflow的多个DAG中被使用,使用的方式如下:spanclasscodesnippetouterspanclasscodesnippetattrfromcustom。mycustomoperatorimportMyCustomOperatorspanspanspanclasscodesnippetouterspanclasscodesnippetattrwithspanspanclasscodesnippetstringDAG(spanspanspanclasscodesnippetouterspanclasscodesnippetattrdagidspanspanclasscodesnippetstringmycustomdag,spanspanspanclasscodesnippetouterspanclasscodesnippetattrdefaultargsspanspanclasscodesnippetstringdefaultargs,spanspanspanclasscodesnippetouterspanclasscodesnippetattrscheduleintervalspanspanclasscodesnippetstringonce,spanspanspanclasscodesnippetouterspanclasscodesnippetattrstartdatespanspanclasscodesnippetstringdaysago(2),spanspanspanclasscodesnippetouterspanclasscodesnippetattrtagsspanspanclasscodesnippetstring〔example〕,spanspanspanclasscodesnippetouterspanclasscodesnippetmeta)spanspanclasscodesnippetstringasdag:spanspanspanclasscodesnippetouterspanclasscodesnippetattrt1spanspanclasscodesnippetstringMyCustomOperator(spanspanspanclasscodesnippetouterspanclasscodesnippetattrtaskidspanspanclasscodesnippetstringmycustomtask,spanspanspanclasscodesnippetouterspanclasscodesnippetattrsqlspanspanclasscodesnippetstringselectfromtable,spanspanspanclasscodesnippetouterspanclasscodesnippetattrmycustomconnidspanspanclasscodesnippetstringmycustomconnid,spanspanspanclasscodesnippetouterspanclasscodesnippetattr)spanspan 现在需要对这个Operator进行转化,我们可以自定义一个转化规则,并将其命名为MyCustomOperator。yaml,内容如下,最主要的内容是migration。module和migration。parameter的定义,其确定了转化规则:spanclasscodesnippetouterspanclasscodesnippetattrnamespan:spanclasscodesnippetstringMyCustomOperatorspanspanspanclasscodesnippetouterspanclasscodesnippetattrdescriptionspan:spanclasscodesnippetstringTheconfigurationformigratingairflowcustomoperatorMyCustomOperatortoDolphinSchedulerSQLtask。spanspanspanclasscodesnippetouterspanclasscodesnippetattrmigrationspan:spanclasscodesnippetstringspanspanspanclasscodesnippetouterspanclasscodesnippetattrmodulespan:spanclasscodesnippetstringspanspanspanclasscodesnippetouterspanclasscodesnippetmetaspanspanclasscodesnippetstringaction:replacespanspanspanclasscodesnippetouterspanclasscodesnippetattrsrcspan:spanclasscodesnippetstringcustom。mycustomoperator。MyCustomOperatorspanspanspanclasscodesnippetouterspanclasscodesnippetattrdestspan:spanclasscodesnippetstringpydolphinscheduler。tasks。sql。Sqlspanspanspanclasscodesnippetouterspanclasscodesnippetattrparameterspan:spanclasscodesnippetstringspanspanspanclasscodesnippetouterspanclasscodesnippetmetaspanspanclasscodesnippetstringaction:replacespanspanspanclasscodesnippetouterspanclasscodesnippetattrsrcspan:spanclasscodesnippetstringtaskidspanspanspanclasscodesnippetouterspanclasscodesnippetattrdestspan:spanclasscodesnippetstringnamespanspanspanclasscodesnippetouterspanclasscodesnippetmetaspanspanclasscodesnippetstringaction:replacespanspanspanclasscodesnippetouterspanclasscodesnippetattrsrcspan:spanclasscodesnippetstringmycustomconnidspanspanspanclasscodesnippetouterspanclasscodesnippetattrdestspan:spanclasscodesnippetstringdatasourcenamespanspan 再使用customrules参数指定转化自定义参数,就能应用自定义规则的转化:spanclasscodesnippetouterspanclasscodesnippetcomment指定自定义规则路径为pathtoMyCustomOperator。yamlspanspanspanclasscodesnippetouterspanclasscodesnippetattributeair2phinspanmigrateinplacecustomrulespathtoMyCustomOperator。yamlairflowdagsspan 让Air2phin运行地更快 Air2phin默认是一个进程运行DAG文件的转化的,当你有许多DAG文件时,Air2phin转化非常耗时,我们提供了一个启动多进程运行Air2phin转化的参数multiprocess,可以将其指定为用户机器的CPU数量来缩短转化时间:spanclasscodesnippetouterspanclasscodesnippetcomment指定air2phin启动12个进程同时进行转化spanspanspanclasscodesnippetouterspanclasscodesnippetattributeair2phinspanmigrateinplacecustomrulespathtoMyCustomOperator。yamlmultiprocessspanclasscodesnippetnumber12spanairflowdagsspan 存在的问题 目前,作为一个转化工具,Air2phin的使用方式已经算比较完善了,能够满足用户迁移调度系统的基本需求,但还有一些地方有待完善。内置规则还不够多转化规则还不够多,目前只有五个,分别是:airflow。DAGairflow。operators。bash。BashOperatorairflow。operators。dummyoperator。DummyOperatorairflow。operators。pythonoperator。PythonOperatorairflow。operators。sparksqloperator。SparkSqlOperator 如果有更多的规则,Air2phin将成为一个更加好用的转化工具,这里欢迎各位随时提交转化规则的PR(https:github。comWhaleOpsair2phinpulls)。部分Airflow的用法不能被迁移过来部分概念仅仅在Airflow中有,在DolphinScheduler中还没有,如任务的成功、失败、重试、触发callback,任务的owner,variable,工作流并发数,tag等,这部分AirflowDAG可以被迁移,但兼容的属性将会丢失,无法迁移到DolphinScheduler。 Air2phin常见问题解答 Q:为什么选择解析AirflowDAG文件而不是数据库?A:因为AirflowDAG文件中才有完成的工作流信息,Airflow的数据库中只有工作流基本信息,没有任务定义的信息,也没有任务的关系,我们选择通过解析Airflow的DAG文件而不是数据库来完成转化。Q:为什么要通过dolphinschedulerpythonsdk做中转不自己提交到DolphinScheduler?A:因为AirflowDAG就是Python定义的,在AirflowDAG中有很多Python的特性,我们不想将这部分特性转化成结构化的数据(转化可能存在信息丢失),恰好DolphinScheduler已经有了Python的sdk,所以直接通过LibCST转化是成本更加低的做法。Q:为什么使用LibCST而不是python内置的AST?A:因为LibCST更加符合我们,Python内置的AST库解析成AST的时候会丢失掉comment的信息,但是我们呢希望保留着部分信息。且LibCST提供更加多visitor保证我们更加方便的实现替换。参考链接:air2phin(https:github。comWhaleOpsair2phin)腾讯回应进军类ChatGPT;Meta新语言模型能运行在单张显卡上;OpenAI创始人提出新摩尔定律极客头条ChatGPT带火的提示工程师岗,不用写代码,也能获得年薪数百万? ChatGPT正在取代员工,最新ChatGPT调查报告发布!