Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination S3: date type written as dictionary for parquet format #14028

Open
marcosmarxm opened this issue Jun 22, 2022 · 32 comments
Open

Destination S3: date type written as dictionary for parquet format #14028

marcosmarxm opened this issue Jun 22, 2022 · 32 comments
Assignees
Labels
area/databases community connectors/destination/s3 frozen Not being actively worked on team/destinations Destinations team's backlog type/bug Something isn't working zendesk

Comments

@marcosmarxm
Copy link
Member

marcosmarxm commented Jun 22, 2022

This Github issue is synchronized with Zendesk:

Ticket ID: #1152
Priority: normal
Group: User Success Engineer
Assignee: Marcos Marx

Original ticket description:

  • Is this your first time deploying Airbyte?: No
  • OS Version / Instance: Kubernetes
  • Memory / Disk: unlimited (scalable cluster)
  • Deployment: Kubernetes
  • Airbyte Version: 0.35.64-alpha
  • Source name/version: source-google-analytics-v4 0.3.22
  • Destination name/version: destination-s3 / 0.3.5
  • Step: The issue is happening during sync
  • Description: the date column of the Google analytics are written as a dictionnary in parquet file instead of string

I did try to sync to the s3 destination in json format, and it worked (the dates were strings)
is this a bug or this is how parquet encode dates ?

[Discourse post]

@marcosmarxm
Copy link
Member Author

I was able to reproduce issue with integration account. Other fields are unnest correctly.

@marcosmarxm
Copy link
Member Author

Comment made from Zendesk by Marcos Marx on 2022-06-22 at 20:39:

I created the issue #14028 looks it's a bug with S3 destination connector. I added to connector roadmap to solve it. I'll update you when this is fix.

@blarghmatey
Copy link
Contributor

I'm wondering if there has been any progress on this issue yet? We are currently using the S3 destination to populate our data lake environment with Parquet files and this is preventing us from being able to execute queries on any tables that contain a datetime value.

@tuliren
Copy link
Contributor

tuliren commented Jul 5, 2022

This issue is very strange in that:

  • If the date from Google Analytics is defined as a logical date type, the Parquet schema should interpret it as an integer field representing epoch days (doc).
  • If the date is not defined as a logical type, in Parquet schema it should be string.

The current { member0: string, member1: none } value is very confusing. I don't know how come it can be an object. It's probably something peculiar from the Google Analytics schema.

There is no progress in this issue so far. Unfortunately we currently don't have enough bandwidth for S3. So I also cannot give an estimation when this can be fixed.

@blarghmatey
Copy link
Contributor

For my use case we are actually using Postgres as the source, so the format conversion seems to be specific to the S3 destination and not anything to do with the source format, unless there is a common issue with the intermediate representation and allocating the proper type information in the schema being passed to the S3 destination.

If anyone has a suggestion of where in the S3 plugin to look I am happy to try my hand at debugging and fixing the issue.

@blarghmatey
Copy link
Contributor

blarghmatey commented Jul 6, 2022

I've been doing some further debugging to narrow down the potential source of the issue. So far I have the following information:

  • The data type that is confounding the writer is a timestamp with timezone in my source Postgres system
  • The _airbyte_emitted_at column is properly represented in the resulting parquet file
  • The Avro file type is able to properly encode the date-time with timezone when accessed via the Python library after downloading the synced file from S3
  • The dictionary encoding toggle does not have an impact on whether the date-time records are represented properly

Looking at the code in the S3 connector it seems that Airbyte is relying on the upstream Apache Parquet Java library for translating from Avro to Parquet. This suggests that there is either a bug in the upstream implementation, or a bug in how it is being used in Airbyte.

It is entirely possible that there is another source of the problem that I am overlooking, but this is what I have discovered so far.

@blarghmatey
Copy link
Contributor

Looking again at the dependencies, the S3 destination is using the 1.12.0 release of the parquet library which is over a year old. The latest release is 1.12.3. Would it be possible to get that version bumped and an updated connector build pushed so that we can see if that resolves this dictionary issue?

