-
Notifications
You must be signed in to change notification settings - Fork 42
small improvements #18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: orchestrator_course
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,13 @@ | ||
from prefect import flow, task | ||
from prefect.blocks.system import Secret | ||
import os | ||
import dlt | ||
import copy | ||
from datetime import datetime, timedelta, timezone | ||
from prefect import flow, task | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the user must install this prefect. so you should add this into the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, will do. Along with all the CLI commands the audience would need for the entire course. |
||
from prefect.blocks.system import Secret | ||
from prefect_github import GitHubCredentials | ||
from prefect.task_runners import ThreadPoolTaskRunner | ||
from datetime import datetime, timedelta, timezone | ||
from dlt.sources.rest_api import rest_api_source | ||
import example_pipeline | ||
|
||
|
||
def _set_env(md_db: str): | ||
|
@@ -28,10 +30,11 @@ def _set_env(md_db: str): | |
os.environ["DESTINATION__MOTHERDUCK__CREDENTIALS__DATABASE"] = md_db | ||
|
||
@task(retries=2, log_prints=True) | ||
def run_resource(resource_name: str, md_db: str, start_date: str = None, end_date: str = None): | ||
def run_resource_in_task_pipeline(resource_name: str, md_db: str, start_date: str = None, end_date: str = None): | ||
""" | ||
Runs a specific resource from the GitHub API using the example pipeline configuration. | ||
Supports backfilling (for forks resource) by setting start_date and end_date. | ||
Runs a specific resource from the GitHub API in a dedicated task pipeline, named after the | ||
resource, so that each task pipeline runs in isolation. | ||
If resource is "forks", you can run with backfilling by setting start_date and end_date. | ||
|
||
Args: | ||
resource_name (str): Name of the GitHub resource to run. | ||
|
@@ -41,28 +44,27 @@ def run_resource(resource_name: str, md_db: str, start_date: str = None, end_dat | |
""" | ||
# Ensure environment vars for dlt are set for this task execution | ||
_set_env(md_db) | ||
import example_pipeline,copy | ||
# Copy pipeline config so modifications don’t leak between tasks | ||
cfg = copy.deepcopy(example_pipeline.config) | ||
config = copy.deepcopy(example_pipeline.config) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why are you copying the pipeline config and passing it to the source? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah i see. this is misleading name. the file suggestions: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also, this file should be named There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. re3: |
||
# If the resource is "forks" and a backfill window is provided, inject it dynamically | ||
if resource_name == "forks" and start_date and end_date: | ||
for res in cfg["resources"]: | ||
for res in config["resources"]: | ||
if res["name"] == "forks": | ||
res["endpoint"]["incremental"]["initial_value"] = start_date | ||
res["endpoint"]["incremental"]["end_value"] = end_date | ||
# pick just one resource from your dlt source | ||
src = rest_api_source(cfg).with_resources(resource_name) | ||
source = rest_api_source(config).with_resources(resource_name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a bit all over the place: I think your files need to be named after what they define, you can include an example way to run the source either commented out, or what's cleaner in a main clause so that it wont be executed if users import the file here you should then .with_resources, which returns a clone of the original source with just your resource selected: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we addressed this issue earlier. Will rename and that should clear up the confusion. |
||
# unique pipeline per resource avoids dlt state clashes | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thats a useful comment. also add: "creates separate pipeline working directories, isolates task pipelines" or something like that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do - adds more context. |
||
pipe = dlt.pipeline( | ||
pipeline = dlt.pipeline( | ||
pipeline_name=f"rest_api_github__{resource_name}", | ||
destination="motherduck", | ||
dataset_name="rest_api_data_mlt_backfill", | ||
progress="log", | ||
) | ||
# Run extraction + load into destination | ||
info = pipe.run(src) | ||
print(f"{resource_name} -> {info}") | ||
return info | ||
load_info = pipeline.run(source) | ||
print(f"{resource_name} -> {load_info}") | ||
return load_info | ||
|
||
@flow(task_runner=ThreadPoolTaskRunner(max_workers=5), log_prints=True) | ||
def main(md_db: str = "dlt_test"): | ||
|
@@ -72,19 +74,21 @@ def main(md_db: str = "dlt_test"): | |
Args: | ||
md_db (str, optional): MotherDuck database name. Defaults to "dlt_test". | ||
""" | ||
# Get today's date in UTC and subtract one day to get yesterday's date | ||
end_dt = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) | ||
start_dt = end_dt - timedelta(days=1) | ||
# Get today's datetime in UTC and subtract one day to get yesterday's date | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually its a datetime not a date. its always useful to be correct about these differences There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, will change it to datetime. |
||
# Create interval from yesterday to today | ||
beginning_of_today = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) | ||
start_of_yesterday = beginning_of_today - timedelta(days=1) | ||
|
||
start_iso = start_dt.isoformat() # Outputs: 2025-08-30T00:00:00+00:00 | ||
end_iso = end_dt.isoformat() | ||
start_iso = start_of_yesterday.isoformat() # Outputs: 2025-08-30T00:00:00+00:00 | ||
end_iso = beginning_of_today.isoformat() | ||
|
||
# Launch tasks concurrently (repos, contributors, issues, forks, releases) | ||
a = run_resource.submit("repos", md_db) | ||
b = run_resource.submit("contributors", md_db) | ||
c = run_resource.submit("issues", md_db) | ||
d = run_resource.submit("forks", md_db, start_date=start_iso, end_date=end_iso) | ||
e = run_resource.submit("releases", md_db) | ||
|
||
a = run_resource_in_task_pipeline.submit("repos", md_db) | ||
b = run_resource_in_task_pipeline.submit("contributors", md_db) | ||
c = run_resource_in_task_pipeline.submit("issues", md_db) | ||
d = run_resource_in_task_pipeline.submit("forks", md_db, start_date=start_iso, end_date=end_iso) | ||
e = run_resource_in_task_pipeline.submit("releases", md_db) | ||
|
||
# Wait for all tasks to complete and return results | ||
return a.result(), b.result(), c.result(), d.result(), e.result() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its good practice to order imports from most general to specific and also group them
python standards, at the top, other libraries, imports from other files should be at the bottom