from collections.abc import Sequence
from typing import Callable, cast
from cffi import FFI
from pyarrow import RecordBatchReader, Schema, Table
from .arrow_odbc import ffi, lib
from .batch_reader_protocol import BatchReaderProtocol
from .buffer import to_bytes_and_len
from .error import raise_on_error
from .pool import enable_odbc_connection_pooling
from .reader import (
DEFAULT_FETCH_BUFFER_LIMIT_IN_BYTES,
DEFAULT_FETCH_BUFFER_LIMIT_IN_ROWS,
BatchReader,
BatchReaderRaii,
)
from .text_encoding import TextEncoding
from .writer import BatchWriter
[docs]
class Connection:
"""
A strong reference to an ODBC connection.
"""
def __init__(self, handle: "FFI.CData") -> None:
self.handle: "FFI.CData" = handle
[docs]
@classmethod
def enable_connection_pooling(cls) -> None:
"""
Activates the connection pooling of the ODBC driver manager for the entire process. Best
called before creating the ODBC environment, i.e. before you the first connection is opend
with arrow-odbc. This is useful in scenarios there you frequently read or write rows and the
overhead of creating a connection for each query is significant.
Example:
.. code-block:: python
from arrow_odbc import Connection
# Let the ODBC driver manager take care of connection pooling for us
Connection.enable_connection_pooling()
# Create the first connection after Connection pooling is enabled
connection_string=
"Driver={ODBC Driver 18 for SQL Server};" \
"Server=localhost;" \
"TrustServerCertificate=yes;"
connection = connect(
connection_string=connection_string,
user="SA",
password="My@Test@Password"
)
"""
enable_odbc_connection_pooling()
[docs]
def read_arrow_batches(
self,
query: str,
batch_size: int = DEFAULT_FETCH_BUFFER_LIMIT_IN_ROWS,
parameters: Sequence[str | None] | None = None,
max_bytes_per_batch: int | None = DEFAULT_FETCH_BUFFER_LIMIT_IN_BYTES,
max_text_size: int | None = None,
max_binary_size: int | None = None,
falliable_allocations: bool = False,
schema: Schema | None = None,
map_schema: Callable[[Schema], Schema] | None = None,
fetch_concurrently: bool = True,
query_timeout_sec: int | None = None,
payload_text_encoding: TextEncoding = TextEncoding.AUTO,
) -> BatchReader:
"""
Execute the query and read the result as an iterator over Arrow batches.
Example:
.. code-block:: python
from arrow_odbc import connect
# Connect to the data source
connection_string=
"Driver={ODBC Driver 18 for SQL Server};" \
"Server=localhost;" \
"TrustServerCertificate=yes;"
connection = connect(
connection_string=connection_string,
user="SA",
password="My@Test@Password"
)
# Execute query and create reader
reader = connection.read_arrow_batches(
query=f"SELECT * FROM MyTable WHERE a=?",
batch_size=1000,
parameters=["I'm a positional query parameter"],
)
# Process results
for batch in reader:
# Process arrow batches
df = batch.to_pandas()
# ...
:param connection: An ODBC connection created with ``connect``.
:param query: The SQL statement yielding the result set which is converted into arrow record
batches.
:param batch_size: The maximum number rows within each batch. The maximum number of rows can
be less if the upper bound defined by ``max_bytes_per_batch`` is lower.
:param parameters: ODBC allows you to use a question mark as placeholder marker (``?``) for
positional parameters. This argument takes a list of parameters those number must match
the number of placholders in the SQL statement. Using this instead of literals helps you
avoid SQL injections or may otherwise simplify your code. Currently all parameters are
passed as VARCHAR strings. You can use `None` to pass `NULL`.
:param max_bytes_per_batch: An upper limit for the total size (all columns) of the buffer
used to transit data from the ODBC driver to the application. Please note that memory
consumption of this buffer is determined not by the actual values, but by the maximum
possible length of an individual row times the number of rows it can hold. Both
``batch_size`` and this parameter define upper bounds for the same buffer. Which ever
bound is lower is used to determine the buffer size.
:param max_text_size: In order for fast bulk fetching to work, `arrow-odbc` needs to know
the size of the largest possible field in each column. It will do so itself
automatically by considering the schema information. However, trouble arises if the
schema contains unbounded variadic fields like `VARCHAR(MAX)` which can hold really
large values. These have a very high upper element size, if any. In order to work with
such schemas we need a limit, of what the an upper bound of the actual values in the
column is, as opposed to the what the largest value is the column could theoretically
store. There is no need for this to be precise, but just knowing that a value would
never exceed 4KiB rather than 2GiB is enough to allow for tremendous efficiency gains.
The size of the text is specified in UTF-8 encoded bytes if using a narrow encoding
(typically all non-windows systems) and in UTF-16 encoded pairs of bytes on systems
using a wide encoding (typically windows). This means about the size in letters, yet if
you are using a lot of emojis or other special characters this number might need to be
larger.
:param max_binary_size: An upper limit for the size of buffers bound to variadic binary
columns of the data source. This limit does not (directly) apply to the size of the
created arrow buffers, but rather applies to the buffers used for the data in transit.
Use this option if you have e.g. VARBINARY(MAX) fields in your database schema. In such
a case without an upper limit, the ODBC driver of your data source is asked for the
maximum size of an element, and is likely to answer with either 0 or a value which is
way larger than any actual entry in the column. If you can not adapt your database
schema, this limit might be what you are looking for. This is the maximum size in bytes
of the binary column.
:param falliable_allocations: If ``True`` an recoverable error is raised in case there is
not enough memory to allocate the buffers. This option may incurr a performance penalty
which scales with the batch size parameter (but not with the amount of actual data in
the source). In case you can test your query against the schema you can safely set this
to ``False``. The required memory will not depend on the amount of data in the data
source. Default is ``True`` though, safety first.
:param schema: Allows you to overwrite the automatically detected schema with one supplied
by the application. Reasons for doing so include domain knowledge you have about the
data which is not reflected in the schema information. E.g. you happen to know a field
of timestamps contains strictly dates. Another reason could be that for certain usecases
another it can make sense to decide the type based on what you want to do with it,
rather than its source. E.g. if you simply want to put everything into a CSV file it can
make perfect sense to fetch everything as string independent of its source type.
:param map_schema: Allows you to provide a custom schema based on the schema inferred from
the metainformation of the query. This would allow you to e.g. map every column type to
string or replace any float32 with a float64, or anything else you might want to
customize, for various reasons while still staying generic over the input schema. If
both ``map_schema`` and ``schema`` are specified ``map_schema`` takes priority.
:param fetch_concurrently: Trade memory for speed. Allocates another transit buffer and use
it to fetch row set groups (aka. batches) from the ODBC data source in a dedicated
system thread, while the main thread converts the previous batch to arrow arrays and
executes the application logic. The transit buffer may be the biggest part of the
required memory so if ``True`` ``arrow-odbc`` consumes almost two times the memory as
compared to false. On the flipsite the next batch can be fetched from the database
immediatly without waiting for the application logic to return control.
:param query_timeout_sec: Use this to limit the time the query is allowed to take, before
responding with data to the application. The driver may replace the number of seconds
you provide with a minimum or maximum value. You can specify ``0``, to deactivate the
timeout, this is the default. For this to work the driver must support this feature.
E.g. PostgreSQL, and Microsoft SQL Server do, but SQLite or MariaDB do not.
:param payload_text_encoding: Controls the encoding used for transferring text data from the
ODBC data source to the application. The resulting Arrow arrays will still be UTF-8
encoded. If you see garbage characters or invalid UTF-8 errors in non-windows
systems, you may want to set the encoding to ``TextEncoding.Utf16``. On windows
systems you may want to set this to ``TextEncoding::Utf8`` to gain performance
benefits, after you have verified that your system locale is set to UTF-8.
:return: A ``BatchReader`` is returned, which implements the iterator protocol and iterates
over individual arrow batches.
"""
reader = BatchReaderRaii()
self._execute(
reader=reader,
query=query,
parameters=parameters,
text_encoding=payload_text_encoding,
query_timeout_sec=query_timeout_sec,
)
if max_text_size is None:
max_text_size = 0
if max_binary_size is None:
max_binary_size = 0
if max_bytes_per_batch is None:
max_bytes_per_batch = 0
# Let us transition to reader state
reader.bind_buffers(
batch_size=batch_size,
max_bytes_per_batch=max_bytes_per_batch,
max_text_size=max_text_size,
max_binary_size=max_binary_size,
falliable_allocations=falliable_allocations,
payload_text_encoding=payload_text_encoding,
schema=schema,
map_schema=map_schema,
fetch_concurrently=fetch_concurrently,
)
return BatchReader(reader)
[docs]
def insert_into_table(
self,
reader: BatchReaderProtocol,
table: str,
chunk_size: int,
):
"""
Consume the batches in the reader and insert them into a table on the database.
Example:
.. code-block:: python
from arrow_odbc import connect
import pyarrow as pa
import pandas
def dataframe_to_table(df):
table = pa.Table.from_pandas(df)
reader = pa.RecordBatchReader.from_batches(table.schema, table.to_batches())
connectiion = connect(
connection_string=connection_string,
user="SA",
password="My@Test@Password"
)
connection.insert_into_table(
table="MyTable",
reader=reader,
chunk_size=1000,
)
:param reader: Reader is used to iterate over record batches. It must expose a `schema`
attribute, referencing an Arrow schema. Each field in the schema must correspond to a
column in the table with identical name. The iterator must yield individual arrow tables
:param table: Name of a database table to insert into. Used to generate the insert statement
for the bulk writer.
:param chunk_size: Number of records to insert in each roundtrip to the database.
Independent of batch size (i.e. number of rows in an individual record batch).
"""
writer = BatchWriter.from_connection(
connection_handle=self.handle,
reader=reader,
chunk_size=chunk_size,
table=table,
)
# Write all batches in reader
for batch in reader:
writer.write_batch(batch)
writer.flush()
[docs]
def from_table_to_db(
self,
source: Table,
target: str,
chunk_size: int = 1000,
):
"""
Reads an arrow table and inserts its contents into a relational table on the database.
This is a convinience wrapper around ``insert_into_table`` which converts an arrow table
into a record batch reader for you.
Example:
.. code-block:: python
from arrow_odbc import connect
import pyarrow as pa
import pandas
def dataframe_to_table(df):
table = pa.Table.from_pandas(df)
connection = connect(
connection_string=connection_string,
user="SA",
password="My@Test@Password"
)
connection.from_table_to_db(
source=table,
target="MyTable",
chunk_size=1000
)
:param source: PyArrow table with content to be inserted into the target table on the
database. Each column of the table must correspond to a column in the target table with
identical name.
:param target: Name of the database table to insert into.
:param chunk_size: Number of records to insert in each roundtrip to the database. The number
will be automatically reduced to the number of rows, if the table is small, in order to
save memory.
"""
# There is no need for chunk size to exceed the maximum amount of rows in the table
chunk_size = min(chunk_size, source.num_rows)
# We implemement this in terms of the functionality to insert a batches from a record batch
# reader, so first we convert our table into a record batch reader.
schema = source.schema
batches = source.to_batches(chunk_size)
reader = RecordBatchReader.from_batches(schema, batches)
# Now we can insert from the reader
self.insert_into_table(
reader=reader,
table=target,
chunk_size=chunk_size,
)
[docs]
def execute(
self,
query: str,
parameters: Sequence[str | None] | None = None,
query_timeout_sec: int | None = None,
payload_text_encoding: TextEncoding = TextEncoding.AUTO,
) -> None:
"""
Execute a SQL statement which does not return a result set, e.g. ``INSERT``, ``UPDATE``,
``DELETE`` or DDL like ``CREATE TABLE``. Any result set the statement might produce is
discarded.
Example:
.. code-block:: python
from arrow_odbc import connect
connection = connect(
connection_string=connection_string,
user="SA",
password="My@Test@Password",
)
connection.execute("CREATE TABLE MyTable (a INTEGER);")
connection.execute("INSERT INTO MyTable (a) VALUES (?);", parameters=["42"])
:param query: The SQL statement to execute.
:param parameters: ODBC allows you to use a question mark as placeholder marker (``?``) for
positional parameters. This argument takes a list of parameters those number must match
the number of placeholders in the SQL statement. Currently all parameters are passed as
VARCHAR strings. You can use ``None`` to pass ``NULL``.
:param query_timeout_sec: Use this to limit the time the query is allowed to take, before
responding to the application. The driver may replace the number of seconds you provide
with a minimum or maximum value. You can specify ``0`` to deactivate the timeout, this
is the default. For this to work the driver must support this feature. E.g. PostgreSQL
and Microsoft SQL Server do, but SQLite or MariaDB do not.
:param payload_text_encoding: Controls the encoding used for the string parameters bound
to the query. If you see garbage characters or invalid UTF-8 errors in non-windows
systems, you may want to set the encoding to ``TextEncoding.Utf16``. On windows
systems you may want to set this to ``TextEncoding::Utf8`` to gain performance
benefits, after you have verified that your system locale is set to UTF-8.
"""
self._execute(
reader=None,
query=query,
parameters=parameters,
text_encoding=payload_text_encoding,
query_timeout_sec=query_timeout_sec,
)
[docs]
def rollback(self) -> None:
"""
Rollback the current transaction. Behavior is only defined in manual commit mode, which can
be set by setting ``autocommit`` to ``False`` when creating the connection.
"""
error = lib.arrow_odbc_connection_rollback(self.handle)
raise_on_error(error)
[docs]
def commit(self) -> None:
"""
Commit the current transaction. Behavior is only defined in manual commit mode, which can
be set by setting ``autocommit`` to ``False`` when creating the connection.
"""
error = lib.arrow_odbc_connection_commit(self.handle)
raise_on_error(error)
def __del__(self):
if self.handle:
lib.arrow_odbc_connection_free(self.handle)
def _execute(
self,
reader: BatchReaderRaii | None,
query: str,
parameters: Sequence[str | None] | None,
text_encoding: TextEncoding,
query_timeout_sec: int | None,
):
query_bytes = query.encode("utf-8")
if parameters is None:
parameters_array = FFI.NULL
parameters_len = 0
encoded_parameters = []
else:
# Check precondition in order to save users some debugging, in case they directly pass a
# non-string argument and do not use a type linter.
if not all([p is None or hasattr(p, "encode") for p in parameters]):
raise TypeError(
"read_arrow_batches_from_odbc only supports string arguments for SQL query "
+ "parameters"
)
parameters_array = ffi.new("ArrowOdbcParameter *[]", len(parameters))
parameters_len = len(parameters)
# Must be kept alive. Within Rust code we only allocate an additional indicator the
# string payload is just referenced.
encoded_parameters = [to_bytes_and_len(p) for p in parameters]
text_encoding_int = text_encoding.value
for p_index in range(0, parameters_len):
(p_bytes, p_len) = encoded_parameters[p_index]
parameters_array[p_index] = lib.arrow_odbc_parameter_string_make(
p_bytes, p_len, text_encoding_int
)
if query_timeout_sec is None:
query_timeout_sec_pointer = ffi.NULL
else:
query_timeout_sec_pointer = ffi.new("uintptr_t *")
query_timeout_sec_pointer[0] = query_timeout_sec
reader_handle = ffi.NULL if reader is None else reader.handle
error = lib.arrow_odbc_connection_execute(
self.handle,
reader_handle,
query_bytes,
len(query_bytes),
parameters_array,
parameters_len,
query_timeout_sec_pointer,
)
raise_on_error(error)
[docs]
def connect(
connection_string: str,
user: str | None = None,
password: str | None = None,
login_timeout_sec: int | None = None,
packet_size: int | None = None,
autocommit: bool = True,
) -> Connection:
"""
Opens a connection to an ODBC data source.
In case you want to use connection pooling, call ``enable_odbc_connection_pooling()`` before
calling this function.
Example:
.. code-block:: python
from arrow_odbc import connect, enable_odbc_connection_pooling
# Let the ODBC driver manager take care of connection pooling for us
enable_odbc_connection_pooling()
# Connect to the data source
connection_string=
"Driver={ODBC Driver 18 for SQL Server};" \
"Server=localhost;" \
"TrustServerCertificate=yes;"
connection = connect(
connection_string=connection_string,
user="SA",
password="My@Test@Password"
)
:param connection_string: ODBC Connection string used to connect to the data source. To find a
connection string for your data source try https://www.connectionstrings.com/.
:param user: Allows for specifying the user seperatly from the connection string if it is not
already part of it. The value will eventually be escaped and attached to the connection
string as `UID`.
:param password: Allows for specifying the password seperatly from the connection string if it
is not already part of it. The value will eventually be escaped and attached to the
connection string as `PWD`.
:param login_timeout_sec: Number of seconds to wait for a login request to complete before
returning to the application. The default is driver-dependent. If ``0``, the timeout is
disabled and a connection attempt will wait indefinitely. If the specified timeout exceeds
the maximum login timeout in the data source, the driver substitutes that value and uses
that instead.
:param packet_size: Specifying the network packet size in bytes. Many ODBC drivers do not
support this option. If the specified size exceeds the maximum packet size or is smaller
than the minimum packet size, the driver substitutes that value and returns SQLSTATE 01S02
(Option value changed).You may want to enable logging to standard error using
``log_to_stderr``.
:param autocommit: If ``True`` the connection is set to autocommit mode, which means that each
individual statement is committed immediately after it is executed. This is the default for
ODBC connections, but some drivers may choose to use manual commit mode by default. If
``False`` the connection is set to manual commit mode. In manual commit mode you need to
explicitly call `commit()` on the connection after executing a statement to make the changes
visible to other connections. If you do not do so, your changes will not be visible to other
connections and will be rolled back when the connection is closed. Setting this parameter to
``True`` ensures that you do not have to worry about this and that your changes are always
visible immediately. Setting it to ``False`` allows you to execute multiple inserts and
queries in the same transaction. Insert performance might also differ based on commit mode.
:return: A ``Connection`` is returned.
"""
connection_string_bytes = connection_string.encode("utf-8")
(user_bytes, user_len) = to_bytes_and_len(user)
(password_bytes, password_len) = to_bytes_and_len(password)
# We use a pointer to pass the login time, so NULL can represent None
if login_timeout_sec is None:
login_timeout_sec_ptr = FFI.NULL
else:
login_timeout_sec_ptr = ffi.new("uint32_t *")
login_timeout_sec_ptr[0] = login_timeout_sec
if packet_size is None:
packet_size_ptr = FFI.NULL
else:
packet_size_ptr = ffi.new("uint32_t *")
packet_size_ptr[0] = packet_size
connection_out = ffi.new("ArrowOdbcConnection **")
error = lib.arrow_odbc_connection_make(
connection_string_bytes,
len(connection_string_bytes),
user_bytes,
user_len,
password_bytes,
password_len,
login_timeout_sec_ptr,
packet_size_ptr,
autocommit,
connection_out,
)
raise_on_error(error)
# Take ownership of the ArrowOdbcConnection. The destructor of Connection will free it.
handle = cast("FFI.CData", connection_out[0])
return Connection(handle=handle)