Skip to content

Commit a6ff9fa

Browse files
authored
Merge pull request #1075 from data-integrations/feat/support-updating-metadata-in-dataplex-for-newly-generated-data
add user managed schema option to dataplex sink plugin
2 parents 4f8af83 + 292552e commit a6ff9fa

File tree

6 files changed

+486
-20
lines changed

6 files changed

+486
-20
lines changed

src/main/java/io/cdap/plugin/gcp/dataplex/common/util/DataplexConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,6 @@ public final class DataplexConstants {
3030
public static final String NONE = "none";
3131
public static final String BIGQUERY_DATASET_ASSET_TYPE = "BIGQUERY_DATASET";
3232
public static final String STORAGE_BUCKET_ASSET_TYPE = "STORAGE_BUCKET";
33+
public static final String STORAGE_BUCKET_PARTITION_KEY = "ts";
34+
public static final String STORAGE_BUCKET_PATH_PREFIX = "gs://";
3335
}

src/main/java/io/cdap/plugin/gcp/dataplex/common/util/DataplexUtil.java

Lines changed: 257 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,31 @@
1717
package io.cdap.plugin.gcp.dataplex.common.util;
1818

1919
import com.google.api.gax.core.FixedCredentialsProvider;
20+
import com.google.api.gax.paging.Page;
2021
import com.google.auth.oauth2.GoogleCredentials;
21-
import com.google.auth.oauth2.ServiceAccountCredentials;
2222
import com.google.cloud.dataplex.v1.DataplexServiceClient;
2323
import com.google.cloud.dataplex.v1.DataplexServiceSettings;
24+
import com.google.cloud.dataplex.v1.Entity;
2425
import com.google.cloud.dataplex.v1.Job;
2526
import com.google.cloud.dataplex.v1.JobName;
2627
import com.google.cloud.dataplex.v1.MetadataServiceClient;
2728
import com.google.cloud.dataplex.v1.MetadataServiceSettings;
29+
import com.google.cloud.dataplex.v1.Partition;
30+
import com.google.cloud.dataplex.v1.Schema.Mode;
31+
import com.google.cloud.dataplex.v1.Schema.PartitionField;
32+
import com.google.cloud.dataplex.v1.Schema.PartitionStyle;
33+
import com.google.cloud.dataplex.v1.Schema.SchemaField;
34+
import com.google.cloud.dataplex.v1.Schema.Type;
2835
import com.google.cloud.dataplex.v1.TaskName;
36+
import com.google.cloud.storage.Blob;
37+
import com.google.cloud.storage.Storage;
2938
import io.cdap.cdap.api.data.schema.Schema;
39+
import io.cdap.cdap.api.data.schema.Schema.LogicalType;
3040
import io.cdap.cdap.etl.api.FailureCollector;
3141
import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize;
3242
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
3343
import io.cdap.plugin.gcp.common.GCPConnectorConfig;
3444
import io.cdap.plugin.gcp.common.GCPUtils;
35-
3645
import org.apache.hadoop.conf.Configuration;
3746
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
3847
import org.awaitility.Awaitility;
@@ -43,9 +52,9 @@
4352
import java.io.IOException;
4453
import java.util.ArrayList;
4554
import java.util.List;
55+
import java.util.Objects;
4656
import java.util.concurrent.TimeUnit;
4757
import java.util.stream.Collectors;
48-
4958
import javax.annotation.Nullable;
5059

5160
/**
@@ -230,8 +239,8 @@ private static Schema convertFieldBasedOnStandardType(com.google.cloud.dataplex.
230239
String error =
231240
String.format("Entity column '%s' is of unsupported type '%s'.", fieldName, standardType.name());
232241
String action = String.format("Supported column types are: %s.",
233-
BigQueryUtil.BQ_TYPE_MAP.keySet().stream().map(t -> t.getStandardType().name())
234-
.collect(Collectors.joining(", ")));
242+
BigQueryUtil.BQ_TYPE_MAP.keySet().stream().map(t -> t.getStandardType().name())
243+
.collect(Collectors.joining(", ")));
235244
if (collector != null) {
236245
collector.addFailure(error, action);
237246
} else {
@@ -292,7 +301,7 @@ public static void getJobCompletion(Configuration conf) throws IOException {
292301
try (DataplexServiceClient dataplexServiceClient = DataplexUtil.getDataplexServiceClient(googleCredentials)) {
293302
DataplexServiceClient.ListJobsPagedResponse
294303
jobList = dataplexServiceClient.listJobs(TaskName.newBuilder().setProject(projectID).setLake(lake).
295-
setLocation(location).setTask(taskId).build());
304+
setLocation(location).setTask(taskId).build());
296305
Job dataplexJob = jobList.iterateAll().iterator().next();
297306
try {
298307
Awaitility.await()
@@ -302,7 +311,7 @@ public static void getJobCompletion(Configuration conf) throws IOException {
302311
.until(() -> {
303312
Job currentJob =
304313
dataplexServiceClient.getJob(JobName.newBuilder().setProject(projectID).setLocation(location)
305-
.setLake(lake).setTask(taskId).setJob(dataplexJob.getUid()).build());
314+
.setLake(lake).setTask(taskId).setJob(dataplexJob.getUid()).build());
306315
LOG.debug("State of the Job is still " + currentJob.getState());
307316
return currentJob.getState() != null && !Job.State.RUNNING.equals(currentJob.getState()) &&
308317
!Job.State.STATE_UNSPECIFIED.equals(currentJob.getState());
@@ -311,7 +320,8 @@ public static void getJobCompletion(Configuration conf) throws IOException {
311320
throw new IOException("Job timed out.", e);
312321
}
313322
Job completedJob = dataplexServiceClient.getJob(JobName.newBuilder().setProject(projectID).setLocation(location)
314-
.setLake(lake).setTask(taskId).setJob(dataplexJob.getUid()).build());
323+
.setLake(lake).setTask(taskId).setJob(dataplexJob.getUid())
324+
.build());
315325
if (!Job.State.SUCCEEDED.equals(completedJob.getState())) {
316326
throw new IOException("Job failed with message: " + completedJob.getMessage());
317327
}
@@ -392,4 +402,243 @@ public static MetadataServiceClient getMetadataServiceClient(GoogleCredentials c
392402
return metadataServiceClient;
393403
}
394404

405+
/**
406+
* Get storage format for entity data.
407+
*
408+
* @param format
409+
*/
410+
public static String getStorageFormatForEntity(String format) {
411+
switch (format) {
412+
case "avro":
413+
return "application/x-avro";
414+
case "csv":
415+
return "text/csv";
416+
case "json":
417+
return "application/json";
418+
case "orc":
419+
return "application/x-orc";
420+
case "parquet":
421+
return "application/x-parquet";
422+
default:
423+
return "undefined";
424+
}
425+
426+
}
427+
428+
/**
429+
* Return Dataplex Schema from CDAP Schema.
430+
*
431+
* @param schema input schema from cdap
432+
* @return MetadataServiceClient
433+
*/
434+
public static com.google.cloud.dataplex.v1.Schema getDataplexSchema(Schema schema) throws IOException {
435+
com.google.cloud.dataplex.v1.Schema.Builder dataplexSchemaBuilder =
436+
com.google.cloud.dataplex.v1.Schema.newBuilder();
437+
dataplexSchemaBuilder.setUserManaged(true);
438+
if (schema == null) {
439+
return dataplexSchemaBuilder.build();
440+
}
441+
// Since "ts" is used by dataplex sink to create time partitioned layout on GCS, we should remove any
442+
// existing columns named "ts" to avoid conflict.
443+
dataplexSchemaBuilder.addAllFields(Objects.requireNonNull(schema.getFields()).stream()
444+
.filter(avroField -> !avroField.getName().equals(DataplexConstants.STORAGE_BUCKET_PARTITION_KEY))
445+
.map(DataplexUtil::toDataplexSchemaField).collect(Collectors.toList()));
446+
// Add partitioning scheme to the schema
447+
Schema.Field partitionField = Schema.Field.of(DataplexConstants.STORAGE_BUCKET_PARTITION_KEY, schema);
448+
dataplexSchemaBuilder.setPartitionStyle(PartitionStyle.HIVE_COMPATIBLE);
449+
dataplexSchemaBuilder.addPartitionFields(toDataplexPartitionField(partitionField));
450+
return dataplexSchemaBuilder.build();
451+
}
452+
453+
private static SchemaField toDataplexSchemaField(
454+
Schema.Field avroField) {
455+
SchemaField.Builder fieldBuilder = SchemaField.newBuilder();
456+
fieldBuilder.setName(avroField.getName());
457+
fieldBuilder.setType(dataplexFieldType(avroField));
458+
fieldBuilder.setMode(dataplexFieldMode(avroField));
459+
if (avroField.getSchema().getType() == Schema.Type.RECORD) {
460+
// Handle nested records. Filtering "ts" column for the same reason in getDataplexSchema
461+
fieldBuilder.addAllFields(avroField.getSchema().getFields().stream()
462+
.filter(schemaField -> !schemaField.getName().equals(DataplexConstants.STORAGE_BUCKET_PARTITION_KEY))
463+
.map(DataplexUtil::toDataplexSchemaField).collect(Collectors.toList()));
464+
}
465+
return fieldBuilder.build();
466+
}
467+
468+
private static PartitionField toDataplexPartitionField(
469+
Schema.Field avroField) {
470+
PartitionField.Builder partitionFieldBuilder = PartitionField.newBuilder();
471+
partitionFieldBuilder.setName(avroField.getName());
472+
// Dataplex supports only partition field type of String for files on GCS.
473+
partitionFieldBuilder.setType(Type.STRING);
474+
return partitionFieldBuilder.build();
475+
}
476+
477+
private static Mode dataplexFieldMode(Schema.Field field) {
478+
/*
479+
Field modes supported by Dataplex:
480+
481+
MODE_UNSPECIFIED: Mode unspecified.
482+
REQUIRED: The field has required semantics.
483+
NULLABLE: The field has optional semantics, and may be null.
484+
REPEATED: The field has repeated (0 or more) semantics, and is a list of values.
485+
*/
486+
487+
Schema.Type type = field.getSchema().getType();
488+
if (type == Schema.Type.ARRAY) {
489+
return Mode.REPEATED;
490+
} else if (type == Schema.Type.UNION) {
491+
for (Schema innerSchema : field.getSchema().getUnionSchemas()) {
492+
if (innerSchema.getType() == Schema.Type.NULL) {
493+
return Mode.NULLABLE;
494+
}
495+
}
496+
}
497+
return Mode.REQUIRED;
498+
}
499+
500+
private static Type dataplexFieldType(Schema.Field field) {
501+
/*
502+
Field types supported by Dataplex:
503+
504+
TYPE_UNSPECIFIED SchemaType unspecified.
505+
BOOLEAN: Boolean field.
506+
BYTE: Single byte numeric field.
507+
INT16: 16-bit numeric field.
508+
INT32: 32-bit numeric field.
509+
INT64: 64-bit numeric field.
510+
FLOAT: Floating point numeric field.
511+
DOUBLE: Double precision numeric field.
512+
DECIMAL: Real value numeric field.
513+
STRING: Sequence of characters field.
514+
BINARY: Sequence of bytes field.
515+
TIMESTAMP: Date and time field.
516+
DATE: Date field.
517+
TIME: Time field.
518+
RECORD: Structured field. Nested fields that define the structure of the map.
519+
If all nested fields are nullable, this field represents a union.
520+
NULL: Null field that does not have values.
521+
*/
522+
523+
Schema schema = field.getSchema();
524+
525+
if (schema.getType() == Schema.Type.UNION) {
526+
// Special case for UNION: a union of ["null", "non-null type"] means this is a
527+
// nullable field. In Dataplex this will be a field with Mode = NULLABLE and Type = <non-null
528+
// type>. So here we have to return the type of the other, non-NULL, element.
529+
// A union of 3+ elements is not supported (can't be represented as a Dataplex type).
530+
if (schema.isNullable()) {
531+
return dataplexPrimitiveFieldType(schema.getNonNullable());
532+
}
533+
return Type.TYPE_UNSPECIFIED;
534+
}
535+
536+
if (schema.getType() == Schema.Type.ARRAY) {
537+
// Special case for ARRAY: check the type of the underlying elements.
538+
// In Dataplex this will be a field with Mode = REPEATED and Type = <array element type>.
539+
return dataplexPrimitiveFieldType(schema.getComponentSchema());
540+
}
541+
542+
return dataplexPrimitiveFieldType(schema);
543+
}
544+
545+
private static Type dataplexPrimitiveFieldType(Schema schema) {
546+
if (schema.getLogicalType() != null) {
547+
Type type = dataplexLogicalFieldType(schema);
548+
if (type != null) {
549+
return type;
550+
}
551+
}
552+
553+
Schema.Type avroType = schema.getType();
554+
switch (avroType) {
555+
case RECORD:
556+
return Type.RECORD;
557+
case STRING:
558+
return Type.STRING;
559+
case FLOAT:
560+
return Type.FLOAT;
561+
case DOUBLE:
562+
return Type.DOUBLE;
563+
case BOOLEAN:
564+
return Type.BOOLEAN;
565+
case NULL:
566+
return Type.NULL;
567+
case BYTES: // BYTES is binary data with variable size.
568+
return Type.BINARY;
569+
case INT:
570+
return Type.INT32;
571+
case LONG:
572+
return Type.INT64;
573+
case UNION: // Shouldn't happen. Unions can not contain other unions as per Avro spec.
574+
case ARRAY: // Not supported as a primitive type (e.g. if this is an ARRAY of ARRAYs).
575+
case MAP:
576+
case ENUM:
577+
default:
578+
return Type.TYPE_UNSPECIFIED;
579+
}
580+
}
581+
582+
private static Type dataplexLogicalFieldType(Schema schema) {
583+
LogicalType logicalType = schema.getLogicalType();
584+
585+
if (logicalType == LogicalType.DECIMAL) {
586+
return Type.DECIMAL;
587+
} else if (logicalType == LogicalType.DATE) {
588+
return Type.DATE;
589+
} else if (logicalType == LogicalType.TIME_MICROS
590+
|| logicalType == LogicalType.TIME_MILLIS) {
591+
return Type.TIME;
592+
} else if (logicalType == LogicalType.TIMESTAMP_MICROS
593+
|| logicalType == LogicalType.TIMESTAMP_MILLIS) {
594+
return Type.TIMESTAMP;
595+
}
596+
597+
return null;
598+
}
599+
600+
/**
601+
* Add partition info to dataplex entity.
602+
*
603+
* @param entity dataplex entity
604+
* @param credentials Google Credentials
605+
* @param bucketName
606+
* @param tableName
607+
* @param project
608+
* @throws IOException
609+
*/
610+
public static void addPartitionInfo(Entity entity,
611+
GoogleCredentials credentials,
612+
String bucketName, String tableName, String project) throws IOException {
613+
Storage storage = GCPUtils.getStorage(project, credentials);
614+
String delimiter = "/";
615+
String partitionPrefix = DataplexConstants.STORAGE_BUCKET_PARTITION_KEY + "=";
616+
Page<Blob> blobs =
617+
storage.list(
618+
bucketName,
619+
Storage.BlobListOption.prefix(tableName + delimiter + partitionPrefix),
620+
Storage.BlobListOption.currentDirectory());
621+
String lastPartition = null;
622+
// blob name example: bq_source_2/ts=2022-08-22-21-52/
623+
// Get the last blob's name in the iterator as it is the one that corresponds to the entity that was just
624+
// created/updated.
625+
for (Blob blob : blobs.iterateAll()) {
626+
lastPartition = blob.getName();
627+
}
628+
// Remove the delimiter from the end of the blob name for creating location string
629+
String location = DataplexConstants.STORAGE_BUCKET_PATH_PREFIX + bucketName +
630+
delimiter + lastPartition.substring(0, lastPartition.length() - 1);
631+
String[] lastPartitionParts = lastPartition.split(delimiter);
632+
Partition.Builder dataplexPartitionBuilder =
633+
Partition.newBuilder();
634+
try (MetadataServiceClient metadataServiceClient =
635+
getMetadataServiceClient(credentials)) {
636+
Partition partition = dataplexPartitionBuilder
637+
.setLocation(location)
638+
//extract the date value from blob name (e.g., 2022-08-22-21-52) to pass as partition value
639+
.addValues(lastPartitionParts[lastPartitionParts.length - 1].substring(partitionPrefix.length()))
640+
.build();
641+
metadataServiceClient.createPartition(entity.getName(), partition);
642+
}
643+
}
395644
}

0 commit comments

Comments
 (0)