import uuid
from datetime import datetime
from pathlib import Path
from tempfile import TemporaryFile
from urllib.parse import urlparse
from feast.infra.passthrough_provider import PassthroughProvider
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.registry_store import RegistryStore
from feast.repo_config import RegistryConfig
from feast.usage import log_exceptions_and_usage
[docs]class GcpProvider(PassthroughProvider):
"""
This class only exists for backwards compatibility.
"""
pass
[docs]class GCSRegistryStore(RegistryStore):
def __init__(self, registry_config: RegistryConfig, repo_path: Path):
uri = registry_config.path
try:
import google.cloud.storage as storage
except ImportError as e:
from feast.errors import FeastExtrasDependencyImportError
raise FeastExtrasDependencyImportError("gcp", str(e))
self.gcs_client = storage.Client()
self._uri = urlparse(uri)
self._bucket = self._uri.hostname
self._blob = self._uri.path.lstrip("/")
[docs] @log_exceptions_and_usage(registry="gs")
def get_registry_proto(self):
import google.cloud.storage as storage
from google.cloud.exceptions import NotFound
file_obj = TemporaryFile()
registry_proto = RegistryProto()
try:
bucket = self.gcs_client.get_bucket(self._bucket)
except NotFound:
raise Exception(
f"No bucket named {self._bucket} exists; please create it first."
)
if storage.Blob(bucket=bucket, name=self._blob).exists(self.gcs_client):
self.gcs_client.download_blob_to_file(
self._uri.geturl(), file_obj, timeout=30
)
file_obj.seek(0)
registry_proto.ParseFromString(file_obj.read())
return registry_proto
raise FileNotFoundError(
f'Registry not found at path "{self._uri.geturl()}". Have you run "feast apply"?'
)
[docs] @log_exceptions_and_usage(registry="gs")
def update_registry_proto(self, registry_proto: RegistryProto):
self._write_registry(registry_proto)
[docs] def teardown(self):
from google.cloud.exceptions import NotFound
gs_bucket = self.gcs_client.get_bucket(self._bucket)
try:
gs_bucket.delete_blob(self._blob)
except NotFound:
# If the blob deletion fails with NotFound, it has already been deleted.
pass
def _write_registry(self, registry_proto: RegistryProto):
registry_proto.version_id = str(uuid.uuid4())
registry_proto.last_updated.FromDatetime(datetime.utcnow())
# we have already checked the bucket exists so no need to do it again
gs_bucket = self.gcs_client.get_bucket(self._bucket)
blob = gs_bucket.blob(self._blob)
file_obj = TemporaryFile()
file_obj.write(registry_proto.SerializeToString())
file_obj.seek(0)
blob.upload_from_file(file_obj)