Skip to content

Commit 4cf8516

Browse files
authored
Merge pull request #1171 from data-integrations/fix/bq-sink-location-for-fqn
send correct location for BQ sink plugin when emitting lineage info
2 parents 1289664 + 25cba9b commit 4cf8516

File tree

1 file changed

+5
-1
lines changed

1 file changed

+5
-1
lines changed

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,11 @@ protected final void initOutput(BatchSinkContext context, BigQuery bigQuery, Str
163163
List<String> fieldNames = fields.stream()
164164
.map(BigQueryTableFieldSchema::getName)
165165
.collect(Collectors.toList());
166-
Asset asset = Asset.builder(outputName).setFqn(fqn).setLocation(getConfig().getLocation()).build();
166+
// Get the dataset location (if it exists)
167+
DatasetId datasetId = DatasetId.of(getConfig().getDatasetProject(), getConfig().getDataset());
168+
Dataset dataset = bigQuery.getDataset(datasetId);
169+
String location = dataset != null ? dataset.getLocation() : getConfig().getLocation();
170+
Asset asset = Asset.builder(outputName).setFqn(fqn).setLocation(location).build();
167171
BigQuerySinkUtils.recordLineage(context, asset, tableSchema, fieldNames);
168172
context.addOutput(Output.of(outputName, getOutputFormatProvider(configuration, tableName, tableSchema)));
169173
}

0 commit comments

Comments
 (0)