Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions workshops/orchestrator_course/dlt_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import dlt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not relevant to this script itself, but I am wondering if there should also be a README file with instructions on running the script. We can give installation commands and also provide links to the dlt docs when relevant.

The script looks and works great, but it might be hard for users to navigate it without a guide.

from dlt.sources.rest_api import RESTAPIConfig, rest_api_source

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pendulum needs to be imported

config: RESTAPIConfig = {
"client": {
"base_url": "https://api.github.com",
"auth": {
"token": dlt.secrets["sources.access_token"],
},
"headers": {
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28"
},
"paginator": "header_link"
},
"resources": [
{
"name": "repos",
"endpoint": {
"path": "orgs/dlt-hub/repos"
},
},
{
"name": "contributors",
"endpoint": {
"path": "repos/dlt-hub/dlt/contributors",
},
},
{
"name": "issues",
"endpoint": {
"path": "repos/dlt-hub/dlt/issues",
"params": {
"state": "open", # Only get open issues
"sort": "updated",
"direction": "desc",
"since": "{incremental.start_value}" # For incremental loading
},
"incremental": {
"cursor_path": "updated_at",
"initial_value": "2025-07-01T00:00:00Z"
}
},
},
{
"name": "forks",
"endpoint": {
"path": "repos/dlt-hub/dlt/forks",
"params": {
"sort": "oldest", # Ensures ascending creation order
"per_page": 100
},
"incremental": { #backfill
"cursor_path": "created_at",
"initial_value": "2025-07-01T00:00:00Z",
"row_order": "asc"
}
},
},
{
"name": "releases",
"endpoint": {
"path": "repos/dlt-hub/dlt/releases",
},
},
],
}

github_source = rest_api_source(config)

pipeline = dlt.pipeline(
pipeline_name="github_repos_issues",
destination="bigquery",
dataset_name="github_data",
progress="log" # Add logging as per rule recommendation
)

load_info = pipeline.run(github_source)
print(load_info)
14 changes: 14 additions & 0 deletions workshops/orchestrator_course/prefect_course/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Prefect Pipelines

A repository for building, testing, and deploying data pipelines using Prefect.

## Overview

This project demonstrates how to create and deploy data workflows using Prefect, a modern workflow orchestration tool. The pipelines in this repository showcase best practices for building reliable, observable, and maintainable data pipelines.

## Features

- Task-based workflow definitions
- Parallel task execution with mapping
- Remote deployment capabilities
- Monitoring and observability
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import os
import dlt
from prefect import flow, task
from prefect_github import GitHubCredentials
from prefect_gcp import GcpCredentials

def set_github_pat_env():
# GitHub PAT (GitHubCredentials.token is SecretStr -> use .get_secret_value())
pat = GitHubCredentials.load("github-pat").token.get_secret_value()
os.environ["SOURCES__ACCESS_TOKEN"] = pat # dlt.secrets["sources.access_token"]

def make_bq_destination():
#get service account info from gcp credentials block
gcp = GcpCredentials.load("gcp-creds")
creds = gcp.service_account_info.get_secret_value() or {}
#get project id from service account info
project = creds.get("project_id")
#create bigquery destination
return dlt.destinations.bigquery(credentials=creds, project_id=project)

@task(log_prints=True)
def run_resource(resource_name: str, bq_dest: dlt.destinations.bigquery, start_date: str = None, end_date: str = None):

import github_pipeline
#get base source
base_source = github_pipeline.github_source
#apply incremental hints to issues resource
if resource_name == "forks" and start_date and end_date:
base_source.forks.apply_hints( # or: base_source.resources["issues"]
incremental=dlt.sources.incremental(
"created_at",
initial_value=start_date,
end_value=end_date,
row_order="asc"
)
)

selected_source = base_source.with_resources(resource_name)

# unique pipeline per resource avoids dlt state clashes
pipeline = dlt.pipeline(
pipeline_name=f"rest_api_github__{resource_name}",
destination=bq_dest,
dataset_name="prefect_orc_demo_bf",
progress="log",
)
info = pipeline.run(selected_source)
print(f"{resource_name} -> {info}")
return info

@flow(log_prints=True)
def main(start_date: str | None = None, end_date: str | None = None):
#set env variables
set_github_pat_env()
#create bigquery destination
bq_dest = make_bq_destination()
#run resources
a = run_resource("forks", bq_dest, start_date=start_date, end_date=end_date)

return a

if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import os
import dlt
from prefect import flow, task
from prefect_github import GitHubCredentials
from prefect_gcp import GcpCredentials

def set_github_pat_env():
# GitHub PAT (GitHubCredentials.token is SecretStr -> use .get_secret_value())
pat = GitHubCredentials.load("github-pat").token.get_secret_value()
os.environ["SOURCES__ACCESS_TOKEN"] = pat # dlt.secrets["sources.access_token"]

def make_bq_destination():
#get service account info from gcp credentials block
gcp = GcpCredentials.load("gcp-creds")
creds = gcp.service_account_info.get_secret_value() or {}
#get project id from service account info
project = creds.get("project_id")
#create bigquery destination
return dlt.destinations.bigquery(credentials=creds, project_id=project)

@task(log_prints=True)
def run_resource(resource_name: str, bq_dest: dlt.destinations.bigquery, incremental_date: str = None):

import github_pipeline
#get base source
base_source = github_pipeline.github_source
#apply incremental hints to issues resource
if resource_name == "issues" and incremental_date:
base_source.issues.apply_hints( # or: base_source.resources["issues"]
incremental=dlt.sources.incremental(
"created_at",
initial_value=incremental_date # "2024-04-01T00:00:00Z"
)
)

selected_source = base_source.with_resources(resource_name)

