Give AlbumentationsX a star on GitHub — it powers this leaderboard

Star on GitHub

openlineage-airflow

OpenLineage integration with Airflow

Rank: #1631Downloads: 5,674,706 (30 days)Stars: 2,332Forks: 431

Description

Status: Package is no longer maintained !

For Airflow 2.7+ use the apache-airflow-providers-openlineage package.

This package can still be used with Airflow versions prior to 2.7, but it is no longer maintained. It will not receive ANY bug fixes, security updates or new features.

OpenLineage Airflow Integration

A library that integrates Airflow DAGs with OpenLineage for automatic metadata collection.

Native integration with Airflow

Starting from Airflow version 2.7.0 OpenLineage integration is included in Airflow repository as a provider. The apache-airflow-providers-openlineage package significantly ease lineage tracking in Airflow, ensuring stability by embedding the functionality directly into each provider and simplifying the process for users to select and manage lineage collection consumers.

As a result, starting from Airflow 2.7.0 one should use the native Airflow Openlineage provider package.

The ongoing development and enhancements will be focused on the apache-airflow-providers-openlineage package, while the openlineage-airflow will primarily be updated for bug fixes.

Features

Metadata

  • Task lifecycle
  • Task parameters
  • Task runs linked to versioned code
  • Task inputs / outputs

Lineage

  • Track inter-DAG dependencies

Built-in

  • SQL parser
  • Link to code builder (ex: GitHub)
  • Metadata extractors

Requirements

Installation

$ pip3 install openlineage-airflow

Note: You can also add openlineage-airflow to your requirements.txt for Airflow.

To install from source, run:

$ python3 setup.py install

Setup

Airflow 2.7+

This package should not be used starting with Airflow 2.7.0 and must not be used with Airflow 2.8+. It was designed as Airflow's external integration that works mainly for Airflow versions <2.7. For Airflow 2.7+ use the native Airflow OpenLineage provider package apache-airflow-providers-openlineage.

Airflow 2.3 - 2.6

Note: The last version of openlineage-airflow to support Airflow versions 2.3-2.4 is 1.33.0

The integration automatically registers itself starting from Airflow 2.3 if it's installed on the Airflow worker's Python. This means you don't have to do anything besides configuring it, which is described in the Configuration section.

Airflow 2.1 - 2.2

Note: The last version of openlineage-airflow to support Airflow versions 2.1-2.2 is 1.14.0.

This method has limited support: it does not support tracking failed jobs, and job starts are registered only when a job ends.

Set your LineageBackend in your airflow.cfg or via environmental variable AIRFLOW__LINEAGE__BACKEND to

openlineage.lineage_backend.OpenLineageBackend

In contrast to integration via subclassing a DAG, a LineageBackend-based approach collects all metadata for a task on each task's completion.

The OpenLineageBackend does not take into account manually configured inlets and outlets.

When enabled, the library will:

  1. On DAG start, collect metadata for each task using an Extractor if it exists for a given operator.
  2. Collect task input / output metadata (source, schema, etc.)
  3. Collect task run-level metadata (execution time, state, parameters, etc.)
  4. On DAG complete, also mark the task as complete in OpenLineage

Configuration

HTTP Backend Environment Variables

openlineage-airflow uses the OpenLineage client to push data to OpenLineage backend.

The OpenLineage client depends on environment variables:

  • OPENLINEAGE_URL - point to the service that will consume OpenLineage events.
  • OPENLINEAGE_API_KEY - set if the consumer of OpenLineage events requires a Bearer authentication key.
  • OPENLINEAGE_NAMESPACE - set if you are using something other than the default namespace for the job namespace.
  • OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE - set to False if you want the source code of callables provided in the PythonOperator to be sent in OpenLineage events.

For backwards compatibility, openlineage-airflow also supports configuration via MARQUEZ_URL, MARQUEZ_NAMESPACE and MARQUEZ_API_KEY variables.

MARQUEZ_URL=http://my_hosted_marquez.example.com:5000
MARQUEZ_NAMESPACE=my_special_ns

Extractors : Sending the correct data from your DAGs

