Skip to content

Commit 0c4231d

Browse files
committed
Index documents as well
1 parent e24c1ca commit 0c4231d

File tree

3 files changed

+48
-0
lines changed

3 files changed

+48
-0
lines changed

datapackage_pipelines_budgetkey/generator.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,21 @@ def generate_pipeline(cls, source):
6464
'doc_type': doc_type}
6565
]
6666
}
67+
}),
68+
('convert_to_key_value', {
69+
'key-prefix': doc_type,
70+
'key-fields': key_fields
71+
}),
72+
('sample'
73+
74+
),
75+
('dump_to_es', {
76+
'indexes': {
77+
'budgetkey': [
78+
{'resource-name': 'document',
79+
'doc_type': 'document'}
80+
]
81+
}
6782
})
6883
]
6984

datapackage_pipelines_budgetkey/pipelines/budgetkey/pipeline-spec.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ budget-functional-aggregates:
2626
target:
2727
name: budget-functional-aggregates
2828
key: null
29+
2930
fields:
3031
func_cls_title_1: null
3132
func_cls_title_2: null
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from datapackage_pipelines.utilities.resources import PROP_STREAMING
2+
from datapackage_pipelines.wrapper import ingest, spew
3+
4+
parameters, dp, res_iter = ingest()
5+
6+
prefix = parameters['key-prefix']
7+
key_fields = parameters['key-fields']
8+
9+
def process_resources(res_iter_):
10+
for res in res_iter_:
11+
for row in res:
12+
yield {
13+
'key': '/'.join([prefix] + [str(row[k]) for k in key_fields]),
14+
'value': dict(row)
15+
}
16+
17+
dp['resources'] = [
18+
{
19+
'name': 'document',
20+
PROP_STREAMING: True,
21+
'path': 'data/documents.csv',
22+
'schema': {
23+
'fields': [
24+
{'name': 'key', 'type': 'string'},
25+
{'name': 'value', 'type': 'object', 'es:index': False}
26+
],
27+
'primaryKey': ['key']
28+
}
29+
}
30+
]
31+
32+
spew(dp, [process_resources(res_iter)])

0 commit comments

Comments
 (0)