From e601ea00bf044fd1ac48db859c2c8491f45fda3f Mon Sep 17 00:00:00 2001
From: Constantin Michael <constantin@codesynthesis.com>
Date: Wed, 14 Sep 2011 15:26:43 +0200
Subject: Corrections and additions to LOB parameter and results callback
 implementation

---
 odb/oracle/oracle-types.hxx |  50 ++++++-----
 odb/oracle/statement.cxx    | 209 ++++++++++++++++++++++++++++----------------
 odb/oracle/statement.hxx    |   4 +-
 3 files changed, 165 insertions(+), 98 deletions(-)

diff --git a/odb/oracle/oracle-types.hxx b/odb/oracle/oracle-types.hxx
index 5911d4f..4d347d0 100644
--- a/odb/oracle/oracle-types.hxx
+++ b/odb/oracle/oracle-types.hxx
@@ -19,25 +19,35 @@ namespace odb
     {
       ub2 type;       // The type stored by buffer. This must be an external
                       // OCI type identifier of the form SQLT_XXX.
-      void* buffer;   // Image buffer pointer. If this bind is associated with
-                      // an output LOB column, interpret as an OCILobLocator*.
-      ub2* size;      // The number of bytes in buffer. Due to
-                      // inconsistencies in the OCI interface, this will be
-                      // interpreted as a ub4 when callbacks are in use.
-      sb4 capacity;   // The maximum number of bytes that can be stored in
+      void* buffer;   // Data buffer pointer.
+      ub2* size;      // The number of bytes in buffer. When parameter
+                      // callbacks are in use, this is interpreted as a ub4*
+                      // indicating the current position. When 'type'
+                      // specifies a LOB type, this is interpreted as an
+                      // OCILobLocator*.
+      ub4 capacity;   // The maximum number of bytes that can be stored in
                       // buffer.
       sb2* indicator; // Pointer to an OCI indicator variable.
 
+      enum piece
+      {
+        whole,
+        first,
+        next,
+        last
+      };
+
       // Callback function signature used to specify LOB parameters that are
       // passed to the database.
       //
       typedef bool (*param_callback_type) (
-        void** context,    // [in/out] The user context.
+        void* context,     // [in/out] The user context.
         void** buffer,     // [out] On return, a pointer to a buffer containing
                            // parameter data.
         ub4* size,         // [out] The parameter data length in bytes.
-        bool* last,        // [out] True if buffer contains the last piece of
-                           // data.
+        ub4* position,     // [in/out] A position context. This value remains
+                           // unchanged between callback invocations.
+        piece*,            // [out] The piece type for this segment of data.
         void* temp_buffer, // [in] A temporary buffer that may be used if
                            // required. The 'buffer' parameter should specify
                            // this buffer on return if it is used.
@@ -47,25 +57,21 @@ namespace odb
       // the database.
       //
       typedef bool (*result_callback_type) (
-        void** context, // [in/out] The user context.
-        void* buffer,   // [in] A buffer containing the result data.
-        ub4 size,       // [in] The result data length in bytes.
-        bool last);     // [in] True if this is the last piece of result data.
+        void* context, // [in/out] The user context.
+        void* buffer,  // [in] A buffer containing the result data.
+        ub4 size,      // [in] The result data length in bytes.
+        piece);        // [in] The piece type for this segment of data.
 
-      param_callback_type param_callback;
-      result_callback_type result_callback;
+      union
+      {
+        param_callback_type param;
+        result_callback_type result;
+      } callback;
 
       // This pointer is provided to the user through the 'context' parameter
       // in both parameter and result callback functions.
       //
       void* callback_context;
-
-      // This flag is used exclusively during parameter callback invocation.
-      // If set, it indicates that this is the first time the callback is
-      // being invoked for this bind instance, and thus OCI is requesting
-      // the first piece of parameter data.
-      //
-      bool first_piece;
     };
   }
 }
diff --git a/odb/oracle/statement.cxx b/odb/oracle/statement.cxx
index 4ab8af1..32b015a 100644
--- a/odb/oracle/statement.cxx
+++ b/odb/oracle/statement.cxx
@@ -8,7 +8,9 @@
 #include <oci.h>
 
 #include <odb/exceptions.hxx> // object_not_persistent
+#include <odb/details/unused.hxx>
 
+#include <odb/oracle/database.hxx>
 #include <odb/oracle/statement.hxx>
 #include <odb/oracle/connection.hxx>
 #include <odb/oracle/error.hxx>
