Skip to content

Commit 25cba9b

Browse files
committed
send correct location for BQ sink plugin when emitting lineage info
1 parent 1289664 commit 25cba9b

File tree

1 file changed

+5
-1
lines changed

1 file changed

+5
-1
lines changed

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,11 @@ protected final void initOutput(BatchSinkContext context, BigQuery bigQuery, Str
163163
List<String> fieldNames = fields.stream()
164164
.map(BigQueryTableFieldSchema::getName)
165165
.collect(Collectors.toList());
166-
Asset asset = Asset.builder(outputName).setFqn(fqn).setLocation(getConfig().getLocation()).build();
166+
// Get the dataset location (if it exists)
167+
DatasetId datasetId = DatasetId.of(getConfig().getDatasetProject(), getConfig().getDataset());
168+
Dataset dataset = bigQuery.getDataset(datasetId);
169+
String location = dataset != null ? dataset.getLocation() : getConfig().getLocation();
170+
Asset asset = Asset.builder(outputName).setFqn(fqn).setLocation(location).build();
167171
BigQuerySinkUtils.recordLineage(context, asset, tableSchema, fieldNames);
168172
context.addOutput(Output.of(outputName, getOutputFormatProvider(configuration, tableName, tableSchema)));
169173
}

0 commit comments

Comments
 (0)