From 1f3976592146bf494de8cd2e779144bf8ea8397e Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Tue, 25 Feb 2025 22:28:01 +0000 Subject: [PATCH 01/18] feat: add allow_large_results option --- bigframes/_config/bigquery_options.py | 22 ++++++++++++++++ bigframes/core/blocks.py | 36 ++++++++++++++++++++++----- bigframes/core/indexes/base.py | 11 ++++++-- bigframes/dataframe.py | 17 +++++++++++-- bigframes/series.py | 5 ++++ 5 files changed, 81 insertions(+), 10 deletions(-) diff --git a/bigframes/_config/bigquery_options.py b/bigframes/_config/bigquery_options.py index 8fec253b24..3968e98a69 100644 --- a/bigframes/_config/bigquery_options.py +++ b/bigframes/_config/bigquery_options.py @@ -87,6 +87,7 @@ def __init__( kms_key_name: Optional[str] = None, skip_bq_connection_check: bool = False, *, + allow_large_results: bool = True, ordering_mode: Literal["strict", "partial"] = "strict", client_endpoints_override: Optional[dict] = None, ): @@ -98,6 +99,7 @@ def __init__( self._application_name = application_name self._kms_key_name = kms_key_name self._skip_bq_connection_check = skip_bq_connection_check + self._allow_large_results = allow_large_results self._session_started = False # Determines the ordering strictness for the session. self._ordering_mode = _validate_ordering_mode(ordering_mode) @@ -232,6 +234,26 @@ def skip_bq_connection_check(self, value: bool): ) self._skip_bq_connection_check = value + @property + def allow_large_results(self) -> bool: + """ + Sets the flag to allow or disallow query results larger than 10 GB. + + The default setting for this flag is True, which allows queries to return results + exceeding 10 GB by creating an explicit destination table. If set to False, it + restricts the result size to 10 GB, and BigQuery will raise an error if this limit + is exceeded. + + Returns: + bool: True if large results are allowed with an explicit destination table, + False if results are limited to 10 GB and errors are raised when exceeded. + """ + return self._allow_large_results + + @allow_large_results.setter + def allow_large_results(self, value: bool): + self._allow_large_results = value + @property def use_regional_endpoints(self) -> bool: """Flag to connect to regional API endpoints. diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 10970b24e8..78c4631e40 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -113,6 +113,7 @@ class MaterializationOptions: downsampling: sampling_options.SamplingOptions = dataclasses.field( default_factory=sampling_options.SamplingOptions ) + allow_large_results: bool = True ordered: bool = True @@ -511,6 +512,7 @@ def to_pandas( random_state: Optional[int] = None, *, ordered: bool = True, + allow_large_results: Optional[bool] = None, ) -> Tuple[pd.DataFrame, Optional[bigquery.QueryJob]]: """Run query and download results as a pandas DataFrame. @@ -551,9 +553,14 @@ def to_pandas( else: sampling = sampling.with_disabled() + if allow_large_results is None: + allow_large_results = bigframes.options.bigquery.allow_large_results + df, query_job = self._materialize_local( materialize_options=MaterializationOptions( - downsampling=sampling, ordered=ordered + downsampling=sampling, + allow_large_results=allow_large_results, + ordered=ordered, ) ) df.set_axis(self.column_labels, axis=1, copy=False) @@ -571,16 +578,21 @@ def try_peek( return None def to_pandas_batches( - self, page_size: Optional[int] = None, max_results: Optional[int] = None + self, + page_size: Optional[int] = None, + max_results: Optional[int] = None, + allow_large_results: Optional[bool] = None, ): """Download results one message at a time. page_size and max_results determine the size and number of batches, see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob#google_cloud_bigquery_job_QueryJob_result""" + if allow_large_results is None: + allow_large_results = bigframes.options.bigquery.allow_large_results execute_result = self.session._executor.execute( self.expr, ordered=True, - use_explicit_destination=True, + use_explicit_destination=allow_large_results, page_size=page_size, max_results=max_results, ) @@ -609,7 +621,10 @@ def _materialize_local( """Run query and download results as a pandas DataFrame. Return the total number of results as well.""" # TODO(swast): Allow for dry run and timeout. execute_result = self.session._executor.execute( - self.expr, ordered=materialize_options.ordered, get_size_bytes=True + self.expr, + ordered=materialize_options.ordered, + use_explicit_destination=materialize_options.allow_large_results, + get_size_bytes=True, ) assert execute_result.total_bytes is not None table_mb = execute_result.total_bytes / _BYTES_TO_MEGABYTES @@ -2665,14 +2680,23 @@ def column_ids(self) -> Sequence[str]: def is_null(self) -> bool: return len(self._block._index_columns) == 0 - def to_pandas(self, *, ordered: Optional[bool] = None) -> pd.Index: + def to_pandas( + self, + *, + ordered: Optional[bool] = None, + allow_large_results: Optional[bool] = None, + ) -> pd.Index: """Executes deferred operations and downloads the results.""" if len(self.column_ids) == 0: raise bigframes.exceptions.NullIndexError( "Cannot materialize index, as this object does not have an index. Set index column(s) using set_index." ) ordered = ordered if ordered is not None else True - return self._block.select_columns([]).to_pandas(ordered=ordered)[0].index + return ( + self._block.select_columns([]) + .to_pandas(ordered=ordered, allow_large_results=allow_large_results)[0] + .index + ) def resolve_level(self, level: LevelsType) -> typing.Sequence[str]: if utils.is_list_like(level): diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index b3a07d33bc..87625d3960 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -490,14 +490,21 @@ def __getitem__(self, key: int) -> typing.Any: else: raise NotImplementedError(f"Index key not supported {key}") - def to_pandas(self) -> pandas.Index: + def to_pandas(self, allow_large_results: Optional[bool] = None) -> pandas.Index: """Gets the Index as a pandas Index. + Args: + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow large query results + over the default size limit of 10 GB. + Returns: pandas.Index: A pandas Index with all of the labels from this Index. """ - return self._block.index.to_pandas(ordered=True) + return self._block.index.to_pandas( + ordered=True, allow_large_results=allow_large_results + ) def to_numpy(self, dtype=None, **kwargs) -> np.ndarray: return self.to_pandas().to_numpy(dtype, **kwargs) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index c02b182ee3..20f873c863 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1554,6 +1554,7 @@ def to_pandas( random_state: Optional[int] = None, *, ordered: bool = True, + allow_large_results: Optional[bool] = None, ) -> pandas.DataFrame: """Write DataFrame to pandas DataFrame. @@ -1576,6 +1577,9 @@ def to_pandas( ordered (bool, default True): Determines whether the resulting pandas dataframe will be ordered. In some cases, unordered may result in a faster-executing query. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow large query results + over the default size limit of 10 GB. Returns: pandas.DataFrame: A pandas DataFrame with all rows and columns of this DataFrame if the @@ -1588,12 +1592,16 @@ def to_pandas( sampling_method=sampling_method, random_state=random_state, ordered=ordered, + allow_large_results=allow_large_results, ) self._set_internal_query_job(query_job) return df.set_axis(self._block.column_labels, axis=1, copy=False) def to_pandas_batches( - self, page_size: Optional[int] = None, max_results: Optional[int] = None + self, + page_size: Optional[int] = None, + max_results: Optional[int] = None, + allow_large_results: Optional[bool] = None, ) -> Iterable[pandas.DataFrame]: """Stream DataFrame results to an iterable of pandas DataFrame. @@ -1605,6 +1613,9 @@ def to_pandas_batches( The size of each batch. max_results (int, default None): If given, only download this many rows at maximum. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow large query results + over the default size limit of 10 GB. Returns: Iterable[pandas.DataFrame]: @@ -1613,7 +1624,9 @@ def to_pandas_batches( see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.table.RowIterator#google_cloud_bigquery_table_RowIterator_to_arrow_iterable """ return self._block.to_pandas_batches( - page_size=page_size, max_results=max_results + page_size=page_size, + max_results=max_results, + allow_large_results=allow_large_results, ) def _compute_dry_run(self) -> bigquery.QueryJob: diff --git a/bigframes/series.py b/bigframes/series.py index fe2d1aae0e..11221e15ce 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -381,6 +381,7 @@ def to_pandas( random_state: Optional[int] = None, *, ordered: bool = True, + allow_large_results: Optional[bool] = None, ) -> pandas.Series: """Writes Series to pandas Series. @@ -403,6 +404,9 @@ def to_pandas( ordered (bool, default True): Determines whether the resulting pandas series will be ordered. In some cases, unordered may result in a faster-executing query. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow large query results + over the default size limit of 10 GB. Returns: @@ -414,6 +418,7 @@ def to_pandas( sampling_method=sampling_method, random_state=random_state, ordered=ordered, + allow_large_results=allow_large_results, ) self._set_internal_query_job(query_job) series = df.squeeze(axis=1) From f0b632e243baeb71c187ebc6c3bf38ad50d5cedb Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Tue, 25 Feb 2025 22:37:16 +0000 Subject: [PATCH 02/18] add to_arrow --- bigframes/core/blocks.py | 7 ++++++- bigframes/dataframe.py | 8 +++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 78c4631e40..fe62312168 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -488,9 +488,14 @@ def to_arrow( self, *, ordered: bool = True, + allow_large_results: Optional[bool] = None, ) -> Tuple[pa.Table, bigquery.QueryJob]: """Run query and download results as a pyarrow Table.""" - execute_result = self.session._executor.execute(self.expr, ordered=ordered) + if allow_large_results is None: + allow_large_results = bigframes.options.bigquery.allow_large_results + execute_result = self.session._executor.execute( + self.expr, ordered=ordered, use_explicit_destination=allow_large_results + ) pa_table = execute_result.to_arrow_table() pa_index_labels = [] diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 20f873c863..dd9299b265 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1529,6 +1529,7 @@ def to_arrow( self, *, ordered: bool = True, + allow_large_results: Optional[bool] = None, ) -> pyarrow.Table: """Write DataFrame to an Arrow table / record batch. @@ -1536,6 +1537,9 @@ def to_arrow( ordered (bool, default True): Determines whether the resulting Arrow table will be ordered. In some cases, unordered may result in a faster-executing query. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow large query results + over the default size limit of 10 GB. Returns: pyarrow.Table: A pyarrow Table with all rows and columns of this DataFrame. @@ -1543,7 +1547,9 @@ def to_arrow( msg = "to_arrow is in preview. Types and unnamed / duplicate name columns may change in future." warnings.warn(msg, category=bfe.PreviewWarning) - pa_table, query_job = self._block.to_arrow(ordered=ordered) + pa_table, query_job = self._block.to_arrow( + ordered=ordered, allow_large_results=allow_large_results + ) self._set_internal_query_job(query_job) return pa_table From 9c1e9dbfb0440291406381c0a071723fc1080ea3 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Tue, 25 Feb 2025 23:21:22 +0000 Subject: [PATCH 03/18] add the ones that only uses to_pandas() --- bigframes/core/indexes/base.py | 6 +- bigframes/dataframe.py | 49 ++++++++++----- bigframes/series.py | 54 +++++++++++----- .../bigframes_vendored/pandas/core/frame.py | 63 ++++++++++++++++--- .../pandas/core/indexes/base.py | 3 + .../bigframes_vendored/pandas/core/series.py | 59 +++++++++++++++-- 6 files changed, 190 insertions(+), 44 deletions(-) diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index 87625d3960..44cc8ba667 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -506,8 +506,10 @@ def to_pandas(self, allow_large_results: Optional[bool] = None) -> pandas.Index: ordered=True, allow_large_results=allow_large_results ) - def to_numpy(self, dtype=None, **kwargs) -> np.ndarray: - return self.to_pandas().to_numpy(dtype, **kwargs) + def to_numpy(self, dtype=None, allow_large_results=None, **kwargs) -> np.ndarray: + return self.to_pandas(allow_large_results=allow_large_results).to_numpy( + dtype, **kwargs + ) __array__ = to_numpy diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index dd9299b265..02877db803 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3720,9 +3720,11 @@ def to_gbq( return destination_table def to_numpy( - self, dtype=None, copy=False, na_value=None, **kwargs + self, dtype=None, copy=False, na_value=None, allow_large_results=None, **kwargs ) -> numpy.ndarray: - return self.to_pandas().to_numpy(dtype, copy, na_value, **kwargs) + return self.to_pandas(allow_large_results=allow_large_results).to_numpy( + dtype, copy, na_value, **kwargs + ) def __array__(self, dtype=None) -> numpy.ndarray: return self.to_numpy(dtype=dtype) @@ -3774,12 +3776,21 @@ def to_dict( "dict", "list", "series", "split", "tight", "records", "index" ] = "dict", into: type[dict] = dict, + allow_large_results: Optional[bool] = None, **kwargs, ) -> dict | list[dict]: - return self.to_pandas().to_dict(orient, into, **kwargs) # type: ignore + return self.to_pandas(allow_large_results=allow_large_results).to_dict(orient, into, **kwargs) # type: ignore - def to_excel(self, excel_writer, sheet_name: str = "Sheet1", **kwargs) -> None: - return self.to_pandas().to_excel(excel_writer, sheet_name, **kwargs) + def to_excel( + self, + excel_writer, + sheet_name: str = "Sheet1", + allow_large_results=None, + **kwargs, + ) -> None: + return self.to_pandas(allow_large_results=allow_large_results).to_excel( + excel_writer, sheet_name, **kwargs + ) def to_latex( self, @@ -3787,16 +3798,23 @@ def to_latex( columns: Sequence | None = None, header: bool | Sequence[str] = True, index: bool = True, + allow_large_results=None, **kwargs, ) -> str | None: - return self.to_pandas().to_latex( + return self.to_pandas(allow_large_results=allow_large_results).to_latex( buf, columns=columns, header=header, index=index, **kwargs # type: ignore ) def to_records( - self, index: bool = True, column_dtypes=None, index_dtypes=None + self, + index: bool = True, + column_dtypes=None, + index_dtypes=None, + allow_large_result=None, ) -> numpy.recarray: - return self.to_pandas().to_records(index, column_dtypes, index_dtypes) + return self.to_pandas(allow_large_results=allow_large_result).to_records( + index, column_dtypes, index_dtypes + ) def to_string( self, @@ -3819,8 +3837,9 @@ def to_string( min_rows: int | None = None, max_colwidth: int | None = None, encoding: str | None = None, + allow_large_result=None, ) -> str | None: - return self.to_pandas().to_string( + return self.to_pandas(allow_large_results=allow_large_result).to_string( buf, columns, # type: ignore col_space, @@ -3867,8 +3886,9 @@ def to_html( table_id: str | None = None, render_links: bool = False, encoding: str | None = None, + allow_large_results: bool | None = None, ) -> str: - return self.to_pandas().to_html( + return self.to_pandas(allow_large_results=allow_large_results).to_html( buf, columns, # type: ignore col_space, @@ -3899,15 +3919,16 @@ def to_markdown( buf=None, mode: str = "wt", index: bool = True, + allow_large_results=None, **kwargs, ) -> str | None: - return self.to_pandas().to_markdown(buf, mode, index, **kwargs) # type: ignore + return self.to_pandas(allow_large_results=allow_large_results).to_markdown(buf, mode, index, **kwargs) # type: ignore - def to_pickle(self, path, **kwargs) -> None: + def to_pickle(self, path, allow_large_results=None, **kwargs) -> None: return self.to_pandas().to_pickle(path, **kwargs) - def to_orc(self, path=None, **kwargs) -> bytes | None: - as_pandas = self.to_pandas() + def to_orc(self, path=None, allow_large_results=None, **kwargs) -> bytes | None: + as_pandas = self.to_pandas(allow_large_results=allow_large_results) # to_orc only works with default index as_pandas_default_index = as_pandas.reset_index() return as_pandas_default_index.to_orc(path, **kwargs) diff --git a/bigframes/series.py b/bigframes/series.py index 11221e15ce..00d4a59346 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1764,11 +1764,19 @@ def to_csv( path_or_buf=path_or_buf, sep=sep, header=header, index=index ) - def to_dict(self, into: type[dict] = dict) -> typing.Mapping: - return typing.cast(dict, self.to_pandas().to_dict(into)) # type: ignore + def to_dict( + self, + into: type[dict] = dict, + allow_large_results: Optional[bool] = None, + ) -> typing.Mapping: + return typing.cast(dict, self.to_pandas(allow_large_results=allow_large_results).to_dict(into)) # type: ignore - def to_excel(self, excel_writer, sheet_name="Sheet1", **kwargs) -> None: - return self.to_pandas().to_excel(excel_writer, sheet_name, **kwargs) + def to_excel( + self, excel_writer, sheet_name="Sheet1", allow_large_results=None, **kwargs + ) -> None: + return self.to_pandas(allow_large_results=allow_large_results).to_excel( + excel_writer, sheet_name, **kwargs + ) def to_json( self, @@ -1791,14 +1799,23 @@ def to_json( ) def to_latex( - self, buf=None, columns=None, header=True, index=True, **kwargs + self, + buf=None, + columns=None, + header=True, + index=True, + allow_large_results=None, + **kwargs, ) -> typing.Optional[str]: - return self.to_pandas().to_latex( + return self.to_pandas(allow_large_results=allow_large_results).to_latex( buf, columns=columns, header=header, index=index, **kwargs ) - def tolist(self) -> _list: - return self.to_pandas().to_list() + def tolist( + self, + allow_large_results: Optional[bool] = None, + ) -> _list: + return self.to_pandas(allow_large_results=allow_large_results).to_list() to_list = tolist to_list.__doc__ = inspect.getdoc(vendored_pandas_series.Series.tolist) @@ -1808,21 +1825,24 @@ def to_markdown( buf: typing.IO[str] | None = None, mode: str = "wt", index: bool = True, + allow_large_results=None, **kwargs, ) -> typing.Optional[str]: - return self.to_pandas().to_markdown(buf, mode=mode, index=index, **kwargs) # type: ignore + return self.to_pandas(allow_large_results=allow_large_results).to_markdown(buf, mode=mode, index=index, **kwargs) # type: ignore def to_numpy( - self, dtype=None, copy=False, na_value=None, **kwargs + self, dtype=None, copy=False, na_value=None, allow_large_results=None, **kwargs ) -> numpy.ndarray: - return self.to_pandas().to_numpy(dtype, copy, na_value, **kwargs) + return self.to_pandas(allow_large_results=allow_large_results).to_numpy( + dtype, copy, na_value, **kwargs + ) def __array__(self, dtype=None) -> numpy.ndarray: return self.to_numpy(dtype=dtype) __array__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__array__) - def to_pickle(self, path, **kwargs) -> None: + def to_pickle(self, path, allow_large_results=None, **kwargs) -> None: return self.to_pandas().to_pickle(path, **kwargs) def to_string( @@ -1837,8 +1857,9 @@ def to_string( name=False, max_rows=None, min_rows=None, + allow_large_result=None, ) -> typing.Optional[str]: - return self.to_pandas().to_string( + return self.to_pandas(allow_large_results=allow_large_result).to_string( buf, na_rep, float_format, @@ -1851,8 +1872,11 @@ def to_string( min_rows, ) - def to_xarray(self): - return self.to_pandas().to_xarray() + def to_xarray( + self, + allow_large_results=None, + ): + return self.to_pandas(allow_large_results=allow_large_results).to_xarray() def _throw_if_index_contains_duplicates( self, error_message: typing.Optional[str] = None diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index f5aa23d00b..8aba2ad7af 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -365,7 +365,9 @@ def from_records( """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def to_numpy(self, dtype=None, copy=False, na_value=None, **kwargs) -> np.ndarray: + def to_numpy( + self, dtype=None, copy=False, na_value=None, allow_large_results=None, **kwargs + ) -> np.ndarray: """ Convert the DataFrame to a NumPy array. @@ -388,7 +390,9 @@ def to_numpy(self, dtype=None, copy=False, na_value=None, **kwargs) -> np.ndarra na_value (Any, default None): The value to use for missing values. The default value depends on dtype and the dtypes of the DataFrame columns. - + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow + large query results over the default size limit of 10 GB. Returns: numpy.ndarray: The converted NumPy array. """ @@ -560,6 +564,7 @@ def to_dict( "dict", "list", "series", "split", "tight", "records", "index" ] = "dict", into: type[dict] = dict, + allow_large_results: Optional[bool] = None, **kwargs, ) -> dict | list[dict]: """ @@ -613,11 +618,13 @@ def to_dict( in the return value. Can be the actual class or an empty instance of the mapping type you want. If you want a collections.defaultdict, you must pass it initialized. - index (bool, default True): Whether to include the index item (and index_names item if `orient` is 'tight') in the returned dictionary. Can only be ``False`` when `orient` is 'split' or 'tight'. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow large + query results over the default size limit of 10 GB. Returns: dict or list of dict: Return a collections.abc.Mapping object representing the DataFrame. @@ -625,7 +632,13 @@ def to_dict( """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def to_excel(self, excel_writer, sheet_name: str = "Sheet1", **kwargs) -> None: + def to_excel( + self, + excel_writer, + sheet_name: str = "Sheet1", + allow_large_results=None, + **kwargs, + ) -> None: """ Write DataFrame to an Excel sheet. @@ -653,11 +666,20 @@ def to_excel(self, excel_writer, sheet_name: str = "Sheet1", **kwargs) -> None: File path or existing ExcelWriter. sheet_name (str, default 'Sheet1'): Name of sheet which will contain DataFrame. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow large + query results over the default size limit of 10 GB. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) def to_latex( - self, buf=None, columns=None, header=True, index=True, **kwargs + self, + buf=None, + columns=None, + header=True, + index=True, + allow_large_results=None, + **kwargs, ) -> str | None: r""" Render object to a LaTeX tabular, longtable, or nested table. @@ -693,6 +715,9 @@ def to_latex( it is assumed to be aliases for the column names. index (bool, default True): Write row names (index). + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow large + query results over the default size limit of 10 GB. Returns: str or None: If buf is None, returns the result as a string. Otherwise returns @@ -701,7 +726,11 @@ def to_latex( raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) def to_records( - self, index: bool = True, column_dtypes=None, index_dtypes=None + self, + index: bool = True, + column_dtypes=None, + index_dtypes=None, + allow_large_result=None, ) -> np.recarray: """ Convert DataFrame to a NumPy record array. @@ -731,6 +760,9 @@ def to_records( If a string or type, the data type to store all index levels. If a dictionary, a mapping of index level names and indices (zero-indexed) to specific data types. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow large + query results over the default size limit of 10 GB. This mapping is applied only if `index=True`. @@ -824,6 +856,9 @@ def to_string( Max width to truncate each column in characters. By default, no limit. encoding (str, default "utf-8"): Set character encoding. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow large + query results over the default size limit of 10 GB. Returns: str or None: If buf is None, returns the result as a string. Otherwise returns @@ -856,6 +891,7 @@ def to_html( table_id: str | None = None, render_links: bool = False, encoding: str | None = None, + allow_large_results: bool | None = None, ): """Render a DataFrame as an HTML table. @@ -948,6 +984,9 @@ def to_html( Convert URLs to HTML links. encoding (str, default "utf-8"): Set character encoding. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow + large query results over the default size limit of 10 GB. Returns: str or None: If buf is None, returns the result as a string. Otherwise @@ -960,6 +999,7 @@ def to_markdown( buf=None, mode: str = "wt", index: bool = True, + allow_large_results=None, **kwargs, ): """Print DataFrame in Markdown-friendly format. @@ -983,6 +1023,9 @@ def to_markdown( Mode in which file is opened. index (bool, optional, default True): Add index (row) labels. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow + large query results over the default size limit of 10 GB. **kwargs These parameters will be passed to `tabulate `_. @@ -1007,10 +1050,13 @@ def to_pickle(self, path, **kwargs) -> None: Args: path (str): File path where the pickled object will be stored. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow + large query results over the default size limit of 10 GB. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def to_orc(self, path=None, **kwargs) -> bytes | None: + def to_orc(self, path=None, allow_large_results=None, **kwargs) -> bytes | None: """ Write a DataFrame to the ORC format. @@ -1030,6 +1076,9 @@ def to_orc(self, path=None, **kwargs) -> bytes | None: we refer to objects with a write() method, such as a file handle (e.g. via builtin open function). If path is None, a bytes object is returned. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow + large query results over the default size limit of 10 GB. Returns: bytes or None: diff --git a/third_party/bigframes_vendored/pandas/core/indexes/base.py b/third_party/bigframes_vendored/pandas/core/indexes/base.py index 59504ee68c..e86f2223a5 100644 --- a/third_party/bigframes_vendored/pandas/core/indexes/base.py +++ b/third_party/bigframes_vendored/pandas/core/indexes/base.py @@ -1068,6 +1068,9 @@ def to_numpy(self, dtype): Args: dtype: The dtype to pass to :meth:`numpy.asarray`. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow + large query results over the default size limit of 10 GB. **kwargs: Additional keywords passed through to the ``to_numpy`` method of the underlying array (for extension arrays). diff --git a/third_party/bigframes_vendored/pandas/core/series.py b/third_party/bigframes_vendored/pandas/core/series.py index 57f7dfbb79..d037faf5ae 100644 --- a/third_party/bigframes_vendored/pandas/core/series.py +++ b/third_party/bigframes_vendored/pandas/core/series.py @@ -486,6 +486,9 @@ def to_string( min_rows (int, optional): The number of rows to display in a truncated repr (when number of rows is above `max_rows`). + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow large + query results over the default size limit of 10 GB. Returns: str or None: @@ -498,6 +501,7 @@ def to_markdown( buf: IO[str] | None = None, mode: str = "wt", index: bool = True, + allow_large_results=None, **kwargs, ) -> str | None: """ @@ -537,6 +541,9 @@ def to_markdown( Buffer to write to. If None, the output is returned as a string. mode (str, optional): Mode in which file is opened, "wt" by default. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow + large query results over the default size limit of 10 GB. index (bool, optional, default True): Add index (row) labels. @@ -546,7 +553,11 @@ def to_markdown( """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def to_dict(self, into: type[dict] = dict) -> Mapping: + def to_dict( + self, + into: type[dict] = dict, + allow_large_results: Optional[bool] = None, + ) -> Mapping: """ Convert Series to {label -> value} dict or dict-like object. @@ -573,6 +584,9 @@ def to_dict(self, into: type[dict] = dict) -> Mapping: object. Can be the actual class or an empty instance of the mapping type you want. If you want a collections.defaultdict, you must pass it initialized. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow large + query results over the default size limit of 10 GB. Returns: collections.abc.Mapping: @@ -611,7 +625,12 @@ def to_frame(self, name=None) -> DataFrame: """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def to_excel(self, excel_writer, sheet_name): + def to_excel( + self, + excel_writer, + sheet_name, + allow_large_results=None, + ): """ Write Series to an Excel sheet. @@ -630,10 +649,21 @@ def to_excel(self, excel_writer, sheet_name): File path or existing ExcelWriter. sheet_name (str, default 'Sheet1'): Name of sheet to contain Series. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow large + query results over the default size limit of 10 GB. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def to_latex(self, buf=None, columns=None, header=True, index=True, **kwargs): + def to_latex( + self, + buf=None, + columns=None, + header=True, + index=True, + allow_large_results=None, + **kwargs, + ): """ Render object to a LaTeX tabular, longtable, or nested table. @@ -647,6 +677,9 @@ def to_latex(self, buf=None, columns=None, header=True, index=True, **kwargs): it is assumed to be aliases for the column names. index (bool, default True): Write row names (index). + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow large + query results over the default size limit of 10 GB. Returns: str or None: @@ -655,7 +688,7 @@ def to_latex(self, buf=None, columns=None, header=True, index=True, **kwargs): """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def tolist(self) -> list: + def tolist(self, allow_large_results: Optional[bool] = None) -> list: """ Return a list of the values. @@ -678,6 +711,11 @@ def tolist(self) -> list: >>> s.to_list() [1, 2, 3] + Args: + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow + large query results over the default size limit of 10 GB. + Returns: list: list of the values. @@ -686,7 +724,7 @@ def tolist(self) -> list: to_list = tolist - def to_numpy(self, dtype, copy=False, na_value=None): + def to_numpy(self, dtype, copy=False, na_value=None, allow_large_results=None): """ A NumPy ndarray representing the values in this Series or Index. @@ -727,6 +765,9 @@ def to_numpy(self, dtype, copy=False, na_value=None): na_value (Any, optional): The value to use for missing values. The default value depends on `dtype` and the type of the array. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow + large query results over the default size limit of 10 GB. ``**kwargs``: Additional keywords passed through to the ``to_numpy`` method of the underlying array (for extension arrays). @@ -776,13 +817,16 @@ def to_pickle(self, path, **kwargs): String, path object (implementing ``os.PathLike[str]``), or file-like object implementing a binary ``write()`` function. File path where the pickled object will be stored. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow + large query results over the default size limit of 10 GB. Returns: None """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def to_xarray(self): + def to_xarray(self, allow_large_results=None): """ Return an xarray object from the pandas object. @@ -791,6 +835,9 @@ def to_xarray(self): Data in the pandas structure converted to Dataset if the object is a DataFrame, or a DataArray if the object is a Series. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow large + query results over the default size limit of 10 GB. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) From 50abb66e333117fd4742b69408b66a03cd47f6b7 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Wed, 26 Feb 2025 00:11:39 +0000 Subject: [PATCH 04/18] add to_csv/json/parquet --- bigframes/core/blocks.py | 9 +-------- bigframes/dataframe.py | 18 ++++++++++++++---- bigframes/series.py | 18 ++++++++++++++---- bigframes/session/executor.py | 9 +++++++-- .../bigframes_vendored/pandas/core/frame.py | 6 ++++-- .../bigframes_vendored/pandas/core/generic.py | 9 +++++++++ 6 files changed, 49 insertions(+), 20 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index fe62312168..50844fa953 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -113,7 +113,7 @@ class MaterializationOptions: downsampling: sampling_options.SamplingOptions = dataclasses.field( default_factory=sampling_options.SamplingOptions ) - allow_large_results: bool = True + allow_large_results: Optional[bool] = None ordered: bool = True @@ -491,8 +491,6 @@ def to_arrow( allow_large_results: Optional[bool] = None, ) -> Tuple[pa.Table, bigquery.QueryJob]: """Run query and download results as a pyarrow Table.""" - if allow_large_results is None: - allow_large_results = bigframes.options.bigquery.allow_large_results execute_result = self.session._executor.execute( self.expr, ordered=ordered, use_explicit_destination=allow_large_results ) @@ -558,9 +556,6 @@ def to_pandas( else: sampling = sampling.with_disabled() - if allow_large_results is None: - allow_large_results = bigframes.options.bigquery.allow_large_results - df, query_job = self._materialize_local( materialize_options=MaterializationOptions( downsampling=sampling, @@ -592,8 +587,6 @@ def to_pandas_batches( page_size and max_results determine the size and number of batches, see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob#google_cloud_bigquery_job_QueryJob_result""" - if allow_large_results is None: - allow_large_results = bigframes.options.bigquery.allow_large_results execute_result = self.session._executor.execute( self.expr, ordered=True, diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 02877db803..e9738d786c 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3546,6 +3546,7 @@ def to_csv( *, header: bool = True, index: bool = True, + allow_large_results: Optional[bool] = None, ) -> Optional[str]: # TODO(swast): Can we support partition columns argument? # TODO(chelsealin): Support local file paths. @@ -3553,7 +3554,7 @@ def to_csv( # query results? See: # https://cloud.google.com/bigquery/docs/exporting-data#limit_the_exported_file_size if not utils.is_gcs_path(path_or_buf): - pd_df = self.to_pandas() + pd_df = self.to_pandas(allow_large_results=allow_large_results) return pd_df.to_csv(path_or_buf, sep=sep, header=header, index=index) if "*" not in path_or_buf: raise NotImplementedError(ERROR_IO_REQUIRES_WILDCARD) @@ -3572,6 +3573,7 @@ def to_csv( path_or_buf, format="csv", export_options=options, + allow_large_results=allow_large_results, ) self._set_internal_query_job(query_job) return None @@ -3585,10 +3587,11 @@ def to_json( *, lines: bool = False, index: bool = True, + allow_large_results: Optional[bool] = None, ) -> Optional[str]: # TODO(swast): Can we support partition columns argument? if not utils.is_gcs_path(path_or_buf): - pd_df = self.to_pandas() + pd_df = self.to_pandas(allow_large_results=allow_large_results) return pd_df.to_json( path_or_buf, orient=orient, @@ -3616,7 +3619,12 @@ def to_json( ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID, ) query_job = self._session._executor.export_gcs( - export_array, id_overrides, path_or_buf, format="json", export_options={} + export_array, + id_overrides, + path_or_buf, + format="json", + export_options={}, + allow_large_results=allow_large_results, ) self._set_internal_query_job(query_job) return None @@ -3737,6 +3745,7 @@ def to_parquet( *, compression: Optional[Literal["snappy", "gzip"]] = "snappy", index: bool = True, + allow_large_results: Optional[bool] = None, ) -> Optional[bytes]: # TODO(swast): Can we support partition columns argument? # TODO(chelsealin): Support local file paths. @@ -3744,7 +3753,7 @@ def to_parquet( # query results? See: # https://cloud.google.com/bigquery/docs/exporting-data#limit_the_exported_file_size if not utils.is_gcs_path(path): - pd_df = self.to_pandas() + pd_df = self.to_pandas(allow_large_results=allow_large_results) return pd_df.to_parquet(path, compression=compression, index=index) if "*" not in path: raise NotImplementedError(ERROR_IO_REQUIRES_WILDCARD) @@ -3766,6 +3775,7 @@ def to_parquet( path, format="parquet", export_options=export_options, + allow_large_results=allow_large_results, ) self._set_internal_query_job(query_job) return None diff --git a/bigframes/series.py b/bigframes/series.py index 00d4a59346..82d8bfecbc 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1753,13 +1753,18 @@ def to_csv( *, header: bool = True, index: bool = True, + allow_large_results: Optional[bool] = None, ) -> Optional[str]: if utils.is_gcs_path(path_or_buf): return self.to_frame().to_csv( - path_or_buf, sep=sep, header=header, index=index + path_or_buf, + sep=sep, + header=header, + index=index, + allow_large_results=allow_large_results, ) else: - pd_series = self.to_pandas() + pd_series = self.to_pandas(allow_large_results=allow_large_results) return pd_series.to_csv( path_or_buf=path_or_buf, sep=sep, header=header, index=index ) @@ -1787,13 +1792,18 @@ def to_json( *, lines: bool = False, index: bool = True, + allow_large_results: Optional[bool] = None, ) -> Optional[str]: if utils.is_gcs_path(path_or_buf): return self.to_frame().to_json( - path_or_buf=path_or_buf, orient=orient, lines=lines, index=index + path_or_buf=path_or_buf, + orient=orient, + lines=lines, + index=index, + allow_large_results=allow_large_results, ) else: - pd_series = self.to_pandas() + pd_series = self.to_pandas(allow_large_results=allow_large_results) return pd_series.to_json( path_or_buf=path_or_buf, orient=orient, lines=lines, index=index # type: ignore ) diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index 502692929d..c3ed6efa3d 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -104,7 +104,7 @@ def execute( *, ordered: bool = True, col_id_overrides: Mapping[str, str] = {}, - use_explicit_destination: bool = False, + use_explicit_destination: Optional[bool] = False, get_size_bytes: bool = False, page_size: Optional[int] = None, max_results: Optional[int] = None, @@ -134,6 +134,7 @@ def export_gcs( uri: str, format: Literal["json", "csv", "parquet"], export_options: Mapping[str, Union[bool, str]], + allow_large_results: Optional[bool] = None, ) -> bigquery.QueryJob: """ Export the ArrayValue to gcs. @@ -243,11 +244,13 @@ def execute( *, ordered: bool = True, col_id_overrides: Mapping[str, str] = {}, - use_explicit_destination: bool = False, + use_explicit_destination: Optional[bool] = False, get_size_bytes: bool = False, page_size: Optional[int] = None, max_results: Optional[int] = None, ): + if use_explicit_destination is None: + use_explicit_destination = bigframes.options.bigquery.allow_large_results if bigframes.options.compute.enable_multi_query_execution: self._simplify_with_caching(array_value) @@ -333,11 +336,13 @@ def export_gcs( uri: str, format: Literal["json", "csv", "parquet"], export_options: Mapping[str, Union[bool, str]], + allow_large_results: Optional[bool] = None, ): query_job = self.execute( array_value, ordered=False, col_id_overrides=col_id_overrides, + use_explicit_destination=allow_large_results, ).query_job result_table = query_job.destination export_data_statement = bq_io.create_export_data_statement( diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 8aba2ad7af..babd36754a 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -513,6 +513,7 @@ def to_parquet( *, compression: Optional[Literal["snappy", "gzip"]] = "snappy", index: bool = True, + allow_large_results: Optional[bool] = None, ) -> Optional[bytes]: """Write a DataFrame to the binary Parquet format. @@ -538,14 +539,15 @@ def to_parquet( should be formatted ``gs:///``. If the data size is more than 1GB, you must use a wildcard to export the data into multiple files and the size of the files varies. - compression (str, default 'snappy'): Name of the compression to use. Use ``None`` for no compression. Supported options: ``'gzip'``, ``'snappy'``. - index (bool, default True): If ``True``, include the dataframe's index(es) in the file output. If ``False``, they will not be written to the file. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow large + query results over the default size limit of 10 GB. Returns: None or bytes: diff --git a/third_party/bigframes_vendored/pandas/core/generic.py b/third_party/bigframes_vendored/pandas/core/generic.py index 9dae802b6e..a8ff16b6db 100644 --- a/third_party/bigframes_vendored/pandas/core/generic.py +++ b/third_party/bigframes_vendored/pandas/core/generic.py @@ -223,6 +223,7 @@ def to_json( *, index: bool = True, lines: bool = False, + allow_large_results: Optional[bool] = None, ) -> Optional[str]: """Convert the object to a JSON string, written to Cloud Storage. @@ -278,6 +279,10 @@ def to_json( throw ValueError if incorrect 'orient' since others are not list-like. + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow large + query results over the default size limit of 10 GB. + Returns: None or str: If path_or_buf is None, returns the resulting json format as a @@ -313,6 +318,10 @@ def to_csv(self, path_or_buf, *, index: bool = True) -> Optional[str]: index (bool, default True): If True, write row names (index). + allow_large_results (bool, default None): + If not None, overrides the global setting to allow or disallow large + query results over the default size limit of 10 GB. + Returns: None or str: If path_or_buf is None, returns the resulting json format as a string. Otherwise returns None. From 6ba3d12ea2ca66a3b8cf07f89d403f125e68aee5 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Wed, 26 Feb 2025 00:19:42 +0000 Subject: [PATCH 05/18] mypy fix --- tests/unit/polars_session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/polars_session.py b/tests/unit/polars_session.py index cffd8ff7ca..6cbb247587 100644 --- a/tests/unit/polars_session.py +++ b/tests/unit/polars_session.py @@ -40,7 +40,7 @@ def execute( *, ordered: bool = True, col_id_overrides: Mapping[str, str] = {}, - use_explicit_destination: bool = False, + use_explicit_destination: Optional[bool] = False, get_size_bytes: bool = False, page_size: Optional[int] = None, max_results: Optional[int] = None, From 63a422c54ef0284538f31f9070124bb75a0a4474 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Wed, 26 Feb 2025 00:42:57 +0000 Subject: [PATCH 06/18] gcs logic update and execute logic update. --- bigframes/dataframe.py | 3 --- bigframes/session/executor.py | 8 +++++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index e9738d786c..10b44973ac 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3573,7 +3573,6 @@ def to_csv( path_or_buf, format="csv", export_options=options, - allow_large_results=allow_large_results, ) self._set_internal_query_job(query_job) return None @@ -3624,7 +3623,6 @@ def to_json( path_or_buf, format="json", export_options={}, - allow_large_results=allow_large_results, ) self._set_internal_query_job(query_job) return None @@ -3775,7 +3773,6 @@ def to_parquet( path, format="parquet", export_options=export_options, - allow_large_results=allow_large_results, ) self._set_internal_query_job(query_job) return None diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index c3ed6efa3d..c60ca17201 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -134,7 +134,6 @@ def export_gcs( uri: str, format: Literal["json", "csv", "parquet"], export_options: Mapping[str, Union[bool, str]], - allow_large_results: Optional[bool] = None, ) -> bigquery.QueryJob: """ Export the ArrayValue to gcs. @@ -251,6 +250,10 @@ def execute( ): if use_explicit_destination is None: use_explicit_destination = bigframes.options.bigquery.allow_large_results + + if ordered and use_explicit_destination: + # Setting both 'ordered' and 'use_explicit_destination' to True is not allowed + use_explicit_destination = False if bigframes.options.compute.enable_multi_query_execution: self._simplify_with_caching(array_value) @@ -336,13 +339,12 @@ def export_gcs( uri: str, format: Literal["json", "csv", "parquet"], export_options: Mapping[str, Union[bool, str]], - allow_large_results: Optional[bool] = None, ): query_job = self.execute( array_value, ordered=False, col_id_overrides=col_id_overrides, - use_explicit_destination=allow_large_results, + use_explicit_destination=True, ).query_job result_table = query_job.destination export_data_statement = bq_io.create_export_data_statement( From 8585ab84d6c1bece1e433f1442c400eb5e509934 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Wed, 26 Feb 2025 02:29:15 +0000 Subject: [PATCH 07/18] add to_pandas_batches and to_pandas large test. --- bigframes/session/executor.py | 13 +++++++++---- third_party/bigframes_vendored/pandas/core/frame.py | 3 ++- .../bigframes_vendored/pandas/core/generic.py | 6 ++++-- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index c60ca17201..34d3537e43 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -60,6 +60,7 @@ _MAX_CLUSTER_COLUMNS = 4 # TODO: b/338258028 Enable pruning to reduce text size. ENABLE_PRUNING = False +MAX_SMALL_RESULT_BYTES = 10 * 1024 * 1024 * 1024 # 10G @dataclasses.dataclass(frozen=True) @@ -251,9 +252,6 @@ def execute( if use_explicit_destination is None: use_explicit_destination = bigframes.options.bigquery.allow_large_results - if ordered and use_explicit_destination: - # Setting both 'ordered' and 'use_explicit_destination' to True is not allowed - use_explicit_destination = False if bigframes.options.compute.enable_multi_query_execution: self._simplify_with_caching(array_value) @@ -281,11 +279,18 @@ def execute( def iterator_supplier(): return iterator.to_arrow_iterable(bqstorage_client=self.bqstoragereadclient) - if get_size_bytes is True: + if get_size_bytes is True or use_explicit_destination: size_bytes = self.bqclient.get_table(query_job.destination).num_bytes else: size_bytes = None + if size_bytes is not None and size_bytes >= MAX_SMALL_RESULT_BYTES: + warnings.warn( + "The query result size has exceeded 10 GB. In BigFrames 2.0 and " + "later, you might need to manually set allow_large_results = True.", + FutureWarning, + ) + # Runs strict validations to ensure internal type predictions and ibis are completely in sync # Do not execute these validations outside of testing suite. if "PYTEST_CURRENT_TEST" in os.environ and len(col_id_overrides) == 0: diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index b0baa06e17..0bb0300fa9 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -547,7 +547,8 @@ def to_parquet( If ``False``, they will not be written to the file. allow_large_results (bool, default None): If not None, overrides the global setting to allow or disallow large - query results over the default size limit of 10 GB. + query results over the default size limit of 10 GB. This parameter has + no effect when results are saved to Google Cloud Storage (GCS). Returns: None or bytes: diff --git a/third_party/bigframes_vendored/pandas/core/generic.py b/third_party/bigframes_vendored/pandas/core/generic.py index a8ff16b6db..83ac46321d 100644 --- a/third_party/bigframes_vendored/pandas/core/generic.py +++ b/third_party/bigframes_vendored/pandas/core/generic.py @@ -281,7 +281,8 @@ def to_json( allow_large_results (bool, default None): If not None, overrides the global setting to allow or disallow large - query results over the default size limit of 10 GB. + query results over the default size limit of 10 GB. This parameter has + no effect when results are saved to Google Cloud Storage (GCS). Returns: None or str: @@ -320,7 +321,8 @@ def to_csv(self, path_or_buf, *, index: bool = True) -> Optional[str]: allow_large_results (bool, default None): If not None, overrides the global setting to allow or disallow large - query results over the default size limit of 10 GB. + query results over the default size limit of 10 GB. This parameter has + no effect when results are saved to Google Cloud Storage (GCS). Returns: None or str: If path_or_buf is None, returns the resulting json format as a From 6bbeeea609af8d4a9cfed19e178c6c411128215a Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Wed, 26 Feb 2025 02:31:31 +0000 Subject: [PATCH 08/18] add to_pandas_batches and to_pandas large test. --- tests/system/large/test_dataframe_io.py | 60 +++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 tests/system/large/test_dataframe_io.py diff --git a/tests/system/large/test_dataframe_io.py b/tests/system/large/test_dataframe_io.py new file mode 100644 index 0000000000..73d096f753 --- /dev/null +++ b/tests/system/large/test_dataframe_io.py @@ -0,0 +1,60 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import warnings + +import google.api_core.exceptions +import pytest + +import bigframes + +WIKIPEDIA_TABLE = "bigquery-public-data.samples.wikipedia" +LARGE_TABLE_OPTION = "bigquery.allow_large_results" + + +def test_to_pandas_batches_raise_when_large_result_not_allowed(session): + with bigframes.option_context(LARGE_TABLE_OPTION, False), pytest.raises( + google.api_core.exceptions.Forbidden + ): + df = session.read_gbq(WIKIPEDIA_TABLE) + next(df.to_pandas_batches(page_size=500, max_results=1500)) + + +def test_to_pandas_batches_override_global_option( + session, +): + with bigframes.option_context(LARGE_TABLE_OPTION, False): + df = session.read_gbq(WIKIPEDIA_TABLE) + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + next( + df.to_pandas_batches( + page_size=500, max_results=1500, allow_large_results=True + ) + ) + assert len(w) == 2 + assert issubclass(w[0].category, FutureWarning) + assert str(w[0].message) == ( + "The query result size has exceeded 10 GB. In BigFrames 2.0 and " + "later, you might need to manually set allow_large_results = True." + ) + + +def test_to_pandas_raise_when_large_result_not_allowed(session): + with bigframes.option_context(LARGE_TABLE_OPTION, False), pytest.raises( + google.api_core.exceptions.Forbidden + ): + df = session.read_gbq(WIKIPEDIA_TABLE) + next(df.to_pandas()) From c9e67b058dc49e2dba03a1f2cb8c3a38a7bb6e8f Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Wed, 26 Feb 2025 23:12:54 +0000 Subject: [PATCH 09/18] add unit tests --- bigframes/dataframe.py | 12 ++++++----- bigframes/series.py | 8 ++++--- tests/unit/resources.py | 1 + tests/unit/test_dataframe_io.py | 37 +++++++++++++++++++++++++++++++++ tests/unit/test_series_io.py | 36 ++++++++++++++++++++++++++++++++ 5 files changed, 86 insertions(+), 8 deletions(-) create mode 100644 tests/unit/test_dataframe_io.py create mode 100644 tests/unit/test_series_io.py diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index a4d88f3b87..817faf4906 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3819,9 +3819,9 @@ def to_records( index: bool = True, column_dtypes=None, index_dtypes=None, - allow_large_result=None, + allow_large_results=None, ) -> numpy.recarray: - return self.to_pandas(allow_large_results=allow_large_result).to_records( + return self.to_pandas(allow_large_results=allow_large_results).to_records( index, column_dtypes, index_dtypes ) @@ -3846,9 +3846,9 @@ def to_string( min_rows: int | None = None, max_colwidth: int | None = None, encoding: str | None = None, - allow_large_result=None, + allow_large_results=None, ) -> str | None: - return self.to_pandas(allow_large_results=allow_large_result).to_string( + return self.to_pandas(allow_large_results=allow_large_results).to_string( buf, columns, # type: ignore col_space, @@ -3934,7 +3934,9 @@ def to_markdown( return self.to_pandas(allow_large_results=allow_large_results).to_markdown(buf, mode, index, **kwargs) # type: ignore def to_pickle(self, path, allow_large_results=None, **kwargs) -> None: - return self.to_pandas().to_pickle(path, **kwargs) + return self.to_pandas(allow_large_results=allow_large_results).to_pickle( + path, **kwargs + ) def to_orc(self, path=None, allow_large_results=None, **kwargs) -> bytes | None: as_pandas = self.to_pandas(allow_large_results=allow_large_results) diff --git a/bigframes/series.py b/bigframes/series.py index f73915520c..58eaa6692d 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1861,7 +1861,9 @@ def __array__(self, dtype=None, copy: Optional[bool] = None) -> numpy.ndarray: __array__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__array__) def to_pickle(self, path, allow_large_results=None, **kwargs) -> None: - return self.to_pandas().to_pickle(path, **kwargs) + return self.to_pandas(allow_large_results=allow_large_results).to_pickle( + path, **kwargs + ) def to_string( self, @@ -1875,9 +1877,9 @@ def to_string( name=False, max_rows=None, min_rows=None, - allow_large_result=None, + allow_large_results=None, ) -> typing.Optional[str]: - return self.to_pandas(allow_large_results=allow_large_result).to_string( + return self.to_pandas(allow_large_results=allow_large_results).to_string( buf, na_rep, float_format, diff --git a/tests/unit/resources.py b/tests/unit/resources.py index c091eac2a2..ebc1243eaf 100644 --- a/tests/unit/resources.py +++ b/tests/unit/resources.py @@ -24,6 +24,7 @@ import bigframes.clients import bigframes.core.ordering import bigframes.dataframe +import bigframes.series import bigframes.session.clients import bigframes.session.executor import bigframes.session.metrics diff --git a/tests/unit/test_dataframe_io.py b/tests/unit/test_dataframe_io.py new file mode 100644 index 0000000000..af6f22e09c --- /dev/null +++ b/tests/unit/test_dataframe_io.py @@ -0,0 +1,37 @@ +from unittest.mock import Mock + +import pytest + +from . import resources + + +@pytest.fixture +def mock_df(monkeypatch: pytest.MonkeyPatch): + dataframe = resources.create_dataframe(monkeypatch) + monkeypatch.setattr(dataframe, "to_pandas", Mock()) + return dataframe + + +@pytest.mark.parametrize( + "api_name, kwargs", + [ + ("to_csv", {"allow_large_results": True}), + ("to_json", {"allow_large_results": True}), + ("to_numpy", {"allow_large_results": True}), + ("to_parquet", {"allow_large_results": True}), + ("to_dict", {"allow_large_results": True}), + ("to_excel", {"excel_writer": "abc", "allow_large_results": True}), + ("to_latex", {"allow_large_results": True}), + ("to_records", {"allow_large_results": True}), + ("to_string", {"allow_large_results": True}), + ("to_html", {"allow_large_results": True}), + ("to_markdown", {"allow_large_results": True}), + ("to_pickle", {"path": "abc", "allow_large_results": True}), + ("to_orc", {"allow_large_results": True}), + ], +) +def test_dataframe_to_pandas(mock_df, api_name, kwargs): + getattr(mock_df, api_name)(**kwargs) + mock_df.to_pandas.assert_called_once_with( + allow_large_results=kwargs["allow_large_results"] + ) diff --git a/tests/unit/test_series_io.py b/tests/unit/test_series_io.py new file mode 100644 index 0000000000..00b796a39e --- /dev/null +++ b/tests/unit/test_series_io.py @@ -0,0 +1,36 @@ +from unittest.mock import Mock + +import pytest + +from . import resources + + +@pytest.fixture +def mock_series(monkeypatch: pytest.MonkeyPatch): + dataframe = resources.create_dataframe(monkeypatch) + series = dataframe["col"] + monkeypatch.setattr(series, "to_pandas", Mock()) + return series + + +@pytest.mark.parametrize( + "api_name, kwargs", + [ + ("to_csv", {"allow_large_results": True}), + ("to_dict", {"allow_large_results": True}), + ("to_excel", {"excel_writer": "abc", "allow_large_results": True}), + ("to_json", {"allow_large_results": True}), + ("to_latex", {"allow_large_results": True}), + ("to_list", {"allow_large_results": True}), + ("to_markdown", {"allow_large_results": True}), + ("to_numpy", {"allow_large_results": True}), + ("to_pickle", {"path": "abc", "allow_large_results": True}), + ("to_string", {"allow_large_results": True}), + ("to_xarray", {"allow_large_results": True}), + ], +) +def test_series_allow_large_results_param_passing(mock_series, api_name, kwargs): + getattr(mock_series, api_name)(**kwargs) + mock_series.to_pandas.assert_called_once_with( + allow_large_results=kwargs["allow_large_results"] + ) From 1277613fa46f71b927f6def79f08c5dd5309aabd Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Thu, 27 Feb 2025 00:12:34 +0000 Subject: [PATCH 10/18] add to_pandas and to_arrow override test --- tests/system/small/test_dataframe_io.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index b07213f943..264ca36157 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -249,6 +249,30 @@ def test_to_pandas_array_struct_correct_result(session): ) +def test_to_pandas_override_global_option(scalars_df_index): + # Direct call to_pandas uses global default setting (allow_large_results=True), + # table has 'bqdf' prefix. + scalars_df_index.to_pandas() + assert scalars_df_index._query_job.destination.table_id.startswith("bqdf") + + # When allow_large_results=False, a destination table is implicitly created, + # table has 'anon' prefix. + scalars_df_index.to_pandas(allow_large_results=False) + assert scalars_df_index._query_job.destination.table_id.startswith("anon") + + +def test_to_arrow_override_global_option(scalars_df_index): + # Direct call to_pandas uses global default setting (allow_large_results=True), + # table has 'bqdf' prefix. + scalars_df_index.to_arrow() + assert scalars_df_index._query_job.destination.table_id.startswith("bqdf") + + # When allow_large_results=False, a destination table is implicitly created, + # table has 'anon' prefix. + scalars_df_index.to_arrow(allow_large_results=False) + assert scalars_df_index._query_job.destination.table_id.startswith("anon") + + def test_load_json_w_unboxed_py_value(session): sql = """ SELECT 0 AS id, JSON_OBJECT('boolean', True) AS json_col, From bb6c9bf531f0bce585cba57b8d7ae051ef34c272 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Thu, 27 Feb 2025 00:13:23 +0000 Subject: [PATCH 11/18] add to_pandas and to_arrow override test --- tests/system/small/test_series_io.py | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 tests/system/small/test_series_io.py diff --git a/tests/system/small/test_series_io.py b/tests/system/small/test_series_io.py new file mode 100644 index 0000000000..b114a23699 --- /dev/null +++ b/tests/system/small/test_series_io.py @@ -0,0 +1,11 @@ +def test_to_pandas_override_global_option(scalars_df_index): + bf_series = scalars_df_index["int64_col"] + # Direct call to_pandas uses global default setting (allow_large_results=True), + # table has 'bqdf' prefix. + bf_series.to_pandas() + assert bf_series._query_job.destination.table_id.startswith("bqdf") + + # When allow_large_results=False, a destination table is implicitly created, + # table has 'anon' prefix. + bf_series.to_pandas(allow_large_results=False) + assert bf_series._query_job.destination.table_id.startswith("anon") From 8ced1b62dbf16716450c910453ee89375d71ad2f Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Thu, 27 Feb 2025 00:15:14 +0000 Subject: [PATCH 12/18] add copyright --- tests/system/small/test_series_io.py | 15 +++++++++++++++ tests/unit/test_dataframe_io.py | 14 ++++++++++++++ tests/unit/test_series_io.py | 14 ++++++++++++++ 3 files changed, 43 insertions(+) diff --git a/tests/system/small/test_series_io.py b/tests/system/small/test_series_io.py index b114a23699..ed27246a80 100644 --- a/tests/system/small/test_series_io.py +++ b/tests/system/small/test_series_io.py @@ -1,3 +1,18 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + def test_to_pandas_override_global_option(scalars_df_index): bf_series = scalars_df_index["int64_col"] # Direct call to_pandas uses global default setting (allow_large_results=True), diff --git a/tests/unit/test_dataframe_io.py b/tests/unit/test_dataframe_io.py index af6f22e09c..5deb0d7a24 100644 --- a/tests/unit/test_dataframe_io.py +++ b/tests/unit/test_dataframe_io.py @@ -1,3 +1,17 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from unittest.mock import Mock import pytest diff --git a/tests/unit/test_series_io.py b/tests/unit/test_series_io.py index 00b796a39e..a97293d3da 100644 --- a/tests/unit/test_series_io.py +++ b/tests/unit/test_series_io.py @@ -1,3 +1,17 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from unittest.mock import Mock import pytest From 475958cfa65386f832e1f2926b894604d3620cf0 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Thu, 27 Feb 2025 00:28:30 +0000 Subject: [PATCH 13/18] modify index to_pandas to match behavior of series and df, add tests. --- bigframes/core/blocks.py | 11 ++++---- bigframes/core/indexes/base.py | 4 ++- tests/system/small/test_index_io.py | 39 +++++++++++++++++++++++++++++ 3 files changed, 47 insertions(+), 7 deletions(-) create mode 100644 tests/system/small/test_index_io.py diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 50844fa953..f6163eb6eb 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1719,7 +1719,7 @@ def transpose( original_row_index = ( original_row_index if original_row_index is not None - else self.index.to_pandas(ordered=True) + else self.index.to_pandas(ordered=True)[0] ) original_row_count = len(original_row_index) if original_row_count > bigframes.constants.MAX_COLUMNS: @@ -2683,18 +2683,17 @@ def to_pandas( *, ordered: Optional[bool] = None, allow_large_results: Optional[bool] = None, - ) -> pd.Index: + ) -> Tuple[pd.Index, Optional[bigquery.QueryJob]]: """Executes deferred operations and downloads the results.""" if len(self.column_ids) == 0: raise bigframes.exceptions.NullIndexError( "Cannot materialize index, as this object does not have an index. Set index column(s) using set_index." ) ordered = ordered if ordered is not None else True - return ( - self._block.select_columns([]) - .to_pandas(ordered=ordered, allow_large_results=allow_large_results)[0] - .index + df, query_job = self._block.select_columns([]).to_pandas( + ordered=ordered, allow_large_results=allow_large_results ) + return df.index, query_job def resolve_level(self, level: LevelsType) -> typing.Sequence[str]: if utils.is_list_like(level): diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index 44cc8ba667..ddbf3cb4ed 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -502,9 +502,11 @@ def to_pandas(self, allow_large_results: Optional[bool] = None) -> pandas.Index: pandas.Index: A pandas Index with all of the labels from this Index. """ - return self._block.index.to_pandas( + df, query_job = self._block.index.to_pandas( ordered=True, allow_large_results=allow_large_results ) + self._query_job = query_job + return df def to_numpy(self, dtype=None, allow_large_results=None, **kwargs) -> np.ndarray: return self.to_pandas(allow_large_results=allow_large_results).to_numpy( diff --git a/tests/system/small/test_index_io.py b/tests/system/small/test_index_io.py new file mode 100644 index 0000000000..31818dfad8 --- /dev/null +++ b/tests/system/small/test_index_io.py @@ -0,0 +1,39 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def test_to_pandas_override_global_option(scalars_df_index): + bf_index = scalars_df_index.index + # Direct call to_pandas uses global default setting (allow_large_results=True), + # table has 'bqdf' prefix. + bf_index.to_pandas() + assert bf_index._query_job.destination.table_id.startswith("bqdf") + + # When allow_large_results=False, a destination table is implicitly created, + # table has 'anon' prefix. + bf_index.to_pandas(allow_large_results=False) + assert bf_index._query_job.destination.table_id.startswith("anon") + + +def test_to_numpy_override_global_option(scalars_df_index): + bf_index = scalars_df_index.index + # Direct call to_pandas uses global default setting (allow_large_results=True), + # table has 'bqdf' prefix. + bf_index.to_numpy() + assert bf_index._query_job.destination.table_id.startswith("bqdf") + + # When allow_large_results=False, a destination table is implicitly created, + # table has 'anon' prefix. + bf_index.to_numpy(allow_large_results=False) + assert bf_index._query_job.destination.table_id.startswith("anon") From 584d27db04d132e32c69d21ecc48200a8c397bd6 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Thu, 27 Feb 2025 00:50:04 +0000 Subject: [PATCH 14/18] update warning message --- tests/system/large/test_dataframe_io.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/system/large/test_dataframe_io.py b/tests/system/large/test_dataframe_io.py index 73d096f753..cbe07fd835 100644 --- a/tests/system/large/test_dataframe_io.py +++ b/tests/system/large/test_dataframe_io.py @@ -48,7 +48,9 @@ def test_to_pandas_batches_override_global_option( assert issubclass(w[0].category, FutureWarning) assert str(w[0].message) == ( "The query result size has exceeded 10 GB. In BigFrames 2.0 and " - "later, you might need to manually set allow_large_results = True." + "later, you might need to manually set `allow_large_results=True` in " + "the IO method or adjust the BigFrames option: " + "`bigframes.options.bigquery.allow_large_results=True`.", ) From df078c307f6920131c596a845882df7bd5352cfd Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Thu, 27 Feb 2025 00:58:01 +0000 Subject: [PATCH 15/18] update warning message --- bigframes/session/executor.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index 72f2be7982..7c9cec1fc5 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -286,10 +286,11 @@ def iterator_supplier(): if size_bytes is not None and size_bytes >= MAX_SMALL_RESULT_BYTES: warnings.warn( "The query result size has exceeded 10 GB. In BigFrames 2.0 and " - "later, you might need to manually set allow_large_results = True.", + "later, you might need to manually set `allow_large_results=True` in " + "the IO method or adjust the BigFrames option: " + "`bigframes.options.bigquery.allow_large_results=True`.", FutureWarning, ) - # Runs strict validations to ensure internal type predictions and ibis are completely in sync # Do not execute these validations outside of testing suite. if "PYTEST_CURRENT_TEST" in os.environ and len(col_id_overrides) == 0: From 3c75a46fb404ae6ce41d23062a7371dfeeed81eb Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Thu, 27 Feb 2025 01:04:55 +0000 Subject: [PATCH 16/18] update warning message test --- tests/system/large/test_dataframe_io.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tests/system/large/test_dataframe_io.py b/tests/system/large/test_dataframe_io.py index cbe07fd835..c055babce6 100644 --- a/tests/system/large/test_dataframe_io.py +++ b/tests/system/large/test_dataframe_io.py @@ -46,11 +46,8 @@ def test_to_pandas_batches_override_global_option( ) assert len(w) == 2 assert issubclass(w[0].category, FutureWarning) - assert str(w[0].message) == ( - "The query result size has exceeded 10 GB. In BigFrames 2.0 and " - "later, you might need to manually set `allow_large_results=True` in " - "the IO method or adjust the BigFrames option: " - "`bigframes.options.bigquery.allow_large_results=True`.", + assert str(w[0].message).startswith( + "The query result size has exceeded 10 GB." ) From 1bc2eb11437a00d74b04d6739a2dad49def11fb6 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Thu, 27 Feb 2025 19:13:40 +0000 Subject: [PATCH 17/18] update parameters --- bigframes/core/indexes/base.py | 4 +-- bigframes/dataframe.py | 28 ++++++++++++++----- bigframes/series.py | 22 +++++++++++---- .../bigframes_vendored/pandas/core/frame.py | 26 +++++++++++++---- .../bigframes_vendored/pandas/core/generic.py | 8 +++++- .../pandas/core/indexes/base.py | 2 +- .../bigframes_vendored/pandas/core/series.py | 16 +++++++---- 7 files changed, 79 insertions(+), 27 deletions(-) diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index ddbf3cb4ed..3f48fd3db2 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -490,7 +490,7 @@ def __getitem__(self, key: int) -> typing.Any: else: raise NotImplementedError(f"Index key not supported {key}") - def to_pandas(self, allow_large_results: Optional[bool] = None) -> pandas.Index: + def to_pandas(self, *, allow_large_results: Optional[bool] = None) -> pandas.Index: """Gets the Index as a pandas Index. Args: @@ -508,7 +508,7 @@ def to_pandas(self, allow_large_results: Optional[bool] = None) -> pandas.Index: self._query_job = query_job return df - def to_numpy(self, dtype=None, allow_large_results=None, **kwargs) -> np.ndarray: + def to_numpy(self, dtype=None, *, allow_large_results=None, **kwargs) -> np.ndarray: return self.to_pandas(allow_large_results=allow_large_results).to_numpy( dtype, **kwargs ) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 14eb853373..b1d24cdfa8 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1644,6 +1644,7 @@ def to_pandas_batches( self, page_size: Optional[int] = None, max_results: Optional[int] = None, + *, allow_large_results: Optional[bool] = None, ) -> Iterable[pandas.DataFrame]: """Stream DataFrame results to an iterable of pandas DataFrame. @@ -3763,7 +3764,13 @@ def to_gbq( return destination_table def to_numpy( - self, dtype=None, copy=False, na_value=None, allow_large_results=None, **kwargs + self, + dtype=None, + copy=False, + na_value=None, + *, + allow_large_results=None, + **kwargs, ) -> numpy.ndarray: return self.to_pandas(allow_large_results=allow_large_results).to_numpy( dtype, copy, na_value, **kwargs @@ -3822,6 +3829,7 @@ def to_dict( "dict", "list", "series", "split", "tight", "records", "index" ] = "dict", into: type[dict] = dict, + *, allow_large_results: Optional[bool] = None, **kwargs, ) -> dict | list[dict]: @@ -3831,7 +3839,8 @@ def to_excel( self, excel_writer, sheet_name: str = "Sheet1", - allow_large_results=None, + *, + allow_large_results: Optional[bool] = None, **kwargs, ) -> None: return self.to_pandas(allow_large_results=allow_large_results).to_excel( @@ -3844,7 +3853,8 @@ def to_latex( columns: Sequence | None = None, header: bool | Sequence[str] = True, index: bool = True, - allow_large_results=None, + *, + allow_large_results: Optional[bool] = None, **kwargs, ) -> str | None: return self.to_pandas(allow_large_results=allow_large_results).to_latex( @@ -3856,6 +3866,7 @@ def to_records( index: bool = True, column_dtypes=None, index_dtypes=None, + *, allow_large_results=None, ) -> numpy.recarray: return self.to_pandas(allow_large_results=allow_large_results).to_records( @@ -3883,7 +3894,8 @@ def to_string( min_rows: int | None = None, max_colwidth: int | None = None, encoding: str | None = None, - allow_large_results=None, + *, + allow_large_results: Optional[bool] = None, ) -> str | None: return self.to_pandas(allow_large_results=allow_large_results).to_string( buf, @@ -3932,6 +3944,7 @@ def to_html( table_id: str | None = None, render_links: bool = False, encoding: str | None = None, + *, allow_large_results: bool | None = None, ) -> str: return self.to_pandas(allow_large_results=allow_large_results).to_html( @@ -3965,17 +3978,18 @@ def to_markdown( buf=None, mode: str = "wt", index: bool = True, - allow_large_results=None, + *, + allow_large_results: Optional[bool] = None, **kwargs, ) -> str | None: return self.to_pandas(allow_large_results=allow_large_results).to_markdown(buf, mode, index, **kwargs) # type: ignore - def to_pickle(self, path, allow_large_results=None, **kwargs) -> None: + def to_pickle(self, path, *, allow_large_results=None, **kwargs) -> None: return self.to_pandas(allow_large_results=allow_large_results).to_pickle( path, **kwargs ) - def to_orc(self, path=None, allow_large_results=None, **kwargs) -> bytes | None: + def to_orc(self, path=None, *, allow_large_results=None, **kwargs) -> bytes | None: as_pandas = self.to_pandas(allow_large_results=allow_large_results) # to_orc only works with default index as_pandas_default_index = as_pandas.reset_index() diff --git a/bigframes/series.py b/bigframes/series.py index 58eaa6692d..33ba6f8599 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1778,12 +1778,13 @@ def to_csv( def to_dict( self, into: type[dict] = dict, + *, allow_large_results: Optional[bool] = None, ) -> typing.Mapping: return typing.cast(dict, self.to_pandas(allow_large_results=allow_large_results).to_dict(into)) # type: ignore def to_excel( - self, excel_writer, sheet_name="Sheet1", allow_large_results=None, **kwargs + self, excel_writer, sheet_name="Sheet1", *, allow_large_results=None, **kwargs ) -> None: return self.to_pandas(allow_large_results=allow_large_results).to_excel( excel_writer, sheet_name, **kwargs @@ -1820,6 +1821,7 @@ def to_latex( columns=None, header=True, index=True, + *, allow_large_results=None, **kwargs, ) -> typing.Optional[str]: @@ -1829,6 +1831,7 @@ def to_latex( def tolist( self, + *, allow_large_results: Optional[bool] = None, ) -> _list: return self.to_pandas(allow_large_results=allow_large_results).to_list() @@ -1841,13 +1844,20 @@ def to_markdown( buf: typing.IO[str] | None = None, mode: str = "wt", index: bool = True, - allow_large_results=None, + *, + allow_large_results: Optional[bool] = None, **kwargs, ) -> typing.Optional[str]: return self.to_pandas(allow_large_results=allow_large_results).to_markdown(buf, mode=mode, index=index, **kwargs) # type: ignore def to_numpy( - self, dtype=None, copy=False, na_value=None, allow_large_results=None, **kwargs + self, + dtype=None, + copy=False, + na_value=None, + *, + allow_large_results=None, + **kwargs, ) -> numpy.ndarray: return self.to_pandas(allow_large_results=allow_large_results).to_numpy( dtype, copy, na_value, **kwargs @@ -1860,7 +1870,7 @@ def __array__(self, dtype=None, copy: Optional[bool] = None) -> numpy.ndarray: __array__.__doc__ = inspect.getdoc(vendored_pandas_series.Series.__array__) - def to_pickle(self, path, allow_large_results=None, **kwargs) -> None: + def to_pickle(self, path, *, allow_large_results=None, **kwargs) -> None: return self.to_pandas(allow_large_results=allow_large_results).to_pickle( path, **kwargs ) @@ -1877,6 +1887,7 @@ def to_string( name=False, max_rows=None, min_rows=None, + *, allow_large_results=None, ) -> typing.Optional[str]: return self.to_pandas(allow_large_results=allow_large_results).to_string( @@ -1894,7 +1905,8 @@ def to_string( def to_xarray( self, - allow_large_results=None, + *, + allow_large_results: Optional[bool] = None, ): return self.to_pandas(allow_large_results=allow_large_results).to_xarray() diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 0bb0300fa9..e59232ee85 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -366,7 +366,13 @@ def from_records( raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) def to_numpy( - self, dtype=None, copy=False, na_value=None, allow_large_results=None, **kwargs + self, + dtype=None, + copy=False, + na_value=None, + *, + allow_large_results=None, + **kwargs, ) -> np.ndarray: """ Convert the DataFrame to a NumPy array. @@ -567,6 +573,7 @@ def to_dict( "dict", "list", "series", "split", "tight", "records", "index" ] = "dict", into: type[dict] = dict, + *, allow_large_results: Optional[bool] = None, **kwargs, ) -> dict | list[dict]: @@ -639,7 +646,8 @@ def to_excel( self, excel_writer, sheet_name: str = "Sheet1", - allow_large_results=None, + *, + allow_large_results: Optional[bool] = None, **kwargs, ) -> None: """ @@ -681,6 +689,7 @@ def to_latex( columns=None, header=True, index=True, + *, allow_large_results=None, **kwargs, ) -> str | None: @@ -733,7 +742,8 @@ def to_records( index: bool = True, column_dtypes=None, index_dtypes=None, - allow_large_result=None, + *, + allow_large_results=None, ) -> np.recarray: """ Convert DataFrame to a NumPy record array. @@ -796,6 +806,8 @@ def to_string( min_rows: int | None = None, max_colwidth: int | None = None, encoding: str | None = None, + *, + allow_large_results: Optional[bool] = None, ): """Render a DataFrame to a console-friendly tabular output. @@ -894,6 +906,7 @@ def to_html( table_id: str | None = None, render_links: bool = False, encoding: str | None = None, + *, allow_large_results: bool | None = None, ): """Render a DataFrame as an HTML table. @@ -1002,7 +1015,8 @@ def to_markdown( buf=None, mode: str = "wt", index: bool = True, - allow_large_results=None, + *, + allow_large_results: Optional[bool] = None, **kwargs, ): """Print DataFrame in Markdown-friendly format. @@ -1038,7 +1052,7 @@ def to_markdown( """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def to_pickle(self, path, **kwargs) -> None: + def to_pickle(self, path, *, allow_large_results, **kwargs) -> None: """Pickle (serialize) object to file. **Examples:** @@ -1059,7 +1073,7 @@ def to_pickle(self, path, **kwargs) -> None: """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def to_orc(self, path=None, allow_large_results=None, **kwargs) -> bytes | None: + def to_orc(self, path=None, *, allow_large_results=None, **kwargs) -> bytes | None: """ Write a DataFrame to the ORC format. diff --git a/third_party/bigframes_vendored/pandas/core/generic.py b/third_party/bigframes_vendored/pandas/core/generic.py index 83ac46321d..ee35bfa429 100644 --- a/third_party/bigframes_vendored/pandas/core/generic.py +++ b/third_party/bigframes_vendored/pandas/core/generic.py @@ -295,7 +295,13 @@ def to_json( """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def to_csv(self, path_or_buf, *, index: bool = True) -> Optional[str]: + def to_csv( + self, + path_or_buf, + *, + index: bool = True, + allow_large_results: Optional[bool] = None, + ) -> Optional[str]: """Write object to a comma-separated values (csv) file on Cloud Storage. Args: diff --git a/third_party/bigframes_vendored/pandas/core/indexes/base.py b/third_party/bigframes_vendored/pandas/core/indexes/base.py index e86f2223a5..c94f707671 100644 --- a/third_party/bigframes_vendored/pandas/core/indexes/base.py +++ b/third_party/bigframes_vendored/pandas/core/indexes/base.py @@ -1061,7 +1061,7 @@ def drop_duplicates(self, *, keep: str = "first"): """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def to_numpy(self, dtype): + def to_numpy(self, dtype, *, allow_large_results=None): """ A NumPy ndarray representing the values in this Series or Index. diff --git a/third_party/bigframes_vendored/pandas/core/series.py b/third_party/bigframes_vendored/pandas/core/series.py index 0c5207e6dc..913a2e7c3e 100644 --- a/third_party/bigframes_vendored/pandas/core/series.py +++ b/third_party/bigframes_vendored/pandas/core/series.py @@ -458,6 +458,8 @@ def to_string( name: bool = False, max_rows: int | None = None, min_rows: int | None = None, + *, + allow_large_results: Optional[bool] = None, ) -> str | None: """ Render a string representation of the Series. @@ -501,7 +503,8 @@ def to_markdown( buf: IO[str] | None = None, mode: str = "wt", index: bool = True, - allow_large_results=None, + *, + allow_large_results: Optional[bool] = None, **kwargs, ) -> str | None: """ @@ -556,6 +559,7 @@ def to_markdown( def to_dict( self, into: type[dict] = dict, + *, allow_large_results: Optional[bool] = None, ) -> Mapping: """ @@ -629,6 +633,7 @@ def to_excel( self, excel_writer, sheet_name, + *, allow_large_results=None, ): """ @@ -661,6 +666,7 @@ def to_latex( columns=None, header=True, index=True, + *, allow_large_results=None, **kwargs, ): @@ -688,7 +694,7 @@ def to_latex( """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def tolist(self, allow_large_results: Optional[bool] = None) -> list: + def tolist(self, *, allow_large_results: Optional[bool] = None) -> list: """ Return a list of the values. @@ -724,7 +730,7 @@ def tolist(self, allow_large_results: Optional[bool] = None) -> list: to_list = tolist - def to_numpy(self, dtype, copy=False, na_value=None, allow_large_results=None): + def to_numpy(self, dtype, copy=False, na_value=None, *, allow_large_results=None): """ A NumPy ndarray representing the values in this Series or Index. @@ -779,7 +785,7 @@ def to_numpy(self, dtype, copy=False, na_value=None, allow_large_results=None): """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def to_pickle(self, path, **kwargs): + def to_pickle(self, path, *, allow_large_results=None, **kwargs): """ Pickle (serialize) object to file. @@ -826,7 +832,7 @@ def to_pickle(self, path, **kwargs): """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) - def to_xarray(self, allow_large_results=None): + def to_xarray(self, *, allow_large_results=None): """ Return an xarray object from the pandas object. From b7d65924c76b264c7df34f2e20897030961f868a Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Thu, 27 Feb 2025 19:41:21 +0000 Subject: [PATCH 18/18] test fix --- tests/system/small/test_dataframe.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index ecece3462a..d73a7aaf4f 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2223,8 +2223,14 @@ def test_df_corr_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only # BigFrames and Pandas differ in their data type handling: # - Column types: BigFrames uses Float64, Pandas uses float64. # - Index types: BigFrames uses strign, Pandas uses object. + pd.testing.assert_index_equal(bf_result.columns, pd_result.columns) + # Only check row order in ordered mode. pd.testing.assert_frame_equal( - bf_result, pd_result, check_dtype=False, check_index_type=False + bf_result, + pd_result, + check_dtype=False, + check_index_type=False, + check_like=~scalars_df._block.session._strictly_ordered, ) @@ -2261,8 +2267,14 @@ def test_cov_w_numeric_only(scalars_dfs_maybe_ordered, columns, numeric_only): # BigFrames and Pandas differ in their data type handling: # - Column types: BigFrames uses Float64, Pandas uses float64. # - Index types: BigFrames uses strign, Pandas uses object. + pd.testing.assert_index_equal(bf_result.columns, pd_result.columns) + # Only check row order in ordered mode. pd.testing.assert_frame_equal( - bf_result, pd_result, check_dtype=False, check_index_type=False + bf_result, + pd_result, + check_dtype=False, + check_index_type=False, + check_like=~scalars_df._block.session._strictly_ordered, )