Querying and Modifying Data
Overview
This page elaborates on how to connect to a cluster and execute a variety of SQL queries using the ODBC driver.
At the implementation layer, the ODBC driver uses SQL Fields queries to retrieve data from the cluster. This means that from ODBC side you can access only those fields that are defined in the cluster configuration.
Moreover, the ODBC driver supports DML (Data Modification Layer), which means that you can modify your data using an ODBC connection.
Note
|
Refer to the ODBC example that incorporates complete logic and exemplary queries described below. |
Configuring the Cluster
As the first step, you need to set up a configuration that will be used by the cluster nodes.
The configuration should include caches configurations as well with properly defined QueryEntities
properties.
QueryEntities
are essential for the cases when your application (or the ODBC driver in our scenario) is going to query and modify the data using SQL statements.
Alternatively you can create tables using DDL.
SQLHENV env;
// Allocate an environment handle
SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);
// Use ODBC ver 3
SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, reinterpret_cast<void*>(SQL_OV_ODBC3), 0);
SQLHDBC dbc;
// Allocate a connection handle
SQLAllocHandle(SQL_HANDLE_DBC, env, &dbc);
// Prepare the connection string
SQLCHAR connectStr[] = "DSN=My Ignite DSN";
// Connecting to the Cluster.
SQLDriverConnect(dbc, NULL, connectStr, SQL_NTS, NULL, 0, NULL, SQL_DRIVER_COMPLETE);
SQLHSTMT stmt;
// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);
SQLCHAR query1[] = "CREATE TABLE Person ( "
"id LONG PRIMARY KEY, "
"firstName VARCHAR, "
"lastName VARCHAR, "
"salary FLOAT) "
"WITH \"template=partitioned\"";
SQLExecDirect(stmt, query1, SQL_NTS);
SQLCHAR query2[] = "CREATE TABLE Organization ( "
"id LONG PRIMARY KEY, "
"name VARCHAR) "
"WITH \"template=partitioned\"";
SQLExecDirect(stmt, query2, SQL_NTS);
SQLCHAR query3[] = "CREATE INDEX idx_organization_name ON Organization (name)";
SQLExecDirect(stmt, query3, SQL_NTS);
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="Person"/>
<property name="cacheMode" value="PARTITIONED"/>
<property name="atomicityMode" value="TRANSACTIONAL"/>
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
<property name="queryEntities">
<list>
<bean class="org.apache.ignite.cache.QueryEntity">
<property name="keyType" value="java.lang.Long"/>
<property name="keyFieldName" value="id"/>
<property name="valueType" value="Person"/>
<property name="fields">
<map>
<entry key="id" value="java.lang.Long"/>
<entry key="firstName" value="java.lang.String"/>
<entry key="lastName" value="java.lang.String"/>
<entry key="salary" value="java.lang.Double"/>
</map>
</property>
</bean>
</list>
</property>
</bean>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="Organization"/>
<property name="cacheMode" value="PARTITIONED"/>
<property name="atomicityMode" value="TRANSACTIONAL"/>
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
<property name="queryEntities">
<list>
<bean class="org.apache.ignite.cache.QueryEntity">
<property name="keyType" value="java.lang.Long"/>
<property name="keyFieldName" value="id"/>
<property name="valueType" value="Organization"/>
<property name="fields">
<map>
<entry key="id" value="java.lang.Long"/>
<entry key="name" value="java.lang.String"/>
</map>
</property>
<property name="indexes">
<list>
<bean class="org.apache.ignite.cache.QueryIndex">
<constructor-arg value="name"/>
</bean>
</list>
</property>
</bean>
</list>
</property>
</bean>
</list>
</property>
</bean>
As you can see, we defined two caches that will contain the data of Person
and Organization
types.
For both types, we listed specific fields and indexes that will be read or updated using SQL.
Connecting to the Cluster
After the cluster is configured and started, we can connect to it from the ODBC driver side. To do this, you need to prepare a valid connection string and pass it as a parameter to the ODBC driver at the connection time. Refer to the Connection String page for more details.
Alternatively, you can also use a pre-configured DSN for connection purposes as shown in the example below.
SQLHENV env;
// Allocate an environment handle
SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);
// Use ODBC ver 3
SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, reinterpret_cast<void*>(SQL_OV_ODBC3), 0);
SQLHDBC dbc;
// Allocate a connection handle
SQLAllocHandle(SQL_HANDLE_DBC, env, &dbc);
// Prepare the connection string
SQLCHAR connectStr[] = "DSN=My Ignite DSN";
// Connecting to Ignite Cluster.
SQLRETURN ret = SQLDriverConnect(dbc, NULL, connectStr, SQL_NTS, NULL, 0, NULL, SQL_DRIVER_COMPLETE);
if (!SQL_SUCCEEDED(ret))
{
SQLCHAR sqlstate[7] = { 0 };
SQLINTEGER nativeCode;
SQLCHAR errMsg[BUFFER_SIZE] = { 0 };
SQLSMALLINT errMsgLen = static_cast<SQLSMALLINT>(sizeof(errMsg));
SQLGetDiagRec(SQL_HANDLE_DBC, dbc, 1, sqlstate, &nativeCode, errMsg, errMsgLen, &errMsgLen);
std::cerr << "Failed to connect to Ignite: "
<< reinterpret_cast<char*>(sqlstate) << ": "
<< reinterpret_cast<char*>(errMsg) << ", "
<< "Native error code: " << nativeCode
<< std::endl;
// Releasing allocated handles.
SQLFreeHandle(SQL_HANDLE_DBC, dbc);
SQLFreeHandle(SQL_HANDLE_ENV, env);
return;
}
Querying Data
After everything is up and running, we’re ready to execute SQL SELECT
queries using the ODBC API
.
SQLHSTMT stmt;
// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);
SQLCHAR query[] = "SELECT firstName, lastName, salary, Organization.name FROM Person "
"INNER JOIN \"Organization\".Organization ON Person.orgId = Organization.id";
SQLSMALLINT queryLen = static_cast<SQLSMALLINT>(sizeof(queryLen));
SQLRETURN ret = SQLExecDirect(stmt, query, queryLen);
if (!SQL_SUCCEEDED(ret))
{
SQLCHAR sqlstate[7] = { 0 };
SQLINTEGER nativeCode;
SQLCHAR errMsg[BUFFER_SIZE] = { 0 };
SQLSMALLINT errMsgLen = static_cast<SQLSMALLINT>(sizeof(errMsg));
SQLGetDiagRec(SQL_HANDLE_DBC, dbc, 1, sqlstate, &nativeCode, errMsg, errMsgLen, &errMsgLen);
std::cerr << "Failed to perfrom SQL query: "
<< reinterpret_cast<char*>(sqlstate) << ": "
<< reinterpret_cast<char*>(errMsg) << ", "
<< "Native error code: " << nativeCode
<< std::endl;
}
else
{
// Printing the result set.
struct OdbcStringBuffer
{
SQLCHAR buffer[BUFFER_SIZE];
SQLLEN resLen;
};
// Getting a number of columns in the result set.
SQLSMALLINT columnsCnt = 0;
SQLNumResultCols(stmt, &columnsCnt);
// Allocating buffers for columns.
std::vector<OdbcStringBuffer> columns(columnsCnt);
// Binding colums. For simplicity we are going to use only
// string buffers here.
for (SQLSMALLINT i = 0; i < columnsCnt; ++i)
SQLBindCol(stmt, i + 1, SQL_C_CHAR, columns[i].buffer, BUFFER_SIZE, &columns[i].resLen);
// Fetching and printing data in a loop.
ret = SQLFetch(stmt);
while (SQL_SUCCEEDED(ret))
{
for (size_t i = 0; i < columns.size(); ++i)
std::cout << std::setw(16) << std::left << columns[i].buffer << " ";
std::cout << std::endl;
ret = SQLFetch(stmt);
}
}
// Releasing statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
Note
|
Columns bindingIn the example above, we bind all columns to the SQL_C_CHAR columns. This means that all values are going to be converted to strings upon fetching. This is done for the sake of simplicity. Value conversion upon fetching can be pretty slow; so your default decision should be to fetch the value the same way as it is stored. |
Inserting Data
To insert new data into the cluster, SQL INSERT
statements can be used from the ODBC side.
SQLHSTMT stmt;
// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);
SQLCHAR query[] =
"INSERT INTO Person (id, orgId, firstName, lastName, resume, salary) "
"VALUES (?, ?, ?, ?, ?, ?)";
SQLPrepare(stmt, query, static_cast<SQLSMALLINT>(sizeof(query)));
// Binding columns.
int64_t key = 0;
int64_t orgId = 0;
char name[1024] = { 0 };
SQLLEN nameLen = SQL_NTS;
double salary = 0.0;
SQLBindParameter(stmt, 1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, &key, 0, 0);
SQLBindParameter(stmt, 2, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, &orgId, 0, 0);
SQLBindParameter(stmt, 3, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_VARCHAR, sizeof(name), sizeof(name), name, 0, &nameLen);
SQLBindParameter(stmt, 4, SQL_PARAM_INPUT, SQL_C_DOUBLE, SQL_DOUBLE, 0, 0, &salary, 0, 0);
// Filling cache.
key = 1;
orgId = 1;
strncpy(name, "John", sizeof(name));
salary = 2200.0;
SQLExecute(stmt);
SQLMoreResults(stmt);
++key;
orgId = 1;
strncpy(name, "Jane", sizeof(name));
salary = 1300.0;
SQLExecute(stmt);
SQLMoreResults(stmt);
++key;
orgId = 2;
strncpy(name, "Richard", sizeof(name));
salary = 900.0;
SQLExecute(stmt);
SQLMoreResults(stmt);
++key;
orgId = 2;
strncpy(name, "Mary", sizeof(name));
salary = 2400.0;
SQLExecute(stmt);
// Releasing statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
Next, we are going to insert additional organizations without the usage of prepared statements.
SQLHSTMT stmt;
// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);
SQLCHAR query1[] = "INSERT INTO \"Organization\".Organization (id, name) VALUES (1L, 'Some company')";
SQLExecDirect(stmt, query1, static_cast<SQLSMALLINT>(sizeof(query1)));
SQLFreeStmt(stmt, SQL_CLOSE);
SQLCHAR query2[] = "INSERT INTO \"Organization\".Organization (id, name) VALUES (2L, 'Some other company')";
SQLExecDirect(stmt, query2, static_cast<SQLSMALLINT>(sizeof(query2)));
// Releasing statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
Warning
|
Error CheckingFor simplicity the example code above does not check for an error return code. You will want to do error checking in production. |
Updating Data
Let’s now update the salary for some of the persons stored in the cluster using SQL UPDATE
statement.
void AdjustSalary(SQLHDBC dbc, int64_t key, double salary)
{
SQLHSTMT stmt;
// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);
SQLCHAR query[] = "UPDATE Person SET salary=? WHERE id=?";
SQLBindParameter(stmt, 1, SQL_PARAM_INPUT,
SQL_C_DOUBLE, SQL_DOUBLE, 0, 0, &salary, 0, 0);
SQLBindParameter(stmt, 2, SQL_PARAM_INPUT, SQL_C_SLONG,
SQL_BIGINT, 0, 0, &key, 0, 0);
SQLExecDirect(stmt, query, static_cast<SQLSMALLINT>(sizeof(query)));
// Releasing statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
}
...
AdjustSalary(dbc, 3, 1200.0);
AdjustSalary(dbc, 1, 2500.0);
Deleting Data
Finally, let’s remove a few records with the help of SQL DELETE
statement.
void DeletePerson(SQLHDBC dbc, int64_t key)
{
SQLHSTMT stmt;
// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);
SQLCHAR query[] = "DELETE FROM Person WHERE id=?";
SQLBindParameter(stmt, 1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT,
0, 0, &key, 0, 0);
SQLExecDirect(stmt, query, static_cast<SQLSMALLINT>(sizeof(query)));
// Releasing statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
}
...
DeletePerson(dbc, 1);
DeletePerson(dbc, 4);
Batching With Arrays of Parameters
The ODBC driver supports batching with arrays of parameters for DML statements.
Let’s try to insert the same records we did in the example above but now with a single SQLExecute
call:
SQLHSTMT stmt;
// Allocating a statement handle.
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);
SQLCHAR query[] =
"INSERT INTO Person (id, orgId, firstName, lastName, resume, salary) "
"VALUES (?, ?, ?, ?, ?, ?)";
SQLPrepare(stmt, query, static_cast<SQLSMALLINT>(sizeof(query)));
// Binding columns.
int64_t key[4] = {0};
int64_t orgId[4] = {0};
char name[1024 * 4] = {0};
SQLLEN nameLen[4] = {0};
double salary[4] = {0};
SQLBindParameter(stmt, 1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, key, 0, 0);
SQLBindParameter(stmt, 2, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, orgId, 0, 0);
SQLBindParameter(stmt, 3, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_VARCHAR, 1024, 1024, name, 0, &nameLen);
SQLBindParameter(stmt, 4, SQL_PARAM_INPUT, SQL_C_DOUBLE, SQL_DOUBLE, 0, 0, salary, 0, 0);
// Filling cache.
key[0] = 1;
orgId[0] = 1;
strncpy(name, "John", 1023);
salary[0] = 2200.0;
nameLen[0] = SQL_NTS;
key[1] = 2;
orgId[1] = 1;
strncpy(name + 1024, "Jane", 1023);
salary[1] = 1300.0;
nameLen[1] = SQL_NTS;
key[2] = 3;
orgId[2] = 2;
strncpy(name + 1024 * 2, "Richard", 1023);
salary[2] = 900.0;
nameLen[2] = SQL_NTS;
key[3] = 4;
orgId[3] = 2;
strncpy(name + 1024 * 3, "Mary", 1023);
salary[3] = 2400.0;
nameLen[3] = SQL_NTS;
// Asking the driver to store the total number of processed argument sets
// in the following variable.
SQLULEN setsProcessed = 0;
SQLSetStmtAttr(stmt, SQL_ATTR_PARAMS_PROCESSED_PTR, &setsProcessed, SQL_IS_POINTER);
// Setting the size of the arguments array. This is 4 in our case.
SQLSetStmtAttr(stmt, SQL_ATTR_PARAMSET_SIZE, reinterpret_cast<SQLPOINTER>(4), 0);
// Executing the statement.
SQLExecute(stmt);
// Releasing the statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);
Note
|
This type of batching is currently supported for INSERT , UPDATE , DELETE , and MERGE statements and does not work for SELECTs . The data-at-execution capability is not supported with Arrays of Parameters batching either.
|
Streaming
The ODBC driver allows streaming data in bulk using the SET
command. See the SET
command documentation for more information.
Note
|
In streaming mode, the array of parameters and data-at-execution parameters are not supported. |
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.