Skip to content

Commit c3c8fb7

Browse files
committed
Create manage_revision fields as dates
1 parent 6377f54 commit c3c8fb7

File tree

5 files changed

+36
-5
lines changed

5 files changed

+36
-5
lines changed

datapackage_pipelines_budgetkey/generator.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ def generate_pipeline(cls, source, base):
146146
}
147147
}
148148
}),
149+
('set-revisions', {}),
149150
('filter', {
150151
'resources': doc_type,
151152
'in': [

datapackage_pipelines_budgetkey/pipelines/budgetkey/elasticsearch/budgetkey.source-spec.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ supports:
112112

113113
tenders:
114114
kind: indexer
115-
revision: 15
115+
revision: 16
116116
dependent_pipeline: ./procurement/tenders/processed
117117
source_datapackage: /var/datapackages/procurement/tenders/processed/datapackage.json
118118
page-title-pattern: '{snippet}'

datapackage_pipelines_budgetkey/processors/manage-revisions.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ def process_resource(res, key_fields, hash_fields, existing_ids, prefix):
8787
logging.info('#%d: KEY: %r HASH: %r', i, key, hash)
8888
try:
8989
existing_id = existing_ids.get(key)
90+
row.update(existing_id)
9091
days_since_last_update = (now - existing_id[prefix+'__last_updated_at']).days
9192
days_since_last_update = max(0, days_since_last_update)
9293
next_update_days = existing_id[prefix+'__next_update_days']

datapackage_pipelines_budgetkey/processors/set-revision.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ def process_resources(res_iter_):
1818
yield from res_iter_
1919

2020

21-
dp['resources'][0]['schema']['fields'].append({
22-
'name': '__revision',
23-
'type': 'integer'
24-
})
21+
dp['resources'][0]['schema']['fields'].extend([
22+
{
23+
'name': '__revision',
24+
'type': 'integer'
25+
},
26+
])
2527

2628
spew(dp, process_resources(res_iter))
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from datapackage_pipelines.utilities.resources import PROP_STREAMING
2+
from datapackage_pipelines.wrapper import ingest, spew
3+
4+
parameters, dp, res_iter = ingest()
5+
6+
7+
def process_resource(res_):
8+
for row in res_:
9+
for k in ['__last_updated_at', '__last_modified_at', '__created_at']:
10+
if k in row and row[k]:
11+
row['rev'+k[1:]] = row[k].date()
12+
yield row
13+
14+
15+
def process_resources(res_iter_):
16+
first = next(res_iter_)
17+
yield process_resource(first)
18+
yield from res_iter_
19+
20+
21+
dp['resources'][0]['schema']['fields'].extend([
22+
{'name': 'rev_last_updated_at', 'type': 'date'},
23+
{'name': 'rev_last_modified_at', 'type': 'date'},
24+
{'name': 'rev_created_at', 'type': 'date'},
25+
])
26+
27+
spew(dp, process_resources(res_iter))

0 commit comments

Comments
 (0)