Skip to content

Conversation

@aaronsteers
Copy link
Collaborator

@aaronsteers aaronsteers commented Oct 23, 2025

What

Updates destination-pgvector to use CDK v7.4.0 with langchain 1.0 support and removes the PyAirbyte dependency. This PR addresses the langchain version lag by upgrading to the latest CDK which includes langchain 1.0.x (up from 0.1.x).

Related to airbytehq/airbyte-python-cdk#810 (CDK langchain upgrade)

How

Dependency Changes:

  • Upgraded airbyte-cdk from >=1.0 to >=7.4.0
  • Removed PyAirbyte dependency (airbyte = "^0.12.0") entirely
  • Updated Python constraint from ^3.9,<3.12 to ^3.10,<3.13
  • Added ulid-py = "^1.1.0" for ULID generation

Code Changes:

  1. Vendored minimal code from PyAirbyte:

    • Created WriteStrategy enum in new write_strategy.py module
    • Created BatchHandle class in record_processor.py
    • Created custom JsonlWriter class (110 lines) for JSONL file writing
  2. Migrated to CDK equivalents:

    • Updated all imports to use airbyte_cdk.sql.* instead of airbyte.*
    • Changed from pydantic to pydantic.v1 for BaseModel
  3. Refactored SQL data loading (⚠️ CRITICAL CHANGE):

    • Removed pandas dependency - no longer using pandas.to_sql()
    • Implemented direct SQL INSERT statements in _write_files_to_new_table():
      • Reads JSONL files line by line with json.loads()
      • Normalizes column names using LowerCaseNormalizer
      • Converts dict and list values to JSON strings for PostgreSQL compatibility
      • Builds parameterized INSERT statements with SQLAlchemy text()
      • Executes row-by-row with bound parameters

Review guide

High priority - potential correctness issues:

  1. sql_processor.py:574-638 - The new _write_files_to_new_table() implementation:

    • ⚠️ Verify JSON serialization handles all Python types correctly (lines 617-620)
    • ⚠️ Check for SQL injection risks (should be safe with parameterized queries, but verify)
    • ⚠️ Confirm null handling works correctly (not explicitly handled in new code)
    • ⚠️ Verify special characters and Unicode are handled properly
  2. sql_processor.py:617-620 - JSON serialization logic:

    if isinstance(value, (dict, list)):
        normalized_record[normalized_col] = json.dumps(value)

    This is critical for pgvector's metadata (dict) and embedding (list) fields. Verify this works for:

    • Nested dicts/lists
    • Arrays with special float values (NaN, Inf, -Inf)
    • Empty dicts/lists

Medium priority - performance and maintainability:
3. Performance implications - Row-by-row INSERT vs pandas bulk operations:

  • The new implementation inserts one row per execute() call
  • This could be significantly slower for large datasets
  • Consider batch insert optimization in future
  1. jsonl_writer.py - Custom JSONL writer implementation:

    • Review for correctness and edge case handling
    • Ensure it properly closes files and handles errors
  2. Test coverage - Only 4 unit tests:

    • Manual testing was done with docker pgvector (3 test records)
    • Consider adding integration tests for the new SQL insertion logic

Low priority - imports and structure:
6. write_strategy.py - New module to avoid circular imports
7. record_processor.py:42-54 - New BatchHandle class
8. Import changes across all files - migrating from airbyte.* to airbyte_cdk.sql.*

User Impact

Positive:

  • Connector now uses langchain 1.0.x (up from 0.1.x)
  • Python 3.12 support added
  • Removes PyAirbyte transitive dependency

Potential negative:

  • ⚠️ Performance regression - Row-by-row inserts may be slower than pandas bulk operations for large datasets
  • Python 3.9 support removed (now requires 3.10+)

Compatibility:

  • No breaking changes to connector configuration or data format
  • Data is still written to PostgreSQL with pgvector extension
  • Embeddings and metadata are properly stored (verified in manual testing)

Can this PR be safely reverted and rolled back?

  • YES 💚

This PR can be safely reverted. The changes are isolated to the destination-pgvector connector and don't affect other connectors. However, note that reverting would restore the old langchain 0.1.x dependencies and Python 3.9 compatibility.


Testing performed:

  • ✅ All 4 unit tests passing
  • ✅ Manual end-to-end test with docker pgvector:
    • Started pgvector container
    • Wrote 3 test records with embeddings
    • Verified data landed correctly in PostgreSQL
    • Confirmed embeddings stored as vector(1536) type
    • Verified metadata stored as JSON

Session info:

- Update CDK dependency to use local path (langchain upgrade branch)
- Temporarily comment out PyAirbyte dependency (will restore after CDK PR merges)
- Update Python version constraint to ^3.10,<3.13
- Run poetry lock to update dependencies

