diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py new file mode 100644 index 0000000000..16da677ef5 --- /dev/null +++ b/bigframes/streaming/__init__.py @@ -0,0 +1,139 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Module for bigquery continuous queries""" + +import json +from typing import Optional + +from google.cloud import bigquery + +import bigframes + + +def to_bigtable( + query: str, + instance: str, + table: str, + bq_client: Optional[bigquery.Client] = None, + app_profile: Optional[str] = None, + truncate: bool = False, + overwrite: bool = False, + auto_create_column_families: bool = False, + bigtable_options: Optional[dict] = None, + job_id: Optional[str] = None, + job_id_prefix: Optional[str] = None, +) -> bigquery.QueryJob: + """Launches a BigQuery continuous query and returns a + QueryJob object for some management functionality. + + This method requires an existing bigtable preconfigured to + accept the continuous query export statement. For instructions + on export to bigtable, see + https://cloud.google.com/bigquery/docs/export-to-bigtable. + + Args: + query (str): + The sql statement to execute as a continuous function. + For example: "SELECT * FROM dataset.table" + This will be wrapped in an EXPORT DATA statement to + launch a continuous query writing to bigtable. + instance (str): + The name of the bigtable instance to export to. + table (str): + The name of the bigtable table to export to. + bq_client (str, default None): + The Client object to use for the query. This determines + the project id and location of the query. If None, will + default to the bigframes global session default client. + app_profile (str, default None): + The bigtable app profile to export to. If None, no app + profile will be used. + truncate (bool, default False): + The export truncate option, see + https://cloud.google.com/bigquery/docs/reference/standard-sql/other-statements#bigtable_export_option + overwrite (bool, default False): + The export overwrite option, see + https://cloud.google.com/bigquery/docs/reference/standard-sql/other-statements#bigtable_export_option + auto_create_column_families (bool, default False): + The auto_create_column_families option, see + https://cloud.google.com/bigquery/docs/reference/standard-sql/other-statements#bigtable_export_option + bigtable_options (dict, default None): + The bigtable options dict, which will be converted to JSON + using json.dumps, see + https://cloud.google.com/bigquery/docs/reference/standard-sql/other-statements#bigtable_export_option + If None, no bigtable_options parameter will be passed. + job_id (str, default None): + If specified, replace the default job id for the query, + see job_id parameter of + https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query + job_id_prefix (str, default None): + If specified, a job id prefix for the query, see + job_id_prefix parameter of + https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query + + Returns: + google.cloud.bigquery.QueryJob: + See https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob + The ongoing query job can be managed using this object. + For example, the job can be cancelled or its error status + can be examined. + """ + # get default client if not passed + if bq_client is None: + bq_client = bigframes.get_global_session().bqclient + + # build export string from parameters + project = bq_client.project + + app_profile_url_string = "" + if app_profile is not None: + app_profile_url_string = f"appProfiles/{app_profile}/" + + bigtable_options_parameter_string = "" + if bigtable_options is not None: + bigtable_options_parameter_string = ( + 'bigtable_options = """' + json.dumps(bigtable_options) + '""",\n' + ) + + sql = ( + "EXPORT DATA\n" + "OPTIONS (\n" + "format = 'CLOUD_BIGTABLE',\n" + f"{bigtable_options_parameter_string}" + f"truncate = {str(truncate)},\n" + f"overwrite = {str(overwrite)},\n" + f"auto_create_column_families = {str(auto_create_column_families)},\n" + f'uri = "https://bigtable.googleapis.com/projects/{project}/instances/{instance}/{app_profile_url_string}tables/{table}"\n' + ")\n" + "AS (\n" + f"{query});" + ) + + # override continuous http parameter + job_config = bigquery.job.QueryJobConfig() + job_config_filled = job_config.from_api_repr({"query": {"continuous": True}}) + + # begin the query job + query_job = bq_client.query( + sql, + job_config=job_config_filled, # type:ignore + # typing error above is in bq client library + # (should accept abstract job_config, only takes concrete) + job_id=job_id, + job_id_prefix=job_id_prefix, + ) + + # return the query job to the user for lifetime management + return query_job diff --git a/scripts/create_bigtable.py b/scripts/create_bigtable.py new file mode 100644 index 0000000000..655e4b31ab --- /dev/null +++ b/scripts/create_bigtable.py @@ -0,0 +1,76 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This script create the bigtable resources required for +# bigframes.streaming testing if they don't already exist + +import os +import pathlib +import sys + +import google.cloud.bigtable as bigtable + +REPO_ROOT = pathlib.Path(__file__).parent.parent + +PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT") + +if not PROJECT_ID: + print( + "Please set GOOGLE_CLOUD_PROJECT environment variable before running.", + file=sys.stderr, + ) + sys.exit(1) + + +def create_instance(client): + instance_name = "streaming-testing-instance" + instance = bigtable.instance.Instance( + instance_name, + client, + ) + cluster_id = "streaming-testing-instance-c1" + cluster = instance.cluster( + cluster_id, + location_id="us-west1-a", + serve_nodes=1, + ) + if not instance.exists(): + operation = instance.create( + clusters=[cluster], + ) + operation.result(timeout=480) + print(f"Created instance {instance_name}") + return instance + + +def create_table(instance): + table_id = "table-testing" + table = bigtable.table.Table( + table_id, + instance, + ) + if not table.exists(): + table.create() + print(f"Created table {table_id}") + + +def main(): + client = bigtable.Client(project=PROJECT_ID, admin=True) + + instance = create_instance(client) + create_table(instance) + + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index d5d282d11a..dbd9ce5fc2 100644 --- a/setup.py +++ b/setup.py @@ -39,6 +39,7 @@ "gcsfs >=2023.3.0", "geopandas >=0.12.2", "google-auth >=2.15.0,<3.0dev", + "google-cloud-bigtable >=2.24.0", "google-cloud-bigquery[bqstorage,pandas] >=3.16.0", "google-cloud-functions >=1.12.0", "google-cloud-bigquery-connection >=1.12.0", diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt index 3c51668655..bbd7bf0069 100644 --- a/testing/constraints-3.9.txt +++ b/testing/constraints-3.9.txt @@ -4,6 +4,7 @@ fsspec==2023.3.0 gcsfs==2023.3.0 geopandas==0.12.2 google-auth==2.15.0 +google-cloud-bigtable==2.24.0 google-cloud-bigquery==3.16.0 google-cloud-functions==1.12.0 google-cloud-bigquery-connection==1.12.0 diff --git a/tests/system/large/test_streaming.py b/tests/system/large/test_streaming.py new file mode 100644 index 0000000000..48db61e5bf --- /dev/null +++ b/tests/system/large/test_streaming.py @@ -0,0 +1,48 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + +import bigframes.streaming + + +def test_streaming_to_bigtable(): + # launch a continuous query + job_id_prefix = "test_streaming_" + sql = """SELECT + body_mass_g, island as rowkey + FROM birds.penguins""" + query_job = bigframes.streaming.to_bigtable( + sql, + "streaming-testing-instance", + "table-testing", + app_profile=None, + truncate=True, + overwrite=True, + auto_create_column_families=True, + bigtable_options={}, + job_id=None, + job_id_prefix=job_id_prefix, + ) + + try: + # wait 100 seconds in order to ensure the query doesn't stop + # (i.e. it is continuous) + time.sleep(100) + assert query_job.error_result is None + assert query_job.errors is None + assert query_job.running() + assert str(query_job.job_id).startswith(job_id_prefix) + finally: + query_job.cancel()