Skip to content

Emit per-statement OpenLineage events for BigQuery script jobs#69234

Open
nailo2c wants to merge 3 commits into
apache:mainfrom
nailo2c:bigquery-openlineage-child-query-events
Open

Emit per-statement OpenLineage events for BigQuery script jobs#69234
nailo2c wants to merge 3 commits into
apache:mainfrom
nailo2c:bigquery-openlineage-child-query-events

Conversation

@nailo2c

@nailo2c nailo2c commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

closes: #47579

Summary

BigQueryInsertJobOperator running a stored procedure / multi-statement query produced a single task event whose inputs and outputs were the aggregation of all child query jobs, so dataset-level lineage looked like every input fed every output.

This adds a separate OpenLineage QUERY event per BigQuery child job, giving accurate per-statement lineage.

  • Test Dag run successfully (with connect to BigQuery).
openlineage_af_ui
  • Open Lineage UI shows input_table1 -> query.1 -> output_table1
Screenshot 2026-07-02 at 3 42 04 PM

Before

One COMPLETE event, child datasets flattened together, looks many-to-many:

{"eventType": "COMPLETE",
 "job": {"name": "...call_stored_procedure"},
 "inputs":  ["...input_table1", "...input_table2"],
 "outputs": ["...output_table1", "...output_table2"]}   // i1,i2 -> o1,o2 (misleading)

After

The task event is unchanged (still the coarse, task-level summary, backward compatible), and each child query is additionally emitted as its own event with precise input -> output and a parent facet linking back to the task:

// child event 1
{"eventType": "COMPLETE",
 "job": {"name": "...call_stored_procedure.query.1"},
 "inputs": ["...input_table1"], "outputs": ["...output_table1"],
 "run": {"facets": {"parent": {"job": {"name": "...call_stored_procedure"}},
                    "externalQuery": {"externalQueryId": "script_job_..._0"}}}}
// child event 2  ->  query.2 : input_table2 -> output_table2

Test Dag I Used

import os
from datetime import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryCreateEmptyDatasetOperator,
    BigQueryInsertJobOperator,
)

PROJECT_ID = "{{ dag_run.conf.get('project_id', params.project_id) }}"
DATASET_ID = "{{ dag_run.conf.get('dataset_id', params.dataset_id) }}"
GCP_CONN_ID = "{{ dag_run.conf.get('gcp_conn_id', params.gcp_conn_id) }}"
LOCATION = os.environ.get("BQ_OPENLINEAGE_LOCATION", "US")

DAG_ID = "reproduce_issue_47579_bigquery_openlineage"
ROUTINE_ID = "issue_47579_proc"

INPUT_TABLE_1 = f"`{PROJECT_ID}.{DATASET_ID}.input_table1`"
INPUT_TABLE_2 = f"`{PROJECT_ID}.{DATASET_ID}.input_table2`"
OUTPUT_TABLE_1 = f"`{PROJECT_ID}.{DATASET_ID}.output_table1`"
OUTPUT_TABLE_2 = f"`{PROJECT_ID}.{DATASET_ID}.output_table2`"
ROUTINE = f"`{PROJECT_ID}.{DATASET_ID}.{ROUTINE_ID}`"


with DAG(
    dag_id=DAG_ID,
    schedule=None,
    start_date=datetime(2026, 7, 1),
    catchup=False,
    max_active_runs=1,
    tags=["repro", "bigquery", "openlineage"],
    params={
        "project_id": os.environ.get("BQ_OPENLINEAGE_PROJECT_ID", "SET_ME"),
        "dataset_id": os.environ.get("BQ_OPENLINEAGE_DATASET_ID", "issue_47579_openlineage_repro"),
        "gcp_conn_id": os.environ.get("BQ_OPENLINEAGE_CONN_ID", "google_cloud_default"),
    },
) as dag:
    create_dataset = BigQueryCreateEmptyDatasetOperator(
        task_id="create_dataset",
        project_id=PROJECT_ID,
        dataset_id=DATASET_ID,
        location=LOCATION,
        gcp_conn_id=GCP_CONN_ID,
        if_exists="ignore",
    )

    create_input_table1 = BigQueryInsertJobOperator(
        task_id="create_input_table1",
        project_id=PROJECT_ID,
        location=LOCATION,
        gcp_conn_id=GCP_CONN_ID,
        configuration={
            "query": {
                "query": f"""
                    CREATE OR REPLACE TABLE {INPUT_TABLE_1} AS
                    SELECT 1 AS id, 'only_from_input_table1' AS payload
                """,
                "useLegacySql": False,
            }
        },
    )

    create_input_table2 = BigQueryInsertJobOperator(
        task_id="create_input_table2",
        project_id=PROJECT_ID,
        location=LOCATION,
        gcp_conn_id=GCP_CONN_ID,
        configuration={
            "query": {
                "query": f"""
                    CREATE OR REPLACE TABLE {INPUT_TABLE_2} AS
                    SELECT 2 AS id, 'only_from_input_table2' AS payload
                """,
                "useLegacySql": False,
            }
        },
    )

    create_stored_procedure = BigQueryInsertJobOperator(
        task_id="create_stored_procedure",
        project_id=PROJECT_ID,
        location=LOCATION,
        gcp_conn_id=GCP_CONN_ID,
        configuration={
            "query": {
                "query": f"""
                    CREATE OR REPLACE PROCEDURE {ROUTINE}()
                    BEGIN
                      CREATE OR REPLACE TABLE {OUTPUT_TABLE_1} AS
                      SELECT id, payload, 'first_ctas' AS marker
                      FROM {INPUT_TABLE_1};

                      CREATE OR REPLACE TABLE {OUTPUT_TABLE_2} AS
                      SELECT id, payload, 'second_ctas' AS marker
                      FROM {INPUT_TABLE_2};
                    END
                """,
                "useLegacySql": False,
            }
        },
    )

    call_stored_procedure = BigQueryInsertJobOperator(
        task_id="call_stored_procedure",
        project_id=PROJECT_ID,
        location=LOCATION,
        gcp_conn_id=GCP_CONN_ID,
        configuration={
            "query": {
                "query": f"CALL {ROUTINE}();",
                "useLegacySql": False,
                "priority": "INTERACTIVE",
            }
        },
    )

    create_dataset >> [create_input_table1, create_input_table2] >> create_stored_procedure
    create_stored_procedure >> call_stored_procedure

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Generated-by: Claude Opus 4.8 following the guidelines

@nailo2c nailo2c requested a review from shahar1 as a code owner July 2, 2026 07:44
@boring-cyborg boring-cyborg Bot added area:providers provider:google Google (including GCP) related issues labels Jul 2, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:google Google (including GCP) related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Calling a BigQuery Stored Procedure aggregates all inputs & outputs at the Airflow Task level

1 participant