This is Stage 3 of the langchain upgrade plan:
- Stage 1: ✅ Bump pgvector to latest CDK (skipped, handled by PyAirbyte)
- Stage 2: ✅ Bump langchain in CDK (PR #810)
- Stage 3: 🔄 Update pgvector to use new CDK with updated langchain

Related PR: airbytehq/airbyte-python-cdk#810

Co-Authored-By: AJ Steers <[email protected]>
@devin-ai-integration
Copy link
Contributor

Original prompt from AJ Steers
Received message in Slack channel #ask-devin-ai:

@Devin - We need to bump our langchain dependencies in the python CDK. Can you make a plan to bump the CDK dependencies, and then use the pgvector destination to test if your changes are successful? I expect you to fully ignore CI for this, until such time as you have tests locally succeeding. You should expect significant breaking changes since we are many versions behind for the langchain dependency.
Thread URL: https://airbytehq-team.slack.com/archives/C08BHPUMEPJ/p1761175248996799?thread_ts=1761175248.996799

@devin-ai-integration
Copy link
Contributor

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link
Contributor

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Helpful Resources

PR Slash Commands

Airbyte Maintainers (that's you!) can execute the following slash commands on your PR:

  • /format-fix - Fixes most formatting issues.
  • /bump-version - Bumps connector versions.
    • You can specify a custom changelog by passing changelog. Example: /bump-version changelog="My cool update"
    • Leaving the changelog arg blank will auto-populate the changelog from the PR title.
  • /run-cat-tests - Runs legacy CAT tests (Connector Acceptance Tests)
  • /build-connector-images - Builds and publishes a pre-release docker image for the modified connector(s).
  • JVM connectors:
    • /update-connector-cdk-version connector=<CONNECTOR_NAME> - Updates the specified connector to the latest CDK version.
      Example: /update-connector-cdk-version connector=destination-bigquery
    • /bump-bulk-cdk-version type=patch changelog='foo' - Bump the Bulk CDK's version. type can be major/minor/patch.
  • Python connectors:
    • /poe connector source-example lock - Run the Poe lock task on the source-example connector, committing the results back to the branch.
    • /poe source example lock - Alias for /poe connector source-example lock.
    • /poe source example use-cdk-branch my/branch - Pin the source-example CDK reference to the branch name specified.
    • /poe source example use-cdk-latest - Update the source-example CDK dependency to the latest available version.

📝 Edit this welcome message.

@github-actions
Copy link
Contributor

github-actions bot commented Oct 23, 2025

destination-pgvector Connector Test Results

1 tests   0 ✅  4s ⏱️
1 suites  0 💤
1 files    0 ❌  1 🔥

For more details on these errors, see this check.

Results for commit 616f083.

♻️ This comment has been updated with latest results.

devin-ai-integration bot and others added 6 commits October 23, 2025 00:39
Switched from local path to git branch reference to avoid Poetry version resolution issues. The CDK branch uses dynamic versioning (0.0.0) which conflicts with PyAirbyte's requirement for CDK >=7.3.9.

This is a temporary approach until CDK PR #810 is merged and we can use the published version.

Co-Authored-By: AJ Steers <[email protected]>
- Update pyproject.toml to use published CDK v7.4.0 instead of git branch
- Fix PyAirbyte import paths (_processors.file -> _writers)
- Update pydantic imports to use pydantic.v1 for compatibility with CDK
- Remove deprecated progress logging calls
- All unit tests passing (4/4)

Co-Authored-By: AJ Steers <[email protected]>
- Created simple JSONL writer to replace PyAirbyte's JsonlWriter
- Updated all imports to use CDK's sql module (exceptions, constants, types, etc.)
- Added WriteStrategy enum and BatchHandle class to vendored code
- Removed PyAirbyte dependency from pyproject.toml
- Added ulid-py dependency (needed by sql_processor)
- Fixed circular import by moving WriteStrategy to separate file
- All unit tests passing (4/4)

Co-Authored-By: AJ Steers <[email protected]>
- Remove pandas dependency (no longer needed)
- Implement direct SQL INSERT statements with proper parameter binding
- Convert dict and list values to JSON strings for PostgreSQL compatibility
- Fixes pandas/SQLAlchemy compatibility issues
- Manual end-to-end test successful: data lands correctly in pgvector with embeddings

Co-Authored-By: AJ Steers <[email protected]>
Comment on lines +9 to +14
class WriteStrategy(enum.Enum):
"""Write strategy for SQL destinations."""
AUTO = "auto"
APPEND = "append"
REPLACE = "replace"
MERGE = "merge"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we refactor to remove WriteStrategy and instead use destination write modes as defined in the airbyte protocol? (You should have a model that defines it?)

@devin-ai-integration
Copy link
Contributor

Good question! I looked into this. The WriteStrategy enum serves a slightly different purpose than DestinationSyncMode:

DestinationSyncMode (from Airbyte protocol):

  • overwrite - replace all data
  • append - add new data
  • append_dedup - add new data with deduplication

WriteStrategy (internal to connector):

  • AUTO - automatically determine strategy based on DestinationSyncMode
  • REPLACE - maps to overwrite
  • APPEND - maps to append
  • MERGE - maps to append_dedup

The key difference is the AUTO value, which allows the connector to automatically determine the write strategy based on the catalog's DestinationSyncMode (see lines 708-725 in sql_processor.py).

Options:

  1. Keep WriteStrategy - It provides the AUTO mode which is useful for the connector's internal logic
  2. Remove WriteStrategy - Use DestinationSyncMode directly and handle the AUTO logic differently (maybe with an Optional[DestinationSyncMode] where None means AUTO?)
  3. Hybrid approach - Use DestinationSyncMode where possible, but keep a minimal enum for the AUTO case

Which approach would you prefer? I'm leaning toward option 1 (keeping it) since the AUTO mode seems useful, but I can refactor if you think option 2 or 3 is better.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants