diff --git a/CHANGES.txt b/CHANGES.txt index 40fddf4..3e9d562 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,95 @@ +1.3.3 (2021-11-01) +^^^^^^^^^^^^^^^^^^ + +* Support async-timeout 4.0+ + + +1.3.2 (2021-10-07) +^^^^^^^^^^^^^^^^^^ + + +1.3.2b2 (2021-10-07) +^^^^^^^^^^^^^^^^^^^^ + +* Respect use_labels for select statement `#882 `_ + + +1.3.2b1 (2021-07-11) +^^^^^^^^^^^^^^^^^^^^ + +* Fix compatibility with SQLAlchemy >= 1.4 `#870 `_ + + +1.3.1 (2021-07-08) +^^^^^^^^^^^^^^^^^^ + + +1.3.1b2 (2021-07-06) +^^^^^^^^^^^^^^^^^^^^ + +* Suppress "Future exception was never retrieved" `#862 `_ + + +1.3.1b1 (2021-07-05) +^^^^^^^^^^^^^^^^^^^^ + +* Fix ClosableQueue.get on cancellation, close it on Connection.close `#859 `_ + + +1.3.0 (2021-06-30) +^^^^^^^^^^^^^^^^^^ + + +1.3.0b4 (2021-06-28) +^^^^^^^^^^^^^^^^^^^^ + +* Fix "Unable to detect disconnect when using NOTIFY/LISTEN" `#559 `_ + + +1.3.0b3 (2021-04-03) +^^^^^^^^^^^^^^^^^^^^ + +* Reformat using black `#814 `_ + + +1.3.0b2 (2021-04-02) +^^^^^^^^^^^^^^^^^^^^ + +* Type annotations `#813 `_ + + +1.3.0b1 (2021-03-30) +^^^^^^^^^^^^^^^^^^^^ + +* Raise ResourceClosedError if we try to open a cursor on a closed SAConnection `#811 `_ + + +1.3.0b0 (2021-03-25) +^^^^^^^^^^^^^^^^^^^^ + +* Fix compatibility with SA 1.4 for IN statement `#806 `_ + + +1.2.1 (2021-03-23) +^^^^^^^^^^^^^^^^^^ + +* Pop loop in connection init due to backward compatibility `#808 `_ + + +1.2.0b4 (2021-03-23) +^^^^^^^^^^^^^^^^^^^^ + +* Set max supported sqlalchemy version `#805 `_ + + +1.2.0b3 (2021-03-22) +^^^^^^^^^^^^^^^^^^^^ + +* Don't run ROLLBACK when the connection is closed `#778 `_ + +* Multiple cursors support `#801 `_ + + 1.2.0b2 (2020-12-21) ^^^^^^^^^^^^^^^^^^^^ diff --git a/PKG-INFO b/PKG-INFO index b4151da..108223e 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: aiopg -Version: 1.2.0b2 +Version: 1.3.3 Summary: Postgres integration with asyncio. Home-page: https://aiopg.readthedocs.io Author: Andrew Svetlov @@ -15,392 +15,6 @@ Project-URL: Docs: RTD, https://aiopg.readthedocs.io Project-URL: GitHub: issues, https://github.com/aio-libs/aiopg/issues Project-URL: GitHub: repo, https://github.com/aio-libs/aiopg -Description: aiopg - ===== - .. image:: https://github.com/aio-libs/aiopg/workflows/CI/badge.svg - :target: https://github.com/aio-libs/aiopg/actions?query=workflow%3ACI - .. image:: https://codecov.io/gh/aio-libs/aiopg/branch/master/graph/badge.svg - :target: https://codecov.io/gh/aio-libs/aiopg - .. image:: https://badges.gitter.im/Join%20Chat.svg - :target: https://gitter.im/aio-libs/Lobby - :alt: Chat on Gitter - - **aiopg** is a library for accessing a PostgreSQL_ database - from the asyncio_ (PEP-3156/tulip) framework. It wraps - asynchronous features of the Psycopg database driver. - - Example - ------- - - .. code:: python - - import asyncio - import aiopg - - dsn = 'dbname=aiopg user=aiopg password=passwd host=127.0.0.1' - - async def go(): - pool = await aiopg.create_pool(dsn) - async with pool.acquire() as conn: - async with conn.cursor() as cur: - await cur.execute("SELECT 1") - ret = [] - async for row in cur: - ret.append(row) - assert ret == [(1,)] - - loop = asyncio.get_event_loop() - loop.run_until_complete(go()) - - - Example of SQLAlchemy optional integration - ------------------------------------------ - - .. code:: python - - import asyncio - from aiopg.sa import create_engine - import sqlalchemy as sa - - metadata = sa.MetaData() - - tbl = sa.Table('tbl', metadata, - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('val', sa.String(255))) - - async def create_table(engine): - async with engine.acquire() as conn: - await conn.execute('DROP TABLE IF EXISTS tbl') - await conn.execute('''CREATE TABLE tbl ( - id serial PRIMARY KEY, - val varchar(255))''') - - async def go(): - async with create_engine(user='aiopg', - database='aiopg', - host='127.0.0.1', - password='passwd') as engine: - - async with engine.acquire() as conn: - await conn.execute(tbl.insert().values(val='abc')) - - async for row in conn.execute(tbl.select()): - print(row.id, row.val) - - loop = asyncio.get_event_loop() - loop.run_until_complete(go()) - - .. _PostgreSQL: http://www.postgresql.org/ - .. _asyncio: http://docs.python.org/3.4/library/asyncio.html - - Please use:: - - $ make test - - for executing the project's unittests. - See https://aiopg.readthedocs.io/en/stable/contributing.html for details - on how to set up your environment to run the tests. - - Changelog - --------- - - 1.2.0b2 (2020-12-21) - ^^^^^^^^^^^^^^^^^^^^ - - * Fix IsolationLevel.read_committed and introduce IsolationLevel.default `#770 `_ - - * Fix python 3.8 warnings in tests `#771 `_ - - - 1.2.0b1 (2020-12-16) - ^^^^^^^^^^^^^^^^^^^^ - - * Deprecate blocking connection.cancel() method `#570 `_ - - - 1.2.0b0 (2020-12-15) - ^^^^^^^^^^^^^^^^^^^^ - - * Implement timeout on acquiring connection from pool `#766 `_ - - - 1.1.0 (2020-12-10) - ^^^^^^^^^^^^^^^^^^ - - - 1.1.0b2 (2020-12-09) - ^^^^^^^^^^^^^^^^^^^^ - - * Added missing slots to context managers `#763 `_ - - - 1.1.0b1 (2020-12-07) - ^^^^^^^^^^^^^^^^^^^^ - - * Fix on_connect multiple call on acquire `#552 `_ - - * Fix python 3.8 warnings `#622 `_ - - * Bump minimum psycopg version to 2.8.4 `#754 `_ - - * Fix Engine.release method to release connection in any way `#756 `_ - - - 1.0.0 (2019-09-20) - ^^^^^^^^^^^^^^^^^^ - - * Removal of an asynchronous call in favor of issues # 550 - - * Big editing of documentation and minor bugs #534 - - - 0.16.0 (2019-01-25) - ^^^^^^^^^^^^^^^^^^^ - - * Fix select priority name `#525 `_ - - * Rename `psycopg2` to `psycopg2-binary` to fix deprecation warning `#507 `_ - - * Fix `#189 `_ hstore when using ReadDictCursor `#512 `_ - - * close cannot be used while an asynchronous query is underway `#452 `_ - - * sqlalchemy adapter trx begin allow transaction_mode `#498 `_ - - - 0.15.0 (2018-08-14) - ^^^^^^^^^^^^^^^^^^^ - - * Support Python 3.7 `#437 `_ - - - 0.14.0 (2018-05-10) - ^^^^^^^^^^^^^^^^^^^ - - * Add ``get_dialect`` func to have ability to pass ``json_serializer`` `#451 `_ - - - 0.13.2 (2018-01-03) - ^^^^^^^^^^^^^^^^^^^ - - * Fixed compatibility with SQLAlchemy 1.2.0 `#412 `_ - - * Added support for transaction isolation levels `#219 `_ - - - 0.13.1 (2017-09-10) - ^^^^^^^^^^^^^^^^^^^ - - * Added connection poll recycling logic `#373 `_ - - - 0.13.0 (2016-12-02) - ^^^^^^^^^^^^^^^^^^^ - - * Add `async with` support to `.begin_nested()` `#208 `_ - - * Fix connection.cancel() `#212 `_ `#223 `_ - - * Raise informative error on unexpected connection closing `#191 `_ - - * Added support for python types columns issues `#217 `_ - - * Added support for default values in SA table issues `#206 `_ - - - 0.12.0 (2016-10-09) - ^^^^^^^^^^^^^^^^^^^ - - * Add an on_connect callback parameter to pool `#141 `_ - - * Fixed connection to work under both windows and posix based systems `#142 `_ - - - 0.11.0 (2016-09-12) - ^^^^^^^^^^^^^^^^^^^ - - * Immediately remove callbacks from a closed file descriptor `#139 `_ - - * Drop Python 3.3 support - - - 0.10.0 (2016-07-16) - ^^^^^^^^^^^^^^^^^^^ - - * Refactor tests to use dockerized Postgres server `#107 `_ - - * Reduce default pool minsize to 1 `#106 `_ - - * Explicitly enumerate packages in setup.py `#85 `_ - - * Remove expired connections from pool on acquire `#116 `_ - - * Don't crash when Connection is GC'ed `#124 `_ - - * Use loop.create_future() if available - - - 0.9.2 (2016-01-31) - ^^^^^^^^^^^^^^^^^^ - - * Make pool.release return asyncio.Future, so we can wait on it in - `__aexit__` `#102 `_ - - * Add support for uuid type `#103 `_ - - - 0.9.1 (2016-01-17) - ^^^^^^^^^^^^^^^^^^ - - * Documentation update `#101 `_ - - - 0.9.0 (2016-01-14) - ^^^^^^^^^^^^^^^^^^ - - * Add async context managers for transactions `#91 `_ - - * Support async iterator in ResultProxy `#92 `_ - - * Add async with for engine `#90 `_ - - - 0.8.0 (2015-12-31) - ^^^^^^^^^^^^^^^^^^ - - * Add PostgreSQL notification support `#58 `_ - - * Support pools with unlimited size `#59 `_ - - * Cancel current DB operation on asyncio timeout `#66 `_ - - * Add async with support for Pool, Connection, Cursor `#88 `_ - - - 0.7.0 (2015-04-22) - ^^^^^^^^^^^^^^^^^^ - - * Get rid of resource leak on connection failure. - - * Report ResourceWarning on non-closed connections. - - * Deprecate iteration protocol support in cursor and ResultProxy. - - * Release sa connection to pool on `connection.close()`. - - - 0.6.0 (2015-02-03) - ^^^^^^^^^^^^^^^^^^ - - * Accept dict, list, tuple, named and positional parameters in - `SAConnection.execute()` - - - 0.5.2 (2014-12-08) - ^^^^^^^^^^^^^^^^^^ - - * Minor release, fixes a bug that leaves connection in broken state - after `cursor.execute()` failure. - - - 0.5.1 (2014-10-31) - ^^^^^^^^^^^^^^^^^^ - - * Fix a bug for processing transactions in line. - - - 0.5.0 (2014-10-31) - ^^^^^^^^^^^^^^^^^^ - - * Add .terminate() to Pool and Engine - - * Reimplement connection pool (now pool size cannot be greater than pool.maxsize) - - * Add .close() and .wait_closed() to Pool and Engine - - * Add minsize, maxsize, size and freesize properties to sa.Engine - - * Support *echo* parameter for logging executed SQL commands - - * Connection.close() is not a coroutine (but we keep backward compatibility). - - - 0.4.1 (2014-10-02) - ^^^^^^^^^^^^^^^^^^ - - * make cursor iterable - - * update docs - - - 0.4.0 (2014-10-02) - ^^^^^^^^^^^^^^^^^^ - - * add timeouts for database operations. - - * Autoregister psycopg2 support for json data type. - - * Support JSON in aiopg.sa - - * Support ARRAY in aiopg.sa - - * Autoregister hstore support if present in connected DB - - * Support HSTORE in aiopg.sa - - - 0.3.2 (2014-07-07) - ^^^^^^^^^^^^^^^^^^ - - * change signature to cursor.execute(operation, parameters=None) to - follow psycopg2 convention. - - - 0.3.1 (2014-07-04) - ^^^^^^^^^^^^^^^^^^ - - * Forward arguments to cursor constructor for pooled connections. - - - 0.3.0 (2014-06-22) - ^^^^^^^^^^^^^^^^^^ - - * Allow executing SQLAlchemy DDL statements. - - * Fix bug with race conditions on acquiring/releasing connections from pool. - - - 0.2.3 (2014-06-12) - ^^^^^^^^^^^^^^^^^^ - - * Fix bug in connection pool. - - - 0.2.2 (2014-06-07) - ^^^^^^^^^^^^^^^^^^ - - * Fix bug with passing parameters into SAConnection.execute when - executing raw SQL expression. - - - 0.2.1 (2014-05-08) - ^^^^^^^^^^^^^^^^^^ - - * Close connection with invalid transaction status on returning to pool. - - - 0.2.0 (2014-05-04) - ^^^^^^^^^^^^^^^^^^ - - * Implemented optional support for sqlalchemy functional sql layer. - - - 0.1.0 (2014-04-06) - ^^^^^^^^^^^^^^^^^^ - - * Implemented plain connections: connect, Connection, Cursor. - - * Implemented database pools: create_pool and Pool. Platform: macOS Platform: POSIX Platform: Windows @@ -412,6 +26,7 @@ Classifier: Programming Language :: Python :: 3.7 Classifier: Programming Language :: Python :: 3.8 Classifier: Programming Language :: Python :: 3.9 +Classifier: Programming Language :: Python :: 3.10 Classifier: Operating System :: POSIX Classifier: Operating System :: MacOS :: MacOS X Classifier: Operating System :: Microsoft :: Windows @@ -423,3 +38,484 @@ Requires-Python: >=3.6 Description-Content-Type: text/x-rst Provides-Extra: sa +License-File: LICENSE + +aiopg +===== +.. image:: https://github.com/aio-libs/aiopg/workflows/CI/badge.svg + :target: https://github.com/aio-libs/aiopg/actions?query=workflow%3ACI +.. image:: https://codecov.io/gh/aio-libs/aiopg/branch/master/graph/badge.svg + :target: https://codecov.io/gh/aio-libs/aiopg +.. image:: https://badges.gitter.im/Join%20Chat.svg + :target: https://gitter.im/aio-libs/Lobby + :alt: Chat on Gitter + +**aiopg** is a library for accessing a PostgreSQL_ database +from the asyncio_ (PEP-3156/tulip) framework. It wraps +asynchronous features of the Psycopg database driver. + +Example +------- + +.. code:: python + + import asyncio + import aiopg + + dsn = 'dbname=aiopg user=aiopg password=passwd host=127.0.0.1' + + async def go(): + pool = await aiopg.create_pool(dsn) + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute("SELECT 1") + ret = [] + async for row in cur: + ret.append(row) + assert ret == [(1,)] + + loop = asyncio.get_event_loop() + loop.run_until_complete(go()) + + +Example of SQLAlchemy optional integration +------------------------------------------ + +.. code:: python + + import asyncio + from aiopg.sa import create_engine + import sqlalchemy as sa + + metadata = sa.MetaData() + + tbl = sa.Table('tbl', metadata, + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('val', sa.String(255))) + + async def create_table(engine): + async with engine.acquire() as conn: + await conn.execute('DROP TABLE IF EXISTS tbl') + await conn.execute('''CREATE TABLE tbl ( + id serial PRIMARY KEY, + val varchar(255))''') + + async def go(): + async with create_engine(user='aiopg', + database='aiopg', + host='127.0.0.1', + password='passwd') as engine: + + async with engine.acquire() as conn: + await conn.execute(tbl.insert().values(val='abc')) + + async for row in conn.execute(tbl.select()): + print(row.id, row.val) + + loop = asyncio.get_event_loop() + loop.run_until_complete(go()) + +.. _PostgreSQL: http://www.postgresql.org/ +.. _asyncio: https://docs.python.org/3/library/asyncio.html + +Please use:: + + $ make test + +for executing the project's unittests. +See https://aiopg.readthedocs.io/en/stable/contributing.html for details +on how to set up your environment to run the tests. + +Changelog +--------- + +1.3.3 (2021-11-01) +^^^^^^^^^^^^^^^^^^ + +* Support async-timeout 4.0+ + + +1.3.2 (2021-10-07) +^^^^^^^^^^^^^^^^^^ + + +1.3.2b2 (2021-10-07) +^^^^^^^^^^^^^^^^^^^^ + +* Respect use_labels for select statement `#882 `_ + + +1.3.2b1 (2021-07-11) +^^^^^^^^^^^^^^^^^^^^ + +* Fix compatibility with SQLAlchemy >= 1.4 `#870 `_ + + +1.3.1 (2021-07-08) +^^^^^^^^^^^^^^^^^^ + + +1.3.1b2 (2021-07-06) +^^^^^^^^^^^^^^^^^^^^ + +* Suppress "Future exception was never retrieved" `#862 `_ + + +1.3.1b1 (2021-07-05) +^^^^^^^^^^^^^^^^^^^^ + +* Fix ClosableQueue.get on cancellation, close it on Connection.close `#859 `_ + + +1.3.0 (2021-06-30) +^^^^^^^^^^^^^^^^^^ + + +1.3.0b4 (2021-06-28) +^^^^^^^^^^^^^^^^^^^^ + +* Fix "Unable to detect disconnect when using NOTIFY/LISTEN" `#559 `_ + + +1.3.0b3 (2021-04-03) +^^^^^^^^^^^^^^^^^^^^ + +* Reformat using black `#814 `_ + + +1.3.0b2 (2021-04-02) +^^^^^^^^^^^^^^^^^^^^ + +* Type annotations `#813 `_ + + +1.3.0b1 (2021-03-30) +^^^^^^^^^^^^^^^^^^^^ + +* Raise ResourceClosedError if we try to open a cursor on a closed SAConnection `#811 `_ + + +1.3.0b0 (2021-03-25) +^^^^^^^^^^^^^^^^^^^^ + +* Fix compatibility with SA 1.4 for IN statement `#806 `_ + + +1.2.1 (2021-03-23) +^^^^^^^^^^^^^^^^^^ + +* Pop loop in connection init due to backward compatibility `#808 `_ + + +1.2.0b4 (2021-03-23) +^^^^^^^^^^^^^^^^^^^^ + +* Set max supported sqlalchemy version `#805 `_ + + +1.2.0b3 (2021-03-22) +^^^^^^^^^^^^^^^^^^^^ + +* Don't run ROLLBACK when the connection is closed `#778 `_ + +* Multiple cursors support `#801 `_ + + +1.2.0b2 (2020-12-21) +^^^^^^^^^^^^^^^^^^^^ + +* Fix IsolationLevel.read_committed and introduce IsolationLevel.default `#770 `_ + +* Fix python 3.8 warnings in tests `#771 `_ + + +1.2.0b1 (2020-12-16) +^^^^^^^^^^^^^^^^^^^^ + +* Deprecate blocking connection.cancel() method `#570 `_ + + +1.2.0b0 (2020-12-15) +^^^^^^^^^^^^^^^^^^^^ + +* Implement timeout on acquiring connection from pool `#766 `_ + + +1.1.0 (2020-12-10) +^^^^^^^^^^^^^^^^^^ + + +1.1.0b2 (2020-12-09) +^^^^^^^^^^^^^^^^^^^^ + +* Added missing slots to context managers `#763 `_ + + +1.1.0b1 (2020-12-07) +^^^^^^^^^^^^^^^^^^^^ + +* Fix on_connect multiple call on acquire `#552 `_ + +* Fix python 3.8 warnings `#622 `_ + +* Bump minimum psycopg version to 2.8.4 `#754 `_ + +* Fix Engine.release method to release connection in any way `#756 `_ + + +1.0.0 (2019-09-20) +^^^^^^^^^^^^^^^^^^ + +* Removal of an asynchronous call in favor of issues # 550 + +* Big editing of documentation and minor bugs #534 + + +0.16.0 (2019-01-25) +^^^^^^^^^^^^^^^^^^^ + +* Fix select priority name `#525 `_ + +* Rename `psycopg2` to `psycopg2-binary` to fix deprecation warning `#507 `_ + +* Fix `#189 `_ hstore when using ReadDictCursor `#512 `_ + +* close cannot be used while an asynchronous query is underway `#452 `_ + +* sqlalchemy adapter trx begin allow transaction_mode `#498 `_ + + +0.15.0 (2018-08-14) +^^^^^^^^^^^^^^^^^^^ + +* Support Python 3.7 `#437 `_ + + +0.14.0 (2018-05-10) +^^^^^^^^^^^^^^^^^^^ + +* Add ``get_dialect`` func to have ability to pass ``json_serializer`` `#451 `_ + + +0.13.2 (2018-01-03) +^^^^^^^^^^^^^^^^^^^ + +* Fixed compatibility with SQLAlchemy 1.2.0 `#412 `_ + +* Added support for transaction isolation levels `#219 `_ + + +0.13.1 (2017-09-10) +^^^^^^^^^^^^^^^^^^^ + +* Added connection poll recycling logic `#373 `_ + + +0.13.0 (2016-12-02) +^^^^^^^^^^^^^^^^^^^ + +* Add `async with` support to `.begin_nested()` `#208 `_ + +* Fix connection.cancel() `#212 `_ `#223 `_ + +* Raise informative error on unexpected connection closing `#191 `_ + +* Added support for python types columns issues `#217 `_ + +* Added support for default values in SA table issues `#206 `_ + + +0.12.0 (2016-10-09) +^^^^^^^^^^^^^^^^^^^ + +* Add an on_connect callback parameter to pool `#141 `_ + +* Fixed connection to work under both windows and posix based systems `#142 `_ + + +0.11.0 (2016-09-12) +^^^^^^^^^^^^^^^^^^^ + +* Immediately remove callbacks from a closed file descriptor `#139 `_ + +* Drop Python 3.3 support + + +0.10.0 (2016-07-16) +^^^^^^^^^^^^^^^^^^^ + +* Refactor tests to use dockerized Postgres server `#107 `_ + +* Reduce default pool minsize to 1 `#106 `_ + +* Explicitly enumerate packages in setup.py `#85 `_ + +* Remove expired connections from pool on acquire `#116 `_ + +* Don't crash when Connection is GC'ed `#124 `_ + +* Use loop.create_future() if available + + +0.9.2 (2016-01-31) +^^^^^^^^^^^^^^^^^^ + +* Make pool.release return asyncio.Future, so we can wait on it in + `__aexit__` `#102 `_ + +* Add support for uuid type `#103 `_ + + +0.9.1 (2016-01-17) +^^^^^^^^^^^^^^^^^^ + +* Documentation update `#101 `_ + + +0.9.0 (2016-01-14) +^^^^^^^^^^^^^^^^^^ + +* Add async context managers for transactions `#91 `_ + +* Support async iterator in ResultProxy `#92 `_ + +* Add async with for engine `#90 `_ + + +0.8.0 (2015-12-31) +^^^^^^^^^^^^^^^^^^ + +* Add PostgreSQL notification support `#58 `_ + +* Support pools with unlimited size `#59 `_ + +* Cancel current DB operation on asyncio timeout `#66 `_ + +* Add async with support for Pool, Connection, Cursor `#88 `_ + + +0.7.0 (2015-04-22) +^^^^^^^^^^^^^^^^^^ + +* Get rid of resource leak on connection failure. + +* Report ResourceWarning on non-closed connections. + +* Deprecate iteration protocol support in cursor and ResultProxy. + +* Release sa connection to pool on `connection.close()`. + + +0.6.0 (2015-02-03) +^^^^^^^^^^^^^^^^^^ + +* Accept dict, list, tuple, named and positional parameters in + `SAConnection.execute()` + + +0.5.2 (2014-12-08) +^^^^^^^^^^^^^^^^^^ + +* Minor release, fixes a bug that leaves connection in broken state + after `cursor.execute()` failure. + + +0.5.1 (2014-10-31) +^^^^^^^^^^^^^^^^^^ + +* Fix a bug for processing transactions in line. + + +0.5.0 (2014-10-31) +^^^^^^^^^^^^^^^^^^ + +* Add .terminate() to Pool and Engine + +* Reimplement connection pool (now pool size cannot be greater than pool.maxsize) + +* Add .close() and .wait_closed() to Pool and Engine + +* Add minsize, maxsize, size and freesize properties to sa.Engine + +* Support *echo* parameter for logging executed SQL commands + +* Connection.close() is not a coroutine (but we keep backward compatibility). + + +0.4.1 (2014-10-02) +^^^^^^^^^^^^^^^^^^ + +* make cursor iterable + +* update docs + + +0.4.0 (2014-10-02) +^^^^^^^^^^^^^^^^^^ + +* add timeouts for database operations. + +* Autoregister psycopg2 support for json data type. + +* Support JSON in aiopg.sa + +* Support ARRAY in aiopg.sa + +* Autoregister hstore support if present in connected DB + +* Support HSTORE in aiopg.sa + + +0.3.2 (2014-07-07) +^^^^^^^^^^^^^^^^^^ + +* change signature to cursor.execute(operation, parameters=None) to + follow psycopg2 convention. + + +0.3.1 (2014-07-04) +^^^^^^^^^^^^^^^^^^ + +* Forward arguments to cursor constructor for pooled connections. + + +0.3.0 (2014-06-22) +^^^^^^^^^^^^^^^^^^ + +* Allow executing SQLAlchemy DDL statements. + +* Fix bug with race conditions on acquiring/releasing connections from pool. + + +0.2.3 (2014-06-12) +^^^^^^^^^^^^^^^^^^ + +* Fix bug in connection pool. + + +0.2.2 (2014-06-07) +^^^^^^^^^^^^^^^^^^ + +* Fix bug with passing parameters into SAConnection.execute when + executing raw SQL expression. + + +0.2.1 (2014-05-08) +^^^^^^^^^^^^^^^^^^ + +* Close connection with invalid transaction status on returning to pool. + + +0.2.0 (2014-05-04) +^^^^^^^^^^^^^^^^^^ + +* Implemented optional support for sqlalchemy functional sql layer. + + +0.1.0 (2014-04-06) +^^^^^^^^^^^^^^^^^^ + +* Implemented plain connections: connect, Connection, Cursor. + +* Implemented database pools: create_pool and Pool. + diff --git a/README.rst b/README.rst index b9ffca0..0c4539f 100644 --- a/README.rst +++ b/README.rst @@ -74,7 +74,7 @@ loop.run_until_complete(go()) .. _PostgreSQL: http://www.postgresql.org/ -.. _asyncio: http://docs.python.org/3.4/library/asyncio.html +.. _asyncio: https://docs.python.org/3/library/asyncio.html Please use:: diff --git a/aiopg/__init__.py b/aiopg/__init__.py index d987be7..400fb05 100644 --- a/aiopg/__init__.py +++ b/aiopg/__init__.py @@ -3,57 +3,91 @@ import warnings from collections import namedtuple -from .connection import TIMEOUT as DEFAULT_TIMEOUT -from .connection import Connection, connect -from .cursor import Cursor +from .connection import ( + TIMEOUT as DEFAULT_TIMEOUT, + Connection, + Cursor, + DefaultCompiler, + IsolationCompiler, + IsolationLevel, + ReadCommittedCompiler, + RepeatableReadCompiler, + SerializableCompiler, + Transaction, + connect, +) from .pool import Pool, create_pool -from .transaction import IsolationLevel, Transaction from .utils import get_running_loop warnings.filterwarnings( - 'always', '.*', + "always", + ".*", category=ResourceWarning, - module=r'aiopg(\.\w+)+', - append=False + module=r"aiopg(\.\w+)+", + append=False, ) -__all__ = ('connect', 'create_pool', 'get_running_loop', - 'Connection', 'Cursor', 'Pool', 'version', 'version_info', - 'DEFAULT_TIMEOUT', 'IsolationLevel', 'Transaction') +__all__ = ( + "connect", + "create_pool", + "get_running_loop", + "Connection", + "Cursor", + "Pool", + "version", + "version_info", + "DEFAULT_TIMEOUT", + "IsolationLevel", + "Transaction", +) -__version__ = '1.2.0b2' +__version__ = "1.3.3" -version = __version__ + ' , Python ' + sys.version +version = f"{__version__}, Python {sys.version}" -VersionInfo = namedtuple('VersionInfo', - 'major minor micro releaselevel serial') +VersionInfo = namedtuple( + "VersionInfo", "major minor micro releaselevel serial" +) -def _parse_version(ver): +def _parse_version(ver: str) -> VersionInfo: RE = ( - r'^' - r'(?P\d+)\.(?P\d+)\.(?P\d+)' - r'((?P[a-z]+)(?P\d+)?)?' - r'$' + r"^" + r"(?P\d+)\.(?P\d+)\.(?P\d+)" + r"((?P[a-z]+)(?P\d+)?)?" + r"$" ) match = re.match(RE, ver) + if not match: + raise ImportError(f"Invalid package version {ver}") try: - major = int(match.group('major')) - minor = int(match.group('minor')) - micro = int(match.group('micro')) - levels = {'rc': 'candidate', - 'a': 'alpha', - 'b': 'beta', - None: 'final'} - releaselevel = levels[match.group('releaselevel')] - serial = int(match.group('serial')) if match.group('serial') else 0 + major = int(match.group("major")) + minor = int(match.group("minor")) + micro = int(match.group("micro")) + levels = {"rc": "candidate", "a": "alpha", "b": "beta", None: "final"} + releaselevel = levels[match.group("releaselevel")] + serial = int(match.group("serial")) if match.group("serial") else 0 return VersionInfo(major, minor, micro, releaselevel, serial) except Exception as e: - raise ImportError("Invalid package version {}".format(ver)) from e + raise ImportError(f"Invalid package version {ver}") from e version_info = _parse_version(__version__) # make pyflakes happy -(connect, create_pool, Connection, Cursor, Pool, DEFAULT_TIMEOUT, - IsolationLevel, Transaction, get_running_loop) +( + connect, + create_pool, + Connection, + Cursor, + Pool, + DEFAULT_TIMEOUT, + IsolationLevel, + Transaction, + get_running_loop, + IsolationCompiler, + DefaultCompiler, + ReadCommittedCompiler, + RepeatableReadCompiler, + SerializableCompiler, +) diff --git a/aiopg/connection.py b/aiopg/connection.py index 0cc1c3e..ca32bda 100755 --- a/aiopg/connection.py +++ b/aiopg/connection.py @@ -1,22 +1,41 @@ +import abc import asyncio import contextlib +import datetime +import enum import errno import platform import select import sys import traceback +import uuid import warnings import weakref from collections.abc import Mapping +from types import TracebackType +from typing import ( + Any, + Callable, + Generator, + List, + Optional, + Sequence, + Tuple, + Type, + cast, +) import psycopg2 -from psycopg2 import extras -from psycopg2.extensions import POLL_ERROR, POLL_OK, POLL_READ, POLL_WRITE - -from .cursor import Cursor -from .utils import _ContextManager, get_running_loop - -__all__ = ('connect',) +import psycopg2.extensions +import psycopg2.extras + +from .log import logger +from .utils import ( + ClosableQueue, + _ContextManager, + create_completed_future, + get_running_loop, +) TIMEOUT = 60.0 @@ -25,8 +44,16 @@ WSAENOTSOCK = 10038 -def connect(dsn=None, *, timeout=TIMEOUT, enable_json=True, - enable_hstore=True, enable_uuid=True, echo=False, **kwargs): +def connect( + dsn: Optional[str] = None, + *, + timeout: float = TIMEOUT, + enable_json: bool = True, + enable_hstore: bool = True, + enable_uuid: bool = True, + echo: bool = False, + **kwargs: Any, +) -> _ContextManager["Connection"]: """A factory for connecting to PostgreSQL. The coroutine accepts all parameters that psycopg2.connect() does @@ -35,22 +62,669 @@ Returns instantiated Connection object. """ - coro = Connection( - dsn, timeout, bool(echo), + connection = Connection( + dsn, + timeout, + bool(echo), enable_hstore=enable_hstore, enable_uuid=enable_uuid, enable_json=enable_json, - **kwargs + **kwargs, ) - - return _ContextManager(coro) - - -def _is_bad_descriptor_error(os_error): - if platform.system() == 'Windows': # pragma: no cover - return os_error.winerror == WSAENOTSOCK - else: - return os_error.errno == errno.EBADF + return _ContextManager[Connection](connection, disconnect) # type: ignore + + +async def disconnect(c: "Connection") -> None: + await c.close() + + +def _is_bad_descriptor_error(os_error: OSError) -> bool: + if platform.system() == "Windows": # pragma: no cover + winerror = int(getattr(os_error, "winerror", 0)) + return winerror == WSAENOTSOCK + return os_error.errno == errno.EBADF + + +class IsolationCompiler(abc.ABC): + __slots__ = ("_isolation_level", "_readonly", "_deferrable") + + def __init__( + self, isolation_level: Optional[str], readonly: bool, deferrable: bool + ): + self._isolation_level = isolation_level + self._readonly = readonly + self._deferrable = deferrable + + @property + def name(self) -> str: + return self._isolation_level or "Unknown" + + def savepoint(self, unique_id: str) -> str: + return f"SAVEPOINT {unique_id}" + + def release_savepoint(self, unique_id: str) -> str: + return f"RELEASE SAVEPOINT {unique_id}" + + def rollback_savepoint(self, unique_id: str) -> str: + return f"ROLLBACK TO SAVEPOINT {unique_id}" + + def commit(self) -> str: + return "COMMIT" + + def rollback(self) -> str: + return "ROLLBACK" + + def begin(self) -> str: + query = "BEGIN" + if self._isolation_level is not None: + query += f" ISOLATION LEVEL {self._isolation_level.upper()}" + + if self._readonly: + query += " READ ONLY" + + if self._deferrable: + query += " DEFERRABLE" + + return query + + def __repr__(self) -> str: + return self.name + + +class ReadCommittedCompiler(IsolationCompiler): + __slots__ = () + + def __init__(self, readonly: bool, deferrable: bool): + super().__init__("Read committed", readonly, deferrable) + + +class RepeatableReadCompiler(IsolationCompiler): + __slots__ = () + + def __init__(self, readonly: bool, deferrable: bool): + super().__init__("Repeatable read", readonly, deferrable) + + +class SerializableCompiler(IsolationCompiler): + __slots__ = () + + def __init__(self, readonly: bool, deferrable: bool): + super().__init__("Serializable", readonly, deferrable) + + +class DefaultCompiler(IsolationCompiler): + __slots__ = () + + def __init__(self, readonly: bool, deferrable: bool): + super().__init__(None, readonly, deferrable) + + @property + def name(self) -> str: + return "Default" + + +class IsolationLevel(enum.Enum): + serializable = SerializableCompiler + repeatable_read = RepeatableReadCompiler + read_committed = ReadCommittedCompiler + default = DefaultCompiler + + def __call__(self, readonly: bool, deferrable: bool) -> IsolationCompiler: + return self.value(readonly, deferrable) # type: ignore + + +async def _release_savepoint(t: "Transaction") -> None: + await t.release_savepoint() + + +async def _rollback_savepoint(t: "Transaction") -> None: + await t.rollback_savepoint() + + +class Transaction: + __slots__ = ("_cursor", "_is_begin", "_isolation", "_unique_id") + + def __init__( + self, + cursor: "Cursor", + isolation_level: Callable[[bool, bool], IsolationCompiler], + readonly: bool = False, + deferrable: bool = False, + ): + self._cursor = cursor + self._is_begin = False + self._unique_id: Optional[str] = None + self._isolation = isolation_level(readonly, deferrable) + + @property + def is_begin(self) -> bool: + return self._is_begin + + async def begin(self) -> "Transaction": + if self._is_begin: + raise psycopg2.ProgrammingError( + "You are trying to open a new transaction, use the save point" + ) + self._is_begin = True + await self._cursor.execute(self._isolation.begin()) + return self + + async def commit(self) -> None: + self._check_commit_rollback() + await self._cursor.execute(self._isolation.commit()) + self._is_begin = False + + async def rollback(self) -> None: + self._check_commit_rollback() + if not self._cursor.closed: + await self._cursor.execute(self._isolation.rollback()) + self._is_begin = False + + async def rollback_savepoint(self) -> None: + self._check_release_rollback() + if not self._cursor.closed: + await self._cursor.execute( + self._isolation.rollback_savepoint( + self._unique_id # type: ignore + ) + ) + self._unique_id = None + + async def release_savepoint(self) -> None: + self._check_release_rollback() + await self._cursor.execute( + self._isolation.release_savepoint(self._unique_id) # type: ignore + ) + self._unique_id = None + + async def savepoint(self) -> "Transaction": + self._check_commit_rollback() + if self._unique_id is not None: + raise psycopg2.ProgrammingError("You do not shut down savepoint") + + self._unique_id = f"s{uuid.uuid1().hex}" + await self._cursor.execute(self._isolation.savepoint(self._unique_id)) + + return self + + def point(self) -> _ContextManager["Transaction"]: + return _ContextManager[Transaction]( + self.savepoint(), + _release_savepoint, + _rollback_savepoint, + ) + + def _check_commit_rollback(self) -> None: + if not self._is_begin: + raise psycopg2.ProgrammingError( + "You are trying to commit " "the transaction does not open" + ) + + def _check_release_rollback(self) -> None: + self._check_commit_rollback() + if self._unique_id is None: + raise psycopg2.ProgrammingError("You do not start savepoint") + + def __repr__(self) -> str: + return ( + f"<{self.__class__.__name__} " + f"transaction={self._isolation} id={id(self):#x}>" + ) + + def __del__(self) -> None: + if self._is_begin: + warnings.warn( + f"You have not closed transaction {self!r}", ResourceWarning + ) + + if self._unique_id is not None: + warnings.warn( + f"You have not closed savepoint {self!r}", ResourceWarning + ) + + async def __aenter__(self) -> "Transaction": + return await self.begin() + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + tb: Optional[TracebackType], + ) -> None: + if exc_type is not None: + await self.rollback() + else: + await self.commit() + + +async def _commit_transaction(t: Transaction) -> None: + await t.commit() + + +async def _rollback_transaction(t: Transaction) -> None: + await t.rollback() + + +class Cursor: + def __init__( + self, + conn: "Connection", + impl: Any, + timeout: float, + echo: bool, + isolation_level: Optional[IsolationLevel] = None, + ): + self._conn = conn + self._impl = impl + self._timeout = timeout + self._echo = echo + self._transaction = Transaction( + self, isolation_level or IsolationLevel.default + ) + + @property + def echo(self) -> bool: + """Return echo mode status.""" + return self._echo + + @property + def description(self) -> Optional[Sequence[Any]]: + """This read-only attribute is a sequence of 7-item sequences. + + Each of these sequences is a collections.namedtuple containing + information describing one result column: + + 0. name: the name of the column returned. + 1. type_code: the PostgreSQL OID of the column. + 2. display_size: the actual length of the column in bytes. + 3. internal_size: the size in bytes of the column associated to + this column on the server. + 4. precision: total number of significant digits in columns of + type NUMERIC. None for other types. + 5. scale: count of decimal digits in the fractional part in + columns of type NUMERIC. None for other types. + 6. null_ok: always None as not easy to retrieve from the libpq. + + This attribute will be None for operations that do not + return rows or if the cursor has not had an operation invoked + via the execute() method yet. + + """ + return self._impl.description # type: ignore + + def close(self) -> None: + """Close the cursor now.""" + if not self.closed: + self._impl.close() + + @property + def closed(self) -> bool: + """Read-only boolean attribute: specifies if the cursor is closed.""" + return self._impl.closed # type: ignore + + @property + def connection(self) -> "Connection": + """Read-only attribute returning a reference to the `Connection`.""" + return self._conn + + @property + def raw(self) -> Any: + """Underlying psycopg cursor object, readonly""" + return self._impl + + @property + def name(self) -> str: + # Not supported + return self._impl.name # type: ignore + + @property + def scrollable(self) -> Optional[bool]: + # Not supported + return self._impl.scrollable # type: ignore + + @scrollable.setter + def scrollable(self, val: bool) -> None: + # Not supported + self._impl.scrollable = val + + @property + def withhold(self) -> bool: + # Not supported + return self._impl.withhold # type: ignore + + @withhold.setter + def withhold(self, val: bool) -> None: + # Not supported + self._impl.withhold = val + + async def execute( + self, + operation: str, + parameters: Any = None, + *, + timeout: Optional[float] = None, + ) -> None: + """Prepare and execute a database operation (query or command). + + Parameters may be provided as sequence or mapping and will be + bound to variables in the operation. Variables are specified + either with positional %s or named %({name})s placeholders. + + """ + if timeout is None: + timeout = self._timeout + waiter = self._conn._create_waiter("cursor.execute") + if self._echo: + logger.info(operation) + logger.info("%r", parameters) + try: + self._impl.execute(operation, parameters) + except BaseException: + self._conn._waiter = None + raise + try: + await self._conn._poll(waiter, timeout) + except asyncio.TimeoutError: + self._impl.close() + raise + + async def executemany(self, *args: Any, **kwargs: Any) -> None: + # Not supported + raise psycopg2.ProgrammingError( + "executemany cannot be used in asynchronous mode" + ) + + async def callproc( + self, + procname: str, + parameters: Any = None, + *, + timeout: Optional[float] = None, + ) -> None: + """Call a stored database procedure with the given name. + + The sequence of parameters must contain one entry for each + argument that the procedure expects. The result of the call is + returned as modified copy of the input sequence. Input + parameters are left untouched, output and input/output + parameters replaced with possibly new values. + + """ + if timeout is None: + timeout = self._timeout + waiter = self._conn._create_waiter("cursor.callproc") + if self._echo: + logger.info("CALL %s", procname) + logger.info("%r", parameters) + try: + self._impl.callproc(procname, parameters) + except BaseException: + self._conn._waiter = None + raise + else: + await self._conn._poll(waiter, timeout) + + def begin(self) -> _ContextManager[Transaction]: + return _ContextManager[Transaction]( + self._transaction.begin(), + _commit_transaction, + _rollback_transaction, + ) + + def begin_nested(self) -> _ContextManager[Transaction]: + if self._transaction.is_begin: + return self._transaction.point() + + return _ContextManager[Transaction]( + self._transaction.begin(), + _commit_transaction, + _rollback_transaction, + ) + + def mogrify(self, operation: str, parameters: Any = None) -> bytes: + """Return a query string after arguments binding. + + The byte string returned is exactly the one that would be sent to + the database running the .execute() method or similar. + + """ + ret = self._impl.mogrify(operation, parameters) + assert ( + not self._conn.isexecuting() + ), "Don't support server side mogrify" + return ret # type: ignore + + async def setinputsizes(self, sizes: int) -> None: + """This method is exposed in compliance with the DBAPI. + + It currently does nothing but it is safe to call it. + + """ + self._impl.setinputsizes(sizes) + + async def fetchone(self) -> Any: + """Fetch the next row of a query result set. + + Returns a single tuple, or None when no more data is + available. + + """ + ret = self._impl.fetchone() + assert ( + not self._conn.isexecuting() + ), "Don't support server side cursors yet" + return ret + + async def fetchmany(self, size: Optional[int] = None) -> List[Any]: + """Fetch the next set of rows of a query result. + + Returns a list of tuples. An empty list is returned when no + more rows are available. + + The number of rows to fetch per call is specified by the + parameter. If it is not given, the cursor's .arraysize + determines the number of rows to be fetched. The method should + try to fetch as many rows as indicated by the size + parameter. If this is not possible due to the specified number + of rows not being available, fewer rows may be returned. + + """ + if size is None: + size = self._impl.arraysize + ret = self._impl.fetchmany(size) + assert ( + not self._conn.isexecuting() + ), "Don't support server side cursors yet" + return ret # type: ignore + + async def fetchall(self) -> List[Any]: + """Fetch all (remaining) rows of a query result. + + Returns them as a list of tuples. An empty list is returned + if there is no more record to fetch. + + """ + ret = self._impl.fetchall() + assert ( + not self._conn.isexecuting() + ), "Don't support server side cursors yet" + return ret # type: ignore + + async def scroll(self, value: int, mode: str = "relative") -> None: + """Scroll to a new position according to mode. + + If mode is relative (default), value is taken as offset + to the current position in the result set, if set to + absolute, value states an absolute target position. + + """ + self._impl.scroll(value, mode) + assert ( + not self._conn.isexecuting() + ), "Don't support server side cursors yet" + + @property + def arraysize(self) -> int: + """How many rows will be returned by fetchmany() call. + + This read/write attribute specifies the number of rows to + fetch at a time with fetchmany(). It defaults to + 1 meaning to fetch a single row at a time. + + """ + return self._impl.arraysize # type: ignore + + @arraysize.setter + def arraysize(self, val: int) -> None: + """How many rows will be returned by fetchmany() call. + + This read/write attribute specifies the number of rows to + fetch at a time with fetchmany(). It defaults to + 1 meaning to fetch a single row at a time. + + """ + self._impl.arraysize = val + + @property + def itersize(self) -> int: + # Not supported + return self._impl.itersize # type: ignore + + @itersize.setter + def itersize(self, val: int) -> None: + # Not supported + self._impl.itersize = val + + @property + def rowcount(self) -> int: + """Returns the number of rows that has been produced of affected. + + This read-only attribute specifies the number of rows that the + last :meth:`execute` produced (for Data Query Language + statements like SELECT) or affected (for Data Manipulation + Language statements like UPDATE or INSERT). + + The attribute is -1 in case no .execute() has been performed + on the cursor or the row count of the last operation if it + can't be determined by the interface. + + """ + return self._impl.rowcount # type: ignore + + @property + def rownumber(self) -> int: + """Row index. + + This read-only attribute provides the current 0-based index of the + cursor in the result set or ``None`` if the index cannot be + determined.""" + + return self._impl.rownumber # type: ignore + + @property + def lastrowid(self) -> int: + """OID of the last inserted row. + + This read-only attribute provides the OID of the last row + inserted by the cursor. If the table wasn't created with OID + support or the last operation is not a single record insert, + the attribute is set to None. + + """ + return self._impl.lastrowid # type: ignore + + @property + def query(self) -> Optional[str]: + """The last executed query string. + + Read-only attribute containing the body of the last query sent + to the backend (including bound arguments) as bytes + string. None if no query has been executed yet. + + """ + return self._impl.query # type: ignore + + @property + def statusmessage(self) -> str: + """the message returned by the last command.""" + return self._impl.statusmessage # type: ignore + + @property + def tzinfo_factory(self) -> datetime.tzinfo: + """The time zone factory used to handle data types such as + `TIMESTAMP WITH TIME ZONE`. + """ + return self._impl.tzinfo_factory # type: ignore + + @tzinfo_factory.setter + def tzinfo_factory(self, val: datetime.tzinfo) -> None: + """The time zone factory used to handle data types such as + `TIMESTAMP WITH TIME ZONE`. + """ + self._impl.tzinfo_factory = val + + async def nextset(self) -> None: + # Not supported + self._impl.nextset() # raises psycopg2.NotSupportedError + + async def setoutputsize( + self, size: int, column: Optional[int] = None + ) -> None: + # Does nothing + self._impl.setoutputsize(size, column) + + async def copy_from(self, *args: Any, **kwargs: Any) -> None: + raise psycopg2.ProgrammingError( + "copy_from cannot be used in asynchronous mode" + ) + + async def copy_to(self, *args: Any, **kwargs: Any) -> None: + raise psycopg2.ProgrammingError( + "copy_to cannot be used in asynchronous mode" + ) + + async def copy_expert(self, *args: Any, **kwargs: Any) -> None: + raise psycopg2.ProgrammingError( + "copy_expert cannot be used in asynchronous mode" + ) + + @property + def timeout(self) -> float: + """Return default timeout for cursor operations.""" + return self._timeout + + def __aiter__(self) -> "Cursor": + return self + + async def __anext__(self) -> Any: + ret = await self.fetchone() + if ret is not None: + return ret + raise StopAsyncIteration + + async def __aenter__(self) -> "Cursor": + return self + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + tb: Optional[TracebackType], + ) -> None: + self.close() + + def __repr__(self) -> str: + return ( + f"<" + f"{type(self).__module__}::{type(self).__name__} " + f"name={self.name}, " + f"closed={self.closed}" + f">" + ) + + +async def _close_cursor(c: Cursor) -> None: + c.close() class Connection: @@ -64,37 +738,47 @@ _source_traceback = None def __init__( - self, dsn, timeout, echo, - *, enable_json=True, enable_hstore=True, - enable_uuid=True, **kwargs + self, + dsn: Optional[str], + timeout: float, + echo: bool = False, + enable_json: bool = True, + enable_hstore: bool = True, + enable_uuid: bool = True, + **kwargs: Any, ): self._enable_json = enable_json self._enable_hstore = enable_hstore self._enable_uuid = enable_uuid - self._loop = get_running_loop(kwargs.pop('loop', None) is not None) - self._waiter = self._loop.create_future() - - kwargs['async_'] = kwargs.pop('async', True) + self._loop = get_running_loop() + self._waiter: Optional[ + "asyncio.Future[None]" + ] = self._loop.create_future() + + kwargs["async_"] = kwargs.pop("async", True) + kwargs.pop("loop", None) # backward compatibility self._conn = psycopg2.connect(dsn, **kwargs) self._dsn = self._conn.dsn assert self._conn.isexecuting(), "Is conn an async at all???" - self._fileno = self._conn.fileno() + self._fileno: Optional[int] = self._conn.fileno() self._timeout = timeout self._last_usage = self._loop.time() self._writing = False self._echo = echo - self._cursor_instance = None - self._notifies = asyncio.Queue() + self._notifies = asyncio.Queue() # type: ignore + self._notifies_proxy = ClosableQueue(self._notifies, self._loop) self._weakref = weakref.ref(self) - self._loop.add_reader(self._fileno, self._ready, self._weakref) + self._loop.add_reader( + self._fileno, self._ready, self._weakref # type: ignore + ) if self._loop.get_debug(): self._source_traceback = traceback.extract_stack(sys._getframe(1)) @staticmethod - def _ready(weak_self): - self = weak_self() + def _ready(weak_self: "weakref.ref[Any]") -> None: + self = cast(Connection, weak_self()) if self is None: return @@ -128,6 +812,7 @@ # chain exception otherwise exc2.__cause__ = exc exc = exc2 + self._notifies_proxy.close(exc) if waiter is not None and not waiter.done(): waiter.set_exception(exc) else: @@ -135,48 +820,60 @@ # connection closed if waiter is not None and not waiter.done(): waiter.set_exception( - psycopg2.OperationalError("Connection closed")) - if state == POLL_OK: + psycopg2.OperationalError("Connection closed") + ) + if state == psycopg2.extensions.POLL_OK: if self._writing: - self._loop.remove_writer(self._fileno) + self._loop.remove_writer(self._fileno) # type: ignore self._writing = False if waiter is not None and not waiter.done(): waiter.set_result(None) - elif state == POLL_READ: + elif state == psycopg2.extensions.POLL_READ: if self._writing: - self._loop.remove_writer(self._fileno) + self._loop.remove_writer(self._fileno) # type: ignore self._writing = False - elif state == POLL_WRITE: + elif state == psycopg2.extensions.POLL_WRITE: if not self._writing: - self._loop.add_writer(self._fileno, self._ready, weak_self) + self._loop.add_writer( + self._fileno, self._ready, weak_self # type: ignore + ) self._writing = True - elif state == POLL_ERROR: - self._fatal_error("Fatal error on aiopg connection: " - "POLL_ERROR from underlying .poll() call") + elif state == psycopg2.extensions.POLL_ERROR: + self._fatal_error( + "Fatal error on aiopg connection: " + "POLL_ERROR from underlying .poll() call" + ) else: - self._fatal_error("Fatal error on aiopg connection: " - "unknown answer {} from underlying " - ".poll() call" - .format(state)) - - def _fatal_error(self, message): + self._fatal_error( + f"Fatal error on aiopg connection: " + f"unknown answer {state} from underlying " + f".poll() call" + ) + + def _fatal_error(self, message: str) -> None: # Should be called from exception handler only. - self._loop.call_exception_handler({ - 'message': message, - 'connection': self, - }) + self._loop.call_exception_handler( + { + "message": message, + "connection": self, + } + ) self.close() if self._waiter and not self._waiter.done(): self._waiter.set_exception(psycopg2.OperationalError(message)) - def _create_waiter(self, func_name): + def _create_waiter(self, func_name: str) -> "asyncio.Future[None]": if self._waiter is not None: - raise RuntimeError('%s() called while another coroutine is ' - 'already waiting for incoming data' % func_name) + raise RuntimeError( + f"{func_name}() called while another coroutine " + f"is already waiting for incoming data" + ) self._waiter = self._loop.create_future() return self._waiter - async def _poll(self, waiter, timeout): + async def _poll( + self, waiter: "asyncio.Future[None]", timeout: float + ) -> None: assert waiter is self._waiter, (waiter, self._waiter) self._ready(self._weakref) @@ -186,21 +883,29 @@ await asyncio.shield(self.close()) raise exc except psycopg2.extensions.QueryCanceledError as exc: - self._loop.call_exception_handler({ - 'message': exc.pgerror, - 'exception': exc, - 'future': self._waiter, - }) + self._loop.call_exception_handler( + { + "message": exc.pgerror, + "exception": exc, + "future": self._waiter, + } + ) raise asyncio.CancelledError finally: self._waiter = None - def _isexecuting(self): - return self._conn.isexecuting() - - def cursor(self, name=None, cursor_factory=None, - scrollable=None, withhold=False, timeout=None, - isolation_level=None): + def isexecuting(self) -> bool: + return self._conn.isexecuting() # type: ignore + + def cursor( + self, + name: Optional[str] = None, + cursor_factory: Any = None, + scrollable: Optional[bool] = None, + withhold: bool = False, + timeout: Optional[float] = None, + isolation_level: Optional[IsolationLevel] = None, + ) -> _ContextManager[Cursor]: """A coroutine that returns a new cursor object using the connection. *cursor_factory* argument can be used to create non-standard @@ -210,49 +915,61 @@ *name*, *scrollable* and *withhold* parameters are not supported by psycopg in asynchronous mode. - NOTE: as of [TODO] any previously created created cursor from this - connection will be closed - """ + """ + self._last_usage = self._loop.time() - coro = self._cursor(name=name, cursor_factory=cursor_factory, - scrollable=scrollable, withhold=withhold, - timeout=timeout, isolation_level=isolation_level) - return _ContextManager(coro) - - async def _cursor(self, name=None, cursor_factory=None, - scrollable=None, withhold=False, timeout=None, - isolation_level=None): - - if not self.closed_cursor: - warnings.warn(('You can only have one cursor per connection. ' - 'The cursor for connection will be closed forcibly' - ' {!r}.').format(self), ResourceWarning) - - self.free_cursor() - + coro = self._cursor( + name=name, + cursor_factory=cursor_factory, + scrollable=scrollable, + withhold=withhold, + timeout=timeout, + isolation_level=isolation_level, + ) + return _ContextManager[Cursor](coro, _close_cursor) + + async def _cursor( + self, + name: Optional[str] = None, + cursor_factory: Any = None, + scrollable: Optional[bool] = None, + withhold: bool = False, + timeout: Optional[float] = None, + isolation_level: Optional[IsolationLevel] = None, + ) -> Cursor: if timeout is None: timeout = self._timeout - impl = await self._cursor_impl(name=name, - cursor_factory=cursor_factory, - scrollable=scrollable, - withhold=withhold) - self._cursor_instance = Cursor( - self, impl, timeout, self._echo, isolation_level - ) - return self._cursor_instance - - async def _cursor_impl(self, name=None, cursor_factory=None, - scrollable=None, withhold=False): + impl = await self._cursor_impl( + name=name, + cursor_factory=cursor_factory, + scrollable=scrollable, + withhold=withhold, + ) + cursor = Cursor(self, impl, timeout, self._echo, isolation_level) + return cursor + + async def _cursor_impl( + self, + name: Optional[str] = None, + cursor_factory: Any = None, + scrollable: Optional[bool] = None, + withhold: bool = False, + ) -> Any: if cursor_factory is None: - impl = self._conn.cursor(name=name, - scrollable=scrollable, withhold=withhold) + impl = self._conn.cursor( + name=name, scrollable=scrollable, withhold=withhold + ) else: - impl = self._conn.cursor(name=name, cursor_factory=cursor_factory, - scrollable=scrollable, withhold=withhold) + impl = self._conn.cursor( + name=name, + cursor_factory=cursor_factory, + scrollable=scrollable, + withhold=withhold, + ) return impl - def _close(self): + def _close(self) -> None: """Remove the connection from the event_loop and close it.""" # N.B. If connection contains uncommitted transaction the # transaction will be discarded @@ -263,121 +980,123 @@ self._loop.remove_writer(self._fileno) self._conn.close() - self.free_cursor() - - if self._waiter is not None and not self._waiter.done(): - self._waiter.set_exception( - psycopg2.OperationalError("Connection closed")) - - @property - def closed_cursor(self): - if not self._cursor_instance: - return True - - return bool(self._cursor_instance.closed) - - def free_cursor(self): - if not self.closed_cursor: - self._cursor_instance.close() - self._cursor_instance = None - - def close(self): + + if not self._loop.is_closed(): + if self._waiter is not None and not self._waiter.done(): + self._waiter.set_exception( + psycopg2.OperationalError("Connection closed") + ) + + self._notifies_proxy.close( + psycopg2.OperationalError("Connection closed") + ) + + def close(self) -> "asyncio.Future[None]": self._close() - ret = self._loop.create_future() - ret.set_result(None) - return ret - - @property - def closed(self): + return create_completed_future(self._loop) + + @property + def closed(self) -> bool: """Connection status. Read-only attribute reporting whether the database connection is open (False) or closed (True). """ - return self._conn.closed - - @property - def raw(self): + return self._conn.closed # type: ignore + + @property + def raw(self) -> Any: """Underlying psycopg connection object, readonly""" return self._conn - async def commit(self): - raise psycopg2.ProgrammingError( - "commit cannot be used in asynchronous mode") - - async def rollback(self): - raise psycopg2.ProgrammingError( - "rollback cannot be used in asynchronous mode") + async def commit(self) -> None: + raise psycopg2.ProgrammingError( + "commit cannot be used in asynchronous mode" + ) + + async def rollback(self) -> None: + raise psycopg2.ProgrammingError( + "rollback cannot be used in asynchronous mode" + ) # TPC - async def xid(self, format_id, gtrid, bqual): - return self._conn.xid(format_id, gtrid, bqual) - - async def tpc_begin(self, xid=None): - raise psycopg2.ProgrammingError( - "tpc_begin cannot be used in asynchronous mode") - - async def tpc_prepare(self): - raise psycopg2.ProgrammingError( - "tpc_prepare cannot be used in asynchronous mode") - - async def tpc_commit(self, xid=None): - raise psycopg2.ProgrammingError( - "tpc_commit cannot be used in asynchronous mode") - - async def tpc_rollback(self, xid=None): - raise psycopg2.ProgrammingError( - "tpc_rollback cannot be used in asynchronous mode") - - async def tpc_recover(self): - raise psycopg2.ProgrammingError( - "tpc_recover cannot be used in asynchronous mode") - - async def cancel(self): - raise psycopg2.ProgrammingError( - "cancel cannot be used in asynchronous mode") - - async def reset(self): - raise psycopg2.ProgrammingError( - "reset cannot be used in asynchronous mode") - - @property - def dsn(self): + async def xid( + self, format_id: int, gtrid: str, bqual: str + ) -> Tuple[int, str, str]: + return self._conn.xid(format_id, gtrid, bqual) # type: ignore + + async def tpc_begin(self, *args: Any, **kwargs: Any) -> None: + raise psycopg2.ProgrammingError( + "tpc_begin cannot be used in asynchronous mode" + ) + + async def tpc_prepare(self) -> None: + raise psycopg2.ProgrammingError( + "tpc_prepare cannot be used in asynchronous mode" + ) + + async def tpc_commit(self, *args: Any, **kwargs: Any) -> None: + raise psycopg2.ProgrammingError( + "tpc_commit cannot be used in asynchronous mode" + ) + + async def tpc_rollback(self, *args: Any, **kwargs: Any) -> None: + raise psycopg2.ProgrammingError( + "tpc_rollback cannot be used in asynchronous mode" + ) + + async def tpc_recover(self) -> None: + raise psycopg2.ProgrammingError( + "tpc_recover cannot be used in asynchronous mode" + ) + + async def cancel(self) -> None: + raise psycopg2.ProgrammingError( + "cancel cannot be used in asynchronous mode" + ) + + async def reset(self) -> None: + raise psycopg2.ProgrammingError( + "reset cannot be used in asynchronous mode" + ) + + @property + def dsn(self) -> Optional[str]: """DSN connection string. Read-only attribute representing dsn connection string used for connectint to PostgreSQL server. """ - return self._dsn - - async def set_session(self, *, isolation_level=None, readonly=None, - deferrable=None, autocommit=None): - raise psycopg2.ProgrammingError( - "set_session cannot be used in asynchronous mode") - - @property - def autocommit(self): + return self._dsn # type: ignore + + async def set_session(self, *args: Any, **kwargs: Any) -> None: + raise psycopg2.ProgrammingError( + "set_session cannot be used in asynchronous mode" + ) + + @property + def autocommit(self) -> bool: """Autocommit status""" - return self._conn.autocommit + return self._conn.autocommit # type: ignore @autocommit.setter - def autocommit(self, val): + def autocommit(self, val: bool) -> None: """Autocommit status""" self._conn.autocommit = val @property - def isolation_level(self): + def isolation_level(self) -> int: """Transaction isolation level. The only allowed value is ISOLATION_LEVEL_READ_COMMITTED. """ - return self._conn.isolation_level - - async def set_isolation_level(self, val): + return self._conn.isolation_level # type: ignore + + async def set_isolation_level(self, val: int) -> None: """Transaction isolation level. The only allowed value is ISOLATION_LEVEL_READ_COMMITTED. @@ -386,158 +1105,149 @@ self._conn.set_isolation_level(val) @property - def encoding(self): + def encoding(self) -> str: """Client encoding for SQL operations.""" - return self._conn.encoding - - async def set_client_encoding(self, val): + return self._conn.encoding # type: ignore + + async def set_client_encoding(self, val: str) -> None: self._conn.set_client_encoding(val) @property - def notices(self): + def notices(self) -> List[str]: """A list of all db messages sent to the client during the session.""" - return self._conn.notices - - @property - def cursor_factory(self): + return self._conn.notices # type: ignore + + @property + def cursor_factory(self) -> Any: """The default cursor factory used by .cursor().""" return self._conn.cursor_factory - async def get_backend_pid(self): + async def get_backend_pid(self) -> int: """Returns the PID of the backend server process.""" - return self._conn.get_backend_pid() - - async def get_parameter_status(self, parameter): + return self._conn.get_backend_pid() # type: ignore + + async def get_parameter_status(self, parameter: str) -> Optional[str]: """Look up a current parameter setting of the server.""" - return self._conn.get_parameter_status(parameter) - - async def get_transaction_status(self): + return self._conn.get_parameter_status(parameter) # type: ignore + + async def get_transaction_status(self) -> int: """Return the current session transaction status as an integer.""" - return self._conn.get_transaction_status() - - @property - def protocol_version(self): + return self._conn.get_transaction_status() # type: ignore + + @property + def protocol_version(self) -> int: """A read-only integer representing protocol being used.""" - return self._conn.protocol_version - - @property - def server_version(self): + return self._conn.protocol_version # type: ignore + + @property + def server_version(self) -> int: """A read-only integer representing the backend version.""" - return self._conn.server_version - - @property - def status(self): + return self._conn.server_version # type: ignore + + @property + def status(self) -> int: """A read-only integer representing the status of the connection.""" - return self._conn.status - - async def lobject(self, *args, **kwargs): - raise psycopg2.ProgrammingError( - "lobject cannot be used in asynchronous mode") - - @property - def timeout(self): + return self._conn.status # type: ignore + + async def lobject(self, *args: Any, **kwargs: Any) -> None: + raise psycopg2.ProgrammingError( + "lobject cannot be used in asynchronous mode" + ) + + @property + def timeout(self) -> float: """Return default timeout for connection operations.""" return self._timeout @property - def last_usage(self): + def last_usage(self) -> float: """Return time() when connection was used.""" return self._last_usage @property - def echo(self): + def echo(self) -> bool: """Return echo mode status.""" return self._echo - def __repr__(self): - msg = ( - '<' - '{module_name}::{class_name} ' - 'isexecuting={isexecuting}, ' - 'closed={closed}, ' - 'echo={echo}, ' - 'cursor={cursor}' - '>' - ) - return msg.format( - module_name=type(self).__module__, - class_name=type(self).__name__, - echo=self.echo, - isexecuting=self._isexecuting(), - closed=bool(self.closed), - cursor=repr(self._cursor_instance) - ) - - def __del__(self): + def __repr__(self) -> str: + return ( + f"<" + f"{type(self).__module__}::{type(self).__name__} " + f"isexecuting={self.isexecuting()}, " + f"closed={self.closed}, " + f"echo={self.echo}, " + f">" + ) + + def __del__(self) -> None: try: _conn = self._conn except AttributeError: return if _conn is not None and not _conn.closed: self.close() - warnings.warn("Unclosed connection {!r}".format(self), - ResourceWarning) - - context = {'connection': self, - 'message': 'Unclosed connection'} + warnings.warn(f"Unclosed connection {self!r}", ResourceWarning) + + context = {"connection": self, "message": "Unclosed connection"} if self._source_traceback is not None: - context['source_traceback'] = self._source_traceback + context["source_traceback"] = self._source_traceback self._loop.call_exception_handler(context) @property - def notifies(self): - """Return notification queue.""" - return self._notifies - - async def _get_oids(self): - cur = await self.cursor() + def notifies(self) -> ClosableQueue: + """Return notification queue (an asyncio.Queue -like object).""" + return self._notifies_proxy + + async def _get_oids(self) -> Tuple[Any, Any]: + cursor = await self.cursor() rv0, rv1 = [], [] try: - await cur.execute( + await cursor.execute( "SELECT t.oid, typarray " "FROM pg_type t JOIN pg_namespace ns ON typnamespace = ns.oid " "WHERE typname = 'hstore';" ) - async for oids in cur: + async for oids in cursor: if isinstance(oids, Mapping): - rv0.append(oids['oid']) - rv1.append(oids['typarray']) + rv0.append(oids["oid"]) + rv1.append(oids["typarray"]) else: rv0.append(oids[0]) rv1.append(oids[1]) finally: - cur.close() + cursor.close() return tuple(rv0), tuple(rv1) - async def _connect(self): + async def _connect(self) -> "Connection": try: - await self._poll(self._waiter, self._timeout) - except Exception: - self.close() + await self._poll(self._waiter, self._timeout) # type: ignore + except BaseException: + await asyncio.shield(self.close()) raise if self._enable_json: - extras.register_default_json(self._conn) + psycopg2.extras.register_default_json(self._conn) if self._enable_uuid: - extras.register_uuid(conn_or_curs=self._conn) + psycopg2.extras.register_uuid(conn_or_curs=self._conn) if self._enable_hstore: - oids = await self._get_oids() - if oids is not None: - oid, array_oid = oids - extras.register_hstore( - self._conn, - oid=oid, - array_oid=array_oid - ) + oid, array_oid = await self._get_oids() + psycopg2.extras.register_hstore( + self._conn, oid=oid, array_oid=array_oid + ) return self - def __await__(self): + def __await__(self) -> Generator[Any, None, "Connection"]: return self._connect().__await__() - async def __aenter__(self): + async def __aenter__(self) -> "Connection": return self - async def __aexit__(self, exc_type, exc_val, exc_tb): - self.close() + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + tb: Optional[TracebackType], + ) -> None: + await self.close() diff --git a/aiopg/cursor.py b/aiopg/cursor.py deleted file mode 100644 index ac8ad44..0000000 --- a/aiopg/cursor.py +++ /dev/null @@ -1,397 +0,0 @@ -import asyncio - -import psycopg2 - -from .log import logger -from .transaction import IsolationLevel, Transaction -from .utils import _TransactionBeginContextManager - - -class Cursor: - def __init__(self, conn, impl, timeout, echo, isolation_level): - self._conn = conn - self._impl = impl - self._timeout = timeout - self._echo = echo - self._transaction = Transaction( - self, isolation_level or IsolationLevel.default - ) - - @property - def echo(self): - """Return echo mode status.""" - return self._echo - - @property - def description(self): - """This read-only attribute is a sequence of 7-item sequences. - - Each of these sequences is a collections.namedtuple containing - information describing one result column: - - 0. name: the name of the column returned. - 1. type_code: the PostgreSQL OID of the column. - 2. display_size: the actual length of the column in bytes. - 3. internal_size: the size in bytes of the column associated to - this column on the server. - 4. precision: total number of significant digits in columns of - type NUMERIC. None for other types. - 5. scale: count of decimal digits in the fractional part in - columns of type NUMERIC. None for other types. - 6. null_ok: always None as not easy to retrieve from the libpq. - - This attribute will be None for operations that do not - return rows or if the cursor has not had an operation invoked - via the execute() method yet. - - """ - return self._impl.description - - def close(self): - """Close the cursor now.""" - if not self.closed: - self._impl.close() - - @property - def closed(self): - """Read-only boolean attribute: specifies if the cursor is closed.""" - return self._impl.closed - - @property - def connection(self): - """Read-only attribute returning a reference to the `Connection`.""" - return self._conn - - @property - def raw(self): - """Underlying psycopg cursor object, readonly""" - return self._impl - - @property - def name(self): - # Not supported - return self._impl.name - - @property - def scrollable(self): - # Not supported - return self._impl.scrollable - - @scrollable.setter - def scrollable(self, val): - # Not supported - self._impl.scrollable = val - - @property - def withhold(self): - # Not supported - return self._impl.withhold - - @withhold.setter - def withhold(self, val): - # Not supported - self._impl.withhold = val - - async def execute(self, operation, parameters=None, *, timeout=None): - """Prepare and execute a database operation (query or command). - - Parameters may be provided as sequence or mapping and will be - bound to variables in the operation. Variables are specified - either with positional %s or named %({name})s placeholders. - - """ - if timeout is None: - timeout = self._timeout - waiter = self._conn._create_waiter('cursor.execute') - if self._echo: - logger.info(operation) - logger.info("%r", parameters) - try: - self._impl.execute(operation, parameters) - except BaseException: - self._conn._waiter = None - raise - try: - await self._conn._poll(waiter, timeout) - except asyncio.TimeoutError: - self._impl.close() - raise - - async def executemany(self, operation, seq_of_parameters): - # Not supported - raise psycopg2.ProgrammingError( - "executemany cannot be used in asynchronous mode") - - async def callproc(self, procname, parameters=None, *, timeout=None): - """Call a stored database procedure with the given name. - - The sequence of parameters must contain one entry for each - argument that the procedure expects. The result of the call is - returned as modified copy of the input sequence. Input - parameters are left untouched, output and input/output - parameters replaced with possibly new values. - - """ - if timeout is None: - timeout = self._timeout - waiter = self._conn._create_waiter('cursor.callproc') - if self._echo: - logger.info("CALL %s", procname) - logger.info("%r", parameters) - try: - self._impl.callproc(procname, parameters) - except BaseException: - self._conn._waiter = None - raise - else: - await self._conn._poll(waiter, timeout) - - def begin(self): - return _TransactionBeginContextManager(self._transaction.begin()) - - def begin_nested(self): - if not self._transaction.is_begin: - return _TransactionBeginContextManager( - self._transaction.begin()) - else: - return self._transaction.point() - - def mogrify(self, operation, parameters=None): - """Return a query string after arguments binding. - - The string returned is exactly the one that would be sent to - the database running the .execute() method or similar. - - """ - ret = self._impl.mogrify(operation, parameters) - assert not self._conn._isexecuting(), ("Don't support server side " - "mogrify") - return ret - - async def setinputsizes(self, sizes): - """This method is exposed in compliance with the DBAPI. - - It currently does nothing but it is safe to call it. - - """ - self._impl.setinputsizes(sizes) - - async def fetchone(self): - """Fetch the next row of a query result set. - - Returns a single tuple, or None when no more data is - available. - - """ - ret = self._impl.fetchone() - assert not self._conn._isexecuting(), ("Don't support server side " - "cursors yet") - return ret - - async def fetchmany(self, size=None): - """Fetch the next set of rows of a query result. - - Returns a list of tuples. An empty list is returned when no - more rows are available. - - The number of rows to fetch per call is specified by the - parameter. If it is not given, the cursor's .arraysize - determines the number of rows to be fetched. The method should - try to fetch as many rows as indicated by the size - parameter. If this is not possible due to the specified number - of rows not being available, fewer rows may be returned. - - """ - if size is None: - size = self._impl.arraysize - ret = self._impl.fetchmany(size) - assert not self._conn._isexecuting(), ("Don't support server side " - "cursors yet") - return ret - - async def fetchall(self): - """Fetch all (remaining) rows of a query result. - - Returns them as a list of tuples. An empty list is returned - if there is no more record to fetch. - - """ - ret = self._impl.fetchall() - assert not self._conn._isexecuting(), ("Don't support server side " - "cursors yet") - return ret - - async def scroll(self, value, mode="relative"): - """Scroll to a new position according to mode. - - If mode is relative (default), value is taken as offset - to the current position in the result set, if set to - absolute, value states an absolute target position. - - """ - ret = self._impl.scroll(value, mode) - assert not self._conn._isexecuting(), ("Don't support server side " - "cursors yet") - return ret - - @property - def arraysize(self): - """How many rows will be returned by fetchmany() call. - - This read/write attribute specifies the number of rows to - fetch at a time with fetchmany(). It defaults to - 1 meaning to fetch a single row at a time. - - """ - return self._impl.arraysize - - @arraysize.setter - def arraysize(self, val): - """How many rows will be returned by fetchmany() call. - - This read/write attribute specifies the number of rows to - fetch at a time with fetchmany(). It defaults to - 1 meaning to fetch a single row at a time. - - """ - self._impl.arraysize = val - - @property - def itersize(self): - # Not supported - return self._impl.itersize - - @itersize.setter - def itersize(self, val): - # Not supported - self._impl.itersize = val - - @property - def rowcount(self): - """Returns the number of rows that has been produced of affected. - - This read-only attribute specifies the number of rows that the - last :meth:`execute` produced (for Data Query Language - statements like SELECT) or affected (for Data Manipulation - Language statements like UPDATE or INSERT). - - The attribute is -1 in case no .execute() has been performed - on the cursor or the row count of the last operation if it - can't be determined by the interface. - - """ - return self._impl.rowcount - - @property - def rownumber(self): - """Row index. - - This read-only attribute provides the current 0-based index of the - cursor in the result set or ``None`` if the index cannot be - determined.""" - - return self._impl.rownumber - - @property - def lastrowid(self): - """OID of the last inserted row. - - This read-only attribute provides the OID of the last row - inserted by the cursor. If the table wasn't created with OID - support or the last operation is not a single record insert, - the attribute is set to None. - - """ - return self._impl.lastrowid - - @property - def query(self): - """The last executed query string. - - Read-only attribute containing the body of the last query sent - to the backend (including bound arguments) as bytes - string. None if no query has been executed yet. - - """ - return self._impl.query - - @property - def statusmessage(self): - """the message returned by the last command.""" - - return self._impl.statusmessage - - # async def cast(self, old, s): - # ... - - @property - def tzinfo_factory(self): - """The time zone factory used to handle data types such as - `TIMESTAMP WITH TIME ZONE`. - """ - return self._impl.tzinfo_factory - - @tzinfo_factory.setter - def tzinfo_factory(self, val): - """The time zone factory used to handle data types such as - `TIMESTAMP WITH TIME ZONE`. - """ - self._impl.tzinfo_factory = val - - async def nextset(self): - # Not supported - self._impl.nextset() # raises psycopg2.NotSupportedError - - async def setoutputsize(self, size, column=None): - # Does nothing - self._impl.setoutputsize(size, column) - - async def copy_from(self, file, table, sep='\t', null='\\N', size=8192, - columns=None): - raise psycopg2.ProgrammingError( - "copy_from cannot be used in asynchronous mode") - - async def copy_to(self, file, table, sep='\t', null='\\N', columns=None): - raise psycopg2.ProgrammingError( - "copy_to cannot be used in asynchronous mode") - - async def copy_expert(self, sql, file, size=8192): - raise psycopg2.ProgrammingError( - "copy_expert cannot be used in asynchronous mode") - - @property - def timeout(self): - """Return default timeout for cursor operations.""" - return self._timeout - - def __aiter__(self): - return self - - async def __anext__(self): - ret = await self.fetchone() - if ret is not None: - return ret - else: - raise StopAsyncIteration - - async def __aenter__(self): - return self - - async def __aexit__(self, exc_type, exc_val, exc_tb): - self.close() - return - - def __repr__(self): - msg = ( - '<' - '{module_name}::{class_name} ' - 'name={name}, ' - 'closed={closed}' - '>' - ) - return msg.format( - module_name=type(self).__module__, - class_name=type(self).__name__, - name=self.name, - closed=self.closed - ) diff --git a/aiopg/pool.py b/aiopg/pool.py index 6ec45bc..e1878af 100644 --- a/aiopg/pool.py +++ b/aiopg/pool.py @@ -1,49 +1,191 @@ import asyncio import collections import warnings +from types import TracebackType +from typing import ( + Any, + Awaitable, + Callable, + Deque, + Generator, + Optional, + Set, + Type, +) import async_timeout -from psycopg2.extensions import TRANSACTION_STATUS_IDLE - -from .connection import TIMEOUT, connect -from .utils import ( - _PoolAcquireContextManager, - _PoolConnectionContextManager, - _PoolContextManager, - _PoolCursorContextManager, - ensure_future, - get_running_loop, -) - - -def create_pool(dsn=None, *, minsize=1, maxsize=10, - timeout=TIMEOUT, pool_recycle=-1, - enable_json=True, enable_hstore=True, enable_uuid=True, - echo=False, on_connect=None, - **kwargs): +import psycopg2.extensions + +from .connection import TIMEOUT, Connection, Cursor, connect +from .utils import _ContextManager, create_completed_future, get_running_loop + + +def create_pool( + dsn: Optional[str] = None, + *, + minsize: int = 1, + maxsize: int = 10, + timeout: float = TIMEOUT, + pool_recycle: float = -1.0, + enable_json: bool = True, + enable_hstore: bool = True, + enable_uuid: bool = True, + echo: bool = False, + on_connect: Optional[Callable[[Connection], Awaitable[None]]] = None, + **kwargs: Any, +) -> _ContextManager["Pool"]: coro = Pool.from_pool_fill( - dsn, minsize, maxsize, timeout, - enable_json=enable_json, enable_hstore=enable_hstore, - enable_uuid=enable_uuid, echo=echo, on_connect=on_connect, - pool_recycle=pool_recycle, **kwargs + dsn, + minsize, + maxsize, + timeout, + enable_json=enable_json, + enable_hstore=enable_hstore, + enable_uuid=enable_uuid, + echo=echo, + on_connect=on_connect, + pool_recycle=pool_recycle, + **kwargs, ) - - return _PoolContextManager(coro) - - -class Pool(asyncio.AbstractServer): + return _ContextManager[Pool](coro, _destroy_pool) + + +async def _destroy_pool(pool: "Pool") -> None: + pool.close() + await pool.wait_closed() + + +class _PoolConnectionContextManager: + """Context manager. + + This enables the following idiom for acquiring and releasing a + connection around a block: + + async with pool as conn: + cur = await conn.cursor() + + while failing loudly when accidentally using: + + with pool: + + """ + + __slots__ = ("_pool", "_conn") + + def __init__(self, pool: "Pool", conn: Connection): + self._pool: Optional[Pool] = pool + self._conn: Optional[Connection] = conn + + def __enter__(self) -> Connection: + assert self._conn + return self._conn + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + tb: Optional[TracebackType], + ) -> None: + if self._pool is None or self._conn is None: + return + try: + self._pool.release(self._conn) + finally: + self._pool = None + self._conn = None + + async def __aenter__(self) -> Connection: + assert self._conn + return self._conn + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + tb: Optional[TracebackType], + ) -> None: + if self._pool is None or self._conn is None: + return + try: + await self._pool.release(self._conn) + finally: + self._pool = None + self._conn = None + + +class _PoolCursorContextManager: + """Context manager. + + This enables the following idiom for acquiring and releasing a + cursor around a block: + + async with pool.cursor() as cur: + await cur.execute("SELECT 1") + + while failing loudly when accidentally using: + + with pool: + + """ + + __slots__ = ("_pool", "_conn", "_cursor") + + def __init__(self, pool: "Pool", conn: Connection, cursor: Cursor): + self._pool = pool + self._conn = conn + self._cursor = cursor + + def __enter__(self) -> Cursor: + return self._cursor + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + tb: Optional[TracebackType], + ) -> None: + try: + self._cursor.close() + except psycopg2.ProgrammingError: + # seen instances where the cursor fails to close: + # https://github.com/aio-libs/aiopg/issues/364 + # We close it here so we don't return a bad connection to the pool + self._conn.close() + raise + finally: + try: + self._pool.release(self._conn) + finally: + self._pool = None # type: ignore + self._conn = None # type: ignore + self._cursor = None # type: ignore + + +class Pool: """Connection pool""" - def __init__(self, dsn, minsize, maxsize, timeout, *, - enable_json, enable_hstore, enable_uuid, echo, - on_connect, pool_recycle, **kwargs): + def __init__( + self, + dsn: str, + minsize: int, + maxsize: int, + timeout: float, + *, + enable_json: bool, + enable_hstore: bool, + enable_uuid: bool, + echo: bool, + on_connect: Optional[Callable[[Connection], Awaitable[None]]], + pool_recycle: float, + **kwargs: Any, + ): if minsize < 0: raise ValueError("minsize should be zero or greater") if maxsize < minsize and maxsize != 0: raise ValueError("maxsize should be not less than minsize") self._dsn = dsn self._minsize = minsize - self._loop = get_running_loop(kwargs.pop('loop', None) is not None) + self._loop = get_running_loop() self._timeout = timeout self._recycle = pool_recycle self._enable_json = enable_json @@ -53,38 +195,40 @@ self._on_connect = on_connect self._conn_kwargs = kwargs self._acquiring = 0 - self._free = collections.deque(maxlen=maxsize or None) + self._free: Deque[Connection] = collections.deque( + maxlen=maxsize or None + ) self._cond = asyncio.Condition() - self._used = set() - self._terminated = set() + self._used: Set[Connection] = set() + self._terminated: Set[Connection] = set() self._closing = False self._closed = False @property - def echo(self): + def echo(self) -> bool: return self._echo @property - def minsize(self): + def minsize(self) -> int: return self._minsize @property - def maxsize(self): + def maxsize(self) -> Optional[int]: return self._free.maxlen @property - def size(self): + def size(self) -> int: return self.freesize + len(self._used) + self._acquiring @property - def freesize(self): + def freesize(self) -> int: return len(self._free) @property - def timeout(self): + def timeout(self) -> float: return self._timeout - async def clear(self): + async def clear(self) -> None: """Close all free connections in pool.""" async with self._cond: while self._free: @@ -93,10 +237,10 @@ self._cond.notify() @property - def closed(self): + def closed(self) -> bool: return self._closed - def close(self): + def close(self) -> None: """Close pool. Mark all pool connections to be closed on getting back to pool. @@ -106,7 +250,7 @@ return self._closing = True - def terminate(self): + def terminate(self) -> None: """Terminate pool. Close pool with instantly closing all acquired connections also. @@ -120,18 +264,19 @@ self._used.clear() - async def wait_closed(self): + async def wait_closed(self) -> None: """Wait for closing all pool's connections.""" if self._closed: return if not self._closing: - raise RuntimeError(".wait_closed() should be called " - "after .close()") + raise RuntimeError( + ".wait_closed() should be called " "after .close()" + ) while self._free: conn = self._free.popleft() - conn.close() + await conn.close() async with self._cond: while self.size > self.freesize: @@ -139,13 +284,13 @@ self._closed = True - def acquire(self): + def acquire(self) -> _ContextManager[Connection]: """Acquire free connection from the pool.""" coro = self._acquire() - return _PoolAcquireContextManager(coro, self) + return _ContextManager[Connection](coro, self.release) @classmethod - async def from_pool_fill(cls, *args, **kwargs): + async def from_pool_fill(cls, *args: Any, **kwargs: Any) -> "Pool": """constructor for filling the free pool with connections, the number is controlled by the minsize parameter """ @@ -156,7 +301,7 @@ return self - async def _acquire(self): + async def _acquire(self) -> Connection: if self._closing: raise RuntimeError("Cannot acquire connection after closing pool") async with async_timeout.timeout(self._timeout), self._cond: @@ -171,7 +316,7 @@ else: await self._cond.wait() - async def _fill_free_pool(self, override_min): + async def _fill_free_pool(self, override_min: bool) -> None: # iterate over free connections and remove timeouted ones n, free = 0, len(self._free) while n < free: @@ -179,7 +324,7 @@ if conn.closed: self._free.pop() elif -1 < self._recycle < self._loop.time() - conn.last_usage: - conn.close() + await conn.close() self._free.pop() else: self._free.rotate() @@ -189,12 +334,14 @@ self._acquiring += 1 try: conn = await connect( - self._dsn, timeout=self._timeout, + self._dsn, + timeout=self._timeout, enable_json=self._enable_json, enable_hstore=self._enable_hstore, enable_uuid=self._enable_uuid, echo=self._echo, - **self._conn_kwargs) + **self._conn_kwargs, + ) if self._on_connect is not None: await self._on_connect(conn) # raise exception if pool is closing @@ -205,16 +352,18 @@ if self._free: return - if override_min and self.size < self.maxsize: + if override_min and self.size < (self.maxsize or 0): self._acquiring += 1 try: conn = await connect( - self._dsn, timeout=self._timeout, + self._dsn, + timeout=self._timeout, enable_json=self._enable_json, enable_hstore=self._enable_hstore, enable_uuid=self._enable_uuid, echo=self._echo, - **self._conn_kwargs) + **self._conn_kwargs, + ) if self._on_connect is not None: await self._on_connect(conn) # raise exception if pool is closing @@ -223,48 +372,56 @@ finally: self._acquiring -= 1 - async def _wakeup(self): + async def _wakeup(self) -> None: async with self._cond: self._cond.notify() - def release(self, conn): - """Release free connection back to the connection pool. - """ - fut = self._loop.create_future() - fut.set_result(None) + def release(self, conn: Connection) -> "asyncio.Future[None]": + """Release free connection back to the connection pool.""" + future = create_completed_future(self._loop) if conn in self._terminated: assert conn.closed, conn self._terminated.remove(conn) - return fut + return future assert conn in self._used, (conn, self._used) self._used.remove(conn) - if not conn.closed: - tran_status = conn._conn.get_transaction_status() - if tran_status != TRANSACTION_STATUS_IDLE: - warnings.warn( - ("Invalid transaction status on " - "released connection: {}").format(tran_status), - ResourceWarning - ) - conn.close() - return fut - if self._closing: - conn.close() - else: - conn.free_cursor() - self._free.append(conn) - fut = ensure_future(self._wakeup(), loop=self._loop) - return fut - - async def cursor(self, name=None, cursor_factory=None, - scrollable=None, withhold=False, *, timeout=None): + if conn.closed: + return future + transaction_status = conn.raw.get_transaction_status() + if transaction_status != psycopg2.extensions.TRANSACTION_STATUS_IDLE: + warnings.warn( + f"Invalid transaction status on " + f"released connection: {transaction_status}", + ResourceWarning, + ) + conn.close() + return future + if self._closing: + conn.close() + else: + self._free.append(conn) + return asyncio.ensure_future(self._wakeup(), loop=self._loop) + + async def cursor( + self, + name: Optional[str] = None, + cursor_factory: Any = None, + scrollable: Optional[bool] = None, + withhold: bool = False, + *, + timeout: Optional[float] = None, + ) -> _PoolCursorContextManager: conn = await self.acquire() - cur = await conn.cursor(name=name, cursor_factory=cursor_factory, - scrollable=scrollable, withhold=withhold, - timeout=timeout) - return _PoolCursorContextManager(self, conn, cur) - - def __await__(self): + cursor = await conn.cursor( + name=name, + cursor_factory=cursor_factory, + scrollable=scrollable, + withhold=withhold, + timeout=timeout, + ) + return _PoolCursorContextManager(self, conn, cursor) + + def __await__(self) -> Generator[Any, Any, _PoolConnectionContextManager]: # This is not a coroutine. It is meant to enable the idiom: # # with (await pool) as conn: @@ -280,23 +437,34 @@ conn = yield from self._acquire().__await__() return _PoolConnectionContextManager(self, conn) - def __enter__(self): + def __enter__(self) -> "Pool": raise RuntimeError( - '"await" should be used as context manager expression') - - def __exit__(self, *args): + '"await" should be used as context manager expression' + ) + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + tb: Optional[TracebackType], + ) -> None: # This must exist because __enter__ exists, even though that # always raises; that's how the with-statement works. pass # pragma: nocover - async def __aenter__(self): + async def __aenter__(self) -> "Pool": return self - async def __aexit__(self, exc_type, exc_val, exc_tb): + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + tb: Optional[TracebackType], + ) -> None: self.close() await self.wait_closed() - def __del__(self): + def __del__(self) -> None: try: self._free except AttributeError: @@ -308,5 +476,5 @@ conn.close() left += 1 warnings.warn( - "Unclosed {} connections in {!r}".format(left, self), - ResourceWarning) + f"Unclosed {left} connections in {self!r}", ResourceWarning + ) diff --git a/aiopg/sa/__init__.py b/aiopg/sa/__init__.py index 7e4f1c9..20bb09c 100644 --- a/aiopg/sa/__init__.py +++ b/aiopg/sa/__init__.py @@ -9,10 +9,25 @@ ResourceClosedError, ) -__all__ = ('create_engine', 'SAConnection', 'Error', - 'ArgumentError', 'InvalidRequestError', 'NoSuchColumnError', - 'ResourceClosedError', 'Engine') +__all__ = ( + "create_engine", + "SAConnection", + "Error", + "ArgumentError", + "InvalidRequestError", + "NoSuchColumnError", + "ResourceClosedError", + "Engine", +) -(SAConnection, Error, ArgumentError, InvalidRequestError, - NoSuchColumnError, ResourceClosedError, create_engine, Engine) +( + SAConnection, + Error, + ArgumentError, + InvalidRequestError, + NoSuchColumnError, + ResourceClosedError, + create_engine, + Engine, +) diff --git a/aiopg/sa/connection.py b/aiopg/sa/connection.py index d79e512..8280dba 100644 --- a/aiopg/sa/connection.py +++ b/aiopg/sa/connection.py @@ -1,8 +1,12 @@ +import asyncio +import contextlib +import weakref + from sqlalchemy.sql import ClauseElement from sqlalchemy.sql.ddl import DDLElement from sqlalchemy.sql.dml import UpdateBase -from ..utils import _SAConnectionContextManager, _TransactionContextManager +from ..utils import _ContextManager, _IterableContextManager from . import exc from .result import ResultProxy from .transaction import ( @@ -13,14 +17,40 @@ ) +async def _commit_transaction_if_active(t: Transaction) -> None: + if t.is_active: + await t.commit() + + +async def _rollback_transaction(t: Transaction) -> None: + await t.rollback() + + +async def _close_result_proxy(c: "ResultProxy") -> None: + c.close() + + class SAConnection: + _QUERY_COMPILE_KWARGS = (("render_postcompile", True),) + + __slots__ = ( + "_connection", + "_transaction", + "_savepoint_seq", + "_engine", + "_dialect", + "_cursors", + "_query_compile_kwargs", + ) + def __init__(self, connection, engine): self._connection = connection self._transaction = None self._savepoint_seq = 0 self._engine = engine self._dialect = engine.dialect - self._cursor = None + self._cursors = weakref.WeakSet() + self._query_compile_kwargs = dict(self._QUERY_COMPILE_KWARGS) def execute(self, query, *multiparams, **params): """Executes a SQL query with optional parameters. @@ -60,17 +90,21 @@ """ coro = self._execute(query, *multiparams, **params) - return _SAConnectionContextManager(coro) - - async def _get_cursor(self): - if self._cursor and not self._cursor.closed: - return self._cursor - - self._cursor = await self._connection.cursor() - return self._cursor + return _IterableContextManager[ResultProxy](coro, _close_result_proxy) + + async def _open_cursor(self): + if self._connection is None: + raise exc.ResourceClosedError("This connection is closed.") + cursor = await self._connection.cursor() + self._cursors.add(cursor) + return cursor + + def _close_cursor(self, cursor): + self._cursors.remove(cursor) + cursor.close() async def _execute(self, query, *multiparams, **params): - cursor = await self._get_cursor() + cursor = await self._open_cursor() dp = _distill_params(multiparams, params) if len(dp) > 1: raise exc.ArgumentError("aiopg doesn't support executemany") @@ -82,46 +116,62 @@ if isinstance(query, str): await cursor.execute(query, dp) elif isinstance(query, ClauseElement): - compiled = query.compile(dialect=self._dialect) # parameters = compiled.params if not isinstance(query, DDLElement): + compiled = query.compile( + dialect=self._dialect, + compile_kwargs=self._query_compile_kwargs, + ) if dp and isinstance(dp, (list, tuple)): if isinstance(query, UpdateBase): - dp = {c.key: pval - for c, pval in zip(query.table.c, dp)} + dp = { + c.key: pval for c, pval in zip(query.table.c, dp) + } else: - raise exc.ArgumentError("Don't mix sqlalchemy SELECT " - "clause with positional " - "parameters") + raise exc.ArgumentError( + "Don't mix sqlalchemy SELECT " + "clause with positional " + "parameters" + ) compiled_parameters = [compiled.construct_params(dp)] processed_parameters = [] processors = compiled._bind_processors for compiled_params in compiled_parameters: - params = {key: (processors[key](compiled_params[key]) - if key in processors - else compiled_params[key]) - for key in compiled_params} + params = { + key: ( + processors[key](compiled_params[key]) + if key in processors + else compiled_params[key] + ) + for key in compiled_params + } processed_parameters.append(params) post_processed_params = self._dialect.execute_sequence_format( - processed_parameters) + processed_parameters + ) # _result_columns is a private API of Compiled, # but I couldn't find any public API exposing this data. result_map = compiled._result_columns else: + compiled = query.compile(dialect=self._dialect) if dp: - raise exc.ArgumentError("Don't mix sqlalchemy DDL clause " - "and execution with parameters") + raise exc.ArgumentError( + "Don't mix sqlalchemy DDL clause " + "and execution with parameters" + ) post_processed_params = [compiled.construct_params()] result_map = None await cursor.execute(str(compiled), post_processed_params[0]) else: - raise exc.ArgumentError("sql statement should be str or " - "SQLAlchemy data " - "selection/modification clause") + raise exc.ArgumentError( + "sql statement should be str or " + "SQLAlchemy data " + "selection/modification clause" + ) return ResultProxy(self, cursor, self._dialect, result_map) @@ -177,45 +227,50 @@ """ coro = self._begin(isolation_level, readonly, deferrable) - return _TransactionContextManager(coro) + return _ContextManager[Transaction]( + coro, _commit_transaction_if_active, _rollback_transaction + ) async def _begin(self, isolation_level, readonly, deferrable): if self._transaction is None: self._transaction = RootTransaction(self) await self._begin_impl(isolation_level, readonly, deferrable) return self._transaction - else: - return Transaction(self, self._transaction) + return Transaction(self, self._transaction) async def _begin_impl(self, isolation_level, readonly, deferrable): - stmt = 'BEGIN' + stmt = "BEGIN" if isolation_level is not None: - stmt += ' ISOLATION LEVEL ' + isolation_level + stmt += f" ISOLATION LEVEL {isolation_level}" if readonly: - stmt += ' READ ONLY' + stmt += " READ ONLY" if deferrable: - stmt += ' DEFERRABLE' - - cur = await self._get_cursor() - try: - await cur.execute(stmt) - finally: - cur.close() + stmt += " DEFERRABLE" + + cursor = await self._open_cursor() + try: + await cursor.execute(stmt) + finally: + self._close_cursor(cursor) async def _commit_impl(self): - cur = await self._get_cursor() - try: - await cur.execute('COMMIT') - finally: - cur.close() + cursor = await self._open_cursor() + try: + await cursor.execute("COMMIT") + finally: + self._close_cursor(cursor) self._transaction = None async def _rollback_impl(self): - cur = await self._get_cursor() - try: - await cur.execute('ROLLBACK') - finally: - cur.close() + try: + if self._connection.closed: + return + cursor = await self._open_cursor() + try: + await cursor.execute("ROLLBACK") + finally: + self._close_cursor(cursor) + finally: self._transaction = None def begin_nested(self): @@ -230,7 +285,9 @@ transaction of a whole. """ coro = self._begin_nested() - return _TransactionContextManager(coro) + return _ContextManager( + coro, _commit_transaction_if_active, _rollback_transaction + ) async def _begin_nested(self): if self._transaction is None: @@ -241,31 +298,36 @@ self._transaction._savepoint = await self._savepoint_impl() return self._transaction - async def _savepoint_impl(self, name=None): + async def _savepoint_impl(self): self._savepoint_seq += 1 - name = 'aiopg_sa_savepoint_%s' % self._savepoint_seq - - cur = await self._get_cursor() - try: - await cur.execute('SAVEPOINT ' + name) + name = f"aiopg_sa_savepoint_{self._savepoint_seq}" + + cursor = await self._open_cursor() + try: + await cursor.execute(f"SAVEPOINT {name}") return name finally: - cur.close() + self._close_cursor(cursor) async def _rollback_to_savepoint_impl(self, name, parent): - cur = await self._get_cursor() - try: - await cur.execute('ROLLBACK TO SAVEPOINT ' + name) - finally: - cur.close() - self._transaction = parent + try: + if self._connection.closed: + return + cursor = await self._open_cursor() + try: + await cursor.execute(f"ROLLBACK TO SAVEPOINT {name}") + finally: + self._close_cursor(cursor) + finally: + self._transaction = parent async def _release_savepoint_impl(self, name, parent): - cur = await self._get_cursor() - try: - await cur.execute('RELEASE SAVEPOINT ' + name) - finally: - cur.close() + cursor = await self._open_cursor() + try: + await cursor.execute(f"RELEASE SAVEPOINT {name}") + finally: + self._close_cursor(cursor) + self._transaction = parent async def begin_twophase(self, xid=None): @@ -284,15 +346,16 @@ if self._transaction is not None: raise exc.InvalidRequestError( "Cannot start a two phase transaction when a transaction " - "is already in progress.") + "is already in progress." + ) if xid is None: xid = self._dialect.create_xid() self._transaction = TwoPhaseTransaction(self, xid) - await self._begin_impl() + await self._begin_impl(None, False, False) return self._transaction async def _prepare_twophase_impl(self, xid): - await self.execute("PREPARE TRANSACTION '%s'" % xid) + await self.execute(f"PREPARE TRANSACTION {xid!r}") async def recover_twophase(self): """Return a list of prepared twophase transaction ids.""" @@ -302,14 +365,14 @@ async def rollback_prepared(self, xid, *, is_prepared=True): """Rollback prepared twophase transaction.""" if is_prepared: - await self.execute("ROLLBACK PREPARED '%s'" % xid) + await self.execute(f"ROLLBACK PREPARED {xid:!r}") else: await self._rollback_impl() async def commit_prepared(self, xid, *, is_prepared=True): """Commit prepared twophase transaction.""" if is_prepared: - await self.execute("COMMIT PREPARED '%s'" % xid) + await self.execute(f"COMMIT PREPARED {xid!r}") else: await self._commit_impl() @@ -332,18 +395,27 @@ After .close() is called, the SAConnection is permanently in a closed state, and will allow no further operations. """ + if self.connection is None: return + await asyncio.shield(self._close()) + + async def _close(self): if self._transaction is not None: - await self._transaction.rollback() + with contextlib.suppress(Exception): + await self._transaction.rollback() self._transaction = None - # don't close underlying connection, it can be reused by pool - # conn.close() - - self._engine.release(self) - self._connection = None - self._engine = None + + for cursor in self._cursors: + cursor.close() + self._cursors.clear() + + if self._engine is not None: + with contextlib.suppress(Exception): + await self._engine.release(self) + self._connection = None + self._engine = None def _distill_params(multiparams, params): @@ -364,23 +436,27 @@ elif len(multiparams) == 1: zero = multiparams[0] if isinstance(zero, (list, tuple)): - if not zero or hasattr(zero[0], '__iter__') and \ - not hasattr(zero[0], 'strip'): + if ( + not zero + or hasattr(zero[0], "__iter__") + and not hasattr(zero[0], "strip") + ): # execute(stmt, [{}, {}, {}, ...]) # execute(stmt, [(), (), (), ...]) return zero else: # execute(stmt, ("value", "value")) return [zero] - elif hasattr(zero, 'keys'): + elif hasattr(zero, "keys"): # execute(stmt, {"key":"value"}) return [zero] else: # execute(stmt, "value") return [[zero]] else: - if (hasattr(multiparams[0], '__iter__') and - not hasattr(multiparams[0], 'strip')): + if hasattr(multiparams[0], "__iter__") and not hasattr( + multiparams[0], "strip" + ): return multiparams else: return [multiparams] diff --git a/aiopg/sa/engine.py b/aiopg/sa/engine.py index 1e77268..dec29f4 100644 --- a/aiopg/sa/engine.py +++ b/aiopg/sa/engine.py @@ -1,9 +1,10 @@ +import asyncio import json import aiopg from ..connection import TIMEOUT -from ..utils import _PoolAcquireContextManager, _PoolContextManager +from ..utils import _ContextManager, get_running_loop from .connection import SAConnection try: @@ -12,7 +13,7 @@ PGDialect_psycopg2, ) except ImportError: # pragma: no cover - raise ImportError('aiopg.sa requires sqlalchemy') + raise ImportError("aiopg.sa requires sqlalchemy") class APGCompiler_psycopg2(PGCompiler_psycopg2): @@ -32,8 +33,9 @@ def get_dialect(json_serializer=json.dumps, json_deserializer=lambda x: x): - dialect = PGDialect_psycopg2(json_serializer=json_serializer, - json_deserializer=json_deserializer) + dialect = PGDialect_psycopg2( + json_serializer=json_serializer, json_deserializer=json_deserializer + ) dialect.statement_compiler = APGCompiler_psycopg2 dialect.implicit_returning = True @@ -49,8 +51,16 @@ _dialect = get_dialect() -def create_engine(dsn=None, *, minsize=1, maxsize=10, dialect=_dialect, - timeout=TIMEOUT, pool_recycle=-1, **kwargs): +def create_engine( + dsn=None, + *, + minsize=1, + maxsize=10, + dialect=_dialect, + timeout=TIMEOUT, + pool_recycle=-1, + **kwargs +): """A coroutine for Engine creation. Returns Engine instance with embedded connection pool. @@ -58,18 +68,36 @@ The pool has *minsize* opened connections to PostgreSQL server. """ - coro = _create_engine(dsn=dsn, minsize=minsize, maxsize=maxsize, - dialect=dialect, timeout=timeout, - pool_recycle=pool_recycle, **kwargs) - return _EngineContextManager(coro) - - -async def _create_engine(dsn=None, *, minsize=1, maxsize=10, dialect=_dialect, - timeout=TIMEOUT, pool_recycle=-1, **kwargs): + coro = _create_engine( + dsn=dsn, + minsize=minsize, + maxsize=maxsize, + dialect=dialect, + timeout=timeout, + pool_recycle=pool_recycle, + **kwargs + ) + return _ContextManager(coro, _close_engine) + + +async def _create_engine( + dsn=None, + *, + minsize=1, + maxsize=10, + dialect=_dialect, + timeout=TIMEOUT, + pool_recycle=-1, + **kwargs +): pool = await aiopg.create_pool( - dsn, minsize=minsize, maxsize=maxsize, - timeout=timeout, pool_recycle=pool_recycle, **kwargs + dsn, + minsize=minsize, + maxsize=maxsize, + timeout=timeout, + pool_recycle=pool_recycle, + **kwargs ) conn = await pool.acquire() try: @@ -79,6 +107,15 @@ await pool.release(conn) +async def _close_engine(engine: "Engine") -> None: + engine.close() + await engine.wait_closed() + + +async def _close_connection(c: SAConnection) -> None: + await c.close() + + class Engine: """Connects a aiopg.Pool and sqlalchemy.engine.interfaces.Dialect together to provide a @@ -88,10 +125,13 @@ create_engine coroutine. """ + __slots__ = ("_dialect", "_pool", "_dsn", "_loop") + def __init__(self, dialect, pool, dsn): self._dialect = dialect self._pool = pool self._dsn = dsn + self._loop = get_running_loop() @property def dialect(self): @@ -160,21 +200,19 @@ def acquire(self): """Get a connection from pool.""" coro = self._acquire() - return _EngineAcquireContextManager(coro, self) + return _ContextManager[SAConnection](coro, _close_connection) async def _acquire(self): raw = await self._pool.acquire() - conn = SAConnection(raw, self) - return conn + return SAConnection(raw, self) def release(self, conn): - raw = conn.connection - fut = self._pool.release(raw) - return fut + return self._pool.release(conn.connection) def __enter__(self): raise RuntimeError( - '"await" should be used as context manager expression') + '"await" should be used as context manager expression' + ) def __exit__(self, *args): # This must exist because __enter__ exists, even though that @@ -195,7 +233,7 @@ # finally: # engine.release(conn) conn = yield from self._acquire().__await__() - return _ConnectionContextManager(self, conn) + return _ConnectionContextManager(conn, self._loop) async def __aenter__(self): return self @@ -205,10 +243,6 @@ await self.wait_closed() -_EngineContextManager = _PoolContextManager -_EngineAcquireContextManager = _PoolAcquireContextManager - - class _ConnectionContextManager: """Context manager. @@ -224,18 +258,15 @@ """ - __slots__ = ('_engine', '_conn') - - def __init__(self, engine, conn): - self._engine = engine + __slots__ = ("_conn", "_loop") + + def __init__(self, conn: SAConnection, loop: asyncio.AbstractEventLoop): self._conn = conn + self._loop = loop def __enter__(self): return self._conn def __exit__(self, *args): - try: - self._engine.release(self._conn) - finally: - self._engine = None - self._conn = None + asyncio.ensure_future(self._conn.close(), loop=self._loop) + self._conn = None diff --git a/aiopg/sa/result.py b/aiopg/sa/result.py index d58bf7b..b3966e9 100644 --- a/aiopg/sa/result.py +++ b/aiopg/sa/result.py @@ -4,10 +4,18 @@ from sqlalchemy.sql import expression, sqltypes from . import exc +from .utils import SQLALCHEMY_VERSION + +if SQLALCHEMY_VERSION >= ["1", "4"]: + from sqlalchemy.util import string_or_unprintable +else: + from sqlalchemy.sql.expression import ( + _string_or_unprintable as string_or_unprintable, + ) class RowProxy(Mapping): - __slots__ = ('_result_proxy', '_row', '_processors', '_keymap') + __slots__ = ("_result_proxy", "_row", "_processors", "_keymap") def __init__(self, result_proxy, row, processors, keymap): """RowProxy objects are constructed by ResultProxy objects.""" @@ -42,8 +50,9 @@ # raise if index is None: raise exc.InvalidRequestError( - "Ambiguous column name '%s' in result set! " - "try 'use_labels' option on select statement." % key) + f"Ambiguous column name {key!r} in result set! " + f"try 'use_labels' option on select statement." + ) if processor is not None: return processor(self._row[index]) else: @@ -78,7 +87,7 @@ return repr(self.as_tuple()) -class ResultMetaData(object): +class ResultMetaData: """Handle cursor.description, applying additional info from an execution context.""" @@ -97,16 +106,18 @@ # `dbapi_type_map` property removed in SQLAlchemy 1.2+. # Usage of `getattr` only needed for backward compatibility with # older versions of SQLAlchemy. - typemap = getattr(dialect, 'dbapi_type_map', {}) - - assert dialect.case_sensitive, \ - "Doesn't support case insensitive database connection" + typemap = getattr(dialect, "dbapi_type_map", {}) + + assert ( + dialect.case_sensitive + ), "Doesn't support case insensitive database connection" # high precedence key values. primary_keymap = {} - assert not dialect.description_encoding, \ - "psycopg in py3k should not use this" + assert ( + not dialect.description_encoding + ), "psycopg in py3k should not use this" for i, rec in enumerate(cursor_description): colname = rec[0] @@ -119,7 +130,7 @@ name, obj, type_ = ( map_column_name.get(colname, colname), None, - map_type.get(colname, typemap.get(coltype, sqltypes.NULLTYPE)) + map_type.get(colname, typemap.get(coltype, sqltypes.NULLTYPE)), ) processor = type_._cached_result_processor(dialect, coltype) @@ -132,7 +143,7 @@ primary_keymap[i] = rec # populate primary keymap, looking for conflicts. - if primary_keymap.setdefault(name, rec) is not rec: + if primary_keymap.setdefault(name, rec) != rec: # place a record that doesn't have the "index" - this # is interpreted later as an AmbiguousColumnError, # but only when actually accessed. Columns @@ -160,7 +171,7 @@ map_column_name = {} for elem in data_map: name = elem[0] - priority_name = getattr(elem[2][0], 'key', name) + priority_name = getattr(elem[2][0], "key", None) or name map_type[name] = elem[3] # type column map_column_name[name] = priority_name @@ -176,9 +187,9 @@ # or colummn('name') constructs to ColumnElements, or after a # pickle/unpickle roundtrip elif isinstance(key, expression.ColumnElement): - if (key._label and key._label in map): + if key._label and key._label in map: result = map[key._label] - elif (hasattr(key, 'key') and key.key in map): + elif hasattr(key, "key") and key.key in map: # match is only on name. result = map[key.key] # search extra hard to make sure this @@ -194,8 +205,9 @@ if result is None: if raiseerr: raise exc.NoSuchColumnError( - "Could not locate column in row for column '%s'" % - expression._string_or_unprintable(key)) + f"Could not locate column in row for column " + f"{string_or_unprintable(key)!r}" + ) else: return None else: @@ -290,10 +302,9 @@ cursor_description = self.cursor.description if cursor_description is not None: self._metadata = ResultMetaData(self, cursor_description) - self._weak = weakref.ref(self, lambda wr: self.cursor.close()) + self._weak = weakref.ref(self, lambda _: self.close()) else: self.close() - self._weak = None @property def returns_rows(self): @@ -329,11 +340,14 @@ * cursor.description is None. """ - if not self.closed: - self.cursor.close() - # allow consistent errors - self._cursor = None - self._weak = None + if self._cursor is None: + return + + if not self._cursor.closed: + self._cursor.close() + + self._cursor = None + self._weak = None def __aiter__(self): return self @@ -342,14 +356,14 @@ ret = await self.fetchone() if ret is not None: return ret - else: - raise StopAsyncIteration + raise StopAsyncIteration def _non_result(self): if self._metadata is None: raise exc.ResourceClosedError( "This result object does not return rows. " - "It has been closed automatically.") + "It has been closed automatically." + ) else: raise exc.ResourceClosedError("This result object is closed.") @@ -358,8 +372,7 @@ metadata = self._metadata keymap = metadata._keymap processors = metadata._processors - return [process_row(metadata, row, processors, keymap) - for row in rows] + return [process_row(metadata, row, processors, keymap) for row in rows] async def fetchall(self): """Fetch all rows, just like DB-API cursor.fetchall().""" diff --git a/aiopg/sa/transaction.py b/aiopg/sa/transaction.py index 8158473..099cd30 100644 --- a/aiopg/sa/transaction.py +++ b/aiopg/sa/transaction.py @@ -1,7 +1,7 @@ from . import exc -class Transaction(object): +class Transaction: """Represent a database transaction in progress. The Transaction object is procured by @@ -23,6 +23,8 @@ See also: SAConnection.begin(), SAConnection.begin_twophase(), SAConnection.begin_nested(). """ + + __slots__ = ("_connection", "_parent", "_is_active") def __init__(self, connection, parent): self._connection = connection @@ -83,12 +85,12 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): if exc_type: await self.rollback() - else: - if self._is_active: - await self.commit() + elif self._is_active: + await self.commit() class RootTransaction(Transaction): + __slots__ = () def __init__(self, connection): super().__init__(connection, None) @@ -109,22 +111,25 @@ The interface is the same as that of Transaction class. """ - _savepoint = None + __slots__ = ("_savepoint",) def __init__(self, connection, parent): - super(NestedTransaction, self).__init__(connection, parent) + super().__init__(connection, parent) + self._savepoint = None async def _do_rollback(self): assert self._savepoint is not None, "Broken transaction logic" if self._is_active: await self._connection._rollback_to_savepoint_impl( - self._savepoint, self._parent) + self._savepoint, self._parent + ) async def _do_commit(self): assert self._savepoint is not None, "Broken transaction logic" if self._is_active: await self._connection._release_savepoint_impl( - self._savepoint, self._parent) + self._savepoint, self._parent + ) class TwoPhaseTransaction(Transaction): @@ -136,6 +141,8 @@ The interface is the same as that of Transaction class with the addition of the .prepare() method. """ + + __slots__ = ("_is_prepared", "_xid") def __init__(self, connection, xid): super().__init__(connection, None) @@ -160,8 +167,10 @@ async def _do_rollback(self): await self._connection._rollback_twophase_impl( - self._xid, is_prepared=self._is_prepared) + self._xid, is_prepared=self._is_prepared + ) async def _do_commit(self): await self._connection._commit_twophase_impl( - self._xid, is_prepared=self._is_prepared) + self._xid, is_prepared=self._is_prepared + ) diff --git a/aiopg/sa/utils.py b/aiopg/sa/utils.py new file mode 100644 index 0000000..6e5ff50 --- /dev/null +++ b/aiopg/sa/utils.py @@ -0,0 +1,3 @@ +import sqlalchemy + +SQLALCHEMY_VERSION = sqlalchemy.__version__.split(".") diff --git a/aiopg/transaction.py b/aiopg/transaction.py deleted file mode 100644 index 16bea52..0000000 --- a/aiopg/transaction.py +++ /dev/null @@ -1,194 +0,0 @@ -import enum -import uuid -import warnings -from abc import ABC - -import psycopg2 - -from aiopg.utils import _TransactionPointContextManager - -__all__ = ('IsolationLevel', 'Transaction') - - -class IsolationCompiler(ABC): - __slots__ = ('_isolation_level', '_readonly', '_deferrable') - - def __init__(self, isolation_level, readonly, deferrable): - self._isolation_level = isolation_level - self._readonly = readonly - self._deferrable = deferrable - - @property - def name(self): - return self._isolation_level - - def savepoint(self, unique_id): - return 'SAVEPOINT {}'.format(unique_id) - - def release_savepoint(self, unique_id): - return 'RELEASE SAVEPOINT {}'.format(unique_id) - - def rollback_savepoint(self, unique_id): - return 'ROLLBACK TO SAVEPOINT {}'.format(unique_id) - - def commit(self): - return 'COMMIT' - - def rollback(self): - return 'ROLLBACK' - - def begin(self): - query = 'BEGIN' - if self._isolation_level is not None: - query += ( - ' ISOLATION LEVEL {}'.format(self._isolation_level.upper()) - ) - - if self._readonly: - query += ' READ ONLY' - - if self._deferrable: - query += ' DEFERRABLE' - - return query - - def __repr__(self): - return self.name - - -class ReadCommittedCompiler(IsolationCompiler): - __slots__ = () - - def __init__(self, readonly, deferrable): - super().__init__('Read committed', readonly, deferrable) - - -class RepeatableReadCompiler(IsolationCompiler): - __slots__ = () - - def __init__(self, readonly, deferrable): - super().__init__('Repeatable read', readonly, deferrable) - - -class SerializableCompiler(IsolationCompiler): - __slots__ = () - - def __init__(self, readonly, deferrable): - super().__init__('Serializable', readonly, deferrable) - - -class DefaultCompiler(IsolationCompiler): - __slots__ = () - - def __init__(self, readonly, deferrable): - super().__init__(None, readonly, deferrable) - - @property - def name(self): - return 'Default' - - -class IsolationLevel(enum.Enum): - serializable = SerializableCompiler - repeatable_read = RepeatableReadCompiler - read_committed = ReadCommittedCompiler - default = DefaultCompiler - - def __call__(self, readonly, deferrable): - return self.value(readonly, deferrable) - - -class Transaction: - __slots__ = ('_cur', '_is_begin', '_isolation', '_unique_id') - - def __init__(self, cur, isolation_level, - readonly=False, deferrable=False): - self._cur = cur - self._is_begin = False - self._unique_id = None - self._isolation = isolation_level(readonly, deferrable) - - @property - def is_begin(self): - return self._is_begin - - async def begin(self): - if self._is_begin: - raise psycopg2.ProgrammingError( - 'You are trying to open a new transaction, use the save point') - self._is_begin = True - await self._cur.execute(self._isolation.begin()) - return self - - async def commit(self): - self._check_commit_rollback() - await self._cur.execute(self._isolation.commit()) - self._is_begin = False - - async def rollback(self): - self._check_commit_rollback() - await self._cur.execute(self._isolation.rollback()) - self._is_begin = False - - async def rollback_savepoint(self): - self._check_release_rollback() - await self._cur.execute( - self._isolation.rollback_savepoint(self._unique_id)) - self._unique_id = None - - async def release_savepoint(self): - self._check_release_rollback() - await self._cur.execute( - self._isolation.release_savepoint(self._unique_id)) - self._unique_id = None - - async def savepoint(self): - self._check_commit_rollback() - if self._unique_id is not None: - raise psycopg2.ProgrammingError('You do not shut down savepoint') - - self._unique_id = 's{}'.format(uuid.uuid1().hex) - await self._cur.execute( - self._isolation.savepoint(self._unique_id)) - - return self - - def point(self): - return _TransactionPointContextManager(self.savepoint()) - - def _check_commit_rollback(self): - if not self._is_begin: - raise psycopg2.ProgrammingError('You are trying to commit ' - 'the transaction does not open') - - def _check_release_rollback(self): - self._check_commit_rollback() - if self._unique_id is None: - raise psycopg2.ProgrammingError('You do not start savepoint') - - def __repr__(self): - return "<{} transaction={} id={:#x}>".format( - self.__class__.__name__, - self._isolation, - id(self) - ) - - def __del__(self): - if self._is_begin: - warnings.warn( - "You have not closed transaction {!r}".format(self), - ResourceWarning) - - if self._unique_id is not None: - warnings.warn( - "You have not closed savepoint {!r}".format(self), - ResourceWarning) - - async def __aenter__(self): - return await self.begin() - - async def __aexit__(self, exc_type, exc, tb): - if exc_type is not None: - await self.rollback() - else: - await self.commit() diff --git a/aiopg/utils.py b/aiopg/utils.py index 67c9f0e..86d66cc 100644 --- a/aiopg/utils.py +++ b/aiopg/utils.py @@ -1,256 +1,195 @@ import asyncio import sys -import warnings -from collections.abc import Coroutine - -import psycopg2 - -from .log import logger - -try: - ensure_future = asyncio.ensure_future -except AttributeError: - ensure_future = getattr(asyncio, 'async') +from types import TracebackType +from typing import ( + Any, + Awaitable, + Callable, + Coroutine, + Generator, + Generic, + Optional, + Type, + TypeVar, + Union, +) if sys.version_info >= (3, 7, 0): __get_running_loop = asyncio.get_running_loop else: + def __get_running_loop() -> asyncio.AbstractEventLoop: loop = asyncio.get_event_loop() if not loop.is_running(): - raise RuntimeError('no running event loop') + raise RuntimeError("no running event loop") return loop -def get_running_loop(is_warn: bool = False) -> asyncio.AbstractEventLoop: - loop = __get_running_loop() +def get_running_loop() -> asyncio.AbstractEventLoop: + return __get_running_loop() - if is_warn: - warnings.warn( - 'aiopg always uses "aiopg.get_running_loop", ' - 'look the documentation.', - DeprecationWarning, - stacklevel=3 + +def create_completed_future( + loop: asyncio.AbstractEventLoop, +) -> "asyncio.Future[Any]": + future = loop.create_future() + future.set_result(None) + return future + + +_TObj = TypeVar("_TObj") +_Release = Callable[[_TObj], Awaitable[None]] + + +class _ContextManager(Coroutine[Any, None, _TObj], Generic[_TObj]): + __slots__ = ("_coro", "_obj", "_release", "_release_on_exception") + + def __init__( + self, + coro: Coroutine[Any, None, _TObj], + release: _Release[_TObj], + release_on_exception: Optional[_Release[_TObj]] = None, + ): + self._coro = coro + self._obj: Optional[_TObj] = None + self._release = release + self._release_on_exception = ( + release if release_on_exception is None else release_on_exception ) - if loop.get_debug(): - logger.warning( - 'aiopg always uses "aiopg.get_running_loop", ' - 'look the documentation.', - exc_info=True - ) + def send(self, value: Any) -> "Any": + return self._coro.send(value) - return loop + def throw( # type: ignore + self, + typ: Type[BaseException], + val: Optional[Union[BaseException, object]] = None, + tb: Optional[TracebackType] = None, + ) -> Any: + if val is None: + return self._coro.throw(typ) + if tb is None: + return self._coro.throw(typ, val) + return self._coro.throw(typ, val, tb) + + def close(self) -> None: + self._coro.close() + + def __await__(self) -> Generator[Any, None, _TObj]: + return self._coro.__await__() + + async def __aenter__(self) -> _TObj: + self._obj = await self._coro + assert self._obj + return self._obj + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + tb: Optional[TracebackType], + ) -> None: + if self._obj is None: + return + + try: + if exc_type is not None: + await self._release_on_exception(self._obj) + else: + await self._release(self._obj) + finally: + self._obj = None -class _ContextManager(Coroutine): - __slots__ = ('_coro', '_obj') - - def __init__(self, coro): - self._coro = coro - self._obj = None - - def send(self, value): - return self._coro.send(value) - - def throw(self, typ, val=None, tb=None): - if val is None: - return self._coro.throw(typ) - elif tb is None: - return self._coro.throw(typ, val) - else: - return self._coro.throw(typ, val, tb) - - def close(self): - return self._coro.close() - - @property - def gi_frame(self): - return self._coro.gi_frame - - @property - def gi_running(self): - return self._coro.gi_running - - @property - def gi_code(self): - return self._coro.gi_code - - def __next__(self): - return self.send(None) - - def __await__(self): - resp = self._coro.__await__() - return resp - - async def __aenter__(self): - self._obj = await self._coro - return self._obj - - async def __aexit__(self, exc_type, exc, tb): - self._obj.close() - self._obj = None - - -class _SAConnectionContextManager(_ContextManager): +class _IterableContextManager(_ContextManager[_TObj]): __slots__ = () - def __aiter__(self): + def __init__(self, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) + + def __aiter__(self) -> "_IterableContextManager[_TObj]": return self - async def __anext__(self): + async def __anext__(self) -> _TObj: if self._obj is None: self._obj = await self._coro try: - return await self._obj.__anext__() + return await self._obj.__anext__() # type: ignore except StopAsyncIteration: - self._obj.close() - self._obj = None + try: + await self._release(self._obj) + finally: + self._obj = None raise -class _PoolContextManager(_ContextManager): - __slots__ = () +class ClosableQueue: + """ + Proxy object for an asyncio.Queue that is "closable" - async def __aexit__(self, exc_type, exc, tb): - self._obj.close() - await self._obj.wait_closed() - self._obj = None + When the ClosableQueue is closed, with an exception object as parameter, + subsequent or ongoing attempts to read from the queue will result in that + exception being result in that exception being raised. - -class _TransactionPointContextManager(_ContextManager): - __slots__ = () - - async def __aexit__(self, exc_type, exc_val, exc_tb): - if exc_type is not None: - await self._obj.rollback_savepoint() - else: - await self._obj.release_savepoint() - - self._obj = None - - -class _TransactionBeginContextManager(_ContextManager): - __slots__ = () - - async def __aexit__(self, exc_type, exc_val, exc_tb): - if exc_type is not None: - await self._obj.rollback() - else: - await self._obj.commit() - - self._obj = None - - -class _TransactionContextManager(_ContextManager): - __slots__ = () - - async def __aexit__(self, exc_type, exc, tb): - if exc_type: - await self._obj.rollback() - else: - if self._obj.is_active: - await self._obj.commit() - self._obj = None - - -class _PoolAcquireContextManager(_ContextManager): - __slots__ = ('_coro', '_obj', '_pool') - - def __init__(self, coro, pool): - super().__init__(coro) - self._pool = pool - - async def __aexit__(self, exc_type, exc, tb): - await self._pool.release(self._obj) - self._pool = None - self._obj = None - - -class _PoolConnectionContextManager: - """Context manager. - - This enables the following idiom for acquiring and releasing a - connection around a block: - - async with pool as conn: - cur = await conn.cursor() - - while failing loudly when accidentally using: - - with pool: - + Note: closing a queue with exception will still allow to read any items + pending in the queue. The close exception is raised only once all items + are consumed. """ - __slots__ = ('_pool', '_conn') + __slots__ = ("_loop", "_queue", "_close_event") - def __init__(self, pool, conn): - self._pool = pool - self._conn = conn + def __init__( + self, + queue: asyncio.Queue, # type: ignore + loop: asyncio.AbstractEventLoop, + ): + self._loop = loop + self._queue = queue + self._close_event = loop.create_future() + # suppress Future exception was never retrieved + self._close_event.add_done_callback(lambda f: f.exception()) - def __enter__(self): - assert self._conn - return self._conn + def close(self, exception: Exception) -> None: + if self._close_event.done(): + return + self._close_event.set_exception(exception) - def __exit__(self, exc_type, exc_val, exc_tb): + async def get(self) -> Any: + if self._close_event.done(): + try: + return self._queue.get_nowait() + except asyncio.QueueEmpty: + return self._close_event.result() + + get = asyncio.ensure_future(self._queue.get(), loop=self._loop) try: - self._pool.release(self._conn) + await asyncio.wait( + [get, self._close_event], return_when=asyncio.FIRST_COMPLETED + ) + except asyncio.CancelledError: + get.cancel() + raise + + if get.done(): + return get.result() + + try: + return self._close_event.result() finally: - self._pool = None - self._conn = None + get.cancel() - async def __aenter__(self): - assert not self._conn - self._conn = await self._pool.acquire() - return self._conn + def empty(self) -> bool: + return self._queue.empty() - async def __aexit__(self, exc_type, exc_val, exc_tb): - try: - await self._pool.release(self._conn) - finally: - self._pool = None - self._conn = None + def qsize(self) -> int: + return self._queue.qsize() + def get_nowait(self) -> Any: + if self._close_event.done(): + try: + return self._queue.get_nowait() + except asyncio.QueueEmpty: + return self._close_event.result() -class _PoolCursorContextManager: - """Context manager. - - This enables the following idiom for acquiring and releasing a - cursor around a block: - - async with pool.cursor() as cur: - await cur.execute("SELECT 1") - - while failing loudly when accidentally using: - - with pool: - - """ - - __slots__ = ('_pool', '_conn', '_cur') - - def __init__(self, pool, conn, cur): - self._pool = pool - self._conn = conn - self._cur = cur - - def __enter__(self): - return self._cur - - def __exit__(self, *args): - try: - self._cur.close() - except psycopg2.ProgrammingError: - # seen instances where the cursor fails to close: - # https://github.com/aio-libs/aiopg/issues/364 - # We close it here so we don't return a bad connection to the pool - self._conn.close() - raise - finally: - try: - self._pool.release(self._conn) - finally: - self._pool = None - self._conn = None - self._cur = None + return self._queue.get_nowait() diff --git a/aiopg.egg-info/PKG-INFO b/aiopg.egg-info/PKG-INFO index b4151da..108223e 100644 --- a/aiopg.egg-info/PKG-INFO +++ b/aiopg.egg-info/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: aiopg -Version: 1.2.0b2 +Version: 1.3.3 Summary: Postgres integration with asyncio. Home-page: https://aiopg.readthedocs.io Author: Andrew Svetlov @@ -15,392 +15,6 @@ Project-URL: Docs: RTD, https://aiopg.readthedocs.io Project-URL: GitHub: issues, https://github.com/aio-libs/aiopg/issues Project-URL: GitHub: repo, https://github.com/aio-libs/aiopg -Description: aiopg - ===== - .. image:: https://github.com/aio-libs/aiopg/workflows/CI/badge.svg - :target: https://github.com/aio-libs/aiopg/actions?query=workflow%3ACI - .. image:: https://codecov.io/gh/aio-libs/aiopg/branch/master/graph/badge.svg - :target: https://codecov.io/gh/aio-libs/aiopg - .. image:: https://badges.gitter.im/Join%20Chat.svg - :target: https://gitter.im/aio-libs/Lobby - :alt: Chat on Gitter - - **aiopg** is a library for accessing a PostgreSQL_ database - from the asyncio_ (PEP-3156/tulip) framework. It wraps - asynchronous features of the Psycopg database driver. - - Example - ------- - - .. code:: python - - import asyncio - import aiopg - - dsn = 'dbname=aiopg user=aiopg password=passwd host=127.0.0.1' - - async def go(): - pool = await aiopg.create_pool(dsn) - async with pool.acquire() as conn: - async with conn.cursor() as cur: - await cur.execute("SELECT 1") - ret = [] - async for row in cur: - ret.append(row) - assert ret == [(1,)] - - loop = asyncio.get_event_loop() - loop.run_until_complete(go()) - - - Example of SQLAlchemy optional integration - ------------------------------------------ - - .. code:: python - - import asyncio - from aiopg.sa import create_engine - import sqlalchemy as sa - - metadata = sa.MetaData() - - tbl = sa.Table('tbl', metadata, - sa.Column('id', sa.Integer, primary_key=True), - sa.Column('val', sa.String(255))) - - async def create_table(engine): - async with engine.acquire() as conn: - await conn.execute('DROP TABLE IF EXISTS tbl') - await conn.execute('''CREATE TABLE tbl ( - id serial PRIMARY KEY, - val varchar(255))''') - - async def go(): - async with create_engine(user='aiopg', - database='aiopg', - host='127.0.0.1', - password='passwd') as engine: - - async with engine.acquire() as conn: - await conn.execute(tbl.insert().values(val='abc')) - - async for row in conn.execute(tbl.select()): - print(row.id, row.val) - - loop = asyncio.get_event_loop() - loop.run_until_complete(go()) - - .. _PostgreSQL: http://www.postgresql.org/ - .. _asyncio: http://docs.python.org/3.4/library/asyncio.html - - Please use:: - - $ make test - - for executing the project's unittests. - See https://aiopg.readthedocs.io/en/stable/contributing.html for details - on how to set up your environment to run the tests. - - Changelog - --------- - - 1.2.0b2 (2020-12-21) - ^^^^^^^^^^^^^^^^^^^^ - - * Fix IsolationLevel.read_committed and introduce IsolationLevel.default `#770 `_ - - * Fix python 3.8 warnings in tests `#771 `_ - - - 1.2.0b1 (2020-12-16) - ^^^^^^^^^^^^^^^^^^^^ - - * Deprecate blocking connection.cancel() method `#570 `_ - - - 1.2.0b0 (2020-12-15) - ^^^^^^^^^^^^^^^^^^^^ - - * Implement timeout on acquiring connection from pool `#766 `_ - - - 1.1.0 (2020-12-10) - ^^^^^^^^^^^^^^^^^^ - - - 1.1.0b2 (2020-12-09) - ^^^^^^^^^^^^^^^^^^^^ - - * Added missing slots to context managers `#763 `_ - - - 1.1.0b1 (2020-12-07) - ^^^^^^^^^^^^^^^^^^^^ - - * Fix on_connect multiple call on acquire `#552 `_ - - * Fix python 3.8 warnings `#622 `_ - - * Bump minimum psycopg version to 2.8.4 `#754 `_ - - * Fix Engine.release method to release connection in any way `#756 `_ - - - 1.0.0 (2019-09-20) - ^^^^^^^^^^^^^^^^^^ - - * Removal of an asynchronous call in favor of issues # 550 - - * Big editing of documentation and minor bugs #534 - - - 0.16.0 (2019-01-25) - ^^^^^^^^^^^^^^^^^^^ - - * Fix select priority name `#525 `_ - - * Rename `psycopg2` to `psycopg2-binary` to fix deprecation warning `#507 `_ - - * Fix `#189 `_ hstore when using ReadDictCursor `#512 `_ - - * close cannot be used while an asynchronous query is underway `#452 `_ - - * sqlalchemy adapter trx begin allow transaction_mode `#498 `_ - - - 0.15.0 (2018-08-14) - ^^^^^^^^^^^^^^^^^^^ - - * Support Python 3.7 `#437 `_ - - - 0.14.0 (2018-05-10) - ^^^^^^^^^^^^^^^^^^^ - - * Add ``get_dialect`` func to have ability to pass ``json_serializer`` `#451 `_ - - - 0.13.2 (2018-01-03) - ^^^^^^^^^^^^^^^^^^^ - - * Fixed compatibility with SQLAlchemy 1.2.0 `#412 `_ - - * Added support for transaction isolation levels `#219 `_ - - - 0.13.1 (2017-09-10) - ^^^^^^^^^^^^^^^^^^^ - - * Added connection poll recycling logic `#373 `_ - - - 0.13.0 (2016-12-02) - ^^^^^^^^^^^^^^^^^^^ - - * Add `async with` support to `.begin_nested()` `#208 `_ - - * Fix connection.cancel() `#212 `_ `#223 `_ - - * Raise informative error on unexpected connection closing `#191 `_ - - * Added support for python types columns issues `#217 `_ - - * Added support for default values in SA table issues `#206 `_ - - - 0.12.0 (2016-10-09) - ^^^^^^^^^^^^^^^^^^^ - - * Add an on_connect callback parameter to pool `#141 `_ - - * Fixed connection to work under both windows and posix based systems `#142 `_ - - - 0.11.0 (2016-09-12) - ^^^^^^^^^^^^^^^^^^^ - - * Immediately remove callbacks from a closed file descriptor `#139 `_ - - * Drop Python 3.3 support - - - 0.10.0 (2016-07-16) - ^^^^^^^^^^^^^^^^^^^ - - * Refactor tests to use dockerized Postgres server `#107 `_ - - * Reduce default pool minsize to 1 `#106 `_ - - * Explicitly enumerate packages in setup.py `#85 `_ - - * Remove expired connections from pool on acquire `#116 `_ - - * Don't crash when Connection is GC'ed `#124 `_ - - * Use loop.create_future() if available - - - 0.9.2 (2016-01-31) - ^^^^^^^^^^^^^^^^^^ - - * Make pool.release return asyncio.Future, so we can wait on it in - `__aexit__` `#102 `_ - - * Add support for uuid type `#103 `_ - - - 0.9.1 (2016-01-17) - ^^^^^^^^^^^^^^^^^^ - - * Documentation update `#101 `_ - - - 0.9.0 (2016-01-14) - ^^^^^^^^^^^^^^^^^^ - - * Add async context managers for transactions `#91 `_ - - * Support async iterator in ResultProxy `#92 `_ - - * Add async with for engine `#90 `_ - - - 0.8.0 (2015-12-31) - ^^^^^^^^^^^^^^^^^^ - - * Add PostgreSQL notification support `#58 `_ - - * Support pools with unlimited size `#59 `_ - - * Cancel current DB operation on asyncio timeout `#66 `_ - - * Add async with support for Pool, Connection, Cursor `#88 `_ - - - 0.7.0 (2015-04-22) - ^^^^^^^^^^^^^^^^^^ - - * Get rid of resource leak on connection failure. - - * Report ResourceWarning on non-closed connections. - - * Deprecate iteration protocol support in cursor and ResultProxy. - - * Release sa connection to pool on `connection.close()`. - - - 0.6.0 (2015-02-03) - ^^^^^^^^^^^^^^^^^^ - - * Accept dict, list, tuple, named and positional parameters in - `SAConnection.execute()` - - - 0.5.2 (2014-12-08) - ^^^^^^^^^^^^^^^^^^ - - * Minor release, fixes a bug that leaves connection in broken state - after `cursor.execute()` failure. - - - 0.5.1 (2014-10-31) - ^^^^^^^^^^^^^^^^^^ - - * Fix a bug for processing transactions in line. - - - 0.5.0 (2014-10-31) - ^^^^^^^^^^^^^^^^^^ - - * Add .terminate() to Pool and Engine - - * Reimplement connection pool (now pool size cannot be greater than pool.maxsize) - - * Add .close() and .wait_closed() to Pool and Engine - - * Add minsize, maxsize, size and freesize properties to sa.Engine - - * Support *echo* parameter for logging executed SQL commands - - * Connection.close() is not a coroutine (but we keep backward compatibility). - - - 0.4.1 (2014-10-02) - ^^^^^^^^^^^^^^^^^^ - - * make cursor iterable - - * update docs - - - 0.4.0 (2014-10-02) - ^^^^^^^^^^^^^^^^^^ - - * add timeouts for database operations. - - * Autoregister psycopg2 support for json data type. - - * Support JSON in aiopg.sa - - * Support ARRAY in aiopg.sa - - * Autoregister hstore support if present in connected DB - - * Support HSTORE in aiopg.sa - - - 0.3.2 (2014-07-07) - ^^^^^^^^^^^^^^^^^^ - - * change signature to cursor.execute(operation, parameters=None) to - follow psycopg2 convention. - - - 0.3.1 (2014-07-04) - ^^^^^^^^^^^^^^^^^^ - - * Forward arguments to cursor constructor for pooled connections. - - - 0.3.0 (2014-06-22) - ^^^^^^^^^^^^^^^^^^ - - * Allow executing SQLAlchemy DDL statements. - - * Fix bug with race conditions on acquiring/releasing connections from pool. - - - 0.2.3 (2014-06-12) - ^^^^^^^^^^^^^^^^^^ - - * Fix bug in connection pool. - - - 0.2.2 (2014-06-07) - ^^^^^^^^^^^^^^^^^^ - - * Fix bug with passing parameters into SAConnection.execute when - executing raw SQL expression. - - - 0.2.1 (2014-05-08) - ^^^^^^^^^^^^^^^^^^ - - * Close connection with invalid transaction status on returning to pool. - - - 0.2.0 (2014-05-04) - ^^^^^^^^^^^^^^^^^^ - - * Implemented optional support for sqlalchemy functional sql layer. - - - 0.1.0 (2014-04-06) - ^^^^^^^^^^^^^^^^^^ - - * Implemented plain connections: connect, Connection, Cursor. - - * Implemented database pools: create_pool and Pool. Platform: macOS Platform: POSIX Platform: Windows @@ -412,6 +26,7 @@ Classifier: Programming Language :: Python :: 3.7 Classifier: Programming Language :: Python :: 3.8 Classifier: Programming Language :: Python :: 3.9 +Classifier: Programming Language :: Python :: 3.10 Classifier: Operating System :: POSIX Classifier: Operating System :: MacOS :: MacOS X Classifier: Operating System :: Microsoft :: Windows @@ -423,3 +38,484 @@ Requires-Python: >=3.6 Description-Content-Type: text/x-rst Provides-Extra: sa +License-File: LICENSE + +aiopg +===== +.. image:: https://github.com/aio-libs/aiopg/workflows/CI/badge.svg + :target: https://github.com/aio-libs/aiopg/actions?query=workflow%3ACI +.. image:: https://codecov.io/gh/aio-libs/aiopg/branch/master/graph/badge.svg + :target: https://codecov.io/gh/aio-libs/aiopg +.. image:: https://badges.gitter.im/Join%20Chat.svg + :target: https://gitter.im/aio-libs/Lobby + :alt: Chat on Gitter + +**aiopg** is a library for accessing a PostgreSQL_ database +from the asyncio_ (PEP-3156/tulip) framework. It wraps +asynchronous features of the Psycopg database driver. + +Example +------- + +.. code:: python + + import asyncio + import aiopg + + dsn = 'dbname=aiopg user=aiopg password=passwd host=127.0.0.1' + + async def go(): + pool = await aiopg.create_pool(dsn) + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute("SELECT 1") + ret = [] + async for row in cur: + ret.append(row) + assert ret == [(1,)] + + loop = asyncio.get_event_loop() + loop.run_until_complete(go()) + + +Example of SQLAlchemy optional integration +------------------------------------------ + +.. code:: python + + import asyncio + from aiopg.sa import create_engine + import sqlalchemy as sa + + metadata = sa.MetaData() + + tbl = sa.Table('tbl', metadata, + sa.Column('id', sa.Integer, primary_key=True), + sa.Column('val', sa.String(255))) + + async def create_table(engine): + async with engine.acquire() as conn: + await conn.execute('DROP TABLE IF EXISTS tbl') + await conn.execute('''CREATE TABLE tbl ( + id serial PRIMARY KEY, + val varchar(255))''') + + async def go(): + async with create_engine(user='aiopg', + database='aiopg', + host='127.0.0.1', + password='passwd') as engine: + + async with engine.acquire() as conn: + await conn.execute(tbl.insert().values(val='abc')) + + async for row in conn.execute(tbl.select()): + print(row.id, row.val) + + loop = asyncio.get_event_loop() + loop.run_until_complete(go()) + +.. _PostgreSQL: http://www.postgresql.org/ +.. _asyncio: https://docs.python.org/3/library/asyncio.html + +Please use:: + + $ make test + +for executing the project's unittests. +See https://aiopg.readthedocs.io/en/stable/contributing.html for details +on how to set up your environment to run the tests. + +Changelog +--------- + +1.3.3 (2021-11-01) +^^^^^^^^^^^^^^^^^^ + +* Support async-timeout 4.0+ + + +1.3.2 (2021-10-07) +^^^^^^^^^^^^^^^^^^ + + +1.3.2b2 (2021-10-07) +^^^^^^^^^^^^^^^^^^^^ + +* Respect use_labels for select statement `#882 `_ + + +1.3.2b1 (2021-07-11) +^^^^^^^^^^^^^^^^^^^^ + +* Fix compatibility with SQLAlchemy >= 1.4 `#870 `_ + + +1.3.1 (2021-07-08) +^^^^^^^^^^^^^^^^^^ + + +1.3.1b2 (2021-07-06) +^^^^^^^^^^^^^^^^^^^^ + +* Suppress "Future exception was never retrieved" `#862 `_ + + +1.3.1b1 (2021-07-05) +^^^^^^^^^^^^^^^^^^^^ + +* Fix ClosableQueue.get on cancellation, close it on Connection.close `#859 `_ + + +1.3.0 (2021-06-30) +^^^^^^^^^^^^^^^^^^ + + +1.3.0b4 (2021-06-28) +^^^^^^^^^^^^^^^^^^^^ + +* Fix "Unable to detect disconnect when using NOTIFY/LISTEN" `#559 `_ + + +1.3.0b3 (2021-04-03) +^^^^^^^^^^^^^^^^^^^^ + +* Reformat using black `#814 `_ + + +1.3.0b2 (2021-04-02) +^^^^^^^^^^^^^^^^^^^^ + +* Type annotations `#813 `_ + + +1.3.0b1 (2021-03-30) +^^^^^^^^^^^^^^^^^^^^ + +* Raise ResourceClosedError if we try to open a cursor on a closed SAConnection `#811 `_ + + +1.3.0b0 (2021-03-25) +^^^^^^^^^^^^^^^^^^^^ + +* Fix compatibility with SA 1.4 for IN statement `#806 `_ + + +1.2.1 (2021-03-23) +^^^^^^^^^^^^^^^^^^ + +* Pop loop in connection init due to backward compatibility `#808 `_ + + +1.2.0b4 (2021-03-23) +^^^^^^^^^^^^^^^^^^^^ + +* Set max supported sqlalchemy version `#805 `_ + + +1.2.0b3 (2021-03-22) +^^^^^^^^^^^^^^^^^^^^ + +* Don't run ROLLBACK when the connection is closed `#778 `_ + +* Multiple cursors support `#801 `_ + + +1.2.0b2 (2020-12-21) +^^^^^^^^^^^^^^^^^^^^ + +* Fix IsolationLevel.read_committed and introduce IsolationLevel.default `#770 `_ + +* Fix python 3.8 warnings in tests `#771 `_ + + +1.2.0b1 (2020-12-16) +^^^^^^^^^^^^^^^^^^^^ + +* Deprecate blocking connection.cancel() method `#570 `_ + + +1.2.0b0 (2020-12-15) +^^^^^^^^^^^^^^^^^^^^ + +* Implement timeout on acquiring connection from pool `#766 `_ + + +1.1.0 (2020-12-10) +^^^^^^^^^^^^^^^^^^ + + +1.1.0b2 (2020-12-09) +^^^^^^^^^^^^^^^^^^^^ + +* Added missing slots to context managers `#763 `_ + + +1.1.0b1 (2020-12-07) +^^^^^^^^^^^^^^^^^^^^ + +* Fix on_connect multiple call on acquire `#552 `_ + +* Fix python 3.8 warnings `#622 `_ + +* Bump minimum psycopg version to 2.8.4 `#754 `_ + +* Fix Engine.release method to release connection in any way `#756 `_ + + +1.0.0 (2019-09-20) +^^^^^^^^^^^^^^^^^^ + +* Removal of an asynchronous call in favor of issues # 550 + +* Big editing of documentation and minor bugs #534 + + +0.16.0 (2019-01-25) +^^^^^^^^^^^^^^^^^^^ + +* Fix select priority name `#525 `_ + +* Rename `psycopg2` to `psycopg2-binary` to fix deprecation warning `#507 `_ + +* Fix `#189 `_ hstore when using ReadDictCursor `#512 `_ + +* close cannot be used while an asynchronous query is underway `#452 `_ + +* sqlalchemy adapter trx begin allow transaction_mode `#498 `_ + + +0.15.0 (2018-08-14) +^^^^^^^^^^^^^^^^^^^ + +* Support Python 3.7 `#437 `_ + + +0.14.0 (2018-05-10) +^^^^^^^^^^^^^^^^^^^ + +* Add ``get_dialect`` func to have ability to pass ``json_serializer`` `#451 `_ + + +0.13.2 (2018-01-03) +^^^^^^^^^^^^^^^^^^^ + +* Fixed compatibility with SQLAlchemy 1.2.0 `#412 `_ + +* Added support for transaction isolation levels `#219 `_ + + +0.13.1 (2017-09-10) +^^^^^^^^^^^^^^^^^^^ + +* Added connection poll recycling logic `#373 `_ + + +0.13.0 (2016-12-02) +^^^^^^^^^^^^^^^^^^^ + +* Add `async with` support to `.begin_nested()` `#208 `_ + +* Fix connection.cancel() `#212 `_ `#223 `_ + +* Raise informative error on unexpected connection closing `#191 `_ + +* Added support for python types columns issues `#217 `_ + +* Added support for default values in SA table issues `#206 `_ + + +0.12.0 (2016-10-09) +^^^^^^^^^^^^^^^^^^^ + +* Add an on_connect callback parameter to pool `#141 `_ + +* Fixed connection to work under both windows and posix based systems `#142 `_ + + +0.11.0 (2016-09-12) +^^^^^^^^^^^^^^^^^^^ + +* Immediately remove callbacks from a closed file descriptor `#139 `_ + +* Drop Python 3.3 support + + +0.10.0 (2016-07-16) +^^^^^^^^^^^^^^^^^^^ + +* Refactor tests to use dockerized Postgres server `#107 `_ + +* Reduce default pool minsize to 1 `#106 `_ + +* Explicitly enumerate packages in setup.py `#85 `_ + +* Remove expired connections from pool on acquire `#116 `_ + +* Don't crash when Connection is GC'ed `#124 `_ + +* Use loop.create_future() if available + + +0.9.2 (2016-01-31) +^^^^^^^^^^^^^^^^^^ + +* Make pool.release return asyncio.Future, so we can wait on it in + `__aexit__` `#102 `_ + +* Add support for uuid type `#103 `_ + + +0.9.1 (2016-01-17) +^^^^^^^^^^^^^^^^^^ + +* Documentation update `#101 `_ + + +0.9.0 (2016-01-14) +^^^^^^^^^^^^^^^^^^ + +* Add async context managers for transactions `#91 `_ + +* Support async iterator in ResultProxy `#92 `_ + +* Add async with for engine `#90 `_ + + +0.8.0 (2015-12-31) +^^^^^^^^^^^^^^^^^^ + +* Add PostgreSQL notification support `#58 `_ + +* Support pools with unlimited size `#59 `_ + +* Cancel current DB operation on asyncio timeout `#66 `_ + +* Add async with support for Pool, Connection, Cursor `#88 `_ + + +0.7.0 (2015-04-22) +^^^^^^^^^^^^^^^^^^ + +* Get rid of resource leak on connection failure. + +* Report ResourceWarning on non-closed connections. + +* Deprecate iteration protocol support in cursor and ResultProxy. + +* Release sa connection to pool on `connection.close()`. + + +0.6.0 (2015-02-03) +^^^^^^^^^^^^^^^^^^ + +* Accept dict, list, tuple, named and positional parameters in + `SAConnection.execute()` + + +0.5.2 (2014-12-08) +^^^^^^^^^^^^^^^^^^ + +* Minor release, fixes a bug that leaves connection in broken state + after `cursor.execute()` failure. + + +0.5.1 (2014-10-31) +^^^^^^^^^^^^^^^^^^ + +* Fix a bug for processing transactions in line. + + +0.5.0 (2014-10-31) +^^^^^^^^^^^^^^^^^^ + +* Add .terminate() to Pool and Engine + +* Reimplement connection pool (now pool size cannot be greater than pool.maxsize) + +* Add .close() and .wait_closed() to Pool and Engine + +* Add minsize, maxsize, size and freesize properties to sa.Engine + +* Support *echo* parameter for logging executed SQL commands + +* Connection.close() is not a coroutine (but we keep backward compatibility). + + +0.4.1 (2014-10-02) +^^^^^^^^^^^^^^^^^^ + +* make cursor iterable + +* update docs + + +0.4.0 (2014-10-02) +^^^^^^^^^^^^^^^^^^ + +* add timeouts for database operations. + +* Autoregister psycopg2 support for json data type. + +* Support JSON in aiopg.sa + +* Support ARRAY in aiopg.sa + +* Autoregister hstore support if present in connected DB + +* Support HSTORE in aiopg.sa + + +0.3.2 (2014-07-07) +^^^^^^^^^^^^^^^^^^ + +* change signature to cursor.execute(operation, parameters=None) to + follow psycopg2 convention. + + +0.3.1 (2014-07-04) +^^^^^^^^^^^^^^^^^^ + +* Forward arguments to cursor constructor for pooled connections. + + +0.3.0 (2014-06-22) +^^^^^^^^^^^^^^^^^^ + +* Allow executing SQLAlchemy DDL statements. + +* Fix bug with race conditions on acquiring/releasing connections from pool. + + +0.2.3 (2014-06-12) +^^^^^^^^^^^^^^^^^^ + +* Fix bug in connection pool. + + +0.2.2 (2014-06-07) +^^^^^^^^^^^^^^^^^^ + +* Fix bug with passing parameters into SAConnection.execute when + executing raw SQL expression. + + +0.2.1 (2014-05-08) +^^^^^^^^^^^^^^^^^^ + +* Close connection with invalid transaction status on returning to pool. + + +0.2.0 (2014-05-04) +^^^^^^^^^^^^^^^^^^ + +* Implemented optional support for sqlalchemy functional sql layer. + + +0.1.0 (2014-04-06) +^^^^^^^^^^^^^^^^^^ + +* Implemented plain connections: connect, Connection, Cursor. + +* Implemented database pools: create_pool and Pool. + diff --git a/aiopg.egg-info/SOURCES.txt b/aiopg.egg-info/SOURCES.txt index c8e8955..54b5cce 100644 --- a/aiopg.egg-info/SOURCES.txt +++ b/aiopg.egg-info/SOURCES.txt @@ -7,10 +7,8 @@ setup.py aiopg/__init__.py aiopg/connection.py -aiopg/cursor.py aiopg/log.py aiopg/pool.py -aiopg/transaction.py aiopg/utils.py aiopg.egg-info/PKG-INFO aiopg.egg-info/SOURCES.txt @@ -22,4 +20,5 @@ aiopg/sa/engine.py aiopg/sa/exc.py aiopg/sa/result.py -aiopg/sa/transaction.py \ No newline at end of file +aiopg/sa/transaction.py +aiopg/sa/utils.py \ No newline at end of file diff --git a/aiopg.egg-info/requires.txt b/aiopg.egg-info/requires.txt index 4bad241..7fb62c8 100644 --- a/aiopg.egg-info/requires.txt +++ b/aiopg.egg-info/requires.txt @@ -1,5 +1,5 @@ psycopg2-binary>=2.8.4 -async_timeout<4.0,>=3.0 +async_timeout<5.0,>=3.0 [sa] -sqlalchemy[postgresql_psycopg2binary]>=1.1 +sqlalchemy[postgresql_psycopg2binary]<1.5,>=1.3 diff --git a/setup.py b/setup.py index 5231006..33e2913 100644 --- a/setup.py +++ b/setup.py @@ -1,82 +1,80 @@ -import os import re +from pathlib import Path from setuptools import setup, find_packages -install_requires = ['psycopg2-binary>=2.8.4', 'async_timeout>=3.0,<4.0'] -extras_require = {'sa': ['sqlalchemy[postgresql_psycopg2binary]>=1.1']} +install_requires = ["psycopg2-binary>=2.8.4", "async_timeout>=3.0,<5.0"] +extras_require = {"sa": ["sqlalchemy[postgresql_psycopg2binary]>=1.3,<1.5"]} -def read(f): - return open(os.path.join(os.path.dirname(__file__), f)).read().strip() +def read(*parts): + return Path(__file__).resolve().parent.joinpath(*parts).read_text().strip() -def get_maintainers(path='MAINTAINERS.txt'): - with open(os.path.join(os.path.dirname(__file__), path)) as f: - return ', '.join(x.strip().strip('*').strip() for x in f.readlines()) +def get_maintainers(path="MAINTAINERS.txt"): + return ", ".join(x.strip().strip("*").strip() for x in read(path).splitlines()) def read_version(): - regexp = re.compile(r"^__version__\W*=\W*'([\d.abrc]+)'") - init_py = os.path.join(os.path.dirname(__file__), 'aiopg', '__init__.py') - with open(init_py) as f: - for line in f: - match = regexp.match(line) - if match is not None: - return match.group(1) - else: - raise RuntimeError('Cannot find version in aiopg/__init__.py') + regexp = re.compile(r"^__version__\W*=\W*\"([\d.abrc]+)\"") + for line in read("aiopg", "__init__.py").splitlines(): + match = regexp.match(line) + if match is not None: + return match.group(1) + + raise RuntimeError("Cannot find version in aiopg/__init__.py") -def read_changelog(path='CHANGES.txt'): - return 'Changelog\n---------\n\n{}'.format(read(path)) +def read_changelog(path="CHANGES.txt"): + return f"Changelog\n---------\n\n{read(path)}" classifiers = [ - 'License :: OSI Approved :: BSD License', - 'Intended Audience :: Developers', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3 :: Only', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: 3.8', - 'Programming Language :: Python :: 3.9', - 'Operating System :: POSIX', - 'Operating System :: MacOS :: MacOS X', - 'Operating System :: Microsoft :: Windows', - 'Environment :: Web Environment', - 'Development Status :: 5 - Production/Stable', - 'Topic :: Database', - 'Topic :: Database :: Front-Ends', - 'Framework :: AsyncIO', + "License :: OSI Approved :: BSD License", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Operating System :: POSIX", + "Operating System :: MacOS :: MacOS X", + "Operating System :: Microsoft :: Windows", + "Environment :: Web Environment", + "Development Status :: 5 - Production/Stable", + "Topic :: Database", + "Topic :: Database :: Front-Ends", + "Framework :: AsyncIO", ] setup( - name='aiopg', + name="aiopg", version=read_version(), - description='Postgres integration with asyncio.', - long_description='\n\n'.join((read('README.rst'), read_changelog())), - long_description_content_type='text/x-rst', + description="Postgres integration with asyncio.", + long_description="\n\n".join((read("README.rst"), read_changelog())), + long_description_content_type="text/x-rst", classifiers=classifiers, - platforms=['macOS', 'POSIX', 'Windows'], - author='Andrew Svetlov', - python_requires='>=3.6', + platforms=["macOS", "POSIX", "Windows"], + author="Andrew Svetlov", + python_requires=">=3.6", project_urls={ - 'Chat: Gitter': 'https://gitter.im/aio-libs/Lobby', - 'CI: GA': 'https://github.com/aio-libs/aiopg/actions?query=workflow%3ACI', - 'Coverage: codecov': 'https://codecov.io/gh/aio-libs/aiopg', - 'Docs: RTD': 'https://aiopg.readthedocs.io', - 'GitHub: issues': 'https://github.com/aio-libs/aiopg/issues', - 'GitHub: repo': 'https://github.com/aio-libs/aiopg', + "Chat: Gitter": "https://gitter.im/aio-libs/Lobby", + "CI: GA": "https://github.com/aio-libs/aiopg/actions?query=workflow%3ACI", + "Coverage: codecov": "https://codecov.io/gh/aio-libs/aiopg", + "Docs: RTD": "https://aiopg.readthedocs.io", + "GitHub: issues": "https://github.com/aio-libs/aiopg/issues", + "GitHub: repo": "https://github.com/aio-libs/aiopg", }, - author_email='andrew.svetlov@gmail.com', + author_email="andrew.svetlov@gmail.com", maintainer=get_maintainers(), - maintainer_email='virmir49@gmail.com', - url='https://aiopg.readthedocs.io', - download_url='https://pypi.python.org/pypi/aiopg', - license='BSD', + maintainer_email="virmir49@gmail.com", + url="https://aiopg.readthedocs.io", + download_url="https://pypi.python.org/pypi/aiopg", + license="BSD", packages=find_packages(), install_requires=install_requires, extras_require=extras_require, - include_package_data=True + include_package_data=True, )