If you do nothing, the OpenLineage backend will receive the Job and the Run from your DAGs, but, unless you use one of the few operators for which this integration provides an extractor, input and output metadata will not be sent.

openlineage-airflow allows you to do more than that by building "Extractors." An extractor is an object suited to extract metadata from a particular operator (or operators).

  1. Name : The name of the task
  2. Inputs : A list of input datasets
  3. Outputs : A list of output datasets
  4. Context : The Airflow context for the task

Bundled Extractors

openlineage-airflow provides extractors for:

  • PostgresOperator
  • MySqlOperator
  • AthenaOperator
  • BigQueryOperator
  • SnowflakeOperator
  • TrinoOperator
  • GreatExpectationsOperator
  • SFTPOperator
  • FTPFileTransmitOperator
  • PythonOperator
  • RedshiftDataOperator, RedshiftSQLOperator
  • SageMakerProcessingOperator, SageMakerProcessingOperatorAsync
  • SageMakerTrainingOperator, SageMakerTrainingOperatorAsync
  • SageMakerTransformOperator, SageMakerTransformOperatorAsync
  • S3CopyObjectExtractor, S3FileTransformExtractor
  • GCSToGCSOperator
  • DbtCloudRunJobOperator

SQL Operators utilize the SQL parser. There is an experimental SQL parser activated if you install openlineage-sql on your Airflow worker.

Custom Extractors

If your DAGs contain additional operators from which you want to extract lineage data, fear not - you can always provide custom extractors. They should derive from BaseExtractor.

There are two ways to register them for use in openlineage-airflow.

One way is to add them to the OPENLINEAGE_EXTRACTORS environment variable, separated by a semi-colon (;).

OPENLINEAGE_EXTRACTORS=full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass

To ensure OpenLineage logging propagation to custom extractors you should use self.log instead of creating a logger yourself.

Default Extractor

When you own operators' code this is not necessary to provide custom extractors. You can also use Default Extractor's capability.

In order to do that you should define at least one of two methods in operator:

  • get_openlineage_facets_on_start()

Extracts metadata on start of task.

  • get_openlineage_facets_on_complete(task_instance: TaskInstance)

Extracts metadata on complete of task. This should accept task_instance argument, similar to extract_on_complete method in base extractors.

If you don't define get_openlineage_facets_on_complete method it would fall back to get_openlineage_facets_on_start.

Great Expectations

The Great Expectations integration works by providing an OpenLineageValidationAction. You need to include it into your action_list in great_expectations.yml.

The following example illustrates a way to change the default configuration:

validation_operators:
  action_list_operator:
    # To learn how to configure sending Slack notifications during evaluation
    # (and other customizations), read: https://docs.greatexpectations.io/en/latest/autoapi/great_expectations/validation_operators/index.html#great_expectations.validation_operators.ActionListValidationOperator
    class_name: ActionListValidationOperator
    action_list:
      - name: store_validation_result
        action:
          class_name: StoreValidationResultAction
      - name: store_evaluation_params
        action:
          class_name: StoreEvaluationParametersAction
      - name: update_data_docs
        action:
          class_name: UpdateDataDocsAction
+     - name: openlineage
+       action:
+         class_name: OpenLineageValidationAction
+         module_name: openlineage.common.provider.great_expectations.action
      # - name: send_slack_notification_on_validation_result
      #   action:
      #     class_name: SlackNotificationAction
      #     # put the actual webhook URL in the uncommitted/config_variables.yml file
      #     slack_webhook: ${validation_notification_slack_webhook}
      #     notify_on: all # possible values: "all", "failure", "success"
      #     renderer:
      #       module_name: great_expectations.render.renderer.slack_renderer
      #       class_name: SlackRenderer

If you're using GreatExpectationsOperator, you need to set validation_operator_name to an operator that includes OpenLineageValidationAction. Setting it in great_expectations.yml files isn't enough - the operator overrides it with the default name if a different one is not provided.

To see an example of a working configuration, see DAG and Great Expectations configuration in the integration tests.

Logging

In addition to conventional logging approaches, the openlineage-airflow package provides an alternative way of configuring its logging behavior. By setting the `OPENLI