diff --git a/bigframes/core/block_transforms.py b/bigframes/core/block_transforms.py index d22112417c..6e99a7c774 100644 --- a/bigframes/core/block_transforms.py +++ b/bigframes/core/block_transforms.py @@ -504,3 +504,75 @@ def _kurt_from_moments_and_count( kurt_id, na_cond_id, ops.partial_arg3(ops.where_op, None) ) return block, kurt_id + + +def align( + left_block: blocks.Block, + right_block: blocks.Block, + join: str = "outer", + axis: typing.Union[str, int, None] = None, +) -> typing.Tuple[blocks.Block, blocks.Block]: + axis_n = core.utils.get_axis_number(axis) if axis is not None else None + # Must align columns first as other way will likely create extra joins + if (axis_n is None) or axis_n == 1: + left_block, right_block = align_columns(left_block, right_block, join=join) + if (axis_n is None) or axis_n == 0: + left_block, right_block = align_rows(left_block, right_block, join=join) + return left_block, right_block + + +def align_rows( + left_block: blocks.Block, + right_block: blocks.Block, + join: str = "outer", +): + joined_index, (get_column_left, get_column_right) = left_block.index.join( + right_block.index, how=join + ) + left_columns = [get_column_left(col) for col in left_block.value_columns] + right_columns = [get_column_right(col) for col in right_block.value_columns] + + left_block = joined_index._block.select_columns(left_columns) + right_block = joined_index._block.select_columns(right_columns) + return left_block, right_block + + +def align_columns( + left_block: blocks.Block, + right_block: blocks.Block, + join: str = "outer", +): + columns, lcol_indexer, rcol_indexer = left_block.column_labels.join( + right_block.column_labels, how=join, return_indexers=True + ) + column_indices = zip( + lcol_indexer if (lcol_indexer is not None) else range(len(columns)), + rcol_indexer if (rcol_indexer is not None) else range(len(columns)), + ) + left_column_ids = [] + right_column_ids = [] + + original_left_block = left_block + original_right_block = right_block + + for left_index, right_index in column_indices: + if left_index >= 0: + left_col_id = original_left_block.value_columns[left_index] + else: + dtype = right_block.dtypes[right_index] + left_block, left_col_id = left_block.create_constant( + None, dtype=dtype, label=original_right_block.column_labels[right_index] + ) + left_column_ids.append(left_col_id) + + if right_index >= 0: + right_col_id = original_right_block.value_columns[right_index] + else: + dtype = original_left_block.dtypes[left_index] + right_block, right_col_id = right_block.create_constant( + None, dtype=dtype, label=left_block.column_labels[left_index] + ) + right_column_ids.append(right_col_id) + left_final = left_block.select_columns(left_column_ids) + right_final = right_block.select_columns(right_column_ids) + return left_final, right_final diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 113355589b..828d2df64e 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -745,6 +745,55 @@ def rpow( __rpow__ = rpow + def align( + self, + other: typing.Union[DataFrame, bigframes.series.Series], + join: str = "outer", + axis: typing.Union[str, int, None] = None, + ) -> typing.Tuple[ + typing.Union[DataFrame, bigframes.series.Series], + typing.Union[DataFrame, bigframes.series.Series], + ]: + axis_n = utils.get_axis_number(axis) if axis else None + if axis_n == 1 and isinstance(other, bigframes.series.Series): + raise NotImplementedError( + f"align with series and axis=1 not supported. {constants.FEEDBACK_LINK}" + ) + left_block, right_block = block_ops.align( + self._block, other._block, join=join, axis=axis + ) + return DataFrame(left_block), other.__class__(right_block) + + def update(self, other, join: str = "left", overwrite=True, filter_func=None): + other = other if isinstance(other, DataFrame) else DataFrame(other) + if join != "left": + raise ValueError("Only 'left' join supported for update") + + if filter_func is not None: # Will always take other if possible + + def update_func( + left: bigframes.series.Series, right: bigframes.series.Series + ) -> bigframes.series.Series: + return left.mask(right.notna() & filter_func(left), right) + + elif overwrite: + + def update_func( + left: bigframes.series.Series, right: bigframes.series.Series + ) -> bigframes.series.Series: + return left.mask(right.notna(), right) + + else: + + def update_func( + left: bigframes.series.Series, right: bigframes.series.Series + ) -> bigframes.series.Series: + return left.mask(left.isna(), right) + + result = self.combine(other, update_func, how=join) + + self._set_block(result._block) + def combine( self, other: DataFrame, @@ -753,56 +802,31 @@ def combine( ], fill_value=None, overwrite: bool = True, + *, + how: str = "outer", ) -> DataFrame: - # Join rows - joined_index, (get_column_left, get_column_right) = self._block.index.join( - other._block.index, how="outer" - ) - columns, lcol_indexer, rcol_indexer = self.columns.join( - other.columns, how="outer", return_indexers=True - ) + l_aligned, r_aligned = block_ops.align(self._block, other._block, join=how) - column_indices = zip( - lcol_indexer if (lcol_indexer is not None) else range(len(columns)), - rcol_indexer if (lcol_indexer is not None) else range(len(columns)), + other_missing_labels = self._block.column_labels.difference( + other._block.column_labels ) - block = joined_index._block + l_frame = DataFrame(l_aligned) + r_frame = DataFrame(r_aligned) results = [] - for left_index, right_index in column_indices: - if left_index >= 0 and right_index >= 0: # -1 indices indicate missing - left_col_id = get_column_left(self._block.value_columns[left_index]) - right_col_id = get_column_right(other._block.value_columns[right_index]) - left_series = bigframes.series.Series(block.select_column(left_col_id)) - right_series = bigframes.series.Series( - block.select_column(right_col_id) - ) + for (label, lseries), (_, rseries) in zip(l_frame.items(), r_frame.items()): + if not ((label in other_missing_labels) and not overwrite): if fill_value is not None: - left_series = left_series.fillna(fill_value) - right_series = right_series.fillna(fill_value) - results.append(func(left_series, right_series)) - elif left_index >= 0: - # Does not exist in other - if overwrite: - dtype = self.dtypes[left_index] - block, null_col_id = block.create_constant(None, dtype=dtype) - result = bigframes.series.Series(block.select_column(null_col_id)) - results.append(result) + result = func( + lseries.fillna(fill_value), rseries.fillna(fill_value) + ) else: - left_col_id = get_column_left(self._block.value_columns[left_index]) - result = bigframes.series.Series(block.select_column(left_col_id)) - if fill_value is not None: - result = result.fillna(fill_value) - results.append(result) - elif right_index >= 0: - right_col_id = get_column_right(other._block.value_columns[right_index]) - result = bigframes.series.Series(block.select_column(right_col_id)) - if fill_value is not None: - result = result.fillna(fill_value) - results.append(result) + result = func(lseries, rseries) else: - # Should not be possible - raise ValueError("No right or left index.") + result = ( + lseries.fillna(fill_value) if fill_value is not None else lseries + ) + results.append(result) if all([isinstance(val, bigframes.series.Series) for val in results]): import bigframes.core.reshape as rs diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index adf17848ee..ba76c4b0d3 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -1211,6 +1211,77 @@ def test_combine( pd.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) +@pytest.mark.parametrize( + ("overwrite", "filter_func"), + [ + (True, None), + (False, None), + (True, lambda x: x.isna() | (x % 2 == 0)), + ], + ids=[ + "default", + "overwritefalse", + "customfilter", + ], +) +def test_df_update(overwrite, filter_func): + if pd.__version__.startswith("1."): + pytest.skip("dtype handled differently in pandas 1.x.") + index1 = pandas.Index([1, 2, 3, 4], dtype="Int64") + index2 = pandas.Index([1, 2, 4, 5], dtype="Int64") + pd_df1 = pandas.DataFrame( + {"a": [1, None, 3, 4], "b": [5, 6, None, 8]}, dtype="Int64", index=index1 + ) + pd_df2 = pandas.DataFrame( + {"a": [None, 20, 30, 40], "c": [90, None, 110, 120]}, + dtype="Int64", + index=index2, + ) + + bf_df1 = dataframe.DataFrame(pd_df1) + bf_df2 = dataframe.DataFrame(pd_df2) + + bf_df1.update(bf_df2, overwrite=overwrite, filter_func=filter_func) + pd_df1.update(pd_df2, overwrite=overwrite, filter_func=filter_func) + + pd.testing.assert_frame_equal(bf_df1.to_pandas(), pd_df1) + + +@pytest.mark.parametrize( + ("join", "axis"), + [ + ("outer", None), + ("outer", 0), + ("outer", 1), + ("left", 0), + ("right", 1), + ("inner", None), + ("inner", 1), + ], +) +def test_df_align(join, axis): + index1 = pandas.Index([1, 2, 3, 4], dtype="Int64") + index2 = pandas.Index([1, 2, 4, 5], dtype="Int64") + pd_df1 = pandas.DataFrame( + {"a": [1, None, 3, 4], "b": [5, 6, None, 8]}, dtype="Int64", index=index1 + ) + pd_df2 = pandas.DataFrame( + {"a": [None, 20, 30, 40], "c": [90, None, 110, 120]}, + dtype="Int64", + index=index2, + ) + + bf_df1 = dataframe.DataFrame(pd_df1) + bf_df2 = dataframe.DataFrame(pd_df2) + + bf_result1, bf_result2 = bf_df1.align(bf_df2, join=join, axis=axis) + pd_result1, pd_result2 = pd_df1.align(pd_df2, join=join, axis=axis) + + # Don't check dtype as pandas does unnecessary float conversion + pd.testing.assert_frame_equal(bf_result1.to_pandas(), pd_result1, check_dtype=False) + pd.testing.assert_frame_equal(bf_result2.to_pandas(), pd_result2, check_dtype=False) + + def test_combine_first( scalars_df_index, scalars_df_2_index, @@ -1232,11 +1303,6 @@ def test_combine_first( pd_df_b.columns = ["b", "a", "d"] pd_result = pd_df_a.combine_first(pd_df_b) - print("pandas") - print(pd_result.to_string()) - print("bigframes") - print(bf_result.to_string()) - # Some dtype inconsistency for all-NULL columns pd.testing.assert_frame_equal(bf_result, pd_result, check_dtype=False) diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 6ce11cd7e9..5cd9fe5163 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -503,6 +503,35 @@ def drop( """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def align( + self, + other, + join="outer", + axis=None, + ) -> tuple: + """ + Align two objects on their axes with the specified join method. + + Join method is specified for each axis Index. + + Args: + other (DataFrame or Series): + join ({{'outer', 'inner', 'left', 'right'}}, default 'outer'): + Type of alignment to be performed. + left: use only keys from left frame, preserve key order. + right: use only keys from right frame, preserve key order. + outer: use union of keys from both frames, sort keys lexicographically. + inner: use intersection of keys from both frames, + preserve the order of the left keys. + + axis (allowed axis of the other object, default None): + Align on index (0), columns (1), or both (None). + + Returns: + tuple of (DataFrame, type of other): Aligned objects. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def rename( self, *, @@ -1265,6 +1294,39 @@ def combine_first(self, other) -> DataFrame: """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def update( + self, other, join: str = "left", overwrite: bool = True, filter_func=None + ) -> DataFrame: + """ + Modify in place using non-NA values from another DataFrame. + + Aligns on indices. There is no return value. + + Args: + other (DataFrame, or object coercible into a DataFrame): + Should have at least one matching index/column label + with the original DataFrame. If a Series is passed, + its name attribute must be set, and that will be + used as the column name to align with the original DataFrame. + join ({'left'}, default 'left'): + Only left join is implemented, keeping the index and columns of the + original object. + overwrite (bool, default True): + How to handle non-NA values for overlapping keys: + True: overwrite original DataFrame's values + with values from `other`. + False: only update values that are NA in + the original DataFrame. + + filter_func (callable(1d-array) -> bool 1d-array, optional): + Can choose to replace values other than NA. Return True for values + that should be updated. + + Returns: + None: This method directly changes calling object. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + # ---------------------------------------------------------------------- # Data reshaping