from typing import List, Optional, Callable
from cffi.api import FFI # type: ignore
import pyarrow
from pyarrow.cffi import ffi as arrow_ffi # type: ignore
from pyarrow import RecordBatch, Schema, Array # type: ignore
from arrow_odbc.connect import to_bytes_and_len, connect_to_database # type: ignore
from .arrow_odbc import ffi, lib # type: ignore
from .error import raise_on_error
DEFAULT_FETCH_BUFFER_LIMIT_IN_BYTES = 2**29
def _schema_from_handle(handle) -> Schema:
"""
Take a handle to an ArrowOdbcReader and return the associated pyarrow schema
"""
# Expose schema as attribute
# https://github.com/apache/arrow/blob/5ead37593472c42f61c76396dde7dcb8954bde70/python/pyarrow/tests/test_cffi.py
with arrow_ffi.new("struct ArrowSchema *") as schema_out:
error = lib.arrow_odbc_reader_schema(handle, schema_out)
raise_on_error(error)
ptr_schema = int(ffi.cast("uintptr_t", schema_out))
return Schema._import_from_c(ptr_schema)
class _BatchReaderRaii:
"""
Takes ownership of the reader in its various states and makes sure its resources are freed if
the object is deleted.
"""
def __init__(self):
reader_out = ffi.new("ArrowOdbcReader **")
lib.arrow_odbc_reader_make_empty(reader_out)
# We take ownership of the corresponding reader written in Rust and keep it alive until
# `self` is deleted.
self.handle = reader_out[0]
def __del__(self):
# Free the resources associated with this handle.
lib.arrow_odbc_reader_free(self.handle)
def schema(self) -> Schema:
return _schema_from_handle(self.handle)
def next_batch(self):
array = arrow_ffi.new("struct ArrowArray *")
schema = arrow_ffi.new("struct ArrowSchema *")
has_next_out = ffi.new("int*")
error = lib.arrow_odbc_reader_next(self.handle, array, schema, has_next_out)
raise_on_error(error)
if has_next_out[0] == 0:
return None
else:
array_ptr = int(ffi.cast("uintptr_t", array))
schema_ptr = int(ffi.cast("uintptr_t", schema))
struct_array = Array._import_from_c(array_ptr, schema_ptr)
return RecordBatch.from_struct_array(struct_array)
def into_concurrent(self):
error = lib.arrow_odbc_reader_into_concurrent(self.handle)
raise_on_error(error)
def query(
self,
query: str,
connection_string: str,
user: Optional[str],
password: Optional[str],
parameters: Optional[List[Optional[str]]],
login_timeout_sec: Optional[int],
packet_size: Optional[int],
):
query_bytes = query.encode("utf-8")
if parameters is None:
parameters_array = FFI.NULL
parameters_len = 0
encoded_parameters = []
else:
# Check precondition in order to save users some debugging, in case they directly pass a
# non-string argument and do not use a type linter.
if not all([p is None or hasattr(p, "encode") for p in parameters]):
raise TypeError(
"read_arrow_batches_from_odbc only supports string arguments for SQL query "
"parameters"
)
parameters_array = ffi.new("ArrowOdbcParameter *[]", len(parameters))
parameters_len = len(parameters)
# Must be kept alive. Within Rust code we only allocate an additional indicator the string
# payload is just referenced.
encoded_parameters = [to_bytes_and_len(p) for p in parameters]
connection = connect_to_database(
connection_string, user, password, login_timeout_sec, packet_size
)
# Connecting to the database has been successful. Note that connection does not truly take
# ownership of the connection. If it runs out of scope (e.g. due to a raised exception) the
# connection would not be closed and its associated resources would not be freed.
# However, this is fine since everything from here on out until we call
# arrow_odbc_reader_make is infalliable. arrow_odbc_reader_query will truly take ownership
# of the connection. Even if it should fail, it will be closed correctly.
for p_index in range(0, parameters_len):
(p_bytes, p_len) = encoded_parameters[p_index]
parameters_array[p_index] = lib.arrow_odbc_parameter_string_make(p_bytes, p_len)
error = lib.arrow_odbc_reader_query(
self.handle,
connection,
query_bytes,
len(query_bytes),
parameters_array,
parameters_len,
)
# See if we managed to execute the query successfully and return an error if not
raise_on_error(error)
def bind_buffers(
self,
batch_size: int,
max_bytes_per_batch: int,
max_text_size: int,
max_binary_size: int,
falliable_allocations: bool = False,
schema: Optional[Schema] = None,
map_schema: Optional[Callable[[Schema], Schema]] = None,
):
if map_schema is not None:
schema = map_schema(self.schema())
ptr_schema = _export_schema_to_c(schema)
error = lib.arrow_odbc_reader_bind_buffers(
self.handle,
batch_size,
max_bytes_per_batch,
max_text_size,
max_binary_size,
falliable_allocations,
ptr_schema,
)
# See if we managed to execute the query successfully and return an error if not
raise_on_error(error)
def more_results(
self,
) -> bool:
with ffi.new("bool *") as has_more_results_c:
error = lib.arrow_odbc_reader_more_results(
self.handle,
has_more_results_c,
)
# See if we managed to execute the query successfully and return an error if not
raise_on_error(error)
# Remember wether there is a new result set in a boolean
has_more_results = has_more_results_c[0] != 0
return has_more_results
[docs]
class BatchReader:
"""
Iterates over Arrow batches from an ODBC data source
"""
def __init__(self, reader: _BatchReaderRaii):
"""
Low level constructor, users should rather invoke `read_arrow_batches_from_odbc` in order to
create instances of `BatchReader`.
"""
# We take ownership of the corresponding reader written in Rust and keep it alive until
# `self` is deleted. We also take care to keep this reader either in empty or reader state,
# meaning we always have it ready to produce batches, or we consumed everything. We avoid
# exposing the intermediate cursor state directly to users.
self.reader = reader
# This is the schema of the batches returned by reader. We take care to keep it in sync in
# case the state of reader changes.
self.schema = self.reader.schema()
def __iter__(self):
# Implement iterable protocol so reader can be used in for loops.
return self
def __next__(self) -> RecordBatch:
# Implement iterator protocol
batch = self.reader.next_batch()
if batch is None:
raise StopIteration()
else:
return batch
[docs]
def more_results(
self,
batch_size: int = 65535,
max_bytes_per_batch: int = DEFAULT_FETCH_BUFFER_LIMIT_IN_BYTES,
max_text_size: Optional[int] = None,
max_binary_size: Optional[int] = None,
falliable_allocations: bool = False,
schema: Optional[Schema] = None,
map_schema: Optional[Callable[[Schema], Schema]] = None,
) -> bool:
"""
Move the reader to the next result set returned by the data source.
A datasource may return multiple results if multiple SQL statements are executed in a single
query or a stored procedure is called. This method closes the current cursor and moves it to
the next result set. You may move to the next result set without extracting the current one.
Example:
.. code-block:: python
from arrow_odbc import read_arrow_batches_from_odbc
connection_string="Driver={ODBC Driver 17 for SQL Server};Server=localhost;"
reader = read_arrow_batches_from_odbc(
query=f"SELECT * FROM MyTable; SELECT * FROM OtherTable;",
connection_string=connection_string,
batch_size=1000,
user="SA",
password="My@Test@Password",
)
# Process first result
for batch in reader:
# Process arrow batches
df = batch.to_pandas()
# ...
reader.more_results()
# Process second result
for batch in reader:
# Process arrow batches
df = batch.to_pandas()
# ...
:param batch_size: The maximum number rows within each batch. Batch size can be individually
choosen for each result set. The maximum number of rows can be less if the upper bound
defined by ``max_bytes_per_batch`` is lower.
:param max_bytes_per_batch: An upper limit for the total size (all columns) of the buffer
used to transit data from the ODBC driver to the application. Please note that memory
consumption of this buffer is determined not by the actual values, but by the maximum
possible length of an indiviual row times the number of rows it can hold. Both
``batch_size`` and this parameter define upper bounds for the same buffer. Which ever
bound is lower is used to determine the buffer size.
:param max_text_size: An upper limit for the size of buffers bound to variadic text columns
of the data source. This limit does not (directly) apply to the size of the created
arrow buffers, but rather applies to the buffers used for the data in transit. Use this
option if you have e.g. VARCHAR(MAX) fields in your database schema. In such columns
without an upper limit, the ODBC driver of your data source is asked for the maximum
size of an element, and is likely to answer with either 0 or a value which is way larger
than any actual entry in the column If you can not adapt your database schema, this
limit might be what you are looking for. On windows systems the size is double words
(16Bit), as windows utilizes an UTF-16 encoding. So this translates to roughly the size
in letters. On non windows systems this is the size in bytes and the datasource is
assumed to utilize an UTF-8 encoding. ``None`` means no upper limit is set and the
maximum element size, reported by ODBC is used to determine buffer sizes. Lower values
result in better memory utilization and can significantly lower the number of bytes
needed per row. Higher values allow for larger values to go through without truncation.
:param max_binary_size: An upper limit for the size of buffers bound to variadic binary
columns of the data source. This limit does not (directly) apply to the size of the
created arrow buffers, but rather applies to the buffers used for the data in transit.
Use this option if you have e.g. VARBINARY(MAX) fields in your next batch.
:param falliable_allocations: If ``True`` an recoverable error is raised in case there is
not enough memory to allocate the buffers. This option may incurr a performance penalty
which scales with the batch size parameter (but not with the amount of actual data in
the source). In case you can test your query against the schema you can safely set this
to ``False``. The required memory will not depend on the amount of data in the data
source. Default is ``True`` though, safety first.
:return: ``True`` in case there is another result set. ``False`` in case that the last
result set has been processed.
"""
if max_text_size is None:
max_text_size = 0
if max_binary_size is None:
max_binary_size = 0
has_more_results = self.reader.more_results()
self.reader.bind_buffers(
batch_size=batch_size,
max_bytes_per_batch=max_bytes_per_batch,
max_text_size=max_text_size,
max_binary_size=max_binary_size,
falliable_allocations=falliable_allocations,
schema=schema,
map_schema=map_schema,
)
# Every result set can have its own schema, so we must update our member
self.schema = self.reader.schema()
return has_more_results
[docs]
def into_pyarrow_record_batch_reader(self):
"""
Converts the ``arrow-odbc`` ``BatchReader`` into a ``pyarrow`` ``RecordBatchReader``. This
method fully passes ownership to the new reader and leaves ``self`` empty.
``arrow-odbc``s BatchReader interface offers some functionality specific to ODBC
datasources. E.g. the ability to move to the next result set of a stored procedure. You may
not need this extra functionality and would rather like to integrate the ``BatchReader``
with other libraries like e.g. DuckDB. In order to do this you can use this method to
convert the ``arrow-odbc`` BatchReader into a ``pyarrow`` ``RecordBatchReader``.
"""
# New empty tmp reader
reader = _BatchReaderRaii()
tmp = BatchReader(reader)
# Swap self and tmp
tmp.reader, self.reader = self.reader, tmp.reader
tmp.schema, self.schema = self.schema, tmp.schema
return pyarrow.RecordBatchReader.from_batches(tmp.schema, tmp)
[docs]
def fetch_concurrently(self):
"""
Allocate another transit buffer and use it to fetch row set groups (aka. batches) from the
ODBC data source in a dedicated system thread, while the main thread converts the previous
batch to arrow arrays and executes the application logic.
If you extract more than one result set from the cursor, you need to call these method for
each result set you want to extract concurrently. This has been done so it is possible to
skip result sets without worrying about the fetching thread to start fetching row groups
from a result set you intended to skip.
Calling this method on an already concurrent reader has no effect.
Example:
.. code-block:: python
from arrow_odbc import read_arrow_batches_from_odbc
connection_string="Driver={ODBC Driver 17 for SQL Server};Server=localhost;"
reader = read_arrow_batches_from_odbc(
query=f"SELECT * FROM MyTable,
connection_string=connection_string,
batch_size=1000,
user="SA",
password="My@Test@Password",
)
# Trade memory for speed. For the price of an additional transit buffer and a native
# system thread we fetch batches now concurrent to our application logic.
reader.fetch_concurrently()
for batch in reader:
# Process arrow batches
df = batch.to_pandas()
# ...
"""
try:
self.reader.into_concurrent()
except Exception:
# Making a reader concurrent will not change its schema, yet if there is an error the
# reader is destroyed and its schema is empty.
# self.schema == self.reader.schema()
# should always be true and so asigning it never would make the code incorrect. Yet we
# only need to do so if it actually changes.
self.schema = self.reader.schema()
raise
[docs]
def read_arrow_batches_from_odbc(
query: str,
connection_string: str,
batch_size: int = 65535,
user: Optional[str] = None,
password: Optional[str] = None,
parameters: Optional[List[Optional[str]]] = None,
max_bytes_per_batch: Optional[int] = DEFAULT_FETCH_BUFFER_LIMIT_IN_BYTES,
max_text_size: Optional[int] = None,
max_binary_size: Optional[int] = None,
falliable_allocations: bool = False,
login_timeout_sec: Optional[int] = None,
packet_size: Optional[int] = None,
schema: Optional[Schema] = None,
map_schema: Optional[Callable[[Schema], Schema]] = None,
) -> BatchReader:
"""
Execute the query and read the result as an iterator over Arrow batches.
Example:
.. code-block:: python
from arrow_odbc import read_arrow_batches_from_odbc
connection_string="Driver={ODBC Driver 17 for SQL Server};Server=localhost;"
reader = read_arrow_batches_from_odbc(
query=f"SELECT * FROM MyTable WHERE a=?",
connection_string=connection_string,
batch_size=1000,
parameters=["I'm a positional query parameter"],
user="SA",
password="My@Test@Password",
)
for batch in reader:
# Process arrow batches
df = batch.to_pandas()
# ...
:param query: The SQL statement yielding the result set which is converted into arrow record
batches.
:param batch_size: The maximum number rows within each batch. The maximum number of rows can be
less if the upper bound defined by ``max_bytes_per_batch`` is lower.
:param connection_string: ODBC Connection string used to connect to the data source. To find a
connection string for your data source try https://www.connectionstrings.com/.
:param user: Allows for specifying the user seperatly from the connection string if it is not
already part of it. The value will eventually be escaped and attached to the connection
string as `UID`.
:param password: Allows for specifying the password seperatly from the connection string if it
is not already part of it. The value will eventually be escaped and attached to the
connection string as `PWD`.
:param parameters: ODBC allows you to use a question mark as placeholder marker (``?``) for
positional parameters. This argument takes a list of parameters those number must match the
number of placholders in the SQL statement. Using this instead of literals helps you avoid
SQL injections or may otherwise simplify your code. Currently all parameters are passed as
VARCHAR strings. You can use `None` to pass `NULL`.
:param max_bytes_per_batch: An upper limit for the total size (all columns) of the buffer used
to transit data from the ODBC driver to the application. Please note that memory consumption
of this buffer is determined not by the actual values, but by the maximum possible length of
an indiviual row times the number of rows it can hold. Both ``batch_size`` and this
parameter define upper bounds for the same buffer. Which ever bound is lower is used to
determine the buffer size.
:param max_text_size: An upper limit for the size of buffers bound to variadic text columns of
the data source. This limit does not (directly) apply to the size of the created arrow
buffers, but rather applies to the buffers used for the data in transit. Use this option if
you have e.g. VARCHAR(MAX) fields in your database schema. In such columns without an upper
limit, the ODBC driver of your data source is asked for the maximum size of an element, and
is likely to answer with either 0 or a value which is way larger than any actual entry in
the column If you can not adapt your database schema, this limit might be what you are
looking for. On windows systems the size is double words (16Bit), as windows utilizes an
UTF-16 encoding. So this translates to roughly the size in letters. On non windows systems
this is the size in bytes and the datasource is assumed to utilize an UTF-8 encoding.
``None`` means no upper limit is set and the maximum element size, reported by ODBC is used
to determine buffer sizes. Lower values result in better memory utilization and can
significantly lower the number of bytes needed per row. Higher values allow for larger
values to go through without truncation.
:param max_binary_size: An upper limit for the size of buffers bound to variadic binary columns
of the data source. This limit does not (directly) apply to the size of the created arrow
buffers, but rather applies to the buffers used for the data in transit. Use this option if
you have e.g. VARBINARY(MAX) fields in your database schema. In such a case without an upper
limit, the ODBC driver of your data source is asked for the maximum size of an element, and
is likely to answer with either 0 or a value which is way larger than any actual entry in
the column. If you can not adapt your database schema, this limit might be what you are
looking for. This is the maximum size in bytes of the binary column.
:param falliable_allocations: If ``True`` an recoverable error is raised in case there is not
enough memory to allocate the buffers. This option may incurr a performance penalty which
scales with the batch size parameter (but not with the amount of actual data in the source).
In case you can test your query against the schema you can safely set this to ``False``. The
required memory will not depend on the amount of data in the data source. Default is
``True`` though, safety first.
:param login_timeout_sec: Number of seconds to wait for a login request to complete before
returning to the application. The default is driver-dependent. If ``0``, the timeout is
disabled and a connection attempt will wait indefinitely. If the specified timeout exceeds
the maximum login timeout in the data source, the driver substitutes that value and uses
that instead.
:param packet_size: Specifying the network packet size in bytes. Many ODBC drivers do not
support this option. If the specified size exceeds the maximum packet size or is smaller
than the minimum packet size, the driver substitutes that value and returns SQLSTATE 01S02
(Option value changed).You may want to enable logging to standard error using
``log_to_stderr``.
:param schema: Allows you to overwrite the automatically detected schema with one supplied by
the application. Reasons for doing so include domain knowledge you have about the data which
is not reflected in the schema information. E.g. you happen to know a field of timestamps
contains strictly dates. Another reason could be that for certain usecases another it can
make sense to decide the type based on what you want to do with it, rather than its source.
E.g. if you simply want to put everything into a CSV file it can make perfect sense to fetch
everything as string independent of its source type.
:param map_schema: Allows you to provide a custom schema based on the schema inferred from the
metainformation of the query. This would allow you to e.g. map every column type to string
or replace any float32 with a float64, or anything else you might want to customize, for
various reasons while still staying generic over the input schema. If both ``map_schema``
and ``schema`` are specified ``map_schema`` takes priority.
:return: A ``BatchReader`` is returned, which implements the iterator protocol and iterates over
individual arrow batches.
"""
reader = _BatchReaderRaii()
reader.query(
query=query,
connection_string=connection_string,
user=user,
password=password,
parameters=parameters,
login_timeout_sec=login_timeout_sec,
packet_size=packet_size,
)
if max_text_size is None:
max_text_size = 0
if max_binary_size is None:
max_binary_size = 0
if max_bytes_per_batch is None:
max_bytes_per_batch = 0
# Let us transition to reader state
reader.bind_buffers(
batch_size=batch_size,
max_bytes_per_batch=max_bytes_per_batch,
max_text_size=max_text_size,
max_binary_size=max_binary_size,
falliable_allocations=falliable_allocations,
schema=schema,
map_schema=map_schema,
)
return BatchReader(reader)
def _export_schema_to_c(schema):
if schema is None:
ptr_schema = ffi.NULL
else:
ptr_schema = arrow_ffi.new("struct ArrowSchema *")
int_schema = int(ffi.cast("uintptr_t", ptr_schema))
schema._export_to_c(int_schema)
return ptr_schema