-
Notifications
You must be signed in to change notification settings - Fork 42
small improvements #18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: orchestrator_course
Are you sure you want to change the base?
Conversation
from prefect.blocks.system import Secret | ||
import os | ||
import dlt | ||
import copy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its good practice to order imports from most general to specific and also group them
python standards, at the top, other libraries, imports from other files should be at the bottom
import dlt | ||
import copy | ||
from datetime import datetime, timedelta, timezone | ||
from prefect import flow, task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the user must install this prefect. so you should add this into the requireemtns.txt
file or into the readme
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, will do. Along with all the CLI commands the audience would need for the entire course.
import example_pipeline,copy | ||
# Copy pipeline config so modifications don’t leak between tasks | ||
cfg = copy.deepcopy(example_pipeline.config) | ||
config = copy.deepcopy(example_pipeline.config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are you copying the pipeline config and passing it to the source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah i see. this is misleading name. the file example_pipeline.py
actually defines your source.
suggestions:
in the file: rename github_source -> source
rename file github_source.py
(or github_rest_api) then it would be from github_source import config
or github_source.config`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, this file should be named prefect_backfill_example.py
spell it out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Yeah, these file names were just for testing. I will rename them as you suggested here now and for the course.
- dlt_pipeline.py was something I created to share the rest_api_source with Aashish and Alena to use in their course.
- The reason I'm doing a copy is more for safety reasons since we are sharing the same config across multiple pipelines running concurrently - so that any changes made in one place don’t affect the original configuration or other tasks using it. It's not necessary since all the other pipelines don't share the same resources. We can remove it. Let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re3:
only deepcopy if you modify and use it in different pipelines. here you load it from the other file, modify it and use it. mind that you don't modify the original config. if another piece of code running on a different thread that runs in parallel is also importing this config it will have its own copy which is the same as the file
# pick just one resource from your dlt source | ||
src = rest_api_source(cfg).with_resources(resource_name) | ||
source = rest_api_source(config).with_resources(resource_name) | ||
# unique pipeline per resource avoids dlt state clashes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thats a useful comment. also add: "creates separate pipeline working directories, isolates task pipelines" or something like that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do - adds more context.
# Get today's date in UTC and subtract one day to get yesterday's date | ||
end_dt = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) | ||
start_dt = end_dt - timedelta(days=1) | ||
# Get today's datetime in UTC and subtract one day to get yesterday's date |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually its a datetime not a date. its always useful to be correct about these differences
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, will change it to datetime.
res["endpoint"]["incremental"]["end_value"] = end_date | ||
# pick just one resource from your dlt source | ||
src = rest_api_source(cfg).with_resources(resource_name) | ||
source = rest_api_source(config).with_resources(resource_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a bit all over the place:
pipeline_example.py
you define the github_source
dlt_pipeline.py
also defines it
you then use neither of the above and instead take config and instantiate the rest_api_source
with it
I think your files need to be named after what they define, you can include an example way to run the source either commented out, or what's cleaner in a main clause so that it wont be executed if users import the file
here you should then .with_resources, which returns a clone of the original source with just your resource selected:
source = github_source(config).with_resources(resource_name)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we addressed this issue earlier. Will rename and that should clear up the confusion.
few things to make this feel like the rest of our codebase and educational materials:
spell out the names: src -> source, cfg -> config
it makes things easier to read (code is more often read then written so that what its optimized for)