from typing import cast
from cffi import FFI
from pyarrow import RecordBatch
from pyarrow.cffi import ffi as arrow_ffi
from .arrow_odbc import ffi, lib
from .batch_reader_protocol import BatchReaderProtocol
from .error import raise_on_error
[docs]
class BatchWriter:
"""
Writes arrow batches to a database table.
"""
def __init__(
self,
handle: "FFI.CData",
):
# We take ownership of the corresponding writer written in Rust and keep it alive until
# `self` is deleted
self.handle: "FFI.CData" = handle
def __del__(self):
# Free the resources associated with this handle.
lib.arrow_odbc_writer_free(self.handle)
[docs]
@classmethod
def from_connection(
cls,
connection_handle: "FFI.CData",
reader: BatchReaderProtocol,
chunk_size: int,
table: str,
):
"""
Create a ``BatchWriter`` from a connection handle and a ``RecordBatchReader`` or
``BatchReader``.
This is a low-level constructor. Use :meth:``Connection.insert_into_table`` instead.
"""
table_bytes = table.encode("utf-8")
# Allocate structures where we will export the Array data and the Array schema. They will be
# released when we exit the with block.
with arrow_ffi.new("struct ArrowSchema*") as c_schema:
# Get the references to the C Data structures.
c_schema_ptr = int(arrow_ffi.cast("uintptr_t", c_schema))
# Export the schema to the C Data structures.
reader.schema._export_to_c(c_schema_ptr)
writer_out = ffi.new("ArrowOdbcWriter **")
error = lib.arrow_odbc_writer_make(
connection_handle,
table_bytes,
len(table_bytes),
chunk_size,
c_schema,
writer_out,
)
raise_on_error(error)
handle = cast(FFI.CData, writer_out[0])
return BatchWriter(handle=handle)
[docs]
def write_batch(self, batch: RecordBatch):
"""
Fills the internal buffers of the writer with data from the batch. Every
time they are full, the data is send to the database. To make sure all
the data is is send ``flush`` must be called.
"""
with (
arrow_ffi.new("struct ArrowArray*") as c_array,
arrow_ffi.new("struct ArrowSchema*") as c_schema,
):
# Get the references to the C Data structures
c_array_ptr = int(arrow_ffi.cast("uintptr_t", c_array))
c_schema_ptr = int(arrow_ffi.cast("uintptr_t", c_schema))
# Export the Array to the C Data structures.
batch._export_to_c(c_array_ptr)
batch.schema._export_to_c(c_schema_ptr)
error = lib.arrow_odbc_writer_write_batch(self.handle, c_array, c_schema)
raise_on_error(error)
[docs]
def flush(self):
"""
Inserts the remaining rows of the last chunk to the database.
"""
error = lib.arrow_odbc_writer_flush(self.handle)
raise_on_error(error)