From 9f2f442b90ac56694eefc21d4afcebe183ca1164 Mon Sep 17 00:00:00 2001 From: z3z1ma Date: Fri, 24 May 2024 15:55:03 -0700 Subject: [PATCH] feat: implement support for bigframes fix: fix a few issues chore: use optional import pattern WIP --- docs/concepts/models/python_models.md | 53 ++++++++++++++++++++++++- setup.cfg | 3 ++ setup.py | 1 + sqlmesh/core/context.py | 6 +++ sqlmesh/core/engine_adapter/_typing.py | 3 ++ sqlmesh/core/engine_adapter/base.py | 5 +++ sqlmesh/core/engine_adapter/bigquery.py | 25 +++++++++++- sqlmesh/core/snapshot/evaluator.py | 2 +- 8 files changed, 94 insertions(+), 4 deletions(-) diff --git a/docs/concepts/models/python_models.md b/docs/concepts/models/python_models.md index a5380ef6f6..5928f57b40 100644 --- a/docs/concepts/models/python_models.md +++ b/docs/concepts/models/python_models.md @@ -33,7 +33,7 @@ The `execute` function is wrapped with the `@model` [decorator](https://wiki.pyt Because SQLMesh creates tables before evaluating models, the schema of the output DataFrame is a required argument. The `@model` argument `columns` contains a dictionary of column names to types. -The function takes an `ExecutionContext` that is able to run queries and to retrieve the current time interval that is being processed, along with arbitrary key-value arguments passed in at runtime. The function can either return a Pandas, PySpark, or Snowpark Dataframe instance. +The function takes an `ExecutionContext` that is able to run queries and to retrieve the current time interval that is being processed, along with arbitrary key-value arguments passed in at runtime. The function can either return a Pandas, PySpark, Bigframe, or Snowpark Dataframe instance. If the function output is too large, it can also be returned in chunks using Python generators. @@ -441,6 +441,57 @@ def execute( return df ``` +### Bigframe +This example demonstrates using the [Bigframe](https://cloud.google.com/bigquery/docs/use-bigquery-dataframes#pandas-examples) DataFrame API. If you use Bigquery, the Bigframe API is preferred to Pandas as all computation is done in Bigquery. + +```python linenums="1" +import typing as t +from datetime import datetime + +from bigframes.pandas import DataFrame + +from sqlmesh import ExecutionContext, model + + +def get_bucket(num: int): + if not num: + return "NA" + boundary = 10 + return "at_or_above_10" if num >= boundary else "below_10" + + +@model( + "mart.wiki", + columns={ + "title": "text", + "views": "int", + "bucket": "text", + }, +) +def execute( + context: ExecutionContext, + start: datetime, + end: datetime, + execution_time: datetime, + **kwargs: t.Any, +) -> DataFrame: + # Create a remote function to be used in the Bigframe DataFrame + remote_get_bucket = context.bigframe.remote_function([int], str)(get_bucket) + + # Returns the Bigframe DataFrame handle, no data is computed locally + df = context.bigframe.read_gbq("bigquery-samples.wikipedia_pageviews.200809h") + + df = ( + # This runs entirely on the BigQuery engine lazily + df[df.title.str.contains(r"[Gg]oogle")] + .groupby(["title"], as_index=False)["views"] + .sum(numeric_only=True) + .sort_values("views", ascending=False) + ) + + return df.assign(bucket=df["views"].apply(remote_get_bucket)) +``` + ### Batching If the output of a Python model is very large and you cannot use Spark, it may be helpful to split the output into multiple batches. diff --git a/setup.cfg b/setup.cfg index 48749e9251..bbda584160 100644 --- a/setup.cfg +++ b/setup.cfg @@ -101,3 +101,6 @@ ignore_missing_imports = True [mypy-dlt.*] ignore_missing_imports = True + +[mypy-bigframes.*] +ignore_missing_imports = True diff --git a/setup.py b/setup.py index 1d1c275bba..4b3de46419 100644 --- a/setup.py +++ b/setup.py @@ -58,6 +58,7 @@ "google-cloud-bigquery[pandas]", "google-cloud-bigquery-storage", ], + "bigframes": ["bigframes>=1.32.0"], "clickhouse": ["clickhouse-connect"], "databricks": ["databricks-sql-connector"], "dev": [ diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index 0f7c218b55..95a97c8efe 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -126,6 +126,7 @@ from typing_extensions import Literal from sqlmesh.core.engine_adapter._typing import ( + BigframeSession, DF, PySparkDataFrame, PySparkSession, @@ -167,6 +168,11 @@ def snowpark(self) -> t.Optional[SnowparkSession]: """Returns the snowpark session if it exists.""" return self.engine_adapter.snowpark + @property + def bigframe(self) -> t.Optional[BigframeSession]: + """Returns the bigframe session if it exists.""" + return self.engine_adapter.bigframe + @property def default_catalog(self) -> t.Optional[str]: raise NotImplementedError diff --git a/sqlmesh/core/engine_adapter/_typing.py b/sqlmesh/core/engine_adapter/_typing.py index 1ce4268929..ba06887bd1 100644 --- a/sqlmesh/core/engine_adapter/_typing.py +++ b/sqlmesh/core/engine_adapter/_typing.py @@ -8,6 +8,8 @@ if t.TYPE_CHECKING: import pyspark import pyspark.sql.connect.dataframe + from bigframes.session import Session as BigframeSession # noqa + from bigframes.dataframe import DataFrame as BigframeDataFrame snowpark = optional_import("snowflake.snowpark") @@ -23,6 +25,7 @@ pd.DataFrame, pyspark.sql.DataFrame, pyspark.sql.connect.dataframe.DataFrame, + BigframeDataFrame, SnowparkDataFrame, ] diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index 6aba65b5aa..49906933f9 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -49,6 +49,7 @@ if t.TYPE_CHECKING: from sqlmesh.core._typing import SchemaName, SessionProperties, TableName from sqlmesh.core.engine_adapter._typing import ( + BigframeSession, DF, PySparkDataFrame, PySparkSession, @@ -160,6 +161,10 @@ def spark(self) -> t.Optional[PySparkSession]: def snowpark(self) -> t.Optional[SnowparkSession]: return None + @property + def bigframe(self) -> t.Optional[BigframeSession]: + return None + @property def comments_enabled(self) -> bool: return self._register_comments and self.COMMENT_CREATION_TABLE.is_supported diff --git a/sqlmesh/core/engine_adapter/bigquery.py b/sqlmesh/core/engine_adapter/bigquery.py index 98256c0b2c..6d7336a1fa 100644 --- a/sqlmesh/core/engine_adapter/bigquery.py +++ b/sqlmesh/core/engine_adapter/bigquery.py @@ -23,6 +23,7 @@ ) from sqlmesh.core.node import IntervalUnit from sqlmesh.core.schema_diff import SchemaDiffer +from sqlmesh.utils import optional_import from sqlmesh.utils.date import to_datetime from sqlmesh.utils.errors import SQLMeshError @@ -35,11 +36,15 @@ from google.cloud.bigquery.table import Table as BigQueryTable from sqlmesh.core._typing import SchemaName, SessionProperties, TableName - from sqlmesh.core.engine_adapter._typing import DF, Query + from sqlmesh.core.engine_adapter._typing import BigframeSession, DF, Query from sqlmesh.core.engine_adapter.base import QueryOrDF + logger = logging.getLogger(__name__) +bigframes = optional_import("bigframes") +bigframes_pd = optional_import("bigframes.pandas") + NestedField = t.Tuple[str, str, t.List[str]] NestedFieldsDict = t.Dict[str, t.List[NestedField]] @@ -105,6 +110,17 @@ class BigQueryEngineAdapter(InsertOverwriteWithMergeMixin, ClusteredByMixin, Row def client(self) -> BigQueryClient: return self.connection._client + @property + def bigframe(self) -> t.Optional[BigframeSession]: + if bigframes: + options = bigframes.BigQueryOptions( + credentials=self.client._credentials, + project=self.client.project, + location=self.client.location, + ) + return bigframes.connect(context=options) + return None + @property def _job_params(self) -> t.Dict[str, t.Any]: from sqlmesh.core.config.connection import BigQueryPriority @@ -140,7 +156,12 @@ def _df_to_source_queries( ) def query_factory() -> Query: - if not self.table_exists(temp_table): + if bigframes_pd and isinstance(df, bigframes_pd.DataFrame): + df.to_gbq( + f"{temp_bq_table.project}.{temp_bq_table.dataset_id}.{temp_bq_table.table_id}", + if_exists="replace", + ) + elif not self.table_exists(temp_table): # Make mypy happy assert isinstance(df, pd.DataFrame) self._db_call(self.client.create_table, table=temp_bq_table, exists_ok=False) diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 643c8a9f59..4e4cf53db8 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -673,7 +673,7 @@ def apply(query_or_df: QueryOrDF, index: int = 0) -> None: if isinstance(query_or_df, pd.DataFrame): return query_or_df.head(limit) if not isinstance(query_or_df, exp.Expression): - # We assume that if this branch is reached, `query_or_df` is a pyspark / snowpark dataframe, + # We assume that if this branch is reached, `query_or_df` is a pyspark / snowpark / bigframe dataframe, # so we use `limit` instead of `head` to get back a dataframe instead of List[Row] # https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.head.html#pyspark.sql.DataFrame.head return query_or_df.limit(limit)