from collections.abc import Callable
from typing import cast
import pyarrow
from cffi import FFI
from pyarrow import Array, RecordBatch, Schema, StructArray
from pyarrow.cffi import ffi as arrow_ffi
from .arrow_odbc import ffi, lib
from .error import raise_on_error
from .text_encoding import TextEncoding
# Default maximum buffer size for transition buffer. Defaults to 512 MiB.
DEFAULT_FETCH_BUFFER_LIMIT_IN_BYTES = 2**29
# Default maximum row size for transition buffer. Defaults to 65535 rows. This is the maximum row
# size which can be stored in a signed 16 bit integer. This is a reasonably large upper bound for
# batch sizes, yet avoids trouble with ODBC drivers, which use signed 16 bit integers to represent
# some of the indices internally.
DEFAULT_FETCH_BUFFER_LIMIT_IN_ROWS = 65535
def _schema_from_handle(handle: "FFI.CData") -> Schema:
"""
Take a handle to an ArrowOdbcReader and return the associated pyarrow schema
"""
# Expose schema as attribute
# https://github.com/apache/arrow/blob/5ead37593472c42f61c76396dde7dcb8954bde70/python/pyarrow/tests/test_cffi.py
with arrow_ffi.new("struct ArrowSchema *") as schema_out:
error = lib.arrow_odbc_reader_schema(handle, schema_out)
raise_on_error(error)
ptr_schema = int(ffi.cast("uintptr_t", schema_out))
return Schema._import_from_c(ptr_schema)
class BatchReaderRaii:
"""
Takes ownership of the reader in its various states and makes sure its resources are freed if
the object is deleted.
"""
def __init__(self):
reader_out = ffi.new("ArrowOdbcReader **")
lib.arrow_odbc_reader_make(reader_out)
# We take ownership of the corresponding reader written in Rust and keep it alive until
# `self` is deleted.
self.handle: "FFI.CData" = reader_out[0]
def __del__(self):
# Free the resources associated with this handle.
lib.arrow_odbc_reader_free(self.handle)
def schema(self) -> Schema:
return _schema_from_handle(self.handle)
def next_batch(self):
array = arrow_ffi.new("struct ArrowArray *")
schema = arrow_ffi.new("struct ArrowSchema *")
has_next_out = ffi.new("int*")
error = lib.arrow_odbc_reader_next(self.handle, array, schema, has_next_out)
raise_on_error(error)
if has_next_out[0] == 0:
return None
else:
array_ptr = int(ffi.cast("uintptr_t", array))
schema_ptr = int(ffi.cast("uintptr_t", schema))
struct_array = cast(StructArray, Array._import_from_c(array_ptr, schema_ptr))
return RecordBatch.from_struct_array(struct_array)
def bind_buffers(
self,
batch_size: int,
max_bytes_per_batch: int,
max_text_size: int,
max_binary_size: int,
falliable_allocations: bool,
payload_text_encoding: TextEncoding,
schema: Schema | None,
map_schema: Callable[[Schema], Schema] | None,
fetch_concurrently: bool,
):
if map_schema is not None:
schema = map_schema(self.schema())
payload_text_encoding_int = payload_text_encoding.value
ptr_schema = _export_schema_to_c(schema)
error = lib.arrow_odbc_reader_bind_buffers(
self.handle,
batch_size,
max_bytes_per_batch,
max_text_size,
max_binary_size,
falliable_allocations,
fetch_concurrently,
payload_text_encoding_int,
ptr_schema,
)
# See if we managed to execute the query successfully and return an error if not
raise_on_error(error)
def more_results(
self,
) -> bool:
with ffi.new("bool *") as has_more_results_c:
error = lib.arrow_odbc_reader_more_results(
self.handle,
has_more_results_c,
)
# See if we managed to execute the query successfully and return an error if not
raise_on_error(error)
# Remember wether there is a new result set in a boolean
has_more_results = cast(int, has_more_results_c[0]) != 0
return has_more_results
[docs]
class BatchReader:
"""
Iterates over Arrow batches from an ODBC data source
"""
def __init__(self, reader: BatchReaderRaii):
"""
Low level constructor, users should rather invoke `read_arrow_batches_from_odbc` in order to
create instances of `BatchReader`.
"""
# We take ownership of the corresponding reader written in Rust and keep it alive until
# `self` is deleted. We also take care to keep this reader either in empty or reader state,
# meaning we always have it ready to produce batches, or we consumed everything. We avoid
# exposing the intermediate cursor state directly to users.
self.reader: BatchReaderRaii = reader
# This is the schema of the batches returned by reader. We take care to keep it in sync in
# case the state of reader changes.
self.schema: Schema = self.reader.schema()
def __iter__(self):
# Implement iterable protocol so reader can be used in for loops.
return self
def __next__(self) -> RecordBatch:
# Implement iterator protocol
batch = self.reader.next_batch()
if batch is None:
raise StopIteration()
else:
return batch
[docs]
def more_results(
self,
batch_size: int = DEFAULT_FETCH_BUFFER_LIMIT_IN_ROWS,
max_bytes_per_batch: int = DEFAULT_FETCH_BUFFER_LIMIT_IN_BYTES,
max_text_size: int | None = None,
max_binary_size: int | None = None,
falliable_allocations: bool = False,
schema: Schema | None = None,
map_schema: Callable[[Schema], Schema] | None = None,
fetch_concurrently: bool = True,
payload_text_encoding: TextEncoding = TextEncoding.AUTO,
) -> bool:
"""
Move the reader to the next result set returned by the data source.
A datasource may return multiple results if multiple SQL statements are executed in a single
query or a stored procedure is called. This method closes the current cursor and moves it to
the next result set. You may move to the next result set without extracting the current one.
Example:
.. code-block:: python
from arrow_odbc import read_arrow_batches_from_odbc
connection_string=
"Driver={ODBC Driver 18 for SQL Server};" \
"Server=localhost;" \
"TrustServerCertificate=yes;"
reader = read_arrow_batches_from_odbc(
query=f"SELECT * FROM MyTable; SELECT * FROM OtherTable;",
connection_string=connection_string,
batch_size=1000,
user="SA",
password="My@Test@Password",
)
# Process first result
for batch in reader:
# Process arrow batches
df = batch.to_pandas()
# ...
reader.more_results()
# Process second result
for batch in reader:
# Process arrow batches
df = batch.to_pandas()
# ...
:param batch_size: The maximum number rows within each batch. Batch size can be individually
choosen for each result set. The maximum number of rows can be less if the upper bound
defined by ``max_bytes_per_batch`` is lower.
:param max_bytes_per_batch: An upper limit for the total size (all columns) of the buffer
used to transit data from the ODBC driver to the application. Please note that memory
consumption of this buffer is determined not by the actual values, but by the maximum
possible length of an indiviual row times the number of rows it can hold. Both
``batch_size`` and this parameter define upper bounds for the same buffer. Which ever
bound is lower is used to determine the buffer size.
:param max_text_size: In order for fast bulk fetching to work, `arrow-odbc` needs to know the
size of the largest possible field in each column. It will do so itself automatically by
considering the schema information. However, trouble arises if the schema contains
unbounded variadic fields like `VARCHAR(MAX)` which can hold really large values. These
have a very high upper element size, if any. In order to work with such schemas we need
a limit, of what the an upper bound of the actual values in the column is, as opposed to
the what the largest value is the column could theoretically store. There is no need for
this to be precise, but just knowing that a value would never exceed 4KiB rather than
2GiB is enough to allow for tremendous efficiency gains. The size of the text is
specified in UTF-8 encoded bytes if using a narrow encoding (typically all non-windows
systems) and in UTF-16 encoded pairs of bytes on systems using a wide encoding
(typically windows). This means about the size in letters, yet if you are using a lot of
emojis or other special characters this number might need to be larger.
:param max_binary_size: An upper limit for the size of buffers bound to variadic binary
columns of the data source. This limit does not (directly) apply to the size of the
created arrow buffers, but rather applies to the buffers used for the data in transit.
Use this option if you have e.g. VARBINARY(MAX) fields in your next batch.
:param falliable_allocations: If ``True`` an recoverable error is raised in case there is
not enough memory to allocate the buffers. This option may incurr a performance penalty
which scales with the batch size parameter (but not with the amount of actual data in
the source). In case you can test your query against the schema you can safely set this
to ``False``. The required memory will not depend on the amount of data in the data
source. Default is ``True`` though, safety first.
:param fetch_concurrently: Trade memory for speed. Allocates another transit buffer and use
it to fetch row set groups (aka. batches) from the ODBC data source in a dedicated
system thread, while the main thread converts the previous batch to arrow arrays and
executes the application logic. The transit buffer may be the biggest part of the
required memory so if ``True`` ``arrow-odbc`` consumes almost two times the memory as
compared to false. On the flipsite the next batch can be fetched from the database
immediatly without waiting for the application logic to return control.
:param payload_text_encoding: Controls the encoding used for transferring text data from the
ODBC data source to the application. The resulting Arrow arrays will still be UTF-8
encoded. If you see garbage characters or invalid UTF-8 errors in non-windows systems,
you may want to set the encoding to ``TextEncoding.Utf16``. On windows systems you may
want to set this to ``TextEncoding::Utf8`` to gain performance benefits, after you have
verified that your system locale is set to UTF-8.
:return: ``True`` in case there is another result set. ``False`` in case that the last
result set has been processed.
"""
if max_text_size is None:
max_text_size = 0
if max_binary_size is None:
max_binary_size = 0
has_more_results = self.reader.more_results()
self.reader.bind_buffers(
batch_size=batch_size,
max_bytes_per_batch=max_bytes_per_batch,
max_text_size=max_text_size,
max_binary_size=max_binary_size,
falliable_allocations=falliable_allocations,
payload_text_encoding=payload_text_encoding,
schema=schema,
map_schema=map_schema,
fetch_concurrently=fetch_concurrently,
)
# Every result set can have its own schema, so we must update our member
self.schema = self.reader.schema()
return has_more_results
[docs]
def into_pyarrow_record_batch_reader(self):
"""
Converts the ``arrow-odbc`` ``BatchReader`` into a ``pyarrow`` ``RecordBatchReader``. This
method fully passes ownership to the new reader and leaves ``self`` empty.
``arrow-odbc``s BatchReader interface offers some functionality specific to ODBC
datasources. E.g. the ability to move to the next result set of a stored procedure. You may
not need this extra functionality and would rather like to integrate the ``BatchReader``
with other libraries like e.g. DuckDB. In order to do this you can use this method to
convert the ``arrow-odbc`` BatchReader into a ``pyarrow`` ``RecordBatchReader``.
"""
# New empty tmp reader
reader = BatchReaderRaii()
tmp = BatchReader(reader)
# Swap self and tmp
tmp.reader, self.reader = self.reader, tmp.reader
tmp.schema, self.schema = self.schema, tmp.schema
return pyarrow.RecordBatchReader.from_batches(tmp.schema, tmp)
def _export_schema_to_c(schema: Schema | None) -> "FFI.CData":
if schema is None:
ptr_schema = ffi.NULL
else:
ptr_schema = arrow_ffi.new("struct ArrowSchema *")
int_schema = int(ffi.cast("uintptr_t", ptr_schema))
schema._export_to_c(int_schema)
return ptr_schema