diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 29c1c68e7c..09a9d97869 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -130,6 +130,8 @@ def __init__( bq_connection_id, cloud_resource_manager_client, cloud_function_service_account, + cloud_function_kms_key_name, + cloud_function_docker_repository, ): self._gcp_project_id = gcp_project_id self._cloud_function_region = cloud_function_region @@ -142,6 +144,8 @@ def __init__( bq_connection_client, cloud_resource_manager_client ) self._cloud_function_service_account = cloud_function_service_account + self._cloud_function_kms_key_name = cloud_function_kms_key_name + self._cloud_function_docker_repository = cloud_function_docker_repository def create_bq_remote_function( self, input_args, input_types, output_type, endpoint, bq_function_name @@ -344,7 +348,9 @@ def create_cloud_function(self, def_, cf_name, package_requirements=None): ) # Determine an upload URL for user code - upload_url_request = functions_v2.GenerateUploadUrlRequest() + upload_url_request = functions_v2.GenerateUploadUrlRequest( + kms_key_name=self._cloud_function_kms_key_name + ) upload_url_request.parent = self.get_cloud_function_fully_qualified_parent() upload_url_response = self._cloud_functions_client.generate_upload_url( request=upload_url_request @@ -383,12 +389,16 @@ def create_cloud_function(self, def_, cf_name, package_requirements=None): function.build_config.source.storage_source.object_ = ( upload_url_response.storage_source.object_ ) + function.build_config.docker_repository = ( + self._cloud_function_docker_repository + ) function.service_config = functions_v2.ServiceConfig() function.service_config.available_memory = "1024M" function.service_config.timeout_seconds = 600 function.service_config.service_account_email = ( self._cloud_function_service_account ) + function.kms_key_name = self._cloud_function_kms_key_name create_function_request.function = function # Create the cloud function and wait for it to be ready to use @@ -597,6 +607,8 @@ def remote_function( name: Optional[str] = None, packages: Optional[Sequence[str]] = None, cloud_function_service_account: Optional[str] = None, + cloud_function_kms_key_name: Optional[str] = None, + cloud_function_docker_repository: Optional[str] = None, ): """Decorator to turn a user defined function into a BigQuery remote function. @@ -699,6 +711,20 @@ def remote_function( for more details. Please make sure the service account has the necessary IAM permissions configured as described in https://cloud.google.com/functions/docs/reference/iam/roles#additional-configuration. + cloud_function_kms_key_name (str, Optional): + Customer managed encryption key to protect cloud functions and + related data at rest. This is of the format + projects/PROJECT_ID/locations/LOCATION/keyRings/KEYRING/cryptoKeys/KEY. + Read https://cloud.google.com/functions/docs/securing/cmek for + more details including granting necessary service accounts + access to the key. + cloud_function_docker_repository (str, Optional): + Docker repository created with the same encryption key as + `cloud_function_kms_key_name` to store encrypted artifacts + created to support the cloud function. This is of the format + projects/PROJECT_ID/locations/LOCATION/repositories/REPOSITORY_NAME. + For more details see + https://cloud.google.com/functions/docs/securing/cmek#before_you_begin. """ import bigframes.pandas as bpd @@ -780,6 +806,16 @@ def remote_function( f"{bq_location}." ) + # If any CMEK is intended then check that a docker repository is also specified + if ( + cloud_function_kms_key_name is not None + and cloud_function_docker_repository is None + ): + raise ValueError( + "cloud_function_docker_repository must be specified with cloud_function_kms_key_name." + " For more details see https://cloud.google.com/functions/docs/securing/cmek#before_you_begin" + ) + def wrapper(f): if not callable(f): raise TypeError("f must be callable, got {}".format(f)) @@ -800,6 +836,8 @@ def wrapper(f): bq_connection_id, resource_manager_client, cloud_function_service_account, + cloud_function_kms_key_name, + cloud_function_docker_repository, ) rf_name, cf_name = remote_function_client.provision_bq_remote_function( diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 03c8412907..10caf17b79 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -620,6 +620,8 @@ def remote_function( name: Optional[str] = None, packages: Optional[Sequence[str]] = None, cloud_function_service_account: Optional[str] = None, + cloud_function_kms_key_name: Optional[str] = None, + cloud_function_docker_repository: Optional[str] = None, ): return global_session.with_default_session( bigframes.session.Session.remote_function, @@ -631,6 +633,8 @@ def remote_function( name=name, packages=packages, cloud_function_service_account=cloud_function_service_account, + cloud_function_kms_key_name=cloud_function_kms_key_name, + cloud_function_docker_repository=cloud_function_docker_repository, ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 5266267a22..6781a9085e 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1364,6 +1364,8 @@ def remote_function( name: Optional[str] = None, packages: Optional[Sequence[str]] = None, cloud_function_service_account: Optional[str] = None, + cloud_function_kms_key_name: Optional[str] = None, + cloud_function_docker_repository: Optional[str] = None, ): """Decorator to turn a user defined function into a BigQuery remote function. Check out the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes. @@ -1444,6 +1446,20 @@ def remote_function( for more details. Please make sure the service account has the necessary IAM permissions configured as described in https://cloud.google.com/functions/docs/reference/iam/roles#additional-configuration. + cloud_function_kms_key_name (str, Optional): + Customer managed encryption key to protect cloud functions and + related data at rest. This is of the format + projects/PROJECT_ID/locations/LOCATION/keyRings/KEYRING/cryptoKeys/KEY. + Read https://cloud.google.com/functions/docs/securing/cmek for + more details including granting necessary service accounts + access to the key. + cloud_function_docker_repository (str, Optional): + Docker repository created with the same encryption key as + `cloud_function_kms_key_name` to store encrypted artifacts + created to support the cloud function. This is of the format + projects/PROJECT_ID/locations/LOCATION/repositories/REPOSITORY_NAME. + For more details see + https://cloud.google.com/functions/docs/securing/cmek#before_you_begin. Returns: callable: A remote function object pointing to the cloud assets created in the background to support the remote execution. The cloud assets can be @@ -1463,6 +1479,8 @@ def remote_function( name=name, packages=packages, cloud_function_service_account=cloud_function_service_account, + cloud_function_kms_key_name=cloud_function_kms_key_name, + cloud_function_docker_repository=cloud_function_docker_repository, ) def read_gbq_function( diff --git a/setup.py b/setup.py index a626fd4b34..5258a7d6f9 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ "geopandas >=0.12.2", "google-auth >=2.15.0,<3.0dev", "google-cloud-bigquery[bqstorage,pandas] >=3.10.0", - "google-cloud-functions >=1.10.1", + "google-cloud-functions >=1.12.0", "google-cloud-bigquery-connection >=1.12.0", "google-cloud-iam >=2.12.1", "google-cloud-resource-manager >=1.10.3", diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt index 07c8b763f3..0aeb15eab8 100644 --- a/testing/constraints-3.9.txt +++ b/testing/constraints-3.9.txt @@ -5,7 +5,7 @@ gcsfs==2023.3.0 geopandas==0.12.2 google-auth==2.15.0 google-cloud-bigquery==3.10.0 -google-cloud-functions==1.10.1 +google-cloud-functions==1.12.0 google-cloud-bigquery-connection==1.12.0 google-cloud-iam==2.12.1 google-cloud-resource-manager==1.10.3 diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 77aa3c7603..f8c5e98f1d 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -22,7 +22,7 @@ import textwrap from google.api_core.exceptions import BadRequest, NotFound, ResourceExhausted -from google.cloud import bigquery, functions_v2 +from google.cloud import bigquery, functions_v2, storage import pandas import pytest import test_utils.prefixer @@ -1322,3 +1322,68 @@ def square_num(x): cleanup_remote_function_assets( rf_session.bqclient, rf_session.cloudfunctionsclient, square_num ) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_with_gcf_cmek(): + # TODO(shobs): Automate the following set-up during testing in the test project. + # + # For upfront convenience, the following set up has been statically created + # in the project bigfrmames-dev-perf via cloud console: + # + # 1. Created an encryption key and granting the necessary service accounts + # the required IAM permissions as per https://cloud.google.com/kms/docs/create-key + # 2. Created a docker repository with CMEK (created in step 1) enabled as per + # https://cloud.google.com/artifact-registry/docs/repositories/create-repos#overview + # + project = "bigframes-dev-perf" + cmek = "projects/bigframes-dev-perf/locations/us-central1/keyRings/bigframesKeyRing/cryptoKeys/bigframesKey" + docker_repository = ( + "projects/bigframes-dev-perf/locations/us-central1/repositories/rf-artifacts" + ) + + session = bigframes.Session(context=bigframes.BigQueryOptions(project=project)) + try: + + @session.remote_function( + [int], + int, + reuse=False, + cloud_function_kms_key_name=cmek, + cloud_function_docker_repository=docker_repository, + ) + def square_num(x): + if x is None: + return x + return x * x + + df = pandas.DataFrame({"num": [-1, 0, None, 1]}, dtype="Int64") + bf = session.read_pandas(df) + + bf_result_col = bf["num"].apply(square_num) + bf_result = bf.assign(result=bf_result_col).to_pandas() + + pd_result_col = df["num"].apply(lambda x: x if x is None else x * x) + pd_result = df.assign(result=pd_result_col) + + assert_pandas_df_equal( + bf_result, pd_result, check_dtype=False, check_index_type=False + ) + + # Assert that the GCF is created with the intended SA + gcf = session.cloudfunctionsclient.get_function( + name=square_num.bigframes_cloud_function + ) + assert gcf.kms_key_name == cmek + + # Assert that GCS artifact has CMEK applied + storage_client = storage.Client() + bucket = storage_client.bucket(gcf.build_config.source.storage_source.bucket) + blob = bucket.get_blob(gcf.build_config.source.storage_source.object_) + assert blob.kms_key_name.startswith(cmek) + + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, square_num + )