Skip to content

Feat/theodw 2863 nsip document entity refactor#2413

Open
KalyaniNik wants to merge 14 commits intomainfrom
feat/THEODW-2863-nsip-document-entity-refactor
Open

Feat/theodw 2863 nsip document entity refactor#2413
KalyaniNik wants to merge 14 commits intomainfrom
feat/THEODW-2863-nsip-document-entity-refactor

Conversation

@KalyaniNik
Copy link
Copy Markdown
Contributor

Jira : https://pins-ds.atlassian.net/browse/THEODW-2863

Changes summary :
Refactored harmonised and curated notebooks to python classes for nisp-document entity.

Note: Unit and Integration tests yet to be done .

PR Template

Note: Run the correct ADO pipeline for this PR - check the list here:
ODW Repositories

  1. JIRA Ticket Reference :

    [ Enter JIRA ticket number and Title here]

  2. Summary of the work :

    [ Enter Summary here]

  3. New Source-to-Raw Datasets

    • New source data has been added
      • A trigger has been attached at the appropriate frequency
  4. New Tables in Standardised Layer

    • New standardised tables have been created
      • orchestration.json is updated and tested in Dev, and PR is open or merged to main
      • Schema exists in odw-config/standardised-table-definitions or is about to be PRd
  5. New Tables in Harmonised or Curated Layers

    • New harmonised or curated tables have been created
      • Script is configured in the pipeline pln_post_deployments
      • Schema exists in odw-config/harmonised-table-definitions or curated-table-definitions or is about to be PRd
  6. Schema or Column Changes
    (Only new columns or columns with changed data types are in scope)

    • Changes to table structure or columns
      • py_change_table is set to run in pln_post_deployments
      • A script has been created to backfill or populate new column(s) in Test and Prod
        • Avoid dropping and recreating tables unless strictly necessary
  7. Script Execution in Build

    • Scripts have run in isolation in Build
      • Script has been added to pln_post_deployments
      • Script is now part of a scheduled pipeline with correct triggers
    • No scripts have run or no action required in Test/Prod
  8. Table Creation and Schema Validation

    • All required tables have been created
    • Schema has been validated against the requirements
  9. Deployment and Schema Change Documentation

    • Deployment steps and rollback procedures are documented
    • Schema change handling is outlined and tested
  10. Archiving Process Review

    • Automatic archiving logic has been reviewed
    • Archiving schedules and retention policies are validated

Comment on lines +40 to +44
LoggingUtil().log_info(f"Loading harmonised NSIP Document data from {self.HARMONISED_TABLE}")
harmonised_docs = self.spark.sql(f"SELECT * FROM {self.HARMONISED_TABLE}")

LoggingUtil().log_info(f"Loading curated NSIP Project data from {self.CURATED_PROJECT_TABLE}")
curated_projects = self.spark.sql(f"SELECT * FROM {self.CURATED_PROJECT_TABLE}")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to load the data here? In the nsip_document notebook doesn't seem to do this?

harmonised_docs.createOrReplaceTempView("harmonised_nsip_document")
curated_projects.createOrReplaceTempView("curated_nsip_project")

df = self.spark.sql("""
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you select the data during load_data, but join here instead? Just to keep the reading/writing separate

Comment on lines +63 to +64
harmonised_docs.createOrReplaceTempView("harmonised_nsip_document")
curated_projects.createOrReplaceTempView("curated_nsip_project")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you simplify the SELECT queries (and moved the logic to load_data, then this could be removed i think

]

# Columns used for the final deduplication step
_DEDUP_COLUMNS = [
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry unable to find the typo

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, i think it should be DEDUPE, altho this isnt a big issue

Comment on lines +210 to +216
FROM
{self.HORIZON_TABLE} AS Doc
LEFT JOIN {self.AIE_EXTRACTS_TABLE} AS Aie
ON Doc.dataid = Aie.DocumentId
AND Doc.version = Aie.version
AND Doc.dataSize = Aie.size
WHERE
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the joins should be moved to process instead

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Join moved to process

Copy link
Copy Markdown
Collaborator

@HarrisonBoyleThomas HarrisonBoyleThomas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to add some unit/integration tests

curated_projects: DataFrame = self.load_parameter("curated_projects", source_data)

# Filter to active records
docs = harmonised_docs.filter(F.col("IsActive") == "Y")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move this into the SQL query to load_data to boost performance, otherwise it'll load all the data and filter afterwards


# Filter to active records and select curated columns
df = (
harmonised_subscriptions.filter(F.col("IsActive") == "Y")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, i'd move the filter to load_data

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants