Querying and Modifying Data | Ignite Documentation

Ignite Summit 2024 — Call For Speakers Now Open — Learn more

Edit

Querying and Modifying Data

Overview

This page elaborates on how to connect to a cluster and execute a variety of SQL queries using the ODBC driver.

At the implementation layer, the ODBC driver uses SQL Fields queries to retrieve data from the cluster. This means that from ODBC side you can access only those fields that are defined in the cluster configuration.

Moreover, the ODBC driver supports DML (Data Modification Layer), which means that you can modify your data using an ODBC connection.

Note
Refer to the ODBC example that incorporates complete logic and exemplary queries described below.

Configuring the Cluster

As the first step, you need to set up a configuration that will be used by the cluster nodes. The configuration should include caches configurations as well with properly defined QueryEntities properties. QueryEntities are essential for the cases when your application (or the ODBC driver in our scenario) is going to query and modify the data using SQL statements. Alternatively you can create tables using DDL.

SQLHENV env;

// Allocate an environment handle
SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);

// Use ODBC ver 3
SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, reinterpret_cast<void*>(SQL_OV_ODBC3), 0);

SQLHDBC dbc;

// Allocate a connection handle
SQLAllocHandle(SQL_HANDLE_DBC, env, &dbc);

// Prepare the connection string
SQLCHAR connectStr[] = "DSN=My Ignite DSN";

// Connecting to the Cluster.
SQLDriverConnect(dbc, NULL, connectStr, SQL_NTS, NULL, 0, NULL, SQL_DRIVER_COMPLETE);

SQLHSTMT stmt;

// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

SQLCHAR query1[] = "CREATE TABLE Person ( "
    "id LONG PRIMARY KEY, "
    "firstName VARCHAR, "
    "lastName VARCHAR, "
    "salary FLOAT) "
    "WITH \"template=partitioned\"";

SQLExecDirect(stmt, query1, SQL_NTS);

SQLCHAR query2[] = "CREATE TABLE Organization ( "
    "id LONG PRIMARY KEY, "
    "name VARCHAR) "
    "WITH \"template=partitioned\"";

SQLExecDirect(stmt, query2, SQL_NTS);

SQLCHAR query3[] = "CREATE INDEX idx_organization_name ON Organization (name)";

SQLExecDirect(stmt, query3, SQL_NTS);
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
    <property name="cacheConfiguration">
        <list>
            <bean class="org.apache.ignite.configuration.CacheConfiguration">
                <property name="name" value="Person"/>
                <property name="cacheMode" value="PARTITIONED"/>
                <property name="atomicityMode" value="TRANSACTIONAL"/>
                <property name="writeSynchronizationMode" value="FULL_SYNC"/>
                <property name="queryEntities">
                    <list>
                        <bean class="org.apache.ignite.cache.QueryEntity">
                            <property name="keyType" value="java.lang.Long"/>
                            <property name="keyFieldName" value="id"/>
                            <property name="valueType" value="Person"/>
                            <property name="fields">
                                <map>
                                    <entry key="id" value="java.lang.Long"/>
                                    <entry key="firstName" value="java.lang.String"/>
                                    <entry key="lastName" value="java.lang.String"/>
                                    <entry key="salary" value="java.lang.Double"/>
                                </map>
                            </property>
                        </bean>
                    </list>
                </property>
            </bean>
            <bean class="org.apache.ignite.configuration.CacheConfiguration">
                <property name="name" value="Organization"/>
                <property name="cacheMode" value="PARTITIONED"/>
                <property name="atomicityMode" value="TRANSACTIONAL"/>
                <property name="writeSynchronizationMode" value="FULL_SYNC"/>
                <property name="queryEntities">
                    <list>
                        <bean class="org.apache.ignite.cache.QueryEntity">
                            <property name="keyType" value="java.lang.Long"/>
                            <property name="keyFieldName" value="id"/>
                            <property name="valueType" value="Organization"/>
                            <property name="fields">
                                <map>
                                    <entry key="id" value="java.lang.Long"/>
                                    <entry key="name" value="java.lang.String"/>
                                </map>
                            </property>
                            <property name="indexes">
                                <list>
                                    <bean class="org.apache.ignite.cache.QueryIndex">
                                        <constructor-arg value="name"/>
                                    </bean>
                                </list>
                            </property>
                        </bean>
                    </list>
                </property>
            </bean>
        </list>
    </property>
</bean>

As you can see, we defined two caches that will contain the data of Person and Organization types. For both types, we listed specific fields and indexes that will be read or updated using SQL.

Connecting to the Cluster

After the cluster is configured and started, we can connect to it from the ODBC driver side. To do this, you need to prepare a valid connection string and pass it as a parameter to the ODBC driver at the connection time. Refer to the Connection String page for more details.

Alternatively, you can also use a pre-configured DSN for connection purposes as shown in the example below.

SQLHENV env;

// Allocate an environment handle
SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);

// Use ODBC ver 3
SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, reinterpret_cast<void*>(SQL_OV_ODBC3), 0);

SQLHDBC dbc;

// Allocate a connection handle
SQLAllocHandle(SQL_HANDLE_DBC, env, &dbc);

// Prepare the connection string
SQLCHAR connectStr[] = "DSN=My Ignite DSN";

// Connecting to Ignite Cluster.
SQLRETURN ret = SQLDriverConnect(dbc, NULL, connectStr, SQL_NTS, NULL, 0, NULL, SQL_DRIVER_COMPLETE);

if (!SQL_SUCCEEDED(ret))
{
  SQLCHAR sqlstate[7] = { 0 };
  SQLINTEGER nativeCode;

  SQLCHAR errMsg[BUFFER_SIZE] = { 0 };
  SQLSMALLINT errMsgLen = static_cast<SQLSMALLINT>(sizeof(errMsg));

  SQLGetDiagRec(SQL_HANDLE_DBC, dbc, 1, sqlstate, &nativeCode, errMsg, errMsgLen, &errMsgLen);

  std::cerr << "Failed to connect to Ignite: "
            << reinterpret_cast<char*>(sqlstate) << ": "
            << reinterpret_cast<char*>(errMsg) << ", "
            << "Native error code: " << nativeCode
            << std::endl;

  // Releasing allocated handles.
  SQLFreeHandle(SQL_HANDLE_DBC, dbc);
  SQLFreeHandle(SQL_HANDLE_ENV, env);

  return;
}

Querying Data

After everything is up and running, we’re ready to execute SQL SELECT queries using the ODBC API.

SQLHSTMT stmt;

// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

SQLCHAR query[] = "SELECT firstName, lastName, salary, Organization.name FROM Person "
  "INNER JOIN \"Organization\".Organization ON Person.orgId = Organization.id";
SQLSMALLINT queryLen = static_cast<SQLSMALLINT>(sizeof(queryLen));

SQLRETURN ret = SQLExecDirect(stmt, query, queryLen);

if (!SQL_SUCCEEDED(ret))
{
  SQLCHAR sqlstate[7] = { 0 };
  SQLINTEGER nativeCode;

  SQLCHAR errMsg[BUFFER_SIZE] = { 0 };
  SQLSMALLINT errMsgLen = static_cast<SQLSMALLINT>(sizeof(errMsg));

  SQLGetDiagRec(SQL_HANDLE_DBC, dbc, 1, sqlstate, &nativeCode, errMsg, errMsgLen, &errMsgLen);

  std::cerr << "Failed to perfrom SQL query: "
            << reinterpret_cast<char*>(sqlstate) << ": "
            << reinterpret_cast<char*>(errMsg) << ", "
            << "Native error code: " << nativeCode
            << std::endl;
}
else
{
  // Printing the result set.
  struct OdbcStringBuffer
  {
    SQLCHAR buffer[BUFFER_SIZE];
    SQLLEN resLen;
  };

  // Getting a number of columns in the result set.
  SQLSMALLINT columnsCnt = 0;
  SQLNumResultCols(stmt, &columnsCnt);

  // Allocating buffers for columns.
  std::vector<OdbcStringBuffer> columns(columnsCnt);

  // Binding colums. For simplicity we are going to use only
  // string buffers here.
  for (SQLSMALLINT i = 0; i < columnsCnt; ++i)
    SQLBindCol(stmt, i + 1, SQL_C_CHAR, columns[i].buffer, BUFFER_SIZE, &columns[i].resLen);

  // Fetching and printing data in a loop.
  ret = SQLFetch(stmt);
  while (SQL_SUCCEEDED(ret))
  {
    for (size_t i = 0; i < columns.size(); ++i)
      std::cout << std::setw(16) << std::left << columns[i].buffer << " ";

    std::cout << std::endl;

    ret = SQLFetch(stmt);
  }
}

// Releasing statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
Note

Columns binding

In the example above, we bind all columns to the SQL_C_CHAR columns. This means that all values are going to be converted to strings upon fetching. This is done for the sake of simplicity. Value conversion upon fetching can be pretty slow; so your default decision should be to fetch the value the same way as it is stored.

Inserting Data

To insert new data into the cluster, SQL INSERT statements can be used from the ODBC side.

SQLHSTMT stmt;

// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

SQLCHAR query[] =
	"INSERT INTO Person (id, orgId, firstName, lastName, resume, salary) "
	"VALUES (?, ?, ?, ?, ?, ?)";

SQLPrepare(stmt, query, static_cast<SQLSMALLINT>(sizeof(query)));

// Binding columns.
int64_t key = 0;
int64_t orgId = 0;
char name[1024] = { 0 };
SQLLEN nameLen = SQL_NTS;
double salary = 0.0;

SQLBindParameter(stmt, 1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, &key, 0, 0);
SQLBindParameter(stmt, 2, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, &orgId, 0, 0);
SQLBindParameter(stmt, 3, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_VARCHAR,	sizeof(name), sizeof(name), name, 0, &nameLen);
SQLBindParameter(stmt, 4, SQL_PARAM_INPUT, SQL_C_DOUBLE, SQL_DOUBLE, 0, 0, &salary, 0, 0);

// Filling cache.
key = 1;
orgId = 1;
strncpy(name, "John", sizeof(name));
salary = 2200.0;

SQLExecute(stmt);
SQLMoreResults(stmt);

++key;
orgId = 1;
strncpy(name, "Jane", sizeof(name));
salary = 1300.0;

SQLExecute(stmt);
SQLMoreResults(stmt);

++key;
orgId = 2;
strncpy(name, "Richard", sizeof(name));
salary = 900.0;

SQLExecute(stmt);
SQLMoreResults(stmt);

++key;
orgId = 2;
strncpy(name, "Mary", sizeof(name));
salary = 2400.0;

SQLExecute(stmt);

// Releasing statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);

Next, we are going to insert additional organizations without the usage of prepared statements.

SQLHSTMT stmt;

// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

SQLCHAR query1[] = "INSERT INTO \"Organization\".Organization (id, name) VALUES (1L, 'Some company')";

SQLExecDirect(stmt, query1, static_cast<SQLSMALLINT>(sizeof(query1)));

SQLFreeStmt(stmt, SQL_CLOSE);

SQLCHAR query2[] = "INSERT INTO \"Organization\".Organization (id, name) VALUES (2L, 'Some other company')";

  SQLExecDirect(stmt, query2, static_cast<SQLSMALLINT>(sizeof(query2)));

// Releasing statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
Warning

Error Checking

For simplicity the example code above does not check for an error return code. You will want to do error checking in production.

Updating Data

Let’s now update the salary for some of the persons stored in the cluster using SQL UPDATE statement.

void AdjustSalary(SQLHDBC dbc, int64_t key, double salary)
{
  SQLHSTMT stmt;

  // Allocate a statement handle
  SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

  SQLCHAR query[] = "UPDATE Person SET salary=? WHERE id=?";

  SQLBindParameter(stmt, 1, SQL_PARAM_INPUT,
      SQL_C_DOUBLE, SQL_DOUBLE, 0, 0, &salary, 0, 0);

  SQLBindParameter(stmt, 2, SQL_PARAM_INPUT, SQL_C_SLONG,
      SQL_BIGINT, 0, 0, &key, 0, 0);

  SQLExecDirect(stmt, query, static_cast<SQLSMALLINT>(sizeof(query)));

  // Releasing statement handle.
  SQLFreeHandle(SQL_HANDLE_STMT, stmt);
}

...
AdjustSalary(dbc, 3, 1200.0);
AdjustSalary(dbc, 1, 2500.0);

Deleting Data

Finally, let’s remove a few records with the help of SQL DELETE statement.

void DeletePerson(SQLHDBC dbc, int64_t key)
{
  SQLHSTMT stmt;

  // Allocate a statement handle
  SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

  SQLCHAR query[] = "DELETE FROM Person WHERE id=?";

  SQLBindParameter(stmt, 1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT,
      0, 0, &key, 0, 0);

  SQLExecDirect(stmt, query, static_cast<SQLSMALLINT>(sizeof(query)));

  // Releasing statement handle.
  SQLFreeHandle(SQL_HANDLE_STMT, stmt);
}

...
DeletePerson(dbc, 1);
DeletePerson(dbc, 4);

Batching With Arrays of Parameters

The ODBC driver supports batching with arrays of parameters for DML statements.

Let’s try to insert the same records we did in the example above but now with a single SQLExecute call:

SQLHSTMT stmt;

// Allocating a statement handle.
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

SQLCHAR query[] =
	"INSERT INTO Person (id, orgId, firstName, lastName, resume, salary) "
	"VALUES (?, ?, ?, ?, ?, ?)";

SQLPrepare(stmt, query, static_cast<SQLSMALLINT>(sizeof(query)));

// Binding columns.
int64_t key[4] = {0};
int64_t orgId[4] = {0};
char name[1024 * 4] = {0};
SQLLEN nameLen[4] = {0};
double salary[4] = {0};

SQLBindParameter(stmt, 1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, key, 0, 0);
SQLBindParameter(stmt, 2, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, orgId, 0, 0);
SQLBindParameter(stmt, 3, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_VARCHAR,	1024, 1024, name, 0, &nameLen);
SQLBindParameter(stmt, 4, SQL_PARAM_INPUT, SQL_C_DOUBLE, SQL_DOUBLE, 0, 0, salary, 0, 0);

// Filling cache.
key[0] = 1;
orgId[0] = 1;
strncpy(name, "John", 1023);
salary[0] = 2200.0;
nameLen[0] = SQL_NTS;

key[1] = 2;
orgId[1] = 1;
strncpy(name + 1024, "Jane", 1023);
salary[1] = 1300.0;
nameLen[1] = SQL_NTS;

key[2] = 3;
orgId[2] = 2;
strncpy(name + 1024 * 2, "Richard", 1023);
salary[2] = 900.0;
nameLen[2] = SQL_NTS;

key[3] = 4;
orgId[3] = 2;
strncpy(name + 1024 * 3, "Mary", 1023);
salary[3] = 2400.0;
nameLen[3] = SQL_NTS;

// Asking the driver to store the total number of processed argument sets
// in the following variable.
SQLULEN setsProcessed = 0;
SQLSetStmtAttr(stmt, SQL_ATTR_PARAMS_PROCESSED_PTR, &setsProcessed, SQL_IS_POINTER);

// Setting the size of the arguments array. This is 4 in our case.
SQLSetStmtAttr(stmt, SQL_ATTR_PARAMSET_SIZE, reinterpret_cast<SQLPOINTER>(4), 0);

// Executing the statement.
SQLExecute(stmt);

// Releasing the statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
Note
This type of batching is currently supported for INSERT, UPDATE, DELETE, and MERGE statements and does not work for SELECTs. The data-at-execution capability is not supported with Arrays of Parameters batching either.

Streaming

The ODBC driver allows streaming data in bulk using the SET command. See the SET command documentation for more information.

Note
In streaming mode, the array of parameters and data-at-execution parameters are not supported.