Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataset visualization issue #2915

Open
salamandra2508 opened this issue Oct 2, 2024 · 4 comments
Open

Dataset visualization issue #2915

salamandra2508 opened this issue Oct 2, 2024 · 4 comments
Labels
question Further information is requested

Comments

@salamandra2508
Copy link

After installing Marquez and setup in my environments (AWS EKS + helm + argocd) I can see the Airflow DAGs in Marquez UI but do not see datasets.
Marquez-10-02-2024_09_31_AM
Screenshot 2024-10-02 at 09 31 46
![Datasets-Repo-https-code-rbi-tech-raiffeisen-ua-data-airflow-examples-git-Branch-dev-new-10-02-2024_09_33_AM]
(https://github.com/user-attachments/assets/4bf50c17-4fe8-4764-925c-a488a14f80d5)
Screenshot 2024-10-02 at 09 34 43

how to achieve this? Or can some one provide DAG example to check it? I'm pretty new with this stuff.

Copy link

boring-cyborg bot commented Oct 2, 2024

Thanks for opening your first issue in the Marquez project! Please be sure to follow the issue template!

@phixMe
Copy link
Member

phixMe commented Oct 8, 2024

It looks like your configuration is good since you've got jobs in the system.

Are you using the correct Airflow operators to extract lineage metadata? (ie: not python or k8s operator) Have you tried to turn on the debug logs in Airflow so that you can see the inputs and outputs are indeed populated with schemas? Are there are namespaces created for your datasets?

@salamandra2508
Copy link
Author

It looks like your configuration is good since you've got jobs in the system.

Are you using the correct Airflow operators to extract lineage metadata? (ie: not python or k8s operator) Have you tried to turn on the debug logs in Airflow so that you can see the inputs and outputs are indeed populated with schemas? Are there are namespaces created for your datasets?

I tried to create DAG based on example ( btw example from repo (https://github.com/MarquezProject/marquez/blob/main/examples/airflow/airflow.md) works fine based on Postgres operator) but when I tried to create same dag via sparksubmit operator, I can see jobs but no dataset .
my spark script :
`from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
import random
import sys

def run_stage(stage):
spark = SparkSession.builder.appName(f"CombinedDataJob_{stage}").getOrCreate()

s3_output_table1 = "s3a://test-busket/marquez/table1.csv"
s3_output_table2 = "s3a://test-busket/marquez/table2.csv"
s3_output_merged = "s3a://test-busket/marquez/merged_table.csv"
s3_output_final = "s3a://test-busket/marquez/final_dataset.csv"

if stage == 'generate':
    # Stage 1: Generate 2 tables with some date and save them
    table1_data = [(1, 'A'), (2, 'B')]
    table2_data = [(1, 'First'), (2, 'Second')]

    df_table1 = spark.createDataFrame(table1_data, ['id', 'value'])
    df_table1.write.mode('overwrite').csv(s3_output_table1, header=True)

    df_table2 = spark.createDataFrame(table2_data, ['id', 'description'])
    df_table2.write.mode('overwrite').csv(s3_output_table2, header=True)

elif stage == 'merge':
    # Stage 2: Merge these tables into a third table
    df_table1 = spark.read.csv(s3_output_table1, header=True, inferSchema=True)
    df_table2 = spark.read.csv(s3_output_table2, header=True, inferSchema=True)
    df_merged = df_table1.join(df_table2, 'id')
    df_merged.write.mode('overwrite').csv(s3_output_merged, header=True)

elif stage == 'process':
    # Stage 3: Create a dataset for the output of the merged table
    df_merged = spark.read.csv(s3_output_merged, header=True, inferSchema=True)
    df_final = df_merged.withColumn('processed_flag', lit(True))
    df_final.write.mode('overwrite').csv(s3_output_final, header=True)

spark.stop()

if name == "main":
stage = sys.argv[1] # The stage to run is passed as the first argument
run_stage(stage)
DAG: from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.utils.dates import days_ago
from airflow.datasets import Dataset # Importing the Dataset class
from datetime import datetime, timedelta

Import necessary methods and variables

from draif_common.common_env import get_default_spark_conf, LIB_FOLDER

default_args = {
'owner': 'datascience',
'depends_on_past': False,
'start_date': days_ago(1),
'email_on_failure': False,
'email_on_retry': False,
'email': ['[email protected]'],
'execution_timeout': timedelta(minutes=300),
}

dag = DAG(
dag_id='complex_data_pipeline_dag',
schedule_interval=None,
catchup=False,
default_args=default_args,
description='A complex data pipeline DAG'
)

conf = get_default_spark_conf()

generate_tables = SparkSubmitOperator(
task_id='generate_tables',
application=f'{LIB_FOLDER}/combined_data_job.py',
conn_id='SparkConnection',
total_executor_cores=2,
executor_cores=1,
executor_memory='1g',
driver_memory='1g',
conf=conf,
application_args=['generate'],
dag=dag
)

merge_tables = SparkSubmitOperator(
task_id='merge_tables',
application=f'{LIB_FOLDER}/combined_data_job.py',
conn_id='SparkConnection',
total_executor_cores=2,
executor_cores=1,
executor_memory='1g',
driver_memory='1g',
conf=conf,
application_args=['merge'],
dag=dag
)

process_final_dataset = SparkSubmitOperator(
task_id='process_final_dataset',
application=f'{LIB_FOLDER}/combined_data_job.py',
conn_id='SparkConnection',
total_executor_cores=2,
executor_cores=1,
executor_memory='1g',
driver_memory='1g',
conf=conf,
application_args=['process'],
outlets=[
Dataset('s3a://TEST-BUCKET/marquez/final_dataset.csv') # Track final dataset
],
dag=dag
)

generate_tables >> merge_tables >> process_final_dataset
`

@pawel-big-lebowski
Copy link
Collaborator

@salamandra2508 I see you're trying to manually define outlets in operator. How about turning on Spark OpenLineage listener and getting the lineage automatically from spark job https://openlineage.io/docs/integrations/spark/ ?

@wslulciuc wslulciuc added the question Further information is requested label Oct 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

4 participants