Skip to content

Commit 7e7298a

Browse files
committed
Added custom retry with backoff for error not covered in BQ client (400, please retry with backoff)
1 parent 097fd84 commit 7e7298a

File tree

1 file changed

+53
-8
lines changed

1 file changed

+53
-8
lines changed

src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,11 @@
4848
import org.slf4j.LoggerFactory;
4949

5050
import java.io.IOException;
51+
import java.util.ArrayList;
5152
import java.util.Collections;
53+
import java.util.List;
5254
import java.util.Map;
55+
import java.util.function.Function;
5356
import javax.annotation.Nullable;
5457

5558
/**
@@ -100,9 +103,6 @@ public void run(ActionContext context) throws Exception {
100103
// Enable legacy SQL
101104
builder.setUseLegacySql(config.isLegacySQL());
102105

103-
// Location must match that of the dataset(s) referenced in the query.
104-
JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build();
105-
106106
// API request - starts the query.
107107
Credentials credentials = config.getServiceAccount() == null ?
108108
null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(),
@@ -126,13 +126,17 @@ public void run(ActionContext context) throws Exception {
126126

127127
QueryJobConfiguration queryConfig = builder.build();
128128

129-
Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
130129

131-
LOG.info("Executing SQL as job {}.", jobId.getJob());
132-
LOG.debug("The BigQuery SQL is {}", config.getSql());
130+
// Setting external retry strategy for BigQuery client due to BigQuery Client not retrying when a job clashes
131+
// with another job, due to error being 400.
133132

134-
// Wait for the query to complete
135-
queryJob.waitFor();
133+
final String retryableStringPattern = "Retrying the job with back-off";
134+
List<Function<BigQueryException, Boolean>> retryRules = new ArrayList<>();
135+
retryRules.add(
136+
(BigQueryException e) -> e.getCode() == 400
137+
&& (e.getMessage().contains(retryableStringPattern) || e.getReason().contains(retryableStringPattern))
138+
);
139+
Job queryJob = executeQueryJobWithCustomRetry(bigQuery, queryConfig, retryRules);
136140

137141
// Check for errors
138142
if (queryJob.getStatus().getError() != null) {
@@ -169,6 +173,47 @@ public void run(ActionContext context) throws Exception {
169173
context.getMetrics().gauge(RECORDS_PROCESSED, rows);
170174
}
171175

176+
/**
177+
* Executes Query with added retry rules following:
178+
* https://cloud.google.com/bigquery/sla
179+
*/
180+
private Job executeQueryJobWithCustomRetry(BigQuery bigQuery, QueryJobConfiguration queryConfig,
181+
List<Function<BigQueryException, Boolean>> retryRules) throws Exception {
182+
// The longest amount of time to wait in-between retries.
183+
final int maximum_backoff = 32;
184+
185+
// The maximum number of retries.
186+
final int max_retries = 20;
187+
188+
int retries = 0;
189+
190+
while (true) {
191+
try {
192+
// Location must match that of the dataset(s) referenced in the query.
193+
JobId jobId = JobId.newBuilder().setRandomJob().setLocation(config.getLocation()).build();
194+
Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
195+
LOG.info("Executing SQL as job {}.", jobId.getJob());
196+
LOG.debug("The BigQuery SQL is {}", config.getSql());
197+
198+
// Wait for the query to complete
199+
queryJob.waitFor();
200+
return queryJob;
201+
} catch (BigQueryException bigQueryException) {
202+
if (retries >= max_retries) {
203+
LOG.error("Run out of retries while executing query with backoff.");
204+
throw bigQueryException;
205+
}
206+
if (retryRules.stream().noneMatch((f -> f.apply(bigQueryException)))) {
207+
throw bigQueryException;
208+
}
209+
LOG.warn("Received {} error from BigQuery, retrying...", bigQueryException.getMessage());
210+
long sleep_time = Math.round((Math.min(Math.pow(2, retries), maximum_backoff) + Math.random()) * 1000);
211+
Thread.sleep(sleep_time);
212+
retries += 1;
213+
}
214+
}
215+
}
216+
172217
@Override
173218
public AbstractBigQueryActionConfig getConfig() {
174219
return config;

0 commit comments

Comments
 (0)