diff --git a/common-test/src/main/java/feast/common/it/DataGenerator.java b/common-test/src/main/java/feast/common/it/DataGenerator.java index 8a0dbb0..92de8bb 100644 --- a/common-test/src/main/java/feast/common/it/DataGenerator.java +++ b/common-test/src/main/java/feast/common/it/DataGenerator.java @@ -198,6 +198,34 @@ public static FeatureTableSpec createFeatureTableSpec( .build(); } + // Create a FeatureTableSpec with project + public static FeatureTableSpec createFeatureTableSpec( + String name, + List entities, + ImmutableMap features, + int maxAgeSecs, + Map labels, + String project) { + + return FeatureTableSpec.newBuilder() + .setName(name) + .addAllEntities(entities) + .addAllFeatures( + features.entrySet().stream() + .map( + entry -> + FeatureSpecV2.newBuilder() + .setName(entry.getKey()) + .setValueType(entry.getValue()) + .putAllLabels(labels) + .build()) + .collect(Collectors.toList())) + .setMaxAge(Duration.newBuilder().setSeconds(maxAgeSecs).build()) + .putAllLabels(labels) + .setProject(project) + .build(); + } + public static DataSource createFileDataSourceSpec( String fileURL, String timestampColumn, String datePartitionColumn) { return DataSource.newBuilder() diff --git a/serving/pom.xml b/serving/pom.xml index dfcc6e5..06d8456 100644 --- a/serving/pom.xml +++ b/serving/pom.xml @@ -109,6 +109,12 @@ test + + com.google.cloud + google-cloud-storage + 1.111.1 + + diff --git a/serving/src/main/java/feast/serving/config/FeastProperties.java b/serving/src/main/java/feast/serving/config/FeastProperties.java index 1b62d84..33b1d1f 100644 --- a/serving/src/main/java/feast/serving/config/FeastProperties.java +++ b/serving/src/main/java/feast/serving/config/FeastProperties.java @@ -38,7 +38,6 @@ import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; import javax.validation.constraints.Positive; -import org.apache.logging.log4j.core.config.plugins.validation.constraints.ValidHost; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.info.BuildProperties; @@ -66,11 +65,11 @@ public FeastProperties() {} /* Feast Serving build version */ @NotBlank private String version = "unknown"; - /* Feast Core host to connect to. */ - @ValidHost @NotBlank private String coreHost; + /* Bucket name for Feast object registry. */ + @NotBlank private String bucketName; - /* Feast Core port to connect to. */ - @Positive private int coreGrpcPort; + /* Object name for Feast object registry. */ + @NotBlank private String objectName; private CoreAuthenticationProperties coreAuthentication; @@ -181,39 +180,39 @@ public void setVersion(String version) { } /** - * Gets Feast Core host. + * Gets Feast object registry bucket name. * - * @return Feast Core host + * @return Feast object registry bucket name */ - public String getCoreHost() { - return coreHost; + public String getBucketName() { + return bucketName; } /** - * Sets Feast Core host to connect to. + * Sets Feast object registry bucket name. * - * @param coreHost Feast Core host + * @param bucketName Feast object registry bucket name */ - public void setCoreHost(String coreHost) { - this.coreHost = coreHost; + public void setBucketName(String bucketName) { + this.bucketName = bucketName; } /** - * Gets Feast Core gRPC port. + * Gets Feast object registry object name. * - * @return Feast Core gRPC port + * @return Feast object registry object name */ - public int getCoreGrpcPort() { - return coreGrpcPort; + public String getObjectName() { + return objectName; } /** - * Sets Feast Core gRPC port. + * Sets Feast object registry object name. * - * @param coreGrpcPort gRPC port of Feast Core + * @param objectName object registry object name */ - public void setCoreGrpcPort(int coreGrpcPort) { - this.coreGrpcPort = coreGrpcPort; + public void setObjectName(String objectName) { + this.objectName = objectName; } /** diff --git a/serving/src/main/java/feast/serving/config/SpecServiceConfig.java b/serving/src/main/java/feast/serving/config/SpecServiceConfig.java index 369d543..6f22499 100644 --- a/serving/src/main/java/feast/serving/config/SpecServiceConfig.java +++ b/serving/src/main/java/feast/serving/config/SpecServiceConfig.java @@ -19,13 +19,11 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.protobuf.InvalidProtocolBufferException; import feast.serving.specs.CachedSpecService; -import feast.serving.specs.CoreSpecService; -import io.grpc.CallCredentials; +import feast.serving.specs.RegistrySpecService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; -import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -34,14 +32,14 @@ public class SpecServiceConfig { private static final Logger log = org.slf4j.LoggerFactory.getLogger(SpecServiceConfig.class); - private String feastCoreHost; - private int feastCorePort; + private String bucketName; + private String objectName; private int feastCachedSpecServiceRefreshInterval; @Autowired public SpecServiceConfig(FeastProperties feastProperties) { - this.feastCoreHost = feastProperties.getCoreHost(); - this.feastCorePort = feastProperties.getCoreGrpcPort(); + this.bucketName = feastProperties.getBucketName(); + this.objectName = feastProperties.getObjectName(); this.feastCachedSpecServiceRefreshInterval = feastProperties.getCoreCacheRefreshInterval(); } @@ -60,11 +58,10 @@ public ScheduledExecutorService cachedSpecServiceScheduledExecutorService( } @Bean - public CachedSpecService specService(ObjectProvider callCredentials) + public CachedSpecService specService() throws InvalidProtocolBufferException, JsonProcessingException { - CoreSpecService coreService = - new CoreSpecService(feastCoreHost, feastCorePort, callCredentials); - CachedSpecService cachedSpecStorage = new CachedSpecService(coreService); + RegistrySpecService registryService = new RegistrySpecService(bucketName, objectName); + CachedSpecService cachedSpecStorage = new CachedSpecService(registryService); try { cachedSpecStorage.populateCache(); } catch (Exception e) { diff --git a/serving/src/main/java/feast/serving/specs/CachedSpecService.java b/serving/src/main/java/feast/serving/specs/CachedSpecService.java index 440b224..63bcb42 100644 --- a/serving/src/main/java/feast/serving/specs/CachedSpecService.java +++ b/serving/src/main/java/feast/serving/specs/CachedSpecService.java @@ -19,16 +19,14 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import feast.proto.core.CoreServiceProto; +import feast.proto.core.CoreServiceProto.GetFeatureTableRequest; import feast.proto.core.CoreServiceProto.ListFeatureTablesRequest; import feast.proto.core.CoreServiceProto.ListFeatureTablesResponse; -import feast.proto.core.CoreServiceProto.ListProjectsRequest; import feast.proto.core.FeatureProto; import feast.proto.core.FeatureTableProto.FeatureTable; import feast.proto.core.FeatureTableProto.FeatureTableSpec; import feast.proto.serving.ServingAPIProto; import feast.serving.exception.SpecRetrievalException; -import io.grpc.StatusRuntimeException; import io.prometheus.client.Gauge; import java.util.HashMap; import java.util.List; @@ -37,14 +35,14 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import org.slf4j.Logger; -/** In-memory cache of specs hosted in Feast Core. */ +/** In-memory cache of specs from the object store registry. */ public class CachedSpecService { private static final int MAX_SPEC_COUNT = 1000; private static final Logger log = org.slf4j.LoggerFactory.getLogger(CachedSpecService.class); private static final String DEFAULT_PROJECT_NAME = "default"; - private final CoreSpecService coreService; + private final RegistrySpecService registryService; private static Gauge cacheLastUpdated = Gauge.build() @@ -66,8 +64,8 @@ public class CachedSpecService { ImmutablePair, FeatureProto.FeatureSpecV2> featureCache; - public CachedSpecService(CoreSpecService coreService) { - this.coreService = coreService; + public CachedSpecService(RegistrySpecService registryService) { + this.registryService = registryService; CacheLoader, FeatureTableSpec> featureTableCacheLoader = CacheLoader.from(k -> retrieveSingleFeatureTable(k.getLeft(), k.getRight())); @@ -83,7 +81,7 @@ public CachedSpecService(CoreSpecService coreService) { /** * Reload the store configuration from the given config path, then retrieve the necessary specs - * from core to preload the cache. + * from the object store registry to preload the cache. */ public void populateCache() { ImmutablePair< @@ -127,37 +125,23 @@ public void scheduledPopulateCache() { HashMap, FeatureProto.FeatureSpecV2> features = new HashMap<>(); - List projects = - coreService.listProjects(ListProjectsRequest.newBuilder().build()).getProjectsList(); - - for (String project : projects) { - try { - ListFeatureTablesResponse featureTablesResponse = - coreService.listFeatureTables( - ListFeatureTablesRequest.newBuilder() - .setFilter(ListFeatureTablesRequest.Filter.newBuilder().setProject(project)) - .build()); - Map featureRefSpecMap = - new HashMap<>(); - for (FeatureTable featureTable : featureTablesResponse.getTablesList()) { - FeatureTableSpec spec = featureTable.getSpec(); - featureTables.put(ImmutablePair.of(project, spec.getName()), spec); - - String featureTableName = spec.getName(); - List featureSpecs = spec.getFeaturesList(); - for (FeatureProto.FeatureSpecV2 featureSpec : featureSpecs) { - ServingAPIProto.FeatureReferenceV2 featureReference = - ServingAPIProto.FeatureReferenceV2.newBuilder() - .setFeatureTable(featureTableName) - .setName(featureSpec.getName()) - .build(); - features.put(ImmutablePair.of(project, featureReference), featureSpec); - } - } - - } catch (StatusRuntimeException e) { - throw new RuntimeException( - String.format("Unable to retrieve specs matching project %s", project), e); + ListFeatureTablesResponse featureTablesResponse = + registryService.listFeatureTables(ListFeatureTablesRequest.newBuilder().build()); + Map featureRefSpecMap = + new HashMap<>(); + for (FeatureTable featureTable : featureTablesResponse.getTablesList()) { + FeatureTableSpec spec = featureTable.getSpec(); + featureTables.put(ImmutablePair.of(spec.getProject(), spec.getName()), spec); + + String featureTableName = spec.getName(); + List featureSpecs = spec.getFeaturesList(); + for (FeatureProto.FeatureSpecV2 featureSpec : featureSpecs) { + ServingAPIProto.FeatureReferenceV2 featureReference = + ServingAPIProto.FeatureReferenceV2.newBuilder() + .setFeatureTable(featureTableName) + .setName(featureSpec.getName()) + .build(); + features.put(ImmutablePair.of(spec.getProject(), featureReference), featureSpec); } } return ImmutablePair.of(featureTables, features); @@ -165,9 +149,9 @@ public void scheduledPopulateCache() { private FeatureTableSpec retrieveSingleFeatureTable(String projectName, String tableName) { FeatureTable table = - coreService + registryService .getFeatureTable( - CoreServiceProto.GetFeatureTableRequest.newBuilder() + GetFeatureTableRequest.newBuilder() .setProject(projectName) .setName(tableName) .build()) @@ -178,7 +162,7 @@ private FeatureTableSpec retrieveSingleFeatureTable(String projectName, String t private FeatureProto.FeatureSpecV2 retrieveSingleFeature( String projectName, ServingAPIProto.FeatureReferenceV2 featureReference) { FeatureTableSpec featureTableSpec = - getFeatureTableSpec(projectName, featureReference); // don't stress core too much + getFeatureTableSpec(projectName, featureReference); // don't stress registry too much if (featureTableSpec == null) { return null; } diff --git a/serving/src/main/java/feast/serving/specs/CoreSpecService.java b/serving/src/main/java/feast/serving/specs/CoreSpecService.java deleted file mode 100644 index eee50d8..0000000 --- a/serving/src/main/java/feast/serving/specs/CoreSpecService.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2019 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.serving.specs; - -import feast.proto.core.CoreServiceGrpc; -import feast.proto.core.CoreServiceProto; -import feast.proto.core.CoreServiceProto.ListFeatureTablesRequest; -import feast.proto.core.CoreServiceProto.ListFeatureTablesResponse; -import feast.proto.core.CoreServiceProto.ListProjectsRequest; -import feast.proto.core.CoreServiceProto.ListProjectsResponse; -import feast.proto.core.CoreServiceProto.UpdateStoreRequest; -import feast.proto.core.CoreServiceProto.UpdateStoreResponse; -import io.grpc.CallCredentials; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import org.slf4j.Logger; -import org.springframework.beans.factory.ObjectProvider; - -/** Client for interfacing with specs in Feast Core. */ -public class CoreSpecService { - - private static final Logger log = org.slf4j.LoggerFactory.getLogger(CoreSpecService.class); - private final CoreServiceGrpc.CoreServiceBlockingStub blockingStub; - - public CoreSpecService( - String feastCoreHost, int feastCorePort, ObjectProvider callCredentials) { - ManagedChannel channel = - ManagedChannelBuilder.forAddress(feastCoreHost, feastCorePort).usePlaintext().build(); - CallCredentials creds = callCredentials.getIfAvailable(); - if (creds != null) { - blockingStub = CoreServiceGrpc.newBlockingStub(channel).withCallCredentials(creds); - } else { - blockingStub = CoreServiceGrpc.newBlockingStub(channel); - } - } - - public UpdateStoreResponse updateStore(UpdateStoreRequest updateStoreRequest) { - return blockingStub.updateStore(updateStoreRequest); - } - - public ListProjectsResponse listProjects(ListProjectsRequest listProjectsRequest) { - return blockingStub.listProjects(listProjectsRequest); - } - - public ListFeatureTablesResponse listFeatureTables( - ListFeatureTablesRequest listFeatureTablesRequest) { - return blockingStub.listFeatureTables(listFeatureTablesRequest); - } - - public CoreServiceProto.GetFeatureTableResponse getFeatureTable( - CoreServiceProto.GetFeatureTableRequest getFeatureTableRequest) { - return blockingStub.getFeatureTable(getFeatureTableRequest); - } -} diff --git a/serving/src/main/java/feast/serving/specs/RegistrySpecService.java b/serving/src/main/java/feast/serving/specs/RegistrySpecService.java new file mode 100644 index 0000000..a4f364d --- /dev/null +++ b/serving/src/main/java/feast/serving/specs/RegistrySpecService.java @@ -0,0 +1,90 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2019 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.specs; + +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import feast.proto.core.CoreServiceProto.GetFeatureTableRequest; +import feast.proto.core.CoreServiceProto.GetFeatureTableResponse; +import feast.proto.core.CoreServiceProto.ListFeatureTablesRequest; +import feast.proto.core.CoreServiceProto.ListFeatureTablesResponse; +import feast.proto.core.FeatureTableProto.FeatureTable; +import feast.proto.core.RegistryProto.Registry; +import feast.serving.exception.SpecRetrievalException; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.slf4j.Logger; + +/** Client for interfacing with specs in Feast Core. */ +public class RegistrySpecService { + + private static final Logger log = org.slf4j.LoggerFactory.getLogger(RegistrySpecService.class); + private Blob blob; + private Storage storage; + private String bucketName; + private String objectName; + + public RegistrySpecService(String bucket, String object) { + bucketName = bucket; + objectName = object; + storage = StorageOptions.getDefaultInstance().getService(); + } + + public ListFeatureTablesResponse listFeatureTables( + ListFeatureTablesRequest listFeatureTablesRequest) { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + blob = storage.get(BlobId.of(bucketName, objectName)); + blob.downloadTo(outputStream); + ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + Registry registry = null; + try { + registry = Registry.parseFrom(inputStream); + } catch (IOException e) { + throw new RuntimeException("Unable to retrieve registry", e); + } + ListFeatureTablesResponse.Builder response = ListFeatureTablesResponse.newBuilder(); + response.addAllTables(registry.getFeatureTablesList()); + return response.build(); + } + + public GetFeatureTableResponse getFeatureTable(GetFeatureTableRequest getFeatureTableRequest) { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + blob = storage.get(BlobId.of(bucketName, objectName)); + blob.downloadTo(outputStream); + ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + Registry registry = null; + try { + registry = Registry.parseFrom(inputStream); + } catch (IOException e) { + throw new RuntimeException("Unable to retrieve registry", e); + } + GetFeatureTableResponse.Builder response = GetFeatureTableResponse.newBuilder(); + for (FeatureTable table : registry.getFeatureTablesList()) { + if (table.getSpec().getProject().equals(getFeatureTableRequest.getProject()) + && table.getSpec().getName().equals(getFeatureTableRequest.getName())) { + return response.setTable(table).build(); + } + } + throw new SpecRetrievalException( + String.format( + "Unable to find FeatureTable %s/%s", + getFeatureTableRequest.getProject(), getFeatureTableRequest.getName())); + } +} diff --git a/serving/src/main/resources/application.yml b/serving/src/main/resources/application.yml index f8187e9..7dd3d2f 100644 --- a/serving/src/main/resources/application.yml +++ b/serving/src/main/resources/application.yml @@ -1,8 +1,8 @@ feast: - # GRPC service address for Feast Core - # Feast Serving requires connection to Feast Core to retrieve and reload Feast metadata (e.g. FeatureSpecs, Store information) - core-host: ${FEAST_CORE_HOST:localhost} - core-grpc-port: ${FEAST_CORE_GRPC_PORT:6565} + # Bucket and object name for Feast object store registry + # Feast Serving requires connection to object store registry to retrieve and reload Feast metadata (e.g. FeatureSpecs, Store information) + bucket-name: ${FEAST_BUCKET_NAME} + object-name: ${FEAST_OBJECT_NAME} core-authentication: enabled: false # should be set to true if authentication is enabled on core. diff --git a/serving/src/test/java/feast/serving/it/AuthTestUtils.java b/serving/src/test/java/feast/serving/it/AuthTestUtils.java index a4c3db7..4bcc423 100644 --- a/serving/src/test/java/feast/serving/it/AuthTestUtils.java +++ b/serving/src/test/java/feast/serving/it/AuthTestUtils.java @@ -20,7 +20,6 @@ import com.google.gson.JsonObject; import com.google.protobuf.Timestamp; import feast.common.auth.credentials.OAuthCredentials; -import feast.proto.core.CoreServiceGrpc; import feast.proto.serving.ServingAPIProto; import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesRequestV2; import feast.proto.serving.ServingServiceGrpc; @@ -69,17 +68,9 @@ public static GetOnlineFeaturesRequestV2 createOnlineFeatureRequest( .build(); } - public static CoreSimpleAPIClient getSecureApiClientForCore( - int feastCorePort, Map options) { - CallCredentials callCredentials = null; - callCredentials = new OAuthCredentials(options); - Channel secureChannel = - ManagedChannelBuilder.forAddress("localhost", feastCorePort).usePlaintext().build(); - - CoreServiceGrpc.CoreServiceBlockingStub secureCoreService = - CoreServiceGrpc.newBlockingStub(secureChannel).withCallCredentials(callCredentials); - - return new CoreSimpleAPIClient(secureCoreService); + public static RegistrySimpleAPIClient getSecureApiClientForRegistry( + String bucketName, String objectName) { + return new RegistrySimpleAPIClient(bucketName, objectName); } public static ServingServiceGrpc.ServingServiceBlockingStub getServingServiceStub( diff --git a/serving/src/test/java/feast/serving/it/BaseAuthIT.java b/serving/src/test/java/feast/serving/it/BaseAuthIT.java index d49ac41..f93815a 100644 --- a/serving/src/test/java/feast/serving/it/BaseAuthIT.java +++ b/serving/src/test/java/feast/serving/it/BaseAuthIT.java @@ -46,7 +46,7 @@ public class BaseAuthIT { static final String HYDRA = "hydra_1"; static final int HYDRA_PORT = 4445; - static CoreSimpleAPIClient insecureApiClient; + static RegistrySimpleAPIClient insecureApiClient; static final String REDIS = "redis_1"; static final int REDIS_PORT = 6379; diff --git a/serving/src/test/java/feast/serving/it/CoreSimpleAPIClient.java b/serving/src/test/java/feast/serving/it/CoreSimpleAPIClient.java deleted file mode 100644 index f7bc12f..0000000 --- a/serving/src/test/java/feast/serving/it/CoreSimpleAPIClient.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * Copyright 2018-2020 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package feast.serving.it; - -import feast.proto.core.CoreServiceGrpc; -import feast.proto.core.CoreServiceProto; -import feast.proto.core.EntityProto; -import feast.proto.core.FeatureTableProto; - -public class CoreSimpleAPIClient { - private CoreServiceGrpc.CoreServiceBlockingStub stub; - - public CoreSimpleAPIClient(CoreServiceGrpc.CoreServiceBlockingStub stub) { - this.stub = stub; - } - - public void simpleApplyEntity(String projectName, EntityProto.EntitySpecV2 entitySpec) { - stub.applyEntity( - CoreServiceProto.ApplyEntityRequest.newBuilder() - .setProject(projectName) - .setSpec(entitySpec) - .build()); - } - - public EntityProto.Entity getEntity(String projectName, String name) { - return stub.getEntity( - CoreServiceProto.GetEntityRequest.newBuilder() - .setProject(projectName) - .setName(name) - .build()) - .getEntity(); - } - - public void simpleApplyFeatureTable( - String projectName, FeatureTableProto.FeatureTableSpec featureTable) { - stub.applyFeatureTable( - CoreServiceProto.ApplyFeatureTableRequest.newBuilder() - .setProject(projectName) - .setTableSpec(featureTable) - .build()); - } - - public FeatureTableProto.FeatureTable simpleGetFeatureTable(String projectName, String name) { - return stub.getFeatureTable( - CoreServiceProto.GetFeatureTableRequest.newBuilder() - .setName(name) - .setProject(projectName) - .build()) - .getTable(); - } -} diff --git a/serving/src/test/java/feast/serving/it/RegistrySimpleAPIClient.java b/serving/src/test/java/feast/serving/it/RegistrySimpleAPIClient.java new file mode 100644 index 0000000..a6d7cf2 --- /dev/null +++ b/serving/src/test/java/feast/serving/it/RegistrySimpleAPIClient.java @@ -0,0 +1,153 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.it; + +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import feast.proto.core.EntityProto; +import feast.proto.core.EntityProto.Entity; +import feast.proto.core.FeatureTableProto; +import feast.proto.core.FeatureTableProto.FeatureTable; +import feast.proto.core.RegistryProto.Registry; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +public class RegistrySimpleAPIClient { + private Blob blob; + private BlobInfo blobInfo; + private Storage storage; + private String bucketName; + private String objectName; + + public RegistrySimpleAPIClient(String bucket, String object) { + storage = StorageOptions.getDefaultInstance().getService(); + bucketName = bucket; + objectName = object; + blobInfo = BlobInfo.newBuilder(BlobId.of(bucketName, objectName)).build(); + } + + public void simpleApplyEntity(String projectName, EntityProto.EntitySpecV2 entitySpec) { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + blob = storage.get(bucketName, objectName); + blob.downloadTo(outputStream); + ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + Registry registry = null; + try { + registry = Registry.parseFrom(inputStream); + } catch (IOException e) { + throw new RuntimeException("Unable to retrieve registry", e); + } + entitySpec = entitySpec.toBuilder().setProject(projectName).build(); + int idx = 0; + for (Entity entity : registry.getEntitiesList()) { + if (entity.getSpec().getProject().equals(projectName) + && entity.getSpec().getName().equals(entitySpec.getName())) { + registry = + registry.toBuilder().setEntities(idx, entity.toBuilder().setSpec(entitySpec)).build(); + blob = storage.create(blobInfo, registry.toByteArray()); + return; + } + idx++; + } + registry = registry.toBuilder().addEntities(Entity.newBuilder().setSpec(entitySpec)).build(); + blob = storage.create(blobInfo, registry.toByteArray()); + return; + } + + public Entity getEntity(String projectName, String name) { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + blob = storage.get(bucketName, objectName); + blob.downloadTo(outputStream); + ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + Registry registry = null; + try { + registry = Registry.parseFrom(inputStream); + } catch (IOException e) { + throw new RuntimeException("Unable to retrieve registry", e); + } + for (Entity entity : registry.getEntitiesList()) { + if (entity.getSpec().getProject().equals(projectName) + && entity.getSpec().getName().equals(name)) { + return entity; + } else { + } + } + throw new RuntimeException( + String.format("Entity with name %s and project %s not found", name, projectName)); + } + + public void simpleApplyFeatureTable( + String projectName, FeatureTableProto.FeatureTableSpec featureTableSpec) { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + blob = storage.get(bucketName, objectName); + blob.downloadTo(outputStream); + ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + Registry registry = null; + try { + registry = Registry.parseFrom(inputStream); + } catch (IOException e) { + throw new RuntimeException("Unable to retrieve registry", e); + } + featureTableSpec = featureTableSpec.toBuilder().setProject(projectName).build(); + int idx = 0; + for (FeatureTable featureTable : registry.getFeatureTablesList()) { + if (featureTable.getSpec().getProject().equals(projectName) + && featureTable.getSpec().getName().equals(featureTableSpec.getName())) { + registry = + registry + .toBuilder() + .setFeatureTables(idx, featureTable.toBuilder().setSpec(featureTableSpec)) + .build(); + blob = storage.create(blobInfo, registry.toByteArray()); + return; + } + idx++; + } + registry = + registry + .toBuilder() + .addFeatureTables(FeatureTable.newBuilder().setSpec(featureTableSpec)) + .build(); + blob = storage.create(blobInfo, registry.toByteArray()); + return; + } + + public FeatureTable simpleGetFeatureTable(String projectName, String name) { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + blob = storage.get(bucketName, objectName); + blob.downloadTo(outputStream); + ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + Registry registry = null; + try { + registry = Registry.parseFrom(inputStream); + } catch (IOException e) { + throw new RuntimeException("Unable to retrieve registry", e); + } + for (FeatureTable featureTable : registry.getFeatureTablesList()) { + if (featureTable.getSpec().getProject().equals(projectName) + && featureTable.getSpec().getName().equals(name)) { + return featureTable; + } + } + throw new RuntimeException( + String.format("FeatureTable with name %s and project %s not found", name, projectName)); + } +} diff --git a/serving/src/test/java/feast/serving/it/ServingServiceBigTableIT.java b/serving/src/test/java/feast/serving/it/ServingServiceBigTableIT.java index 21bddb5..4072ff9 100644 --- a/serving/src/test/java/feast/serving/it/ServingServiceBigTableIT.java +++ b/serving/src/test/java/feast/serving/it/ServingServiceBigTableIT.java @@ -30,6 +30,13 @@ import com.google.cloud.bigtable.data.v2.BigtableDataClient; import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.BucketInfo.LifecycleRule; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.hash.Hashing; @@ -37,6 +44,7 @@ import feast.common.it.DataGenerator; import feast.common.models.FeatureV2; import feast.proto.core.EntityProto; +import feast.proto.core.RegistryProto.Registry; import feast.proto.serving.ServingAPIProto.FeatureReferenceV2; import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesRequestV2; import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesResponse; @@ -88,7 +96,7 @@ public class ServingServiceBigTableIT extends BaseAuthIT { static final Map options = new HashMap<>(); - static CoreSimpleAPIClient coreClient; + static RegistrySimpleAPIClient registryClient; static ServingServiceGrpc.ServingServiceBlockingStub servingStub; static BigtableDataClient client; @@ -107,6 +115,8 @@ public class ServingServiceBigTableIT extends BaseAuthIT { DataGenerator.createFeatureReference("rides", "trip_empty"); static final FeatureReferenceV2 feature4Reference = DataGenerator.createFeatureReference("rides", "trip_wrong_type"); + private static String bucketName; + private static String objectName; @ClassRule @Container public static DockerComposeContainer environment = @@ -122,11 +132,28 @@ public class ServingServiceBigTableIT extends BaseAuthIT { @DynamicPropertySource static void initialize(DynamicPropertyRegistry registry) { registry.add("grpc.server.port", () -> FEAST_SERVING_PORT); + + registry.add("feast.bucket-name", () -> bucketName); + registry.add("feast.object-name", () -> objectName); } @BeforeAll static void globalSetup() throws IOException { - coreClient = TestUtils.getApiClientForCore(FEAST_CORE_PORT); + Registry registryProto = Registry.newBuilder().build(); + Storage storage = StorageOptions.getDefaultInstance().getService(); + bucketName = String.format("feast-serving-registry-test-%d", System.currentTimeMillis()); + objectName = "registry.db"; + storage.create( + BucketInfo.newBuilder(bucketName) + .setLifecycleRules( + ImmutableList.of( + new LifecycleRule( + LifecycleRule.LifecycleAction.newDeleteAction(), + LifecycleRule.LifecycleCondition.newBuilder().setAge(14).build()))) + .build()); + BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(bucketName, objectName)).build(); + Blob blob = storage.create(blobInfo, registryProto.toByteArray()); + registryClient = TestUtils.getApiClientForRegistry(bucketName, objectName); servingStub = TestUtils.getServingServiceStub(false, FEAST_SERVING_PORT, null); // Initialize BigTable Client @@ -157,7 +184,7 @@ static void globalSetup() throws IOException { .setDescription(driverEntityDescription) .setValueType(driverEntityType) .build(); - TestUtils.applyEntity(coreClient, projectName, driverEntitySpec); + TestUtils.applyEntity(registryClient, projectName, driverEntitySpec); // Apply Entity (this_is_a_long_long_long_long_long_long_entity_id) String superLongEntityName = "this_is_a_long_long_long_long_long_long_entity_id"; @@ -169,7 +196,7 @@ static void globalSetup() throws IOException { .setDescription(superLongEntityDescription) .setValueType(superLongEntityType) .build(); - TestUtils.applyEntity(coreClient, projectName, superLongEntitySpec); + TestUtils.applyEntity(registryClient, projectName, superLongEntitySpec); // Apply Entity (merchant_id) String merchantEntityName = "merchant_id"; @@ -181,7 +208,7 @@ static void globalSetup() throws IOException { .setDescription(merchantEntityDescription) .setValueType(merchantEntityType) .build(); - TestUtils.applyEntity(coreClient, projectName, merchantEntitySpec); + TestUtils.applyEntity(registryClient, projectName, merchantEntitySpec); // Apply FeatureTable (rides) String ridesFeatureTableName = "rides"; @@ -197,7 +224,7 @@ static void globalSetup() throws IOException { "trip_wrong_type", ValueProto.ValueType.Enum.STRING); TestUtils.applyFeatureTable( - coreClient, projectName, ridesFeatureTableName, ridesEntities, ridesFeatures, 7200); + registryClient, projectName, ridesFeatureTableName, ridesEntities, ridesFeatures, 7200); // Apply FeatureTable (superLong) String superLongFeatureTableName = "superlong"; @@ -213,7 +240,7 @@ static void globalSetup() throws IOException { "trip_wrong_type", ValueProto.ValueType.Enum.STRING); TestUtils.applyFeatureTable( - coreClient, + registryClient, projectName, superLongFeatureTableName, superLongEntities, @@ -225,7 +252,7 @@ static void globalSetup() throws IOException { ImmutableList ridesMerchantEntities = ImmutableList.of(driverEntityName, merchantEntityName); TestUtils.applyFeatureTable( - coreClient, + registryClient, projectName, rideMerchantFeatureTableName, ridesMerchantEntities, @@ -696,7 +723,7 @@ public void shouldSupportAllFeastTypes() throws IOException { .setDescription("") .setValueType(ValueProto.ValueType.Enum.STRING) .build(); - TestUtils.applyEntity(coreClient, "default", entitySpec); + TestUtils.applyEntity(registryClient, "default", entitySpec); ImmutableMap allTypesFeatures = new ImmutableMap.Builder() @@ -717,7 +744,7 @@ public void shouldSupportAllFeastTypes() throws IOException { .build(); TestUtils.applyFeatureTable( - coreClient, "default", "all_types", ImmutableList.of("entity"), allTypesFeatures, 7200); + registryClient, "default", "all_types", ImmutableList.of("entity"), allTypesFeatures, 7200); Schema schema = SchemaBuilder.record("AllTypesRecord") diff --git a/serving/src/test/java/feast/serving/it/ServingServiceCassandraIT.java b/serving/src/test/java/feast/serving/it/ServingServiceCassandraIT.java index 93ee5f5..c80b916 100644 --- a/serving/src/test/java/feast/serving/it/ServingServiceCassandraIT.java +++ b/serving/src/test/java/feast/serving/it/ServingServiceCassandraIT.java @@ -20,12 +20,20 @@ import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.BucketInfo.LifecycleRule; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.hash.Hashing; import feast.common.it.DataGenerator; import feast.common.models.FeatureV2; import feast.proto.core.EntityProto; +import feast.proto.core.RegistryProto.Registry; import feast.proto.serving.ServingAPIProto.FeatureReferenceV2; import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesRequestV2; import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesResponse; @@ -74,7 +82,7 @@ public class ServingServiceCassandraIT extends BaseAuthIT { static final Map options = new HashMap<>(); - static CoreSimpleAPIClient coreClient; + static RegistrySimpleAPIClient registryClient; static ServingServiceGrpc.ServingServiceBlockingStub servingStub; static CqlSession cqlSession; @@ -88,6 +96,8 @@ public class ServingServiceCassandraIT extends BaseAuthIT { DataGenerator.createFeatureReference("rides", "trip_empty"); static final FeatureReferenceV2 feature4Reference = DataGenerator.createFeatureReference("rides", "trip_wrong_type"); + private static String bucketName; + private static String objectName; @ClassRule @Container public static DockerComposeContainer environment = @@ -103,11 +113,28 @@ public class ServingServiceCassandraIT extends BaseAuthIT { @DynamicPropertySource static void initialize(DynamicPropertyRegistry registry) { registry.add("grpc.server.port", () -> FEAST_SERVING_PORT); + + registry.add("feast.bucket-name", () -> bucketName); + registry.add("feast.object-name", () -> objectName); } @BeforeAll static void globalSetup() throws IOException { - coreClient = TestUtils.getApiClientForCore(FEAST_CORE_PORT); + Registry registryProto = Registry.newBuilder().build(); + Storage storage = StorageOptions.getDefaultInstance().getService(); + bucketName = String.format("feast-serving-registry-test-%d", System.currentTimeMillis()); + objectName = "registry.db"; + storage.create( + BucketInfo.newBuilder(bucketName) + .setLifecycleRules( + ImmutableList.of( + new LifecycleRule( + LifecycleRule.LifecycleAction.newDeleteAction(), + LifecycleRule.LifecycleCondition.newBuilder().setAge(14).build()))) + .build()); + BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(bucketName, objectName)).build(); + Blob blob = storage.create(blobInfo, registryProto.toByteArray()); + registryClient = TestUtils.getApiClientForRegistry(bucketName, objectName); servingStub = TestUtils.getServingServiceStub(false, FEAST_SERVING_PORT, null); cqlSession = @@ -131,7 +158,7 @@ static void globalSetup() throws IOException { .setDescription(driverEntityDescription) .setValueType(driverEntityType) .build(); - TestUtils.applyEntity(coreClient, projectName, driverEntitySpec); + TestUtils.applyEntity(registryClient, projectName, driverEntitySpec); // Apply Entity (merchant_id) String merchantEntityName = "merchant_id"; @@ -143,7 +170,7 @@ static void globalSetup() throws IOException { .setDescription(merchantEntityDescription) .setValueType(merchantEntityType) .build(); - TestUtils.applyEntity(coreClient, projectName, merchantEntitySpec); + TestUtils.applyEntity(registryClient, projectName, merchantEntitySpec); // Apply FeatureTable (rides) String ridesFeatureTableName = "rides"; @@ -159,7 +186,7 @@ static void globalSetup() throws IOException { "trip_wrong_type", ValueProto.ValueType.Enum.STRING); TestUtils.applyFeatureTable( - coreClient, projectName, ridesFeatureTableName, ridesEntities, ridesFeatures, 7200); + registryClient, projectName, ridesFeatureTableName, ridesEntities, ridesFeatures, 7200); // Apply FeatureTable (food) String foodFeatureTableName = "food"; @@ -171,14 +198,14 @@ static void globalSetup() throws IOException { "trip_distance", ValueProto.ValueType.Enum.DOUBLE); TestUtils.applyFeatureTable( - coreClient, projectName, foodFeatureTableName, foodEntities, foodFeatures, 7200); + registryClient, projectName, foodFeatureTableName, foodEntities, foodFeatures, 7200); // Apply FeatureTable (rides_merchant) String rideMerchantFeatureTableName = "rides_merchant"; ImmutableList ridesMerchantEntities = ImmutableList.of(driverEntityName, merchantEntityName); TestUtils.applyFeatureTable( - coreClient, + registryClient, projectName, rideMerchantFeatureTableName, ridesMerchantEntities, diff --git a/serving/src/test/java/feast/serving/it/ServingServiceIT.java b/serving/src/test/java/feast/serving/it/ServingServiceIT.java index 8e0a82e..54461e7 100644 --- a/serving/src/test/java/feast/serving/it/ServingServiceIT.java +++ b/serving/src/test/java/feast/serving/it/ServingServiceIT.java @@ -18,6 +18,13 @@ import static org.junit.jupiter.api.Assertions.*; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.BucketInfo.LifecycleRule; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.hash.Hashing; @@ -28,6 +35,7 @@ import feast.common.it.DataGenerator; import feast.common.models.FeatureV2; import feast.proto.core.EntityProto; +import feast.proto.core.RegistryProto.Registry; import feast.proto.serving.ServingAPIProto; import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesRequestV2; import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesResponse; @@ -70,12 +78,14 @@ public class ServingServiceIT extends BaseAuthIT { static final Map options = new HashMap<>(); static final String timestampPrefix = "_ts"; - static CoreSimpleAPIClient coreClient; + static RegistrySimpleAPIClient registryClient; static ServingServiceGrpc.ServingServiceBlockingStub servingStub; static RedisCommands syncCommands; static final int FEAST_SERVING_PORT = 6568; @LocalServerPort private int metricsPort; + private static String bucketName; + private static String objectName; @ClassRule @Container public static DockerComposeContainer environment = @@ -91,11 +101,28 @@ public class ServingServiceIT extends BaseAuthIT { @DynamicPropertySource static void initialize(DynamicPropertyRegistry registry) { registry.add("grpc.server.port", () -> FEAST_SERVING_PORT); + + registry.add("feast.bucket-name", () -> bucketName); + registry.add("feast.object-name", () -> objectName); } @BeforeAll static void globalSetup() { - coreClient = TestUtils.getApiClientForCore(FEAST_CORE_PORT); + Registry registryProto = Registry.newBuilder().build(); + Storage storage = StorageOptions.getDefaultInstance().getService(); + bucketName = String.format("feast-serving-registry-test-%d", System.currentTimeMillis()); + objectName = "registry.db"; + storage.create( + BucketInfo.newBuilder(bucketName) + .setLifecycleRules( + ImmutableList.of( + new LifecycleRule( + LifecycleRule.LifecycleAction.newDeleteAction(), + LifecycleRule.LifecycleCondition.newBuilder().setAge(14).build()))) + .build()); + BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(bucketName, objectName)).build(); + Blob blob = storage.create(blobInfo, registryProto.toByteArray()); + registryClient = TestUtils.getApiClientForRegistry(bucketName, objectName); servingStub = TestUtils.getServingServiceStub(false, FEAST_SERVING_PORT, null); RedisClient redisClient = @@ -119,7 +146,7 @@ static void globalSetup() { .setDescription(description) .setValueType(entityType) .build(); - TestUtils.applyEntity(coreClient, projectName, entitySpec); + TestUtils.applyEntity(registryClient, projectName, entitySpec); // Apply FeatureTable String featureTableName = "rides"; @@ -150,7 +177,7 @@ static void globalSetup() { ValueProto.ValueType.Enum.STRING); TestUtils.applyFeatureTable( - coreClient, projectName, featureTableName, entities, features, 7200); + registryClient, projectName, featureTableName, entities, features, 7200); // Serialize Redis Key with Entity i.e RedisProto.RedisKeyV2 redisKey = @@ -443,7 +470,7 @@ public void shouldReturnNotFoundForUpdatedType() { ValueProto.ValueType.Enum.STRING); TestUtils.applyFeatureTable( - coreClient, projectName, featureTableName, entities, features, 7200); + registryClient, projectName, featureTableName, entities, features, 7200); // Sleep is necessary to ensure caching (every 1s) of updated FeatureTable is done try { diff --git a/serving/src/test/java/feast/serving/it/ServingServiceOauthAuthenticationIT.java b/serving/src/test/java/feast/serving/it/ServingServiceOauthAuthenticationIT.java index 8f2440d..b05793b 100644 --- a/serving/src/test/java/feast/serving/it/ServingServiceOauthAuthenticationIT.java +++ b/serving/src/test/java/feast/serving/it/ServingServiceOauthAuthenticationIT.java @@ -20,6 +20,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.testcontainers.containers.wait.strategy.Wait.forHttp; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.BucketInfo.LifecycleRule; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.squareup.okhttp.OkHttpClient; import com.squareup.okhttp.Request; @@ -27,11 +35,11 @@ import feast.common.it.DataGenerator; import feast.proto.core.EntityProto; import feast.proto.core.FeatureTableProto; +import feast.proto.core.RegistryProto.Registry; import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesRequestV2; import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesResponse; import feast.proto.serving.ServingServiceGrpc.ServingServiceBlockingStub; import feast.proto.types.ValueProto; -import feast.proto.types.ValueProto.Value; import io.grpc.ManagedChannel; import java.io.File; import java.io.IOException; @@ -48,6 +56,8 @@ import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; import org.springframework.boot.web.server.LocalServerPort; import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; import org.testcontainers.containers.DockerComposeContainer; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.junit.jupiter.Container; @@ -65,12 +75,14 @@ @Testcontainers public class ServingServiceOauthAuthenticationIT extends BaseAuthIT { - CoreSimpleAPIClient coreClient; + RegistrySimpleAPIClient registryClient; FeatureTableProto.FeatureTableSpec expectedFeatureTableSpec; static final Map options = new HashMap<>(); static final int FEAST_SERVING_PORT = 6566; @LocalServerPort private int metricsPort; + private static String bucketName; + private static String objectName; @ClassRule @Container public static DockerComposeContainer environment = @@ -84,8 +96,28 @@ public class ServingServiceOauthAuthenticationIT extends BaseAuthIT { Wait.forLogMessage(".*gRPC Server started.*\\n", 1) .withStartupTimeout(Duration.ofMinutes(SERVICE_START_MAX_WAIT_TIME_IN_MINUTES))); + @DynamicPropertySource + static void initialize(DynamicPropertyRegistry registry) { + registry.add("feast.bucket-name", () -> bucketName); + registry.add("feast.object-name", () -> objectName); + } + @BeforeAll static void globalSetup() throws IOException, InitializationError, InterruptedException { + Registry registryProto = Registry.newBuilder().build(); + Storage storage = StorageOptions.getDefaultInstance().getService(); + bucketName = String.format("feast-serving-registry-test-%d", System.currentTimeMillis()); + objectName = "registry.db"; + storage.create( + BucketInfo.newBuilder(bucketName) + .setLifecycleRules( + ImmutableList.of( + new LifecycleRule( + LifecycleRule.LifecycleAction.newDeleteAction(), + LifecycleRule.LifecycleCondition.newBuilder().setAge(14).build()))) + .build()); + BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(bucketName, objectName)).build(); + Blob blob = storage.create(blobInfo, registryProto.toByteArray()); String hydraExternalHost = environment.getServiceHost(HYDRA, HYDRA_PORT); Integer hydraExternalPort = environment.getServicePort(HYDRA, HYDRA_PORT); String hydraExternalUrl = String.format("http://%s:%s", hydraExternalHost, hydraExternalPort); @@ -102,14 +134,14 @@ static void globalSetup() throws IOException, InitializationError, InterruptedEx @BeforeEach public void initState() { - coreClient = AuthTestUtils.getSecureApiClientForCore(FEAST_CORE_PORT, options); + registryClient = AuthTestUtils.getSecureApiClientForRegistry(bucketName, objectName); EntityProto.EntitySpecV2 entitySpec = DataGenerator.createEntitySpecV2( ENTITY_ID, "Entity 1 description", ValueProto.ValueType.Enum.STRING, ImmutableMap.of("label_key", "label_value")); - coreClient.simpleApplyEntity(PROJECT_NAME, entitySpec); + registryClient.simpleApplyEntity(PROJECT_NAME, entitySpec); expectedFeatureTableSpec = DataGenerator.createFeatureTableSpec( @@ -126,7 +158,7 @@ public void initState() { .setBatchSource( DataGenerator.createFileDataSourceSpec("file:///path/to/file", "ts_col", "")) .build(); - coreClient.simpleApplyFeatureTable(PROJECT_NAME, expectedFeatureTableSpec); + registryClient.simpleApplyFeatureTable(PROJECT_NAME, expectedFeatureTableSpec); } /** Test that Feast Serving metrics endpoint can be accessed with authentication enabled */ @@ -145,7 +177,7 @@ public void shouldAllowUnauthenticatedAccessToMetricsEndpoint() throws IOExcepti @Test public void shouldAllowUnauthenticatedGetOnlineFeatures() { FeatureTableProto.FeatureTable actualFeatureTable = - coreClient.simpleGetFeatureTable(PROJECT_NAME, FEATURE_TABLE_NAME); + registryClient.simpleGetFeatureTable(PROJECT_NAME, FEATURE_TABLE_NAME); assertEquals(expectedFeatureTableSpec.getName(), actualFeatureTable.getSpec().getName()); assertEquals( expectedFeatureTableSpec.getBatchSource(), actualFeatureTable.getSpec().getBatchSource()); @@ -159,7 +191,7 @@ public void shouldAllowUnauthenticatedGetOnlineFeatures() { servingStub.getOnlineFeaturesV2(onlineFeatureRequestV2); assertEquals(1, featureResponse.getFieldValuesCount()); - Map fieldsMap = featureResponse.getFieldValues(0).getFieldsMap(); + Map fieldsMap = featureResponse.getFieldValues(0).getFieldsMap(); assertTrue(fieldsMap.containsKey(ENTITY_ID)); assertTrue(fieldsMap.containsKey(FEATURE_TABLE_NAME + ":" + FEATURE_NAME)); ((ManagedChannel) servingStub.getChannel()).shutdown(); @@ -168,7 +200,7 @@ public void shouldAllowUnauthenticatedGetOnlineFeatures() { @Test void canGetOnlineFeaturesIfAuthenticated() { FeatureTableProto.FeatureTable actualFeatureTable = - coreClient.simpleGetFeatureTable(PROJECT_NAME, FEATURE_TABLE_NAME); + registryClient.simpleGetFeatureTable(PROJECT_NAME, FEATURE_TABLE_NAME); assertEquals(expectedFeatureTableSpec.getName(), actualFeatureTable.getSpec().getName()); assertEquals( expectedFeatureTableSpec.getBatchSource(), actualFeatureTable.getSpec().getBatchSource()); @@ -182,7 +214,7 @@ void canGetOnlineFeaturesIfAuthenticated() { GetOnlineFeaturesResponse featureResponse = servingStub.getOnlineFeaturesV2(onlineFeatureRequest); assertEquals(1, featureResponse.getFieldValuesCount()); - Map fieldsMap = featureResponse.getFieldValues(0).getFieldsMap(); + Map fieldsMap = featureResponse.getFieldValues(0).getFieldsMap(); assertTrue(fieldsMap.containsKey(ENTITY_ID)); assertTrue(fieldsMap.containsKey(FEATURE_TABLE_NAME + ":" + FEATURE_NAME)); ((ManagedChannel) servingStub.getChannel()).shutdown(); diff --git a/serving/src/test/java/feast/serving/it/ServingServiceOauthAuthorizationIT.java b/serving/src/test/java/feast/serving/it/ServingServiceOauthAuthorizationIT.java index 64fe44b..733cbb9 100644 --- a/serving/src/test/java/feast/serving/it/ServingServiceOauthAuthorizationIT.java +++ b/serving/src/test/java/feast/serving/it/ServingServiceOauthAuthorizationIT.java @@ -21,12 +21,19 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.testcontainers.containers.wait.strategy.Wait.forHttp; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.BucketInfo.LifecycleRule; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; import feast.common.it.DataGenerator; +import feast.proto.core.RegistryProto.Registry; import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesRequestV2; import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesResponse; import feast.proto.serving.ServingServiceGrpc.ServingServiceBlockingStub; import feast.proto.types.ValueProto; -import feast.proto.types.ValueProto.Value; import io.grpc.ManagedChannel; import io.grpc.StatusRuntimeException; import java.io.File; @@ -69,8 +76,10 @@ public class ServingServiceOauthAuthorizationIT extends BaseAuthIT { private static int KETO_PORT = 4466; private static int KETO_ADAPTOR_PORT = 8080; static String subjectClaim = "sub"; - static CoreSimpleAPIClient coreClient; + static RegistrySimpleAPIClient registryClient; static final int FEAST_SERVING_PORT = 6766; + private static String bucketName; + private static String objectName; @ClassRule @Container public static DockerComposeContainer environment = @@ -110,10 +119,27 @@ static void initialize(DynamicPropertyRegistry registry) { registry.add("feast.security.authentication.options.jwkEndpointURI", () -> JWK_URI); registry.add("feast.security.authorization.options.authorizationUrl", () -> ketoAdaptorUrl); registry.add("grpc.server.port", () -> FEAST_SERVING_PORT); + + registry.add("feast.bucket-name", () -> bucketName); + registry.add("feast.object-name", () -> objectName); } @BeforeAll static void globalSetup() throws IOException, InitializationError, InterruptedException { + Registry registryProto = Registry.newBuilder().build(); + Storage storage = StorageOptions.getDefaultInstance().getService(); + bucketName = String.format("feast-serving-registry-test-%d", System.currentTimeMillis()); + objectName = "registry.db"; + storage.create( + BucketInfo.newBuilder(bucketName) + .setLifecycleRules( + com.google.common.collect.ImmutableList.of( + new LifecycleRule( + LifecycleRule.LifecycleAction.newDeleteAction(), + LifecycleRule.LifecycleCondition.newBuilder().setAge(14).build()))) + .build()); + BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(bucketName, objectName)).build(); + Blob blob = storage.create(blobInfo, registryProto.toByteArray()); String hydraExternalHost = environment.getServiceHost(HYDRA, HYDRA_PORT); Integer hydraExternalPort = environment.getServicePort(HYDRA, HYDRA_PORT); String hydraExternalUrl = String.format("http://%s:%s", hydraExternalHost, hydraExternalPort); @@ -130,12 +156,12 @@ static void globalSetup() throws IOException, InitializationError, InterruptedEx adminCredentials.put("audience", AUDIENCE); adminCredentials.put("grant_type", GRANT_TYPE); - coreClient = AuthTestUtils.getSecureApiClientForCore(FEAST_CORE_PORT, adminCredentials); - coreClient.simpleApplyEntity( + registryClient = AuthTestUtils.getSecureApiClientForRegistry(bucketName, objectName); + registryClient.simpleApplyEntity( PROJECT_NAME, DataGenerator.createEntitySpecV2( ENTITY_ID, "", ValueProto.ValueType.Enum.STRING, Collections.emptyMap())); - coreClient.simpleApplyFeatureTable( + registryClient.simpleApplyFeatureTable( PROJECT_NAME, DataGenerator.createFeatureTableSpec( FEATURE_TABLE_NAME, @@ -176,7 +202,7 @@ void canGetOnlineFeaturesIfAdmin() { GetOnlineFeaturesResponse featureResponse = servingStub.getOnlineFeaturesV2(onlineFeatureRequest); assertEquals(1, featureResponse.getFieldValuesCount()); - Map fieldsMap = featureResponse.getFieldValues(0).getFieldsMap(); + Map fieldsMap = featureResponse.getFieldValues(0).getFieldsMap(); assertTrue(fieldsMap.containsKey(ENTITY_ID)); assertTrue(fieldsMap.containsKey(FEATURE_TABLE_NAME + ":" + FEATURE_NAME)); ((ManagedChannel) servingStub.getChannel()).shutdown(); @@ -195,7 +221,7 @@ void canGetOnlineFeaturesIfProjectMember() { GetOnlineFeaturesResponse featureResponse = servingStub.getOnlineFeaturesV2(onlineFeatureRequest); assertEquals(1, featureResponse.getFieldValuesCount()); - Map fieldsMap = featureResponse.getFieldValues(0).getFieldsMap(); + Map fieldsMap = featureResponse.getFieldValues(0).getFieldsMap(); assertTrue(fieldsMap.containsKey(ENTITY_ID)); assertTrue(fieldsMap.containsKey(FEATURE_TABLE_NAME + ":" + FEATURE_NAME)); ((ManagedChannel) servingStub.getChannel()).shutdown(); diff --git a/serving/src/test/java/feast/serving/it/TestUtils.java b/serving/src/test/java/feast/serving/it/TestUtils.java index 6772dad..c33ec85 100644 --- a/serving/src/test/java/feast/serving/it/TestUtils.java +++ b/serving/src/test/java/feast/serving/it/TestUtils.java @@ -21,8 +21,6 @@ import com.google.common.collect.ImmutableMap; import feast.common.auth.credentials.OAuthCredentials; import feast.common.it.DataGenerator; -import feast.proto.core.CoreServiceGrpc; -import feast.proto.core.CoreServiceGrpc.CoreServiceBlockingStub; import feast.proto.core.EntityProto.Entity; import feast.proto.core.EntityProto.EntitySpecV2; import feast.proto.core.FeatureTableProto.FeatureTable; @@ -52,13 +50,9 @@ public static ServingServiceGrpc.ServingServiceBlockingStub getServingServiceStu } } - public static CoreSimpleAPIClient getApiClientForCore(int feastCorePort) { - Channel channel = - ManagedChannelBuilder.forAddress("localhost", feastCorePort).usePlaintext().build(); - - CoreServiceBlockingStub coreService = CoreServiceGrpc.newBlockingStub(channel); - - return new CoreSimpleAPIClient(coreService); + public static RegistrySimpleAPIClient getApiClientForRegistry( + String bucketName, String objectName) { + return new RegistrySimpleAPIClient(bucketName, objectName); } public static GetOnlineFeaturesRequestV2 createOnlineFeatureRequest( @@ -73,7 +67,7 @@ public static GetOnlineFeaturesRequestV2 createOnlineFeatureRequest( } public static void applyFeatureTable( - CoreSimpleAPIClient secureApiClient, + RegistrySimpleAPIClient secureApiClient, String projectName, String featureTableName, List entities, @@ -97,10 +91,10 @@ public static void applyFeatureTable( } public static void applyEntity( - CoreSimpleAPIClient coreApiClient, String projectName, EntitySpecV2 entitySpec) { - coreApiClient.simpleApplyEntity(projectName, entitySpec); + RegistrySimpleAPIClient registryClient, String projectName, EntitySpecV2 entitySpec) { + registryClient.simpleApplyEntity(projectName, entitySpec); String entityName = entitySpec.getName(); - Entity actualEntity = coreApiClient.getEntity(projectName, entityName); + Entity actualEntity = registryClient.getEntity(projectName, entityName); assertEquals(entitySpec.getName(), actualEntity.getSpec().getName()); } } diff --git a/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java b/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java index 4e48b64..1f04a4c 100644 --- a/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java +++ b/serving/src/test/java/feast/serving/service/CachedSpecServiceTest.java @@ -26,14 +26,12 @@ import feast.common.it.DataGenerator; import feast.proto.core.CoreServiceProto.ListFeatureTablesRequest; import feast.proto.core.CoreServiceProto.ListFeatureTablesResponse; -import feast.proto.core.CoreServiceProto.ListProjectsRequest; -import feast.proto.core.CoreServiceProto.ListProjectsResponse; import feast.proto.core.FeatureTableProto; import feast.proto.core.FeatureTableProto.FeatureTableSpec; import feast.proto.serving.ServingAPIProto.FeatureReferenceV2; import feast.proto.types.ValueProto; import feast.serving.specs.CachedSpecService; -import feast.serving.specs.CoreSpecService; +import feast.serving.specs.RegistrySpecService; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -44,7 +42,7 @@ public class CachedSpecServiceTest { @Rule public final ExpectedException expectedException = ExpectedException.none(); - @Mock CoreSpecService coreService; + @Mock RegistrySpecService registryService; private CachedSpecService cachedSpecService; @@ -58,7 +56,6 @@ public class CachedSpecServiceTest { public void setUp() { initMocks(this); - this.setupProject("default"); this.featureTableEntities = ImmutableList.of("entity1"); this.featureTable1Features = ImmutableMap.of( @@ -76,35 +73,29 @@ public void setUp() { this.featureTableEntities, featureTable1Features, 7200, - ImmutableMap.of()); + ImmutableMap.of(), + "default"); this.featureTable2Spec = DataGenerator.createFeatureTableSpec( "featuretable2", this.featureTableEntities, featureTable2Features, 7200, - ImmutableMap.of()); + ImmutableMap.of(), + "default"); - this.setupFeatureTableAndProject("default"); + this.setupFeatureTableAndProject(); - cachedSpecService = new CachedSpecService(this.coreService); + cachedSpecService = new CachedSpecService(this.registryService); } - private void setupProject(String project) { - when(coreService.listProjects(ListProjectsRequest.newBuilder().build())) - .thenReturn(ListProjectsResponse.newBuilder().addProjects(project).build()); - } - - private void setupFeatureTableAndProject(String project) { + private void setupFeatureTableAndProject() { FeatureTableProto.FeatureTable featureTable1 = FeatureTableProto.FeatureTable.newBuilder().setSpec(this.featureTable1Spec).build(); FeatureTableProto.FeatureTable featureTable2 = FeatureTableProto.FeatureTable.newBuilder().setSpec(this.featureTable2Spec).build(); - when(coreService.listFeatureTables( - ListFeatureTablesRequest.newBuilder() - .setFilter(ListFeatureTablesRequest.Filter.newBuilder().setProject(project).build()) - .build())) + when(registryService.listFeatureTables(ListFeatureTablesRequest.newBuilder().build())) .thenReturn( ListFeatureTablesResponse.newBuilder() .addTables(featureTable1)