From a3ab4768a13540e8c8f566bca990a98f96cb3802 Mon Sep 17 00:00:00 2001 From: dzen Date: Thu, 5 Mar 2020 14:40:50 +0100 Subject: [PATCH 1/2] Support newer Python Combines upstream commits to suppress asyncio deprecations: - f004c52 - 388d38f - 0cdfe91 - b4f0120 - 23d84ca --- .travis.yml | 6 ++--- Dockerfile | 2 +- aioamqp/__init__.py | 4 ++-- aioamqp/channel.py | 4 ++-- aioamqp/protocol.py | 33 ++++++++++++++------------ aioamqp/tests/test_basic.py | 2 +- aioamqp/tests/test_connection_close.py | 2 +- aioamqp/tests/test_connection_lost.py | 2 +- aioamqp/tests/test_protocol.py | 4 ++-- docs/changelog.rst | 4 ++++ docs/introduction.rst | 2 +- setup.cfg | 2 +- setup.py | 2 +- tox.ini | 2 +- 14 files changed, 39 insertions(+), 32 deletions(-) diff --git a/.travis.yml b/.travis.yml index 1069e7c..9d6e4ef 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,10 +1,10 @@ language: python -dist: bionic +dist: focal python: -- 3.5 - 3.6 -- 3.7-dev +- 3.7 - 3.8 +- 3.9 services: - rabbitmq env: diff --git a/Dockerfile b/Dockerfile index 7ec4545..1ea92f1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.5 +FROM python:3.9 WORKDIR /usr/src/app diff --git a/aioamqp/__init__.py b/aioamqp/__init__.py index 3e67936..030e7db 100644 --- a/aioamqp/__init__.py +++ b/aioamqp/__init__.py @@ -10,7 +10,7 @@ from .version import __packagename__ async def connect(host='localhost', port=None, login='guest', password='guest', - virtualhost='/', ssl=None, login_method='AMQPLAIN', insist=False, + virtualhost='/', ssl=None, login_method='PLAIN', insist=False, protocol_factory=AmqpProtocol, *, loop=None, **kwargs): """Convenient method to connect to an AMQP broker @@ -69,7 +69,7 @@ async def connect(host='localhost', port=None, login='guest', password='guest', async def from_url( - url, login_method='AMQPLAIN', insist=False, protocol_factory=AmqpProtocol, **kwargs): + url, login_method='PLAIN', insist=False, protocol_factory=AmqpProtocol, **kwargs): """ Connect to the AMQP using a single url parameter and return the client. For instance: diff --git a/aioamqp/channel.py b/aioamqp/channel.py index 7f0f402..b58bd5a 100644 --- a/aioamqp/channel.py +++ b/aioamqp/channel.py @@ -31,7 +31,7 @@ class Channel: self.cancellation_callbacks = [] self.return_callback = return_callback self.response_future = None - self.close_event = asyncio.Event(loop=self._loop) + self.close_event = asyncio.Event() self.cancelled_consumers = set() self.last_consumer_tag = None self.publisher_confirms = False @@ -518,7 +518,7 @@ class Channel: } future = self._get_waiter('basic_consume' + ctag) future.set_result(results) - self._ctag_events[ctag] = asyncio.Event(loop=self._loop) + self._ctag_events[ctag] = asyncio.Event() async def basic_deliver(self, frame): consumer_tag = frame.consumer_tag diff --git a/aioamqp/protocol.py b/aioamqp/protocol.py index ddf1285..4938833 100644 --- a/aioamqp/protocol.py +++ b/aioamqp/protocol.py @@ -79,8 +79,8 @@ class AmqpProtocol(asyncio.StreamReaderProtocol): self.connection_tunning['heartbeat'] = kwargs.get('heartbeat') self.connecting = asyncio.Future(loop=self._loop) - self.connection_closed = asyncio.Event(loop=self._loop) - self.stop_now = asyncio.Future(loop=self._loop) + self.connection_closed = asyncio.Event() + self.stop_now = asyncio.Event() self.state = CONNECTING self.version_major = None self.version_minor = None @@ -91,14 +91,14 @@ class AmqpProtocol(asyncio.StreamReaderProtocol): self.server_heartbeat = None self._heartbeat_timer_recv = None self._heartbeat_timer_send = None - self._heartbeat_trigger_send = asyncio.Event(loop=self._loop) + self._heartbeat_trigger_send = asyncio.Event() self._heartbeat_worker = None self.channels = {} self.server_frame_max = None self.server_channel_max = None self.channels_ids_ceil = 0 self.channels_ids_free = set() - self._drain_lock = asyncio.Lock(loop=self._loop) + self._drain_lock = asyncio.Lock() def connection_made(self, transport): super().connection_made(transport) @@ -171,16 +171,13 @@ class AmqpProtocol(asyncio.StreamReaderProtocol): await self.wait_closed(timeout=timeout) async def wait_closed(self, timeout=None): - await asyncio.wait_for(self.connection_closed.wait(), timeout=timeout, loop=self._loop) + await asyncio.wait_for(self.connection_closed.wait(), timeout=timeout) if self._heartbeat_worker is not None: try: - await asyncio.wait_for(self._heartbeat_worker, timeout=timeout, loop=self._loop) + await asyncio.wait_for(self._heartbeat_worker, timeout=timeout) except asyncio.CancelledError: pass - async def close_ok(self, frame): - self._stream_writer.close() - async def start_connection(self, host, port, login, password, virtualhost, ssl=False, login_method='PLAIN', insist=False): """Initiate a connection at the protocol level @@ -188,7 +185,7 @@ class AmqpProtocol(asyncio.StreamReaderProtocol): """ if login_method != 'PLAIN': - logger.warning('only PLAIN login_method is supported, falling back to AMQPLAIN') + logger.warning('login_method %s is not supported, falling back to PLAIN', login_method) self._stream_writer.write(amqp_constants.PROTOCOL_HEADER) @@ -311,12 +308,12 @@ class AmqpProtocol(asyncio.StreamReaderProtocol): channel.connection_closed(reply_code, reply_text, exception) async def run(self): - while not self.stop_now.done(): + while not self.stop_now.is_set(): try: await self.dispatch_frame() except exceptions.AmqpClosedConnection as exc: logger.info("Close connection") - self.stop_now.set_result(None) + self.stop_now.set() self._close_channels(exception=exc) except Exception: # pylint: disable=broad-except @@ -329,7 +326,7 @@ class AmqpProtocol(asyncio.StreamReaderProtocol): the rest of the AmqpProtocol class. This is kept around for backwards compatibility purposes only. """ - await self.stop_now + await self.stop_now.wait() async def send_heartbeat(self): """Sends an heartbeat message. @@ -403,6 +400,11 @@ class AmqpProtocol(asyncio.StreamReaderProtocol): ) await self._write_frame(0, request) + async def close_ok(self, frame): + """In response to server close confirmation""" + self.stop_now.set() + self._stream_writer.close() + async def server_close(self, frame): """The server is closing the connection""" self.state = CLOSING @@ -414,6 +416,7 @@ class AmqpProtocol(asyncio.StreamReaderProtocol): reply_text, reply_code, class_id, method_id) self._close_channels(reply_code, reply_text) await self._close_ok() + self.stop_now.set() self._stream_writer.close() async def _close_ok(self): @@ -456,11 +459,11 @@ class AmqpProtocol(asyncio.StreamReaderProtocol): await self.ensure_open() try: channel_id = self.channels_ids_free.pop() - except KeyError: + except KeyError as ex: assert self.server_channel_max is not None, 'connection channel-max tuning not performed' # channel-max = 0 means no limit if self.server_channel_max and self.channels_ids_ceil > self.server_channel_max: - raise exceptions.NoChannelAvailable() + raise exceptions.NoChannelAvailable() from ex self.channels_ids_ceil += 1 channel_id = self.channels_ids_ceil channel = self.CHANNEL_FACTORY(self, channel_id, **kwargs) diff --git a/aioamqp/tests/test_basic.py b/aioamqp/tests/test_basic.py index d59ce19..00ec880 100644 --- a/aioamqp/tests/test_basic.py +++ b/aioamqp/tests/test_basic.py @@ -58,7 +58,7 @@ class BasicCancelTestCase(testcase.RabbitTestCaseMixin, asynctest.TestCase): result = await self.channel.publish("payload", exchange_name, routing_key='') - await asyncio.sleep(5, loop=self.loop) + await asyncio.sleep(5) result = await self.channel.queue_declare(queue_name, passive=True) self.assertEqual(result['message_count'], 1) diff --git a/aioamqp/tests/test_connection_close.py b/aioamqp/tests/test_connection_close.py index 9491548..c5d4188 100644 --- a/aioamqp/tests/test_connection_close.py +++ b/aioamqp/tests/test_connection_close.py @@ -22,7 +22,7 @@ class CloseTestCase(testcase.RabbitTestCaseMixin, asynctest.TestCase): # TODO: remove with python <3.4.4 support self.assertTrue(transport._closing) # make sure those 2 tasks/futures are properly set as finished - await amqp.stop_now + await amqp.stop_now.wait() await amqp.worker async def test_multiple_close(self): diff --git a/aioamqp/tests/test_connection_lost.py b/aioamqp/tests/test_connection_lost.py index 21c7819..d16d51b 100644 --- a/aioamqp/tests/test_connection_lost.py +++ b/aioamqp/tests/test_connection_lost.py @@ -24,7 +24,7 @@ class ConnectionLostTestCase(testcase.RabbitTestCaseMixin, asynctest.TestCase): self.assertEqual(amqp.state, OPEN) self.assertTrue(channel.is_open) amqp._stream_reader._transport.close() # this should have the same effect as the tcp connection being lost - await asyncio.wait_for(amqp.worker, 1, loop=self.loop) + await asyncio.wait_for(amqp.worker, 1) self.assertEqual(amqp.state, CLOSED) self.assertFalse(channel.is_open) self.assertTrue(self.callback_called) diff --git a/aioamqp/tests/test_protocol.py b/aioamqp/tests/test_protocol.py index 766a5d9..80d4187 100644 --- a/aioamqp/tests/test_protocol.py +++ b/aioamqp/tests/test_protocol.py @@ -55,7 +55,7 @@ class ProtocolTestCase(testcase.RabbitTestCaseMixin, asynctest.TestCase): connect.assert_called_once_with( insist=False, password='pass', - login_method='AMQPLAIN', + login_method='PLAIN', login='tom', host='example.com', protocol_factory=AmqpProtocol, @@ -74,7 +74,7 @@ class ProtocolTestCase(testcase.RabbitTestCaseMixin, asynctest.TestCase): connect.assert_called_once_with( insist=False, password='pass', - login_method='AMQPLAIN', + login_method='PLAIN', ssl=ssl_context, login='tom', host='example.com', diff --git a/docs/changelog.rst b/docs/changelog.rst index 0939ca4..8ac4f7b 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -4,6 +4,10 @@ Changelog Next release ------------ + * Add support for Python 3.9. + * Drop support for Python 3.5. + * Fix annoying auth method warning because of a wrong defined default argument (closes #214). + Aioamqp 0.14.0 -------------- diff --git a/docs/introduction.rst b/docs/introduction.rst index ec86f61..d0e183a 100644 --- a/docs/introduction.rst +++ b/docs/introduction.rst @@ -7,7 +7,7 @@ Aioamqp library is a pure-Python implementation of the AMQP 0.9.1 protocol using Prerequisites ------------- -Aioamqp works only with python >= 3.5 using asyncio library. +Aioamqp works only with python >= 3.6 using asyncio library. Installation ------------ diff --git a/setup.cfg b/setup.cfg index 0ab7d0b..9a9f5ab 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,2 +1,2 @@ [bdist_wheel] -python-tag = py35.py36.py37.py38 +python-tag = py36.py37.py38.py39 diff --git a/setup.py b/setup.py index a740243..ea0e79f 100644 --- a/setup.py +++ b/setup.py @@ -34,10 +34,10 @@ setuptools.setup( "Operating System :: OS Independent", "Programming Language :: Python", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.5", "Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", ], platforms='all', license='BSD' diff --git a/tox.ini b/tox.ini index 127cc7b..a1f5fcd 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py35, py36, py37, py38 +envlist = py36, py37, py38, py39 skipsdist = true skip_missing_interpreters = true -- 2.34.1