Skip to content

Commit f3b04c2

Browse files
authored
fix: ETL job failure when input dataset is empty (#1490)
1 parent d22c3e2 commit f3b04c2

File tree

3 files changed

+43
-10
lines changed

3 files changed

+43
-10
lines changed

src/data-pipeline/spark-etl/src/main/java/software/aws/solution/clickstream/TransformerV3.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -261,11 +261,6 @@ public Dataset<Row> extractUser(final Dataset<Row> eventDataset, final Dataset<R
261261
Dataset<Row> userDataset = convertedDataset.select(expr("dataOut.user.*"))
262262
.select(toColumnArray(ModelV2.getUserFields()));
263263

264-
if (userDataset.count() == 0) {
265-
log.info("extractUser return empty dataset");
266-
return userDataset;
267-
}
268-
269264
userDataset = userDataset.withColumn(USER_FIRST_EVENT_NAME, col(Constant.EVENT_NAME))
270265
.withColumn(USER_LATEST_EVENT_NAME, col(Constant.EVENT_NAME))
271266
.drop(Constant.EVENT_NAME);
@@ -279,6 +274,11 @@ public Dataset<Row> extractUser(final Dataset<Row> eventDataset, final Dataset<R
279274
log.info("tableName: {}", tableName);
280275
log.info("pathInfo - incremental: " + pathInfo.getIncremental() + ", full: " + pathInfo.getFull());
281276

277+
if (userDataset.count() == 0) {
278+
log.info("extractUser return empty dataset");
279+
return userDataset;
280+
}
281+
282282
// save new (append)
283283
String path = saveIncrementalDatasetToPath(pathInfo.getIncremental(), newUserAggDataset);
284284

src/data-pipeline/spark-etl/src/main/java/software/aws/solution/clickstream/transformer/BaseThirdPartyTransformer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -217,11 +217,6 @@ public Dataset<Row> extractUser(final Dataset<Row> eventDataset, final Dataset<R
217217
Dataset<Row> userDataset = convertedDataset.select(expr("dataOut.user.*"))
218218
.select(toColumnArray(ModelV2.getUserFields()));
219219

220-
if (userDataset.count() == 0) {
221-
log.info("extractUser return empty dataset");
222-
return userDataset;
223-
}
224-
225220
// agg new
226221
Dataset<Row> newUserAggDataset = aggUserDataset(userDataset, "newUserAggDataset");
227222
log.info("newUserAggDataset count: {}", newUserAggDataset.count());
@@ -231,6 +226,11 @@ public Dataset<Row> extractUser(final Dataset<Row> eventDataset, final Dataset<R
231226
log.info("tableName: {}", tableName);
232227
log.info("pathInfo - incremental: " + pathInfo.getIncremental() + ", full: " + pathInfo.getFull());
233228

229+
if (userDataset.count() == 0) {
230+
log.info("extractUser return empty dataset");
231+
return userDataset;
232+
}
233+
234234
// save new (append)
235235
String path = saveIncrementalDatasetToPath(pathInfo.getIncremental(), newUserAggDataset);
236236

src/data-pipeline/spark-etl/src/test/java/software/aws/solution/clickstream/TransformerV3Test.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,4 +316,37 @@ void test_extract_session_from_event2() throws IOException {
316316
String expectedJson1 = this.resourceFileAsString("/event_v2/expected/test_extract_session_from_event2.json");
317317
Assertions.assertEquals(expectedJson1, replaceDynData(sessionDataset.first().prettyJson()));
318318
}
319+
320+
321+
@Test
322+
public void should_transform_can_merge_state_table_when_dataset_is_empty_v2() throws IOException {
323+
// DOWNLOAD_FILE=0 ./gradlew clean test --info --tests software.aws.solution.clickstream.TransformerV3Test.should_transform_can_merge_state_table_when_dataset_is_empty_v2
324+
System.setProperty(APP_IDS_PROP, "uba-app");
325+
System.setProperty(PROJECT_ID_PROP, "test_project_id_01");
326+
System.setProperty("force.merge", "true");
327+
328+
String testWarehouseDir = "/tmp/warehouse/should_transform_can_merge_state_table_when_dataset_is_empty_v2/" + new Date().getTime();
329+
System.setProperty(WAREHOUSE_DIR_PROP, testWarehouseDir);
330+
331+
Dataset<Row> dataset =
332+
spark.read().json(requireNonNull(getClass().getResource("/original_data_with_user_profile_set2.json")).getPath());
333+
334+
Dataset<Row> emptyDataset = dataset.filter(col("appId").equalTo("empty"));
335+
// make sure input dataset is empty
336+
Assertions.assertEquals(0, emptyDataset.count());
337+
338+
339+
// create folders for full and incremental table
340+
String tableDirFull = testWarehouseDir + "/" + getUserPropsTableName("clickstream") + FULL_SUFFIX + TABLE_VERSION_SUFFIX_V3;
341+
emptyDataset.write().mode(SaveMode.Overwrite).json(tableDirFull);
342+
String tableDirIncr = testWarehouseDir + "/" + getUserPropsTableName("clickstream") + INCREMENTAL_SUFFIX + TABLE_VERSION_SUFFIX_V3;
343+
emptyDataset.write().mode(SaveMode.Overwrite).json(tableDirIncr);
344+
345+
346+
Map<TableName, Dataset<Row>> transformedDatasets = transformer.transform(emptyDataset);
347+
Dataset<Row> datasetEvent = transformedDatasets.get(TableName.EVENT_V2);
348+
349+
transformer.postTransform(datasetEvent); // should no error
350+
}
351+
319352
}

0 commit comments

Comments
 (0)