@blarghmatey
Copy link
Contributor

I believe that this change in the upstream Parquet library will resolve this bug, so updating to 1.12.3 will be the necessary fix

apache/parquet-java@c72862b

@blarghmatey
Copy link
Contributor

@marcosmarxm @tuliren I found what appears to be the necessary fix to this bug and pushed a PR that bumps the upstream Parquet dependency to the latest version. Can you take a look and see about getting a new release pushed so I can test it? Thanks!

#14502

@marcosmarxm
Copy link
Member Author

Thanks @blarghmatey !

@blarghmatey
Copy link
Contributor

Taking a closer look at the Avro file that I generated while testing I noticed that the actual schema for the timestamp with TZ fields is an Avro union type, which is not supported in Parquet, so it seems that what is happening is that the conversion is treating the Avro Union as a Parquet Struct, resulting in the behavior that we're seeing.

Avro schema that I'm working:

>>> pprint(json.loads(dfr.meta['avro.schema'].decode('utf8')))
{'fields': [{'name': '_airbyte_ab_id',
             'type': {'logicalType': 'uuid', 'type': 'string'}},
            {'name': '_airbyte_emitted_at',
             'type': {'logicalType': 'timestamp-millis', 'type': 'long'}},
            {'default': None, 'name': 'id', 'type': ['null', 'double']},
            {'default': None, 'name': 'live', 'type': ['null', 'boolean']},
            {'default': None, 'name': 'title', 'type': ['null', 'string']},
            {'default': None, 'name': 'run_tag', 'type': ['null', 'string']},
            {'default': None,
             'name': 'end_date',
             'type': ['null',
                      {'logicalType': 'timestamp-micros', 'type': 'long'},
                      'string']},
            {'default': None, 'name': 'course_id', 'type': ['null', 'double']},
            {'default': None,
             'name': 'created_on',
             'type': ['null',
                      {'logicalType': 'timestamp-micros', 'type': 'long'},
                      'string']},
            {'default': None,
             'name': 'start_date',
             'type': ['null',
                      {'logicalType': 'timestamp-micros', 'type': 'long'},
                      'string']},
            {'default': None,
             'name': 'updated_on',
             'type': ['null',
                      {'logicalType': 'timestamp-micros', 'type': 'long'},
                      'string']},
            {'default': None,
             'name': 'courseware_id',
             'type': ['null', 'string']},
            {'default': None,
             'name': 'enrollment_end',
             'type': ['null',
                      {'logicalType': 'timestamp-micros', 'type': 'long'},
                      'string']},
            {'default': None,
             'name': 'expiration_date',
             'type': ['null',
                      {'logicalType': 'timestamp-micros', 'type': 'long'},
                      'string']},
            {'default': None,
             'name': 'enrollment_start',
             'type': ['null',
                      {'logicalType': 'timestamp-micros', 'type': 'long'},
                      'string']},
            {'default': None,
             'name': 'courseware_url_path',
             'type': ['null', 'string']},
            {'default': None,
             'name': '_airbyte_additional_properties',
             'type': ['null', {'type': 'map', 'values': 'string'}]}],
 'name': 'mitxonline__app__postgres__courses_courserun',
 'namespace': 'public',
 'type': 'record'}

Generated Parquet schema:

>>> pq.read_table("/tmp/2022_07_08_1657291363014_0.parquet")
pyarrow.Table
_airbyte_ab_id: string not null
_airbyte_emitted_at: timestamp[ms, tz=UTC] not null
id: double
live: bool
title: string
run_tag: string
end_date: struct<member0: timestamp[us, tz=UTC], member1: string>
  child 0, member0: timestamp[us, tz=UTC]
  child 1, member1: string
course_id: double
created_on: struct<member0: timestamp[us, tz=UTC], member1: string>
  child 0, member0: timestamp[us, tz=UTC]
  child 1, member1: string