# unique pipeline per resource avoids dlt state clashes
pipeline = dlt.pipeline(
pipeline_name=f"rest_api_github__{resource_name}",
destination=bq_dest,
dataset_name="prefect_orc_demo_inc",
progress="log",
)
info = pipeline.run(selected_source)
print(f"{resource_name} -> {info}")
return info

@flow(log_prints=True)
def main(incremental_date: str | None = None):
#set env variables
set_github_pat_env()
#create bigquery destination
bq_dest = make_bq_destination()
#run resources
a = run_resource("repos", bq_dest)
b = run_resource("contributors", bq_dest)
c = run_resource("releases", bq_dest)
d = run_resource("issues", bq_dest, incremental_date=incremental_date)
return a, b, c, d

if __name__ == "__main__":
main()
30 changes: 30 additions & 0 deletions workshops/orchestrator_course/prefect_course/flow_local_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from prefect import flow, task
import dlt

@task(log_prints=True)
def run_resource(resource_name: str):

import github_pipeline
# pick just one resource from your dlt source
source = github_pipeline.github_source.with_resources(resource_name)
# unique pipeline per resource avoids dlt state clashes
pipeline = dlt.pipeline(
pipeline_name=f"rest_api_github__{resource_name}",
destination="bigquery",
dataset_name="prefect_orc_demo",
progress="log",
)
info = pipeline.run(source)
print(f"{resource_name} -> {info}")
return info

@flow(log_prints=True)
def main():

a = run_resource("repos")
b = run_resource("contributors")
c = run_resource("releases")
return a, b, c

if __name__ == "__main__":
main.serve(name="bigquery_deployment")
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import os
import dlt
from prefect import flow, task
from prefect_github import GitHubCredentials
from prefect_gcp import GcpCredentials
from prefect.task_runners import ThreadPoolTaskRunner

def set_github_pat_env():
# GitHub PAT (GitHubCredentials.token is SecretStr -> use .get_secret_value())
pat = GitHubCredentials.load("github-pat").token.get_secret_value()
os.environ["SOURCES__ACCESS_TOKEN"] = pat # dlt.secrets["sources.access_token"]

def make_bq_destination():
#get service account info from gcp credentials block
gcp = GcpCredentials.load("gcp-creds")
creds = gcp.service_account_info.get_secret_value() or {}
#get project id from service account info
project = creds.get("project_id")
#create bigquery destination
return dlt.destinations.bigquery(credentials=creds, project_id=project)

@task(retries=2, retry_delay_seconds=10, log_prints=True)
def run_resource(resource_name: str, bq_dest: dlt.destinations.bigquery, incremental_date: str = None):

import github_pipeline
#get base source
base_source = github_pipeline.github_source
#apply incremental hints to issues resource
if resource_name == "issues" and incremental_date:
base_source.issues.apply_hints( # or: base_source.resources["issues"]
incremental=dlt.sources.incremental(
"created_at",
initial_value=incremental_date # "2024-04-01T00:00:00Z"
)
)

selected_source = base_source.with_resources(resource_name)

# unique pipeline per resource avoids dlt state clashes
pipeline = dlt.pipeline(
pipeline_name=f"rest_api_github__{resource_name}",
destination=bq_dest,
dataset_name="prefect_orc_demo_pzl",
progress="log",
)
info = pipeline.run(selected_source)
print(f"{resource_name} -> {info}")
return info

@flow(task_runner=ThreadPoolTaskRunner(max_workers=5), log_prints=True)
def main(incremental_date: str | None = None):
#set env variables
set_github_pat_env()
#create bigquery destination
bq_dest = make_bq_destination()
#run resources
a = run_resource.submit("repos", bq_dest)
b = run_resource.submit("contributors", bq_dest)
c = run_resource.submit("releases", bq_dest)
d = run_resource.submit("forks", bq_dest)
e = run_resource.submit("issues", bq_dest, incremental_date=incremental_date)
return a.result(), b.result(), c.result(), d.result(), e.result()

if __name__ == "__main__":
main()
51 changes: 51 additions & 0 deletions workshops/orchestrator_course/prefect_course/flow_remote_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import os
import dlt
from prefect import flow, task
from prefect_github import GitHubCredentials
from prefect_gcp import GcpCredentials

def set_github_pat_env():
# GitHub PAT (GitHubCredentials.token is SecretStr -> use .get_secret_value())
pat = GitHubCredentials.load("github-pat").token.get_secret_value()
os.environ["SOURCES__ACCESS_TOKEN"] = pat # dlt.secrets["sources.access_token"]

def make_bq_destination():
#get service account info from gcp credentials block
gcp = GcpCredentials.load("gcp-creds")
creds = gcp.service_account_info.get_secret_value() or {}
#get project id from service account info
project = creds.get("project_id")
#create bigquery destination
return dlt.destinations.bigquery(credentials=creds, project_id=project)

@task(log_prints=True)
def run_resource(resource_name: str, bq_dest: dlt.destinations.bigquery):

import github_pipeline
# pick just one resource from your dlt source
source = github_pipeline.github_source.with_resources(resource_name)
# unique pipeline per resource avoids dlt state clashes
pipeline = dlt.pipeline(
pipeline_name=f"rest_api_github__{resource_name}",
destination=bq_dest,
dataset_name="prefect_orc_demo",
progress="log",
)
info = pipeline.run(source)
print(f"{resource_name} -> {info}")
return info

@flow(log_prints=True)
def main():
#set env variables
set_github_pat_env()
#create bigquery destination
bq_dest = make_bq_destination()
#run resources
a = run_resource("repos", bq_dest)
b = run_resource("contributors", bq_dest)
c = run_resource("releases", bq_dest)
return a, b, c

if __name__ == "__main__":
main()
Loading