@@ -32,26 +34,47 @@ namespace odb
     {
       bind& b (*static_cast<bind*> (context));
 
-      if (b.indicator != -1)
+      // Only callback to user if the parameter is not NULL.
+      //
+      if (*b.indicator != -1)
       {
-        bool last (false);
-        if (!(*b.param_callback) (&b.callback_context,
+        bind::piece p;
+
+        if (!(*b.callback.param) (&b.callback_context,
                                   buffer,
                                   len,
-                                  &last,
+                                  reinterpret_cast<ub4*> (b.size),
+                                  &p,
                                   b.buffer,
                                   b.capacity))
           return OCI_ERROR;
 
-        if (b.first_piece)
+        switch (p)
         {
-          b.first_piece = false;
-          *piece = last ? OCI_ONE_PIECE : OCI_FIRST_PIECE;
+        case bind::whole:
+          {
+            *piece = OCI_ONE_PIECE;
+            break;
+          }
+        case bind::first:
+          {
+            *piece = OCI_FIRST_PIECE;
+            break;
+          }
+        case bind::next:
+          {
+            *piece = OCI_NEXT_PIECE;
+            break;
+          }
+        case bind::last:
+          {
+            *piece = OCI_LAST_PIECE;
+            break;
+          }
+        default:
+          assert (0);
+          return OCI_ERROR;
         }
-        else if (last)
-          *piece = OCI_LAST_PIECE;
-        else
-          *piece = OCI_NEXT_PIECE;
       }
       else
         *piece = OCI_ONE_PIECE;
@@ -61,18 +84,6 @@ namespace odb
       return OCI_CONTINUE;
     }
 
-    static sb4
-    lob_result_callback_proxy (void* context,
-                               const void* buffer,
-                               ub8 len,
-                               ub1 piece,
-                               void** new_buffer,
-                               ub8* new_len)
-    {
-      bind& b (*static_cast<bind*> (context));
-      return OCI_CONTINUE;
-    }
-
     //
     // statement
     //
@@ -112,9 +123,8 @@ namespace odb
 
       for (size_t e (o + c); o < e; ++c, ++b)
       {
-        bool callback ((b->type == SQLT_BLOB ||
-                        b->type == SQLT_CLOB ||
-                        b->type == SQLT_NCLOB) && b->param_callback != 0);
+        bool callback ((b->type == SQLT_BLOB || b->type == SQLT_CLOB) &&
+                       b->callback.param != 0);
 
         OCIBind* h (0);
 
@@ -123,7 +133,7 @@ namespace odb
                                err,
                                o,
                                b->buffer,
-                               b->capacity,
+                               static_cast<sb4> (b->capacity),
                                b->type,
                                b->indicator,
                                b->size,
@@ -146,8 +156,10 @@ namespace odb
     }
 
     void statement::
-    bind_result (bind* b, size_t c)
+    bind_result (bind* b, size_t c, size_t p)
     {
+      ODB_POTENTIALLY_UNUSED (p);
+
       OCIError* err (conn_.error_handle ());
 
       // The parameter position in OCIDefineByPos is specified as a 1-based
@@ -155,30 +167,14 @@ namespace odb
       //
       for (size_t i (1); i <= c; ++i, ++b)
       {
-        if (b->type == SQLT_BLOB ||
-            b->type == SQLT_CLOB ||
-            b->type == SQLT_NCLOB)
+        if (b->type == SQLT_BLOB || b->type == SQLT_CLOB)
         {
-          auto_descriptor<OCILobLocator> locator;
-          {
-            sword r (OCIDescriptorAlloc (conn_.db ().environment (),
-                                         &b->buffer,
-                                         OCI_DTYPE_LOB,
-                                         0,
-                                         0));
-
-            if (r != OCI_SUCCESS)
-              throw bad_alloc ();
-
-            locator.reset (static_cast<OCILobLocator*> (b->buffer));
-          }
-
           OCIDefine* h (0);
           sword r (OCIDefineByPos (stmt_,
                                    &h,
                                    err,
                                    i,
-                                   &locator,
+                                   reinterpret_cast<OCILobLocator*> (b->size),
                                    sizeof (OCILobLocator*),
                                    b->type,
                                    b->indicator,
@@ -189,7 +185,37 @@ namespace odb
           if (r == OCI_ERROR || r == OCI_INVALID_HANDLE)
             translate_error (err, r);
 
-          locator.reset ();
+          // @@ LOB prefetching is only supported in OCI version 11.1 and
+          // greater and in Oracle server 11.1 and greater. If this code is
+          // called against a pre 11.1 server, the call to OCIAttrSet will
+          // return an error code.
+          //
+#if (OCI_MAJOR_VERSION == 11 && OCI_MINOR_VERSION >= 1) \
+  || OCI_MAJOR_VERSION > 11
+          if (p != 0)
+          {
+            r = OCIAttrSet (h,
+                            OCI_HTYPE_DEFINE,
+                            &p,
+                            0,
+                            OCI_ATTR_LOBPREFETCH_SIZE,
+                            err);
+
+            if (r == OCI_ERROR || r == OCI_INVALID_HANDLE)
+              translate_error (err, r);
+
+            bool b (true);
+            r = OCIAttrSet (h,
+                            OCI_HTYPE_DEFINE,
+                            &b,
+                            0,
+                            OCI_ATTR_LOBPREFETCH_LENGTH,
+                            err);
+
+            if (r == OCI_ERROR || r == OCI_INVALID_HANDLE)
+              translate_error (err, r);
+          }
+#endif
         }
         else
         {
@@ -199,7 +225,7 @@ namespace odb
                                    err,
                                    i,
                                    b->buffer,
-                                   b->capacity,
+                                   static_cast<sb4> (b->capacity),
                                    b->type,
                                    b->indicator,
                                    b->size,
@@ -237,7 +263,7 @@ namespace odb
           done_ (false)
     {
       bind_param (cond.bind, cond.count, 0);
-      bind_result (data.bind, data.count);
+      bind_result (data.bind, data.count, lob_prefetch_len);
     }
 
     void select_statement::
@@ -280,8 +306,6 @@ namespace odb
           translate_error (conn_.error_handle (), r);
         else if (r == OCI_NO_DATA)
           done_ = true;
-        else
-          finalize_result (data_.bind, data_.count);
       }
 
       return done_ ? no_data : success;
@@ -313,13 +337,21 @@ namespace odb
 
       for (size_t i (0); i < data_.count; ++i)
       {
+        // Only stream if the bind specifies a LOB type, and the LOB
+        // value is not NULL, and a result callback has been provided.
+        //
         if ((data_.bind[i].type == SQLT_BLOB ||
-             data_.bind[i].type == SQLT_CLOB ||
-             data_.bind[i].type == SQLT_NCLOB) &&
-            data_.bind[i].indicator != -1)
+             data_.bind[i].type == SQLT_CLOB) &&
+            *data_.bind[i].indicator != -1 &&
+            data_.bind[i].callback.result != 0)
         {
+          // @@ If data_.bind[i].capacity is 0, we will be stuck in an
+          // infinite loop.
+          //
+          assert (data_.bind[i].capacity != 0);
+
           OCILobLocator* locator (
-            reinterpret_cast<OCILobLocator> (data_.bind[i].buffer));
+            reinterpret_cast<OCILobLocator*> (data_.bind[i].size));
 
           ub8 length (0);
           sword r (OCILobGetLength2(conn_.handle (),
@@ -327,25 +359,52 @@ namespace odb
                                     locator,
                                     &length));
 
-          if (s == OCI_ERROR || s == OCI_INVALID_HANDLE)
-            translate_error (conn_.error_handle (), r);
-
-          sword r (OCILobRead2 (conn_.handle (),
-                                conn_.error_handle (),
-                                err,
-                                &length,
-                                0,
-                                1,  // offset - first position is 1.
-                                0,
-                                0,
-                                OCI_FIRST_PIECE,
-                                &data_.bind[i],
-                                &lob_result_callback_proxy,
-                                0,
-                                SQLCS_IMPLICIT));
-
           if (r == OCI_ERROR || r == OCI_INVALID_HANDLE)
-            translate_error (conn_.error_handle (), r);
+            translate_error (err, r);
+
+          for (ub8 total (0), read (0); total < length; total += read)
+          {
+            read = length - total;
+            read = read > data_.bind[i].capacity ?
+              data_.bind[i].capacity : read;
+
+            // The call to OCILobRead2 does not need to know when the last
+            // piece is being requested.
+            //
+            ub1 oci_piece (total == 0 ? OCI_FIRST_PIECE : OCI_NEXT_PIECE);
+
+            r = OCILobRead2 (conn_.handle (),
+                             err,
+                             locator,
+                             &read,
+                             0,
+                             1,  // Starting offset. The first position is 1.
+                                 // This parameter is only used by OCI on the
+                                 // first call when polling.
+                             data_.bind[i].buffer,
+                             data_.bind[i].capacity,
+                             oci_piece,
+                             0,
+                             0,
+                             0,
+                             SQLCS_IMPLICIT);
+
+            if (r == OCI_ERROR || r == OCI_INVALID_HANDLE)
+              translate_error (err, r);
+
+            bind::piece user_piece;
+            if (oci_piece == OCI_FIRST_PIECE)
+              user_piece = read == length ? bind::whole : bind::first;
+            else
+              user_piece = total + read < length ? bind::next : bind::last;
+
+            if (!(*data_.bind[i].callback.result) (
+                  data_.bind[i].callback_context,
+                  data_.bind[i].buffer,
+                  read,
+                  user_piece))
+              break;
+          }
         }
       }
     }
@@ -367,12 +426,12 @@ namespace odb
       typedef insert_statement::id_bind_type bind;
 
       bind& b (*static_cast<bind*> (context));
-      b.indicator = -1;
 
       *buffer = 0;
       *len = 0;
       *piece = OCI_ONE_PIECE;
-      *reinterpret_cast<sb2**> (indicator) = &b.indicator;
+      b.indicator = -1;
+      *indicator = &b.indicator;
 
       return OCI_CONTINUE;
     }
diff --git a/odb/oracle/statement.hxx b/odb/oracle/statement.hxx
index 3ab60bc..b4e6601 100644
--- a/odb/oracle/statement.hxx
+++ b/odb/oracle/statement.hxx
@@ -48,7 +48,9 @@ namespace odb
       // lost OCIDefine resources.
       //
       void
-      bind_result (bind*, std::size_t count);
+        bind_result (bind*,
+                     std::size_t count,
+                     std::size_t lob_prefetch_len = 0);
 
     protected:
       connection& conn_;
-- 
cgit v1.1