start_date: struct<member0: timestamp[us, tz=UTC], member1: string>
  child 0, member0: timestamp[us, tz=UTC]
  child 1, member1: string
updated_on: struct<member0: timestamp[us, tz=UTC], member1: string>
  child 0, member0: timestamp[us, tz=UTC]
  child 1, member1: string
courseware_id: string
enrollment_end: struct<member0: timestamp[us, tz=UTC], member1: string>
  child 0, member0: timestamp[us, tz=UTC]
  child 1, member1: string
expiration_date: struct<member0: timestamp[us, tz=UTC], member1: string>
  child 0, member0: timestamp[us, tz=UTC]
  child 1, member1: string
enrollment_start: struct<member0: timestamp[us, tz=UTC], member1: string>
  child 0, member0: timestamp[us, tz=UTC]
  child 1, member1: string
courseware_url_path: string
_airbyte_additional_properties: map<string, string ('_airbyte_additional_properties')>
  child 0, _airbyte_additional_properties: struct<key: string not null, value: string not null> not null
      child 0, key: string not null
      child 1, value: string not null

@blarghmatey
Copy link
Contributor

The block of code in the upstream Parquet library that generates the struct from a Union type is https://github.com/apache/parquet-mr/blob/e990eb3f14c39273e46a9fce07ec85d2edf7fccb/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java#L264-L272

It seems that the solution is to modify the Airbyte code that generates the Avro schema from the JSON schema to not include string in the union of logical types for the record so that the Parquet conversion doesn't generate the struct. If there is another approach that I'm missing I'm happy for suggestions.

@tuliren tuliren self-assigned this Jul 27, 2022
@felcastro
Copy link

Any updates? Currently trying to sync data from MySQL to S3 Parquet, and Timestamp fields are being converted to struct type.
{"member0": "2020-02-04T17:01:27.000+0000", "member1": null}

@harshithmullapudi harshithmullapudi changed the title Date type written as dictionnary for parquet format in s3 destination Date type written as dictionary for parquet format in s3 destination Sep 20, 2022
@grishick grishick added the team/destinations Destinations team's backlog label Sep 27, 2022
@liyinqiu
Copy link

Hi there, will there be an ongoing fix here or it is intentionally written as a struct?
The background is we are building mysql to s3 sync, then build a Hive table based on the output of this parquet file, we want to have datetime column type in Hive metastore but it does not work with this struct in parquet file.
From this post, we used a previous version of mysql source and it outputs datetime as string could mitigate the issue.

@marcosmarxm marcosmarxm changed the title Date type written as dictionary for parquet format in s3 destination Destination S3: date type written as dictionary for parquet format Nov 30, 2022
@sebas-500
Copy link

Any updates on this?

I'm trying to copy a Postgres table into S3 but some of the datetime columns contain the aforementioned struct.

@robertomczak
Copy link
Contributor

Observed the same during our MySQL to Parquet S3 testing {'member0': datetime.datetime(2016, 9, 25, 19, 57, 31, tzinfo=), 'member1': None}

@sebas-500
Copy link

sebas-500 commented May 22, 2023

Observed the same during our MySQL to Parquet S3 testing {'member0': datetime.datetime(2016, 9, 25, 19, 57, 31, tzinfo=), 'member1': None}

@robertomczak
Did you manage to work it around?

@robertomczak
Copy link
Contributor

Observed the same during our MySQL to Parquet S3 testing {'member0': datetime.datetime(2016, 9, 25, 19, 57, 31, tzinfo=), 'member1': None}

@robertomczak Did you manage to work it around?

Not directly in Airbyte, current idea is to process data in further ETL using Pandas and extract column from nested object to top level object of pandas data frame.

It would be nice to fix it, as keeping empty string artifact on intermidiate data lake is not good practice. 🤷‍♂️

@sebas-500
Copy link

Observed the same during our MySQL to Parquet S3 testing {'member0': datetime.datetime(2016, 9, 25, 19, 57, 31, tzinfo=), 'member1': None}

@robertomczak Did you manage to work it around?

