Skip to content

Commit 21cf533

Browse files
committed
Python/SQLAlchemy: Demonstrate support for asyncpg and psycopg
The `sqlalchemy-cratedb` package supports the vanilla HTTP-based transport using urllib3, and the standard PostgreSQL drivers `asyncpg` and `psycopg`.
1 parent 3f15707 commit 21cf533

File tree

6 files changed

+549
-1
lines changed

6 files changed

+549
-1
lines changed

by-language/python-sqlalchemy/README.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ Run example programs::
9292

9393
time python insert_dask.py
9494

95+
time python sync_table.py urllib3 psycopg
96+
time python async_table.py asyncpg psycopg
97+
time python async_streaming.py asyncpg psycopg
98+
9599
Use ``insert_pandas.py`` to connect to any other database instance::
96100

97101
export DBURI="crate://crate@localhost:4200/"
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
"""
2+
About
3+
=====
4+
5+
Example program to demonstrate how to connect to CrateDB using its SQLAlchemy
6+
dialect, and exercise a few basic examples using the low-level table API, in
7+
asynchronous mode.
8+
9+
Specific to the asynchronous mode of SQLAlchemy is the streaming of results:
10+
11+
> The `AsyncConnection` also features a "streaming" API via the `AsyncConnection.stream()`
12+
> method that returns an `AsyncResult` object. This result object uses a server-side cursor
13+
> and provides an async/await API, such as an async iterator.
14+
>
15+
> -- https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#synopsis-core
16+
17+
Both the PostgreSQL drivers `asyncpg` and `psycopg` can be used.
18+
The corresponding SQLAlchemy dialect identifiers are::
19+
20+
# PostgreSQL protocol on port 5432, using `asyncpg`
21+
crate+asyncpg://crate@localhost:5432/doc
22+
23+
# PostgreSQL protocol on port 5432, using `psycopg`
24+
crate+psycopg://crate@localhost:5432/doc
25+
26+
27+
Synopsis
28+
========
29+
::
30+
31+
# Run CrateDB
32+
docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate
33+
34+
# Use PostgreSQL protocol, with `asyncpg`
35+
python async_streaming.py asyncpg
36+
37+
# Use PostgreSQL protocol, with asynchronous support of `psycopg`
38+
python async_streaming.py psycopg
39+
40+
# Use with both variants
41+
python async_streaming.py asyncpg psycopg
42+
43+
"""
44+
import asyncio
45+
import sys
46+
import typing as t
47+
from functools import lru_cache
48+
49+
import sqlalchemy as sa
50+
from sqlalchemy.ext.asyncio import create_async_engine
51+
52+
metadata = sa.MetaData()
53+
table = sa.Table(
54+
"t1",
55+
metadata,
56+
sa.Column("id", sa.Integer, primary_key=True, autoincrement=False),
57+
sa.Column("name", sa.String),
58+
)
59+
60+
61+
class AsynchronousTableStreamingExample:
62+
"""
63+
Demonstrate reading streamed results when using the CrateDB SQLAlchemy
64+
dialect in asynchronous mode with the `psycopg` and `asyncpg` drivers.
65+
66+
- https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#synopsis-core
67+
- https://docs.sqlalchemy.org/en/20/_modules/asyncio/basic.html
68+
"""
69+
70+
def __init__(self, dsn: str):
71+
self.dsn = dsn
72+
73+
@property
74+
@lru_cache
75+
def engine(self):
76+
"""
77+
Provide an SQLAlchemy engine object.
78+
"""
79+
return create_async_engine(self.dsn, echo=True)
80+
81+
async def run(self):
82+
"""
83+
Run the whole recipe.
84+
"""
85+
await self.create_and_insert()
86+
await self.read_buffered()
87+
await self.read_streaming()
88+
89+
async def create_and_insert(self):
90+
"""
91+
Create table schema, completely dropping it upfront, and insert a few records.
92+
"""
93+
# conn is an instance of AsyncConnection
94+
async with self.engine.begin() as conn:
95+
# to support SQLAlchemy DDL methods as well as legacy functions, the
96+
# AsyncConnection.run_sync() awaitable method will pass a "sync"
97+
# version of the AsyncConnection object to any synchronous method,
98+
# where synchronous IO calls will be transparently translated for
99+
# await.
100+
await conn.run_sync(metadata.drop_all, checkfirst=True)
101+
await conn.run_sync(metadata.create_all)
102+
103+
# for normal statement execution, a traditional "await execute()"
104+
# pattern is used.
105+
await conn.execute(
106+
table.insert(),
107+
[{"id": 1, "name": "some name 1"}, {"id": 2, "name": "some name 2"}],
108+
)
109+
110+
# CrateDB specifics to flush/synchronize the write operation.
111+
await conn.execute(sa.text("REFRESH TABLE t1;"))
112+
113+
async def read_buffered(self):
114+
"""
115+
Read data from the database, in buffered mode.
116+
"""
117+
async with self.engine.connect() as conn:
118+
# the default result object is the
119+
# sqlalchemy.engine.Result object
120+
result = await conn.execute(table.select())
121+
122+
# the results are buffered so no await call is necessary
123+
# for this case.
124+
print(result.fetchall())
125+
126+
async def read_streaming(self):
127+
"""
128+
Read data from the database, in streaming mode.
129+
"""
130+
async with self.engine.connect() as conn:
131+
132+
# for a streaming result that buffers only segments of the
133+
# result at time, the AsyncConnection.stream() method is used.
134+
# this returns a sqlalchemy.ext.asyncio.AsyncResult object.
135+
async_result = await conn.stream(table.select())
136+
137+
# this object supports async iteration and awaitable
138+
# versions of methods like .all(), fetchmany(), etc.
139+
async for row in async_result:
140+
print(row)
141+
142+
143+
async def run_example(dsn: str):
144+
example = AsynchronousTableStreamingExample(dsn)
145+
146+
# Run a basic conversation.
147+
# It also includes a catalog inquiry at `table.drop(checkfirst=True)`.
148+
await example.run()
149+
150+
151+
def run_drivers(drivers: t.List[str]):
152+
for driver in drivers:
153+
if driver == "asyncpg":
154+
dsn = "crate+asyncpg://crate@localhost:5432/doc"
155+
elif driver == "psycopg":
156+
dsn = "crate+psycopg://crate@localhost:5432/doc"
157+
else:
158+
raise ValueError(f"Unknown driver: {driver}")
159+
160+
asyncio.run(run_example(dsn))
161+
162+
163+
if __name__ == "__main__":
164+
165+
drivers = sys.argv[1:]
166+
if not drivers:
167+
raise ValueError("Please select driver")
168+
run_drivers(drivers)
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
"""
2+
About
3+
=====
4+
5+
Example program to demonstrate how to connect to CrateDB using its SQLAlchemy
6+
dialect, and exercise a few basic examples using the low-level table API, in
7+
asynchronous mode.
8+
9+
Both the PostgreSQL drivers `asyncpg` and `psycopg` can be used.
10+
The corresponding SQLAlchemy dialect identifiers are::
11+
12+
# PostgreSQL protocol on port 5432, using `asyncpg`
13+
crate+asyncpg://crate@localhost:5432/doc
14+
15+
# PostgreSQL protocol on port 5432, using `psycopg`
16+
crate+psycopg://crate@localhost:5432/doc
17+
18+
Synopsis
19+
========
20+
::
21+
22+
# Run CrateDB
23+
docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate
24+
25+
# Use PostgreSQL protocol, with `asyncpg`
26+
python async_table.py asyncpg
27+
28+
# Use PostgreSQL protocol, with asynchronous support of `psycopg`
29+
python async_table.py psycopg
30+
31+
# Use with both variants
32+
python async_table.py asyncpg psycopg
33+
34+
"""
35+
import asyncio
36+
import sys
37+
import typing as t
38+
from functools import lru_cache
39+
40+
import sqlalchemy as sa
41+
from sqlalchemy.ext.asyncio import create_async_engine
42+
43+
44+
class AsynchronousTableExample:
45+
"""
46+
Demonstrate the CrateDB SQLAlchemy dialect in asynchronous mode,
47+
using the `asyncpg` and `psycopg` drivers.
48+
"""
49+
50+
def __init__(self, dsn: str):
51+
self.dsn = dsn
52+
53+
@property
54+
@lru_cache
55+
def engine(self):
56+
"""
57+
Provide an SQLAlchemy engine object.
58+
"""
59+
return create_async_engine(self.dsn, isolation_level="AUTOCOMMIT", echo=True)
60+
61+
@property
62+
@lru_cache
63+
def table(self):
64+
"""
65+
Provide an SQLAlchemy table object.
66+
"""
67+
metadata = sa.MetaData()
68+
return sa.Table(
69+
"testdrive",
70+
metadata,
71+
sa.Column("x", sa.Integer, primary_key=True, autoincrement=False),
72+
sa.Column("y", sa.Integer),
73+
)
74+
75+
async def conn_run_sync(self, func: t.Callable, *args, **kwargs):
76+
"""
77+
To support SQLAlchemy DDL methods as well as legacy functions, the
78+
AsyncConnection.run_sync() awaitable method will pass a "sync"
79+
version of the AsyncConnection object to any synchronous method,
80+
where synchronous IO calls will be transparently translated for
81+
await.
82+
83+
https://docs.sqlalchemy.org/en/20/_modules/asyncio/basic.html
84+
"""
85+
# `conn` is an instance of `AsyncConnection`
86+
async with self.engine.begin() as conn:
87+
return await conn.run_sync(func, *args, **kwargs)
88+
89+
async def run(self):
90+
"""
91+
Run the whole recipe, returning the result from the "read" step.
92+
"""
93+
await self.create()
94+
await self.insert(sync=True)
95+
return await self.read()
96+
97+
async def create(self):
98+
"""
99+
Create table schema, completely dropping it upfront.
100+
"""
101+
await self.conn_run_sync(self.table.drop, checkfirst=True)
102+
await self.conn_run_sync(self.table.create)
103+
104+
async def insert(self, sync: bool = False):
105+
"""
106+
Write data from the database, taking CrateDB-specific `REFRESH TABLE` into account.
107+
"""
108+
async with self.engine.begin() as conn:
109+
stmt = self.table.insert().values(x=1, y=42)
110+
await conn.execute(stmt)
111+
stmt = self.table.insert().values(x=2, y=42)
112+
await conn.execute(stmt)
113+
if sync and self.dsn.startswith("crate"):
114+
await conn.execute(sa.text("REFRESH TABLE testdrive;"))
115+
116+
async def read(self):
117+
"""
118+
Read data from the database.
119+
"""
120+
async with self.engine.begin() as conn:
121+
cursor = await conn.execute(sa.text("SELECT * FROM testdrive;"))
122+
return cursor.fetchall()
123+
124+
async def reflect(self):
125+
"""
126+
Reflect the table schema from the database.
127+
"""
128+
129+
# Optionally enable tracing SQLAlchemy calls.
130+
# self.trace()
131+
132+
def reflect(session):
133+
"""
134+
A function written in "synchronous" style that will be invoked
135+
within the asyncio event loop.
136+
137+
The session object passed is a traditional orm.Session object with
138+
synchronous interface.
139+
140+
https://docs.sqlalchemy.org/en/20/_modules/asyncio/greenlet_orm.html
141+
"""
142+
meta = sa.MetaData()
143+
reflected_table = sa.Table("testdrive", meta, autoload_with=session)
144+
print("Table information:")
145+
print(f"Table: {reflected_table}")
146+
print(f"Columns: {reflected_table.columns}")
147+
print(f"Constraints: {reflected_table.constraints}")
148+
print(f"Primary key: {reflected_table.primary_key}")
149+
150+
return await self.conn_run_sync(reflect)
151+
152+
@staticmethod
153+
def trace():
154+
"""
155+
Trace execution flow through SQLAlchemy.
156+
157+
pip install hunter
158+
"""
159+
from hunter import Q, trace
160+
161+
constraint = Q(module_startswith="sqlalchemy")
162+
trace(constraint)
163+
164+
165+
async def run_example(dsn: str):
166+
example = AsynchronousTableExample(dsn)
167+
168+
# Run a basic conversation.
169+
# It also includes a catalog inquiry at `table.drop(checkfirst=True)`.
170+
result = await example.run()
171+
print(result)
172+
173+
# Reflect the table schema.
174+
await example.reflect()
175+
176+
177+
def run_drivers(drivers: t.List[str]):
178+
for driver in drivers:
179+
if driver == "asyncpg":
180+
dsn = "crate+asyncpg://crate@localhost:5432/doc"
181+
elif driver == "psycopg":
182+
dsn = "crate+psycopg://crate@localhost:5432/doc"
183+
else:
184+
raise ValueError(f"Unknown driver: {driver}")
185+
186+
asyncio.run(run_example(dsn))
187+
188+
189+
if __name__ == "__main__":
190+
191+
drivers = sys.argv[1:]
192+
if not drivers:
193+
raise ValueError("Please select driver")
194+
run_drivers(drivers)
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
click<9
22
colorlog<7
3-
crate[sqlalchemy]
43
dask==2023.12.1
54
pandas<2.2
65
sqlalchemy>=2,<2.1
6+
sqlalchemy-cratedb[all] @ git+https://github.com/crate-workbench/sqlalchemy-cratedb@amo/postgresql-async

0 commit comments

Comments
 (0)