Skip to content
72 changes: 64 additions & 8 deletions mssql_python/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
InternalError,
ProgrammingError,
NotSupportedError,
sqlstate_to_exception,
)
from mssql_python.auth import extract_auth_type, process_connection_string
from mssql_python.constants import ConstantsDDBC, GetInfoConstants
Expand All @@ -57,6 +58,42 @@
# Note: "utf-16" with BOM is NOT included as it's problematic for SQL_WCHAR
UTF16_ENCODINGS: frozenset[str] = frozenset(["utf-16le", "utf-16be"])

_SQLSTATE_RE = re.compile(r"^SQLSTATE:([A-Z0-9]{0,5}):(.*)", re.DOTALL)


def _raise_connection_error(e: RuntimeError) -> None:
"""Map a RuntimeError from the C++ pybind layer to the correct DB-API 2.0 exception.

Connection::checkError() throws "SQLSTATE:XXXXX:<odbc_message>" so the SQLSTATE
can be mapped via sqlstate_to_exception(), consistent with cursor-level error handling.
"""
error_msg = str(e)
match = _SQLSTATE_RE.match(error_msg)
if match:
sqlstate, ddbc_error = match.group(1), match.group(2)
# Handle malformed SQLSTATE prefix (empty or invalid code)
if not sqlstate or len(sqlstate) != 5:
logger.error("Connection error (malformed SQLSTATE): %s", ddbc_error)
raise OperationalError(
driver_error="Connection operation failed",
ddbc_error=ddbc_error,
) from None
exc = sqlstate_to_exception(sqlstate, ddbc_error)
if exc is None:
logger.error("Unknown SQLSTATE %s, raising DatabaseError", sqlstate)
raise DatabaseError(
driver_error=f"An error occurred with SQLSTATE code: {sqlstate}",
ddbc_error=ddbc_error,
) from None
logger.error("Connection error (SQLSTATE %s): %s", sqlstate, ddbc_error)
raise exc from None
# Fallback: no SQLSTATE prefix — e.g. "Connection handle not allocated"
logger.error("Connection error: %s", error_msg)
raise OperationalError(
driver_error="Connection operation failed",
ddbc_error=error_msg,
) from None

Comment thread
gargsaumya marked this conversation as resolved.

