Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 28 additions & 24 deletions workshops/orchestrator_course/prefect_bckfil_flow.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from prefect import flow, task
from prefect.blocks.system import Secret
import os
import dlt
import copy
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its good practice to order imports from most general to specific and also group them
python standards, at the top, other libraries, imports from other files should be at the bottom

from datetime import datetime, timedelta, timezone
from prefect import flow, task
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the user must install this prefect. so you should add this into the requireemtns.txt file or into the readme

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, will do. Along with all the CLI commands the audience would need for the entire course.

from prefect.blocks.system import Secret
from prefect_github import GitHubCredentials
from prefect.task_runners import ThreadPoolTaskRunner
from datetime import datetime, timedelta, timezone
from dlt.sources.rest_api import rest_api_source
import example_pipeline


def _set_env(md_db: str):
Expand All @@ -28,10 +30,11 @@ def _set_env(md_db: str):
os.environ["DESTINATION__MOTHERDUCK__CREDENTIALS__DATABASE"] = md_db

@task(retries=2, log_prints=True)
def run_resource(resource_name: str, md_db: str, start_date: str = None, end_date: str = None):
def run_resource_in_task_pipeline(resource_name: str, md_db: str, start_date: str = None, end_date: str = None):
"""
Runs a specific resource from the GitHub API using the example pipeline configuration.
Supports backfilling (for forks resource) by setting start_date and end_date.
Runs a specific resource from the GitHub API in a dedicated task pipeline, named after the
resource, so that each task pipeline runs in isolation.
If resource is "forks", you can run with backfilling by setting start_date and end_date.

Args:
resource_name (str): Name of the GitHub resource to run.
Expand All @@ -41,28 +44,27 @@ def run_resource(resource_name: str, md_db: str, start_date: str = None, end_dat
"""
# Ensure environment vars for dlt are set for this task execution
_set_env(md_db)
import example_pipeline,copy
# Copy pipeline config so modifications don’t leak between tasks
cfg = copy.deepcopy(example_pipeline.config)
config = copy.deepcopy(example_pipeline.config)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you copying the pipeline config and passing it to the source?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see. this is misleading name. the file example_pipeline.py actually defines your source.

suggestions:
in the file: rename github_source -> source
rename file github_source.py (or github_rest_api) then it would be from github_source import config or github_source.config`

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, this file should be named prefect_backfill_example.py
spell it out

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Yeah, these file names were just for testing. I will rename them as you suggested here now and for the course.
  2. dlt_pipeline.py was something I created to share the rest_api_source with Aashish and Alena to use in their course.
  3. The reason I'm doing a copy is more for safety reasons since we are sharing the same config across multiple pipelines running concurrently - so that any changes made in one place don’t affect the original configuration or other tasks using it. It's not necessary since all the other pipelines don't share the same resources. We can remove it. Let me know.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re3:
only deepcopy if you modify and use it in different pipelines. here you load it from the other file, modify it and use it. mind that you don't modify the original config. if another piece of code running on a different thread that runs in parallel is also importing this config it will have its own copy which is the same as the file

# If the resource is "forks" and a backfill window is provided, inject it dynamically
if resource_name == "forks" and start_date and end_date:
for res in cfg["resources"]:
for res in config["resources"]:
if res["name"] == "forks":
res["endpoint"]["incremental"]["initial_value"] = start_date
res["endpoint"]["incremental"]["end_value"] = end_date
# pick just one resource from your dlt source
src = rest_api_source(cfg).with_resources(resource_name)
source = rest_api_source(config).with_resources(resource_name)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit all over the place:
pipeline_example.py you define the github_source
dlt_pipeline.py also defines it
you then use neither of the above and instead take config and instantiate the rest_api_source with it

I think your files need to be named after what they define, you can include an example way to run the source either commented out, or what's cleaner in a main clause so that it wont be executed if users import the file

here you should then .with_resources, which returns a clone of the original source with just your resource selected:
source = github_source(config).with_resources(resource_name)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we addressed this issue earlier. Will rename and that should clear up the confusion.

# unique pipeline per resource avoids dlt state clashes
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats a useful comment. also add: "creates separate pipeline working directories, isolates task pipelines" or something like that

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do - adds more context.

pipe = dlt.pipeline(
pipeline = dlt.pipeline(
pipeline_name=f"rest_api_github__{resource_name}",
destination="motherduck",
dataset_name="rest_api_data_mlt_backfill",
progress="log",
)
# Run extraction + load into destination
info = pipe.run(src)
print(f"{resource_name} -> {info}")
return info
load_info = pipeline.run(source)
print(f"{resource_name} -> {load_info}")
return load_info

@flow(task_runner=ThreadPoolTaskRunner(max_workers=5), log_prints=True)
def main(md_db: str = "dlt_test"):
Expand All @@ -72,19 +74,21 @@ def main(md_db: str = "dlt_test"):
Args:
md_db (str, optional): MotherDuck database name. Defaults to "dlt_test".
"""
# Get today's date in UTC and subtract one day to get yesterday's date
end_dt = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
start_dt = end_dt - timedelta(days=1)
# Get today's datetime in UTC and subtract one day to get yesterday's date
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually its a datetime not a date. its always useful to be correct about these differences

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, will change it to datetime.

# Create interval from yesterday to today
beginning_of_today = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
start_of_yesterday = beginning_of_today - timedelta(days=1)

start_iso = start_dt.isoformat() # Outputs: 2025-08-30T00:00:00+00:00
end_iso = end_dt.isoformat()
start_iso = start_of_yesterday.isoformat() # Outputs: 2025-08-30T00:00:00+00:00
end_iso = beginning_of_today.isoformat()

# Launch tasks concurrently (repos, contributors, issues, forks, releases)
a = run_resource.submit("repos", md_db)
b = run_resource.submit("contributors", md_db)
c = run_resource.submit("issues", md_db)
d = run_resource.submit("forks", md_db, start_date=start_iso, end_date=end_iso)
e = run_resource.submit("releases", md_db)

a = run_resource_in_task_pipeline.submit("repos", md_db)
b = run_resource_in_task_pipeline.submit("contributors", md_db)
c = run_resource_in_task_pipeline.submit("issues", md_db)
d = run_resource_in_task_pipeline.submit("forks", md_db, start_date=start_iso, end_date=end_iso)
e = run_resource_in_task_pipeline.submit("releases", md_db)

# Wait for all tasks to complete and return results
return a.result(), b.result(), c.result(), d.result(), e.result()
Expand Down