Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support lineage of Pandas.DataFrame #665

Open
wajda opened this issue Apr 24, 2023 · 11 comments
Open

Support lineage of Pandas.DataFrame #665

wajda opened this issue Apr 24, 2023 · 11 comments
Labels

Comments

@wajda
Copy link
Contributor

wajda commented Apr 24, 2023

I have uses XLX file and 3 parqute files as source and performed some teansformation. the code ran good and i could able to see the linegae in spline. but i could able see only 3 parqute files as source , and xlsx source is not displying in linegae .

image

the above is the code i used and i have attached the screenshots of databricks cluster and libraries i have installed on cluster level. Can you get back with me on this. Thankyou in advance.

Originally posted by @harishyadavdevops in #262 (comment)

# COMMAND ----------
#from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col,to_date,datediff,current_date

import pandas
import xlrd

#from pyspark.sql.types import *

#spark=SparkSession.builder\
#                  .appName("POC For Lineage")\
#                  .config("spark.jars.packages","com.crealytics:spark-excel_2.12:0.13.7")\
#                  .getOrCreate()


input_filepath='/dbfs/FileStore'
input_filepaths='dbfs:/FileStore'

pandas_df=pandas.read_excel(f'{input_filepath}/CMO_ERICA_AIM_SAP_Mapping_Master_Latest.xlsx',index_col=None)


df_cmo_master=spark.createDataFrame(pandas_df)\
    .select( \
    col("IDERICA"), \
    col("TargetDays").alias("target_days"), \
    col("PrimaryPlatformPlan").alias("plan_platfrom"), \
    col("sitename").alias("cmo_site"), \
    col("primaryplatform").alias("pes_platform"), \
    ) \
    .distinct()


df_cmo_master.show()
#df_cmo_master = spark.sql('Select 1 as IDERICA, 1 as  Account_Number')

df_qrta = spark.read.format('parquet').option("header",True).load(f'{input_filepaths}/qrta_intermediate.parquet')
df_external_supplier = spark.read.format('parquet').option("header",True).load(f'{input_filepaths}/external_supplier.parquet')
df_comet_supplier= spark.read.format('parquet').option("header",True).load(f'{input_filepaths}/comet_supplier.parquet')

# dbfs:/FileStore/comet_supplier.parquet
# dbfs:/FileStore/external_supplier.parquet
# dbfs:/FileStore/qrta_intermediate.parquet
# dbfs:/FileStore/CMO_ERICA_AIM_SAP_Mapping_Master_Latest.xlsx

df_qrta_intermediate = df_qrta.select(
    col("PO - Batch_ID").alias("po_batch_id"),
    col("Batch").alias("vendor_batch_id"),
    col("SKU").alias("sku"),
    col("SKU Description").alias("sku_description"),
    col("Brand Name").alias("sku_brand"),
    col("Brand Platform").alias("sku_brand_platform"),
    to_date(col("Janssen Release - Actual"),'MM-dd-yyyy').alias("janssen_release_date"),
    to_date(col("Batch Start - Actual"),'MM-dd-yyyy').alias("manufacturing_start_date"),
    to_date(col("Batch End - Actual"),'MM-dd-yyyy').alias("manufacturing_completion_date"),
    to_date(col("CMO Release - Actual"),'MM-dd-yyyy').alias("cmo_release_date"),
    datediff(
        to_date(col("janseen_release"),'MM-dd-yyyy'), to_date(col("batch_end"),'MM-dd-yyyy')
    ).alias("release_turn_around_time_overall"),
    datediff(
        to_date(col("cmo_actual"),'MM-dd-yyyy'), to_date(col("batch_end"),'MM-dd-yyyy')
    ).alias("release_turn_around_time_cmo"),
    datediff(
        to_date(col("janseen_release"),'MM-dd-yyyy'), to_date(col("cmo_actual"),'MM-dd-yyyy')
    ).alias("release_turn_around_time_janssen"),
    col("# Returns to CMO").cast("int").alias("count_of_iteration_needed"),
    col("Status – Final Release").alias("final_release_status"),
    to_date(col("Status Date Final Releasee"),'MM-dd-yyyy').alias("final_release_status_date"),
    col("CMO Site ID").alias("cmo_id")
)

# COMMAND ----------

df_qrta_final = (
    df_qrta_intermediate.alias("qrta_intermediate")
    .join(
        df_cmo_master.alias("cmo_master"),
        col("qrta_intermediate.cmo_id") == col("cmo_master.IDERICA"),"left"
    )
    .join(
        df_external_supplier.alias("external_supplier"),
        col("qrta_intermediate.cmo_id") ==col("external_supplier.Site ID (Legacy ERICA ID)"),"left"
    )
    .join(
        df_comet_supplier.alias("comet_supplier"),
        col("external_supplier.COMET ID") == col("comet_supplier.Account Number"),"left"
    )
    .select(
        "qrta_intermediate.*",
        "cmo_master.cmo_site",
        "cmo_master.plan_platfrom",
        "cmo_master.pes_platform",
        col("cmo_master.target_days").cast("int").alias("target"),
        col("comet_supplier.Account Name").alias("quality_cmo_site"),
        col("comet_supplier.Responsible Owning Group").alias("eq_platform"),
        current_date().alias("last_refresh_date")
        
    )
)

df_qrta_final.show()

image

image

@wajda
Copy link
Contributor Author

wajda commented Apr 24, 2023

@harishyadavdevops this problem is most likely caused by the fact that you are reading your Excel file through Pandas API, which is not directly supported by Spline. If you click on the icon to open the detailed execution plan, and there you should see a 4th terminal node that represents your Excel data, but just isn't recognised as a read command (that's why you don't see it on the high-level lineage overview.

Try read the Excel file using Spark Excel connector instead of Pandas.

@harishyadavdevops
Copy link

harishyadavdevops commented Apr 24, 2023

df_cmo_master = spark.read.format("com.crealytics.spark.excel")\
                          .option('header','true')\
                          .option('inferSchema','true')\
                          .load(f"{input_filepath}/CMO_ERICA_AIM_SAP_Mapping_Master_Latest.xlsx")\
                          .select(\
                              col("IDERICA"),\
                              col("TargetDays").alias("target_days"),\
                              col("PrimaryPlatformPlan").alias("plan_platfrom"),\
                              col("sitename").alias("cmo_site"),\
                              col("primaryplatform").alias("pes_platform"),\
                            )\
                          .distinct()

@wajda i have used above code for reading the xlsx file . and code ran perfectly. but when i use this i faced the issue with no lineage is redirecting to Spline UI. Emplty screen is populated in UI.

@wajda
Copy link
Contributor Author

wajda commented Apr 24, 2023

By default Spline agent only reacts on writing data to a persistent storage, i.e. df.write(), never on df.read(), df.show() etc.
You can enable capturing memory-only actions if you want, it could be useful for debugging purposes:

spline.plugins.za.co.absa.spline.harvester.plugin.embedded.NonPersistentActionsCapturePlugin.enabled=true

@harishyadavdevops
Copy link

harishyadavdevops commented May 9, 2023 via email

@cerveada
Copy link
Contributor

cerveada commented May 9, 2023

Spline is a web application. HTTPS is managed by the web server, not the application itself.

For example, if you use Tomcat to run Spline, you have to set up tomcat tu sopport HTTPS.
https://tomcat.apache.org/tomcat-9.0-doc/ssl-howto.html

@harishyadavdevops
Copy link

harishyadavdevops commented May 10, 2023 via email

@cerveada
Copy link
Contributor

We try to help when possible, but we cannot spend time in meetings doing tech support.

I cannot tell what is wrong from the code you provided, but I put together a troubleshooting guide. You can try to go through it and find the issue yourself. I hope it will help: AbsaOSS/spline#1225

Another thing: All messages you send to this ticket are public GitHub issues, so be sure not to share any sensitive data here.

@harishyadavdevops
Copy link

I HAVE A JOBS IN AWS GLUE. but when i ran that job it ran successfully in aws glue. but lineage is not populated in spline.
Does ""gluecontext"" is not supported by spline ? if so why?? can some one explain ??

@wajda
Copy link
Contributor Author

wajda commented Dec 22, 2023

It should be supported. But I think the discussion has already deviated far from the original topic.

Please look through this - https://github.com/search?q=repo%3AAbsaOSS%2Fspline-spark-agent+glue&type=issues
If it doesn't help, create a separate issue or a discussion. Help us to keep thinks organised.
Thank you.

@harishyadavdevops
Copy link

harishyadavdevops commented Dec 23, 2023 via email

@wajda
Copy link
Contributor Author

wajda commented Dec 23, 2023

The short answer is - No, neither UI nor the REST API has any auth mechanism built-in. Likewise there is no notion of "user" in the system - no on-boarding is required to start using it.

The longer answer is the following.
The intention for Spline was to create a simple core system that focuses on one thing only - lineage tracking. The authentication layer can be added on top of it, for example by putting a simple proxy in front of the it that would intercept any HTTP calls and perform authentication. This would basically allow to implement all-or-nothing access control style. If you need more granular access control then the things start being more complex and involved. Some simpler authorization use-cases could still be implemented on the proxy level by intercepting not only requests, but also response and filtering the content being returned to the user. But more complex and sophisticated use-cases definitely have to be implemented in the Spline core. It all depends on what exactly your requirements are.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: New
Development

No branches or pull requests

3 participants