Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[source-dynamodb] Incorrect detection of primary key #40730

Open
1 task done
guillemcomerma opened this issue Jul 4, 2024 · 0 comments
Open
1 task done

[source-dynamodb] Incorrect detection of primary key #40730

guillemcomerma opened this issue Jul 4, 2024 · 0 comments

Comments

@guillemcomerma
Copy link

guillemcomerma commented Jul 4, 2024

Connector Name

Dynamodb

Connector Version

0.3.2

What step the error happened?

Other

Relevant information

Summary

I am experiencing an issue when creating a new connection with Airbyte, where the source is DynamoDB and the destination is Snowflake. The primary key auto-detected by the system is incorrect. The detected primary key uses a dot notation chain built from all keys defined for the DynamoDB table, rather than correctly identifying the top-level primary key.

Details

My DynamoDB table has the following key structure:

  • Primary Key: id (partition key)
  • Secondary Indexes:
    • Partition Key: merchantId
    • Composite Key: Partition Key: externalBankAccountId, Sort Key: bookedAt

Issue
Airbyte incorrectly detects the primary key as bookedAt.externalBankAccountId.id.merchantId, which is not a valid key in the table. This causes issues since Airbyte supports only top-level primary keys.
image

Code Analysis

The problem seems to originate from the method used to detect the primary key. The relevant code snippet can be found here and here

According to the AWS CLI documentation, the attributeDefinitions method returns all attributes, not just the primary key. See official docs. Below the results I am getting within the attributeDefinitions:

 "AttributeDefinitions": [
            {
                "AttributeName": "bookedAt",
                "AttributeType": "S"
            },
            {
                "AttributeName": "externalBankAccountId",
                "AttributeType": "S"
            },
            {
                "AttributeName": "id",
                "AttributeType": "S"
            },
            {
                "AttributeName": "merchantId",
                "AttributeType": "S"
            }
        ]

However, my assumption is that the correct method to be used (the property to be accessed is KeySchema.

"KeySchema": [
  {
    "AttributeName": "id",
    "KeyType": "HASH"
  }
]

Proposed Solution

Replace the usage of attributeDefinitions in favour of keySchema to retrieve the primary key.
I am willing to test and develop this fix but would appreciate guidance on the setup, development process, and contribution workflow as this is my first time contributing to this project and using Java. 🙏 #first-issue #help-welcome

Relevant log output

2024-07-04 08:15:42 platform > failures: [ {
  "failureOrigin" : "source",
  "failureType" : "system_error",
  "internalMessage" : "java.lang.UnsupportedOperationException: Unsupported attribute type for filtering",
  "externalMessage" : "Something went wrong in the connector. See the logs for more details.",
  "metadata" : {
    "attemptNumber" : 2,
    "jobId" : 4,
    "from_trace_message" : true,
    "connector_command" : "read"
  },
  "stacktrace" : "java.lang.UnsupportedOperationException: Unsupported attribute type for filtering\n\tat io.airbyte.integrations.source.dynamodb.DynamodbSource.lambda$scanIncremental$2(DynamodbSource.java:164)\n\tat java.base/java.util.Optional.map(Optional.java:260)\n\tat io.airbyte.integrations.source.dynamodb.DynamodbSource.scanIncremental(DynamodbSource.java:151)\n\tat io.airbyte.integrations.source.dynamodb.DynamodbSource.lambda$read$1(DynamodbSource.java:126)\n\tat java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)\n\tat java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)\n\tat java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)\n\tat java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)\n\tat java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:575)\n\tat java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)\n\tat java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:616)\n\tat java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:622)\n\tat java.base/java.util.stream.ReferencePipeline.toList(ReferencePipeline.java:627)\n\tat io.airbyte.integrations.source.dynamodb.DynamodbSource.read(DynamodbSource.java:129)\n\tat io.airbyte.cdk.integrations.base.IntegrationRunner.readSerial(IntegrationRunner.java:275)\n\tat io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:173)\n\tat io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.java:125)\n\tat io.airbyte.integrations.source.dynamodb.DynamodbSource.main(DynamodbSource.java:50)\n",
  "timestamp" : 1720080942144
}, {
  "failureOrigin" : "destination",
  "failureType" : "system_error",
  "internalMessage" : "java.lang.IllegalArgumentException: Only top-level primary keys are supported",
  "externalMessage" : "Something went wrong in the connector. See the logs for more details.",
  "metadata" : {
    "attemptNumber" : 2,
    "jobId" : 4,
    "from_trace_message" : true,
    "connector_command" : "write"
  },
  "stacktrace" : "java.lang.IllegalArgumentException: Only top-level primary keys are supported\n\tat io.airbyte.integrations.base.destination.typing_deduping.CatalogParser.toStreamConfig(CatalogParser.kt:146)\n\tat io.airbyte.integrations.base.destination.typing_deduping.CatalogParser.parseCatalog(CatalogParser.kt:42)\n\tat io.airbyte.integrations.destination.snowflake.SnowflakeDestination.getSerializedMessageConsumer(SnowflakeDestination.kt:214)\n\tat io.airbyte.cdk.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.kt:208)\n\tat io.airbyte.cdk.integrations.base.IntegrationRunner.run(IntegrationRunner.kt:116)\n\tat io.airbyte.cdk.integrations.base.adaptive.AdaptiveDestinationRunner$Runner.run(AdaptiveDestinationRunner.kt:68)\n\tat io.airbyte.integrations.destination.snowflake.SnowflakeDestinationKt.main(SnowflakeDestination.kt:345)\n",
  "timestamp" : 1720080942323
},

Contribute

  • Yes, I want to contribute
@guillemcomerma guillemcomerma added area/connectors Connector related issues needs-triage type/bug Something isn't working labels Jul 4, 2024
@marcosmarxm marcosmarxm changed the title Incorrect detection of primary key for DynamoDB source [source-dynamodb] Incorrect detection of primary key Jul 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants