Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,17 @@ async def handle_async_request(self, request: Request) -> Response:
#
# In this case we clear the connection and try again.
pool_request.clear_connection()
with self._optional_thread_lock:
closing = []
# If the connection still claims to be available then
# it would be immediately assigned again, so drop it.
if (
connection in self._connections
and connection.is_available()
):
self._connections.remove(connection)
closing.append(connection)
await self._close_connections(closing)
else:
break # pragma: nocover

Expand Down
1 change: 1 addition & 0 deletions httpcore/_async/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ def can_handle_request(self, origin: Origin) -> bool:
def is_available(self) -> bool:
return (
self._state != HTTPConnectionState.CLOSED
and self._connection_terminated is None
and not self._connection_error
and not self._used_all_stream_ids
and not (
Expand Down
11 changes: 11 additions & 0 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,17 @@ def handle_request(self, request: Request) -> Response:
#
# In this case we clear the connection and try again.
pool_request.clear_connection()
with self._optional_thread_lock:
closing = []
# If the connection still claims to be available then
# it would be immediately assigned again, so drop it.
if (
connection in self._connections
and connection.is_available()
):
self._connections.remove(connection)
closing.append(connection)
self._close_connections(closing)
else:
break # pragma: nocover

Expand Down
1 change: 1 addition & 0 deletions httpcore/_sync/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ def can_handle_request(self, origin: Origin) -> bool:
def is_available(self) -> bool:
return (
self._state != HTTPConnectionState.CLOSED
and self._connection_terminated is None
and not self._connection_error
and not self._used_all_stream_ids
and not (
Expand Down
106 changes: 106 additions & 0 deletions tests/_async/test_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,112 @@ async def trace(name, kwargs):
]


@pytest.mark.anyio
async def test_connection_pool_drops_stale_available_connection():
"""
If a connection claims to be available but then refuses a request,
the pool should not keep assigning requests to that stale connection.
"""

class StaleConnection(httpcore.AsyncConnectionInterface):
def __init__(self) -> None:
self.closed = False

async def handle_async_request(
self, request: httpcore.Request
) -> httpcore.Response:
raise httpcore.ConnectionNotAvailable()

def can_handle_request(self, origin: httpcore.Origin) -> bool:
return True

def is_available(self) -> bool:
return True

def has_expired(self) -> bool:
return False

def is_idle(self) -> bool:
return True

def is_closed(self) -> bool:
return False

async def aclose(self) -> None:
self.closed = True

def info(self) -> str:
return "STALE"

class SuccessConnection(httpcore.AsyncConnectionInterface):
def __init__(self) -> None:
self.closed = False

async def handle_async_request(
self, request: httpcore.Request
) -> httpcore.Response:
async def content() -> typing.AsyncIterator[bytes]:
yield b"ok"

return httpcore.Response(200, content=content())

def can_handle_request(self, origin: httpcore.Origin) -> bool:
return True

def is_available(self) -> bool:
return True

def has_expired(self) -> bool:
return False

def is_idle(self) -> bool:
return True

def is_closed(self) -> bool:
return False

async def aclose(self) -> None:
self.closed = True

def info(self) -> str:
return "OK"

class Pool(httpcore.AsyncConnectionPool):
def __init__(self) -> None:
super().__init__()
self.stale_connection = StaleConnection()
self.success_connection = SuccessConnection()
self._created = 0

def create_connection(
self, origin: httpcore.Origin
) -> httpcore.AsyncConnectionInterface:
self._created += 1
if self._created == 1:
return self.stale_connection
return self.success_connection

origin = httpcore.Origin(b"https", b"example.com", 443)
pool = Pool()
async with pool:
response = await pool.request("GET", "https://example.com/")

assert response.status == 200
assert response.content == b"ok"
assert pool.stale_connection.closed
assert len(pool.connections) == 1
connection = typing.cast(typing.Any, pool.connections[0])
assert connection is pool.success_connection
assert pool.stale_connection.can_handle_request(origin)
assert not pool.stale_connection.has_expired()
assert pool.stale_connection.is_idle()
assert not pool.stale_connection.is_closed()
assert pool.stale_connection.info() == "STALE"
assert pool.success_connection.can_handle_request(origin)
assert pool.success_connection.is_available()
assert pool.success_connection.info() == "OK"


@pytest.mark.anyio
async def test_connection_pool_with_immediate_expiry():
"""
Expand Down
106 changes: 106 additions & 0 deletions tests/_sync/test_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,112 @@ def trace(name, kwargs):



def test_connection_pool_drops_stale_available_connection():
"""
If a connection claims to be available but then refuses a request,
the pool should not keep assigning requests to that stale connection.
"""

class StaleConnection(httpcore.ConnectionInterface):
def __init__(self) -> None:
self.closed = False

def handle_request(
self, request: httpcore.Request
) -> httpcore.Response:
raise httpcore.ConnectionNotAvailable()

def can_handle_request(self, origin: httpcore.Origin) -> bool:
return True

def is_available(self) -> bool:
return True

def has_expired(self) -> bool:
return False

def is_idle(self) -> bool:
return True

def is_closed(self) -> bool:
return False

def close(self) -> None:
self.closed = True

def info(self) -> str:
return "STALE"

class SuccessConnection(httpcore.ConnectionInterface):
def __init__(self) -> None:
self.closed = False

def handle_request(
self, request: httpcore.Request
) -> httpcore.Response:
def content() -> typing.Iterator[bytes]:
yield b"ok"

return httpcore.Response(200, content=content())

def can_handle_request(self, origin: httpcore.Origin) -> bool:
return True

def is_available(self) -> bool:
return True

def has_expired(self) -> bool:
return False

def is_idle(self) -> bool:
return True

def is_closed(self) -> bool:
return False

def close(self) -> None:
self.closed = True

def info(self) -> str:
return "OK"

class Pool(httpcore.ConnectionPool):
def __init__(self) -> None:
super().__init__()
self.stale_connection = StaleConnection()
self.success_connection = SuccessConnection()
self._created = 0

def create_connection(
self, origin: httpcore.Origin
) -> httpcore.ConnectionInterface:
self._created += 1
if self._created == 1:
return self.stale_connection
return self.success_connection

origin = httpcore.Origin(b"https", b"example.com", 443)
pool = Pool()
with pool:
response = pool.request("GET", "https://example.com/")

assert response.status == 200
assert response.content == b"ok"
assert pool.stale_connection.closed
assert len(pool.connections) == 1
connection = typing.cast(typing.Any, pool.connections[0])
assert connection is pool.success_connection
assert pool.stale_connection.can_handle_request(origin)
assert not pool.stale_connection.has_expired()
assert pool.stale_connection.is_idle()
assert not pool.stale_connection.is_closed()
assert pool.stale_connection.info() == "STALE"
assert pool.success_connection.can_handle_request(origin)
assert pool.success_connection.is_available()
assert pool.success_connection.info() == "OK"



def test_connection_pool_with_immediate_expiry():
"""
Connection pools with keepalive_expiry=0.0 should immediately expire
Expand Down
Loading