Not directly in Airbyte, current idea is to process data in further ETL using Pandas and extract column from nested object to top level object of pandas data frame.

It would be nice to fix it, as keeping empty string artifact on intermidiate data lake is not good practice. 🤷‍♂️

Aren't the destination files getting bigger? For example in my case a table of 20MB was transformed to two files of 97 and 20MB each. I bet the struct has something to do with it.

@msaffitz
Copy link

msaffitz commented Jun 7, 2023

I'm seeing this as well -- I'm curious if this is related to #17010 and #17011.

In either case, I poked around to see if I could get a fix but unfortunately wasn't able to get my env fully setup to build / run tests.

It'd awesome to get a fix within Airbyte for this :)

@tevonsb
Copy link

tevonsb commented Aug 21, 2023

We have also experienced this using a snowflake connector on any field of type DATE.

Reading to a parquet file in S3. Our data is not large (100k rows) chunked into a single file. So don't think it has to do with data scale.

@qoqajr
Copy link

qoqajr commented Sep 25, 2023

Hello,

I'm experiencing the same issue with postgresql timestamptz data being written into s3 parquet files. Any update on tthe issue since 2022...?

@jamsi
Copy link

jamsi commented Nov 3, 2023

Incase it helps anyone, I was processing some of the parquet data in Pyspark through AWS Glue and added this transformation to solve struct.member0 -> DateTime field.

def extract_timestamp_from_struct(df: DataFrame) -> DataFrame:
    '''
    This function converts any "structs" that are really datetimes, into datetimes.
    This is documented here: https://github.com/airbytehq/airbyte/issues/14028
    
    Parameters:
    - df: The input DataFrame.
    
    Returns:
    - DataFrame with datetime columns as datetimes
    '''    
    for field in df.schema:
        if isinstance(field.dataType, StructType):
            if any(child.name == "member0" and isinstance(child.dataType, TimestampType) for child in field.dataType.fields):
                df = df.withColumn(field.name, col(f"{field.name}.member0"))
    return df

@qoqajr
Copy link

qoqajr commented Nov 3, 2023

Thanks @jamsi for sharing, I ended up doing exactly the same in a glue job, but it would be better if it wouldn't be required to add another service in the mix just to fix date formats.

@lahdirakram
Copy link

any new about this? im facing the same behaviour

@john-motif
Copy link

I'm also seeing this issue with Snowflake > GCS in Parquet format

@OMK2186
Copy link

OMK2186 commented Jan 3, 2024

MySQL to S3 connector:
_airbyte_emitted_at is in string format, but columns from MySQL table are being converted to Struct
{'member0': datetime.datetime(2023, 12, 7, 7, 48, 39, tzinfo=<UTC>), 'member1': None}

@davidfromtandym
Copy link

This is an insanely frustrating bug

@nijuyonkadesu
Copy link

I cling to the tinest ember of hope, and rebuid destination-s3 connector with

implementation ('org.apache.parquet:parquet-avro:1.13.1') { exclude group: 'org.slf4j', module: 'slf4j-log4j12'}

and

{'member0': datetime.datetime(2024, 2, 19, 10, 30, tzinfo=<UTC>), 'member1': None}

image

it's still the same ...

@bleonard bleonard added the frozen Not being actively worked on label Mar 22, 2024
@myonlinecode1988
Copy link

I see the above in both parquet and avro files. Is it on the roadmap to address this?

@octavia-squidington-iii
Copy link
Collaborator

Zendesk ticket #5666 has been linked to this issue.

@evantahler evantahler added type/bug Something isn't working and removed frozen Not being actively worked on labels Apr 5, 2024
@jbfbell jbfbell added the frozen Not being actively worked on label Apr 18, 2024
@kahlua-kol
Copy link

Hey! Any hope to get this bug fixed soon? :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/databases community connectors/destination/s3 frozen Not being actively worked on team/destinations Destinations team's backlog type/bug Something isn't working zendesk
Projects
None yet
Development

No branches or pull requests