def _validate_utf16_wchar_compatibility(
encoding: str, wchar_type: int, context: str = "SQL_WCHAR"
Expand Down Expand Up @@ -333,9 +370,12 @@ def __init__(
if not PoolingManager.is_initialized():
PoolingManager.enable()
self._pooling = PoolingManager.is_enabled()
self._conn = ddbc_bindings.Connection(
self.connection_str, self._pooling, self._attrs_before
)
try:
self._conn = ddbc_bindings.Connection(
self.connection_str, self._pooling, self._attrs_before
)
except RuntimeError as e:
_raise_connection_error(e)
self.setautocommit(autocommit)

# Register this connection for cleanup before Python shutdown
Expand Down Expand Up @@ -456,7 +496,10 @@ def autocommit(self) -> bool:
Returns:
bool: True if autocommit is enabled, False otherwise.
"""
return self._conn.get_autocommit()
try:
return self._conn.get_autocommit()
except RuntimeError as e:
_raise_connection_error(e)

@autocommit.setter
def autocommit(self, value: bool) -> None:
Expand Down Expand Up @@ -496,7 +539,10 @@ def setautocommit(self, value: bool = False) -> None:
Raises:
DatabaseError: If there is an error while setting the autocommit mode.
"""
self._conn.set_autocommit(value)
try:
self._conn.set_autocommit(value)
except RuntimeError as e:
_raise_connection_error(e)

def setencoding(self, encoding: Optional[str] = None, ctype: Optional[int] = None) -> None:
"""
Expand Down Expand Up @@ -1491,7 +1537,10 @@ def commit(self) -> None:
)

# Commit the current transaction
self._conn.commit()
try:
self._conn.commit()
except RuntimeError as e:
_raise_connection_error(e)
logger.info("Transaction committed successfully.")

def rollback(self) -> None:
Expand All @@ -1514,7 +1563,10 @@ def rollback(self) -> None:
)

# Roll back the current transaction
self._conn.rollback()
try:
self._conn.rollback()
except RuntimeError as e:
_raise_connection_error(e)
logger.info("Transaction rolled back successfully.")

def close(self) -> None:
Expand Down Expand Up @@ -1570,7 +1622,11 @@ def close(self) -> None:
# For autocommit True, this is not necessary as each statement is
# committed immediately
logger.debug("Rolling back uncommitted changes before closing connection.")
self._conn.rollback()
try:
self._conn.rollback()
except RuntimeError as e:
# Handle C++ layer RuntimeError with proper DB-API exception mapping
_raise_connection_error(e)
# TODO: Check potential race conditions in case of multithreaded scenarios
# Close the connection
self._conn.close()
Expand Down
11 changes: 10 additions & 1 deletion mssql_python/pybind/connection/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,17 @@ void Connection::disconnect() {
// DB spec compliant
void Connection::checkError(SQLRETURN ret) const {
if (!SQL_SUCCEEDED(ret)) {
// Format: "SQLSTATE:XXXXX:<odbc_error_message>" — parsed by _raise_connection_error()
ErrorInfo err = SQLCheckError_Wrap(SQL_HANDLE_DBC, _dbcHandle, ret);
ThrowStdException(err.ddbcErrorMsg);
std::string sqlState = err.sqlState;
std::string errorMsg = err.ddbcErrorMsg;
// Only add SQLSTATE prefix if we have a valid 5-character code
if (sqlState.length() == 5) {
ThrowStdException("SQLSTATE:" + sqlState + ":" + errorMsg);
} else {
// No valid SQLSTATE (e.g., SQL_INVALID_HANDLE) — throw clean error message
ThrowStdException(errorMsg);
}
}
}

Expand Down
86 changes: 86 additions & 0 deletions tests/test_006_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,92 @@ def test_connection_error():
assert "Incomplete specification" in str(excinfo.value) or "has no value" in str(excinfo.value)


def test_connect_runtime_error_mapped_to_correct_dbapi_exception():
"""Regression test for https://github.com/microsoft/mssql-python/issues/532.

Connection failures from the C++ pybind layer (RuntimeError) must be mapped
to the correct DB-API 2.0 exception via the embedded SQLSTATE code.
This covers connect(), commit(), and rollback() — all of which go through
Connection::checkError() in the C++ layer.
"""
from unittest.mock import MagicMock, patch

# SQLSTATE 28000 = login failed -> OperationalError
with patch(
"mssql_python.connection.ddbc_bindings.Connection",
side_effect=RuntimeError("SQLSTATE:28000:Login failed for user 'baduser'."),
):
with pytest.raises(OperationalError) as exc_info:
connect("Server=testserver;Database=mydb;UID=baduser;PWD=wrongpassword;")
assert "Login failed for user" in exc_info.value.ddbc_error
assert not isinstance(exc_info.value, RuntimeError)

# SQLSTATE IM002 = driver not found -> OperationalError (per SQLSTATE mapping)
with patch(
"mssql_python.connection.ddbc_bindings.Connection",
side_effect=RuntimeError(
"SQLSTATE:IM002:Data source name not found and no default driver specified"
),
):
with pytest.raises(OperationalError) as exc_info:
connect("Server=testserver;Database=mydb;UID=u;PWD=p;")
assert "no default driver" in exc_info.value.ddbc_error
assert not isinstance(exc_info.value, RuntimeError)

Comment thread
gargsaumya marked this conversation as resolved.
# No SQLSTATE prefix -> fallback OperationalError
with patch(
"mssql_python.connection.ddbc_bindings.Connection",
side_effect=RuntimeError("Connection handle not allocated"),
):
with pytest.raises(OperationalError) as exc_info:
connect("Server=testserver;Database=mydb;UID=u;PWD=p;")
assert "Connection handle not allocated" in exc_info.value.ddbc_error
assert not isinstance(exc_info.value, RuntimeError)

# Unmapped SQLSTATE (sqlstate_to_exception returns None) -> DatabaseError
with patch(
"mssql_python.connection.ddbc_bindings.Connection",
side_effect=RuntimeError("SQLSTATE:99999:Unknown error with unmapped code"),
):
with pytest.raises(DatabaseError) as exc_info:
connect("Server=testserver;Database=mydb;UID=u;PWD=p;")
assert "Unknown error with unmapped code" in exc_info.value.ddbc_error
assert "SQLSTATE code: 99999" in exc_info.value.driver_error
assert not isinstance(exc_info.value, RuntimeError)

# Malformed SQLSTATE (empty code) -> OperationalError
with patch(
"mssql_python.connection.ddbc_bindings.Connection",
side_effect=RuntimeError("SQLSTATE::Invalid handle!"),
):
with pytest.raises(OperationalError) as exc_info:
connect("Server=testserver;Database=mydb;UID=u;PWD=p;")
assert "Invalid handle!" in exc_info.value.ddbc_error
assert not isinstance(exc_info.value, RuntimeError)

# commit() failure -> OperationalError via same _raise_connection_error path
mock_conn = MagicMock()
mock_conn.commit.side_effect = RuntimeError("SQLSTATE:08S01:Communication link failure")
mock_conn.get_autocommit.return_value = False
with patch("mssql_python.connection.ddbc_bindings.Connection", return_value=mock_conn):
conn = connect("Server=testserver;Database=mydb;UID=u;PWD=p;")
with pytest.raises(OperationalError) as exc_info:
conn.commit()
assert "Communication link failure" in exc_info.value.ddbc_error
assert not isinstance(exc_info.value, RuntimeError)

# rollback() failure -> OperationalError via same _raise_connection_error path
mock_conn2 = MagicMock()
mock_conn2.rollback.side_effect = RuntimeError("SQLSTATE:08S01:Communication link failure")
mock_conn2.get_autocommit.return_value = False
with patch("mssql_python.connection.ddbc_bindings.Connection", return_value=mock_conn2):
conn2 = connect("Server=testserver;Database=mydb;UID=u;PWD=p;")
with pytest.raises(OperationalError) as exc_info:
conn2.rollback()
assert "Communication link failure" in exc_info.value.ddbc_error
assert not isinstance(exc_info.value, RuntimeError)


def test_truncate_error_message_successful_cases():
"""Test truncate_error_message with valid Microsoft messages for comparison."""

Expand Down
Loading