/*
virtualpg.c -- SQLite3 extension [VIRTUAL TABLE accessing PostgreSQL tables]
version 2.0.0-RC0, 2018 July 14
Author: Sandro Furieri a.furieri@lqt.it
-----------------------------------------------------------------------------
Version: MPL 1.1/GPL 2.0/LGPL 2.1
The contents of this file are subject to the Mozilla Public License Version
1.1 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.mozilla.org/MPL/
Software distributed under the License is distributed on an "AS IS" basis,
WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
for the specific language governing rights and limitations under the
License.
The Original Code is the SpatiaLite library
The Initial Developer of the Original Code is Alessandro Furieri
Portions created by the Initial Developer are Copyright (C) 2013-2018
the Initial Developer. All Rights Reserved.
Contributor(s):
Alternatively, the contents of this file may be used under the terms of
either the GNU General Public License Version 2 or later (the "GPL"), or
the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
in which case the provisions of the GPL or the LGPL are applicable instead
of those above. If you wish to allow use of your version of this file only
under the terms of either the GPL or the LGPL, and not to allow others to
use your version of this file under the terms of the MPL, indicate your
decision by deleting the provisions above and replace them with the notice
and other provisions required by the GPL or the LGPL. If you do not delete
the provisions above, a recipient may use your version of this file under
the terms of any one of the MPL, the GPL or the LGPL.
*/
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#ifndef LOADABLE_EXTENSION
#include <sqlite3.h>
#else
#include <sqlite3ext.h>
#endif
#include "virtualpg.h"
#include "config.h"
#ifndef LOADABLE_EXTENSION
struct sqlite3_module my_pg_module;
#else
SQLITE_EXTENSION_INIT1 struct sqlite3_module my_pg_module;
#endif
#define SQLITEX_DATE 10001
#define SQLITEX_TIME 10002
#define SQLITEX_DATETIME 10003
#define SQLITEX_BOOL 10004
typedef struct VirtualPgStruct
{
/* extends the sqlite3_vtab struct */
const sqlite3_module *pModule; /* ptr to sqlite module: USED INTERNALLY BY SQLITE */
int nRef; /* # references: USED INTERNALLY BY SQLITE */
char *zErrMsg; /* error message: USE INTERNALLY BY SQLITE */
sqlite3 *db; /* the sqlite db holding the virtual table */
char *ConnInfo; /* PQ ConnInfo string */
PGconn *pg_conn; /* Postgres connection handle */
char *pg_schema; /* the Postgres schema name */
char *pg_table; /* the Postgres table name */
int isView; /* is a Postgres View or not */
int nColumns; /* number of columns into the table */
char **Column; /* the name for each column */
char **Type; /* the Postgres datatype for each column */
int *Mapping; /* the SQlite datatype for each column */
int *MaxSize; /* the max size (in bytes) for each column */
int *NotNull; /* NotNull clause for each column */
char *IsPK; /* Y/N for each column */
int newRowid;
char **PKstrings; /* array of PK string values */
int *PKidx; /* array of PK column indices */
int PKrows; /* number of rows in the PK array */
int PKcols; /* number of columns in the PK array */
int readOnly; /* PostgreSQL readOnly mode */
int julianNumbers; /* DATE/TIME as Julian numbers */
int pendingTransaction; /* True/False: if there is PostgreSQL pending transaction */
} VirtualPg;
typedef VirtualPg *VirtualPgPtr;
typedef struct vpgSqliteValueStruct
{
/* a multitype storing a column value */
int Type; /* current data-type */
sqlite3_int64 IntValue; /* INT value */
double DoubleValue; /* DOUBLE value */
char *Text; /* TEXT value */
unsigned char *Blob; /* BLOB value */
int Size; /* size (in bytes) for TEXT or BLOB */
} vpgSqliteValue;
typedef vpgSqliteValue *vpgSqliteValuePtr;
typedef struct VirtualPgCursorStruct
{
/* extends the sqlite3_vtab_cursor struct */
VirtualPgPtr pVtab; /* Virtual table of this cursor */
PGresult *resultSet; /* Postgres result set Object */
int nRows; /* num rows into the result set */
int nFields; /* num cols into the result set */
int currentRow; /* the current row index */
int nColumns; /* number of columns */
vpgSqliteValuePtr *Values; /* the current-row value for each column */
int eof; /* the EOF marker */
} VirtualPgCursor;
typedef VirtualPgCursor *VirtualPgCursorPtr;
typedef struct vpgMemBufferStruct
{
/* a struct handling a dynamically growing output buffer */
char *Buffer;
size_t WriteOffset;
size_t BufferSize;
int Error;
} vpgMemBuffer;
typedef vpgMemBuffer *vpgMemBufferPtr;
/*************************************************************
/
/ virtualized libPQ methods
/
**************************************************************/
static virtualPQ pq;
extern void
vpgPQclear (PGresult * res)
{
/* PQclear */
pq.PQclear (res);
}
extern PGconn *
vpgPQconnectdb (const char *conninfo)
{
/* PQconnectdb */
return pq.PQconnectdb (conninfo);
}
extern char *
vpgPQerrorMessage (const PGconn * conn)
{
/* PQerrorMessage */
return pq.PQerrorMessage (conn);
}
extern PGresult *
vpgPQexec (PGconn * conn, const char *command)
{
/* PQexec */
return pq.PQexec (conn, command);
}
extern void
vpgPQfinish (PGconn * conn)
{
/* PQfinish */
pq.PQfinish (conn);
}
extern int
vpgPQgetisnull (const PGresult * res, int row_number, int column_number)
{
/* PQgetisnull */
return pq.PQgetisnull (res, row_number, column_number);
}
extern char *
vpgPQgetvalue (const PGresult * res, int row_number, int column_number)
{
/* PQgetvalue */
return pq.PQgetvalue (res, row_number, column_number);
}
extern int
vpgPQlibVersion (void)
{
/* PQlibVersion */
return pq.PQlibVersion ();
}
extern int
vpgPQnfields (const PGresult * res)
{
/* PQnfields */
return pq.PQnfields (res);
}
extern int
vpgPQntuples (const PGresult * res)
{
/* PQntuples */
return pq.PQntuples (res);
}
extern ExecStatusType
vpgPQresultStatus (const PGresult * res)
{
/* PQresultStatus */
return pq.PQresultStatus (res);
}
extern ConnStatusType
vpgPQstatus (const PGconn * conn)
{
/* PQstatus */
return pq.PQstatus (conn);
}
/*************************************************************
/
/ VirtualPG internal methods
/
**************************************************************/
static void
vpgResetError (sqlite3 * db)
{
/* attempting to reset the PostgreSQL error */
const char *sql = "SELECT PostgreSql_ResetLastError()";
sqlite3_exec (db, sql, NULL, 0, NULL);
}
static void
vpgReportError (sqlite3 * db, const char *emsg)
{
/* attempting to report some PostgreSQL error */
char *sql =
sqlite3_mprintf
("SELECT PostgreSql_SetLastError('VirtualPostgres: %q')", emsg);
sqlite3_exec (db, sql, NULL, 0, NULL);
sqlite3_free (sql);
}
static void
vpgMemBufferInitialize (vpgMemBufferPtr buf)
{
/* initializing a dynamically growing output buffer */
buf->Buffer = NULL;
buf->WriteOffset = 0;
buf->BufferSize = 0;
buf->Error = 0;
}
static void
vpgMemBufferReset (vpgMemBufferPtr buf)
{
/* cleaning a dynamically growing output buffer */
if (buf->Buffer)
free (buf->Buffer);
buf->Buffer = NULL;
buf->WriteOffset = 0;
buf->BufferSize = 0;
buf->Error = 0;
}
static void
vpgMemBufferAppend (vpgMemBufferPtr buf, const char *payload)
{
/* appending into the buffer */
size_t size = strlen (payload);
size_t free_size = buf->BufferSize - buf->WriteOffset;
if (size > free_size)
{
/* we must allocate a bigger buffer */
size_t new_size;
char *new_buf;
if (buf->BufferSize == 0)
new_size = size + 1024;
else if (buf->BufferSize <= 4196)
new_size = buf->BufferSize + size + 4196;
else if (buf->BufferSize <= 65536)
new_size = buf->BufferSize + size + 65536;
else
new_size = buf->BufferSize + size + (1024 * 1024);
new_buf = malloc (new_size);
if (!new_buf)
{
buf->Error = 1;
return;
}
if (buf->Buffer)
{
strcpy (new_buf, buf->Buffer);
free (buf->Buffer);
}
buf->Buffer = new_buf;
buf->BufferSize = new_size;
}
strcpy (buf->Buffer + buf->WriteOffset, payload);
buf->WriteOffset += size;
}
static vpgSqliteValuePtr
vpgAllocValue (void)
{
/* allocates and initialites a Value multitype */
vpgSqliteValuePtr p = malloc (sizeof (vpgSqliteValue));
p->Type = SQLITE_NULL;
p->Text = NULL;
p->Blob = NULL;
return p;
}
static void
vpgFreeValue (vpgSqliteValuePtr p)
{
/* freeing a Value multitype */
if (!p)
return;
if (p->Text)
free (p->Text);
if (p->Blob)
free (p->Blob);
free (p);
}
static void
vpgFreePKstrings (VirtualPgPtr p_vt)
{
/* freeing the PK string values */
if (p_vt->PKidx != NULL)
free (p_vt->PKidx);
p_vt->PKidx = NULL;
if (p_vt->PKstrings != NULL)
{
int nPKs = p_vt->PKrows * p_vt->PKcols;
int c;
for (c = 0; c < nPKs; c++)
{
char *p = *(p_vt->PKstrings + c);
if (p != NULL)
free (p);
}
free (p_vt->PKstrings);
}
p_vt->PKstrings = NULL;
p_vt->PKrows = 0;
p_vt->PKcols = 0;
}
static int
vpgCountPKcols (VirtualPgPtr p_vt)
{
/* counting how many PK columns */
int c;
int n = 0;
for (c = 0; c < p_vt->nColumns; c++)
{
if (*(p_vt->IsPK + c) == 'Y')
n++;
}
return n;
}
static char *
vpgDoubleQuoted (const char *value)
{
/*
/ returns a well formatted TEXT value for SQL
/ 1] strips trailing spaces
/ 2] masks any QUOTE inside the string, appending another QUOTE
*/
const char *p_in;
const char *p_end;
char qt = '"';
char *out;
char *p_out;
int len = 0;
int i;
if (!value)
return NULL;
p_end = value;
for (i = (strlen (value) - 1); i >= 0; i--)
{
/* stripping trailing spaces */
p_end = value + i;
if (value[i] != ' ')
break;
}
p_in = value;
while (p_in <= p_end)
{
/* computing the output length */
len++;
if (*p_in == qt)
len++;
p_in++;
}
if (len == 1 && *value == ' ')
{
/* empty string */
len = 0;
}
out = malloc (len + 3);
if (!out)
return NULL;
p_out = out;
p_in = value;
*p_out++ = '"';
if (len == 0)
{
/* empty string */
*p_out++ = '"';
*p_out = '\0';
return out;
}
while (p_in <= p_end)
{
/* creating the output string */
if (*p_in == qt)
*p_out++ = qt;
*p_out++ = *p_in++;
}
*p_out++ = '"';
*p_out = '\0';
return out;
}
static char *
vpgBuildPkWhere (VirtualPgPtr p_vt, int nRow)
{
/* attempting to build a WHERE clause (PK columns) */
int idx;
int c;
char *where;
char *prev;
int col_idx;
const char *col_name;
char *xcol_name;
const char *value;
int type;
int quoted;
if (nRow < 0 || nRow >= p_vt->PKrows)
return NULL;
where = sqlite3_mprintf (" WHERE");
idx = nRow * p_vt->PKcols;
for (c = 0; c < p_vt->PKcols; c++)
{
prev = where;
col_idx = *(p_vt->PKidx + c);
col_name = *(p_vt->Column + col_idx);
xcol_name = vpgDoubleQuoted (col_name);
type = *(p_vt->Mapping + col_idx);
value = *(p_vt->PKstrings + idx + c);
switch (type)
{
case SQLITE_TEXT:
case SQLITEX_DATE:
case SQLITEX_TIME:
case SQLITEX_DATETIME:
case SQLITEX_BOOL:
quoted = 1;
break;
default:
quoted = 0;
break;
};
if (value == NULL)
{
if (c == 0)
where = sqlite3_mprintf ("%s %s IS NULL", prev, xcol_name);
else
where =
sqlite3_mprintf ("%s AND %s IS NULL", prev, xcol_name);
}
else if (quoted)
{
if (c == 0)
where =
sqlite3_mprintf ("%s %s = %Q", prev, xcol_name, value);
else
where =
sqlite3_mprintf ("%s AND %s = %Q", prev, xcol_name,
value);
}
else
{
if (c == 0)
where =
sqlite3_mprintf ("%s %s = %s", prev, xcol_name, value);
else
where =
sqlite3_mprintf ("%s AND %s = %s", prev, xcol_name,
value);
}
free (xcol_name);
sqlite3_free (prev);
}
return where;
}
static char *
vpgBuildPkOrderBy (VirtualPgPtr p_vt)
{
/* attempting to build an ORDER BY clause (PK columns) */
int c;
char *orderBy;
char *prev;
const char *col_name;
char *xcol_name;
int first = 1;
orderBy = sqlite3_mprintf (" ORDER BY");
for (c = 0; c < p_vt->nColumns; c++)
{
if (p_vt->IsPK[c] == 'Y')
{
prev = orderBy;
col_name = *(p_vt->Column + c);
xcol_name = vpgDoubleQuoted (col_name);
if (first)
{
first = 0;
orderBy = sqlite3_mprintf ("%s %s", prev, xcol_name);
}
else
orderBy = sqlite3_mprintf ("%s, %s", prev, xcol_name);
free (xcol_name);
sqlite3_free (prev);
}
}
return orderBy;
}
static void
vpgSetNullValue (vpgSqliteValuePtr p)
{
/* setting a NULL value to the multitype */
if (!p)
return;
p->Type = SQLITE_NULL;
if (p->Text)
free (p->Text);
if (p->Blob)
free (p->Blob);
p->Text = NULL;
p->Blob = NULL;
}
static void
vpgSetIntValue (vpgSqliteValuePtr p, const char *v)
{
/* setting an INT value to the multitype */
sqlite3_int64 value = atoll (v);
if (!p)
return;
p->Type = SQLITE_INTEGER;
if (p->Text)
free (p->Text);
if (p->Blob)
free (p->Blob);
p->Text = NULL;
p->Blob = NULL;
p->IntValue = value;
}
static void
vpgSetDoubleValue (vpgSqliteValuePtr p, const char *v)
{
/* setting a DOUBLE value to the multitype */
double value = atof (v);
if (!p)
return;
p->Type = SQLITE_FLOAT;
if (p->Text)
free (p->Text);
if (p->Blob)
free (p->Blob);
p->Text = NULL;
p->Blob = NULL;
p->DoubleValue = value;
}
static void
vpgSetDateValue (vpgSqliteValuePtr p, double v)
{
/* setting a DOUBLE value to the multitype [DATE/TIME] */
if (!p)
return;
p->Type = SQLITE_FLOAT;
if (p->Text)
free (p->Text);
if (p->Blob)
free (p->Blob);
p->Text = NULL;
p->Blob = NULL;
p->DoubleValue = v;
}
static void
vpgSetBoolValue (vpgSqliteValuePtr p, const char *v)
{
/* setting a BOOL value to the multitype */
if (!p)
return;
p->Type = SQLITE_INTEGER;
if (p->Text)
free (p->Text);
if (p->Blob)
free (p->Blob);
p->Text = NULL;
p->Blob = NULL;
p->IntValue = 1;
if (strcmp (v, "f") == 0)
p->IntValue = 0;
}
static void
vpgSetTextValue (vpgSqliteValuePtr p, const char *value)
{
/* setting a TEXT value to the multitype */
int size = strlen (value);
if (!p)
return;
p->Type = SQLITE_TEXT;
if (p->Text)
free (p->Text);
if (p->Blob)
free (p->Blob);
p->Blob = NULL;
p->Text = malloc (size);
memcpy (p->Text, value, size);
p->Size = size;
}
static double
vpgMakeJulianDay (sqlite3 * db, const char *value)
{
/* computing a Julian Day */
int ret;
sqlite3_stmt *stmt;
double julian;
const char *sql = "SELECT JulianDay(?)";
ret = sqlite3_prepare_v2 (db, sql, strlen (sql), &stmt, NULL);
if (ret != SQLITE_OK)
{
char *emsg = sqlite3_mprintf ("SQLite error (JulianDay): %s\n",
sqlite3_errmsg (db));
vpgReportError (db, emsg);
sqlite3_free (emsg);
return 0.0;
}
sqlite3_reset (stmt);
sqlite3_clear_bindings (stmt);
sqlite3_bind_text (stmt, 1, value, strlen (value), SQLITE_STATIC);
ret = sqlite3_step (stmt);
if (ret == SQLITE_ROW)
julian = sqlite3_column_double (stmt, 0);
else
{
char *emsg = sqlite3_mprintf ("SQLite error (JulianDay): %s\n",
sqlite3_errmsg (db));
sqlite3_finalize (stmt);
vpgReportError (db, emsg);
sqlite3_free (emsg);
return 0.0;
}
sqlite3_finalize (stmt);
return julian;
}
static char *
vpgMakeDate (sqlite3 * db, double value)
{
/* tranforming a Julian Day into a DATE */
int ret;
sqlite3_stmt *stmt;
const char *str;
char *dt;
const char *sql = "SELECT Date(?)";
ret = sqlite3_prepare_v2 (db, sql, strlen (sql), &stmt, NULL);
if (ret != SQLITE_OK)
{
char *emsg = sqlite3_mprintf ("SQLite error (Date): %s\n",
sqlite3_errmsg (db));
vpgReportError (db, emsg);
sqlite3_free (emsg);
return sqlite3_mprintf ("%s", "1900-01-01");
}
sqlite3_reset (stmt);
sqlite3_clear_bindings (stmt);
sqlite3_bind_double (stmt, 1, value);
ret = sqlite3_step (stmt);
if (ret == SQLITE_ROW)
{
str = (const char *) sqlite3_column_text (stmt, 0);
dt = sqlite3_mprintf ("%s", str);
}
else
{
char *emsg = sqlite3_mprintf ("SQLite error (Date): %s\n",
sqlite3_errmsg (db));
vpgReportError (db, emsg);
sqlite3_free (emsg);
sqlite3_finalize (stmt);
return sqlite3_mprintf ("%s", "1900-01-01");
}
sqlite3_finalize (stmt);
return dt;
}
static char *
vpgMakeTime (sqlite3 * db, double value)
{
/* tranforming a Julian Day into a TIME */
int ret;
sqlite3_stmt *stmt;
const char *str;
char *dt;
const char *sql = "SELECT Time(?)";
ret = sqlite3_prepare_v2 (db, sql, strlen (sql), &stmt, NULL);
if (ret != SQLITE_OK)
{
char *emsg = sqlite3_mprintf ("SQLite error (Time): %s\n",
sqlite3_errmsg (db));
vpgReportError (db, emsg);
sqlite3_free (emsg);
return sqlite3_mprintf ("%s", "12:00:00.000");
}
sqlite3_reset (stmt);
sqlite3_clear_bindings (stmt);
sqlite3_bind_double (stmt, 1, value);
ret = sqlite3_step (stmt);
if (ret == SQLITE_ROW)
{
str = (const char *) sqlite3_column_text (stmt, 0);
dt = sqlite3_mprintf ("%s", str);
}
else
{
char *emsg = sqlite3_mprintf ("SQLite error (Time): %s\n",
sqlite3_errmsg (db));
vpgReportError (db, emsg);
sqlite3_free (emsg);
sqlite3_finalize (stmt);
return sqlite3_mprintf ("%s", "12:00:00.000");
}
sqlite3_finalize (stmt);
return dt;
}
static char *
vpgMakeDatetime (sqlite3 * db, double value)
{
/* tranforming a Julian Day into a TIMESTAMP */
int ret;
sqlite3_stmt *stmt;
const char *str;
char *dt;
const char *sql = "SELECT Datetime(?)";
ret = sqlite3_prepare_v2 (db, sql, strlen (sql), &stmt, NULL);
if (ret != SQLITE_OK)
{
char *emsg = sqlite3_mprintf ("SQLite error (Datetime): %s\n",
sqlite3_errmsg (db));
vpgReportError (db, emsg);
sqlite3_free (emsg);
return sqlite3_mprintf ("%s", "1900-01-01 12:00:00.000");
}
sqlite3_reset (stmt);
sqlite3_clear_bindings (stmt);
sqlite3_bind_double (stmt, 1, value);
ret = sqlite3_step (stmt);
if (ret == SQLITE_ROW)
{
str = (const char *) sqlite3_column_text (stmt, 0);
dt = sqlite3_mprintf ("%s", str);
}
else
{
char *emsg = sqlite3_mprintf ("SQLite error (Datetime): %s\n",
sqlite3_errmsg (db));
vpgReportError (db, emsg);
sqlite3_free (emsg);
sqlite3_finalize (stmt);
return sqlite3_mprintf ("%s", "1900-01-01 12:00:00.000");
}
sqlite3_finalize (stmt);
return dt;
}
static char *
vpgMakeBool (int value)
{
/* tranforming an INTEGER into a BOOL */
return sqlite3_mprintf ("%c", (value == 0) ? 'f' : 't');
}
static char *
vpgDequoted (const char *value)
{
/*
/ returns a well formatted TEXT value
/ 1] if the input string begins and ends with ' sigle quote will be the target
/ 2] if the input string begins and ends with " double quote will be the target
/ 3] in any othet case the string will simply be copied
*/
const char *pi = value;
const char *start;
const char *end;
char *clean;
char *po;
int len;
char target;
int mark = 0;
if (value == NULL)
return NULL;
len = strlen (value);
clean = malloc (len + 1);
if (*(value + 0) == '"' && *(value + len - 1) == '"')
target = '"';
else if (*(value + 0) == '\'' && *(value + len - 1) == '\'')
target = '\'';
else
{
/* no dequoting; simply copying */
strcpy (clean, value);
return clean;
}
start = value;
end = value + len - 1;
po = clean;
while (*pi != '\0')
{
if (mark)
{
if (*pi == target)
{
*po++ = *pi++;
mark = 0;
continue;
}
else
{
/* error: mismatching quote */
free (clean);
return NULL;
}
}
if (*pi == target)
{
if (pi == start || pi == end)
{
/* first or last char */
pi++;
continue;
}
/* found a quote marker */
mark = 1;
pi++;
continue;
}
*po++ = *pi++;
}
*po = '\0';
return clean;
}
static int
vpgMapType (const char *type)
{
/* mapping a PostgreSQL data-type to SQLite */
if (strcmp (type, "int2") == 0)
return SQLITE_INTEGER;
if (strcmp (type, "int4") == 0)
return SQLITE_INTEGER;
if (strcmp (type, "int8") == 0)
return SQLITE_INTEGER;
if (strcmp (type, "float4") == 0)
return SQLITE_FLOAT;
if (strcmp (type, "float8") == 0)
return SQLITE_FLOAT;
if (strcmp (type, "money") == 0)
return SQLITE_FLOAT;
if (strcmp (type, "numeric") == 0)
return SQLITE_FLOAT;
if (strcmp (type, "date") == 0)
return SQLITEX_DATE;
if (strcmp (type, "time") == 0)
return SQLITEX_TIME;
if (strcmp (type, "timestamp") == 0)
return SQLITEX_DATETIME;
if (strcmp (type, "bool") == 0)
return SQLITEX_BOOL;
return SQLITE_TEXT;
}
static void
vpgFreeTable (VirtualPgPtr p_vt)
{
/* memory cleanup; freeing the virtual table struct */
int i;
if (!p_vt)
return;
if (p_vt->pg_conn)
vpgPQfinish (p_vt->pg_conn);
if (p_vt->pg_schema)
sqlite3_free (p_vt->pg_schema);
if (p_vt->pg_table)
sqlite3_free (p_vt->pg_table);
if (p_vt->Column)
{
for (i = 0; i < p_vt->nColumns; i++)
{
if (*(p_vt->Column + i))
sqlite3_free (*(p_vt->Column + i));
}
sqlite3_free (p_vt->Column);
}
if (p_vt->Type)
{
for (i = 0; i < p_vt->nColumns; i++)
{
if (*(p_vt->Type + i))
sqlite3_free (*(p_vt->Type + i));
}
sqlite3_free (p_vt->Type);
}
if (p_vt->Mapping)
sqlite3_free (p_vt->Mapping);
if (p_vt->MaxSize)
sqlite3_free (p_vt->MaxSize);
if (p_vt->NotNull)
sqlite3_free (p_vt->NotNull);
if (p_vt->IsPK)
sqlite3_free (p_vt->IsPK);
vpgFreePKstrings (p_vt);
sqlite3_free (p_vt);
}
static int
vpgInsertRow (VirtualPgPtr p_vt, int argc, sqlite3_value ** argv)
{
/* trying to insert a row into the PostgreSQL real-table */
vpgMemBuffer sql_statement;
char *sql;
char *schema;
char *table;
char *xname;
int c;
int comma;
PGresult *res;
char dummy[1024];
char *emsg;
vpgResetError (p_vt->db);
/* preparing the PostgreSQL query */
schema = vpgDoubleQuoted (p_vt->pg_schema);
table = vpgDoubleQuoted (p_vt->pg_table);
sql = sqlite3_mprintf ("INSERT INTO %s.%s (", schema, table);
free (schema);
free (table);
vpgMemBufferInitialize (&sql_statement);
vpgMemBufferAppend (&sql_statement, sql);
sqlite3_free (sql);
comma = 0;
for (c = 0; c < p_vt->nColumns; c++)
{
/* column names */
xname = vpgDoubleQuoted (*(p_vt->Column + c));
if (comma)
vpgMemBufferAppend (&sql_statement, ", ");
vpgMemBufferAppend (&sql_statement, xname);
free (xname);
comma = 1;
}
vpgMemBufferAppend (&sql_statement, ") VALUES (");
comma = 0;
for (c = 2; c < argc; c++)
{
/* column values */
int pg_type = *(p_vt->Mapping + c - 2);
if (comma)
vpgMemBufferAppend (&sql_statement, ", ");
switch (pg_type)
{
case SQLITE_INTEGER:
#if defined(_WIN32) || defined(__MINGW32__)
/* CAVEAT - M$ runtime doesn't supports %lld for 64 bits */
sprintf (dummy, "%I64d", sqlite3_value_int64 (argv[c]));
#else
sprintf (dummy, "%lld", sqlite3_value_int64 (argv[c]));
#endif
sql = sqlite3_mprintf ("%s", dummy);
vpgMemBufferAppend (&sql_statement, sql);
sqlite3_free (sql);
break;
case SQLITE_FLOAT:
sql =
sqlite3_mprintf ("%1.16f", sqlite3_value_double (argv[c]));
vpgMemBufferAppend (&sql_statement, sql);
sqlite3_free (sql);
break;
case SQLITEX_DATE:
if (p_vt->julianNumbers)
{
xname =
vpgMakeDate (p_vt->db,
sqlite3_value_double (argv[c]));
sql = sqlite3_mprintf ("%Q", xname);
sqlite3_free (xname);
}
else
sql = sqlite3_mprintf ("%Q", sqlite3_value_text (argv[c]));
vpgMemBufferAppend (&sql_statement, sql);
sqlite3_free (sql);
break;
case SQLITEX_TIME:
if (p_vt->julianNumbers)
{
xname =
vpgMakeTime (p_vt->db,
sqlite3_value_double (argv[c]));
sql = sqlite3_mprintf ("%Q", xname);
sqlite3_free (xname);
}
else
sql = sqlite3_mprintf ("%Q", sqlite3_value_text (argv[c]));
vpgMemBufferAppend (&sql_statement, sql);
sqlite3_free (sql);
break;
case SQLITEX_DATETIME:
if (p_vt->julianNumbers)
{
xname =
vpgMakeDatetime (p_vt->db,
sqlite3_value_double (argv[c]));
sql = sqlite3_mprintf ("%Q", xname);
sqlite3_free (xname);
}
else
sql = sqlite3_mprintf ("%Q", sqlite3_value_text (argv[c]));
vpgMemBufferAppend (&sql_statement, sql);
sqlite3_free (sql);
break;
case SQLITEX_BOOL:
xname = vpgMakeBool (sqlite3_value_int (argv[c]));
sql = sqlite3_mprintf ("%Q", xname);
sqlite3_free (xname);
vpgMemBufferAppend (&sql_statement, sql);
sqlite3_free (sql);
break;
case SQLITE_TEXT:
sql =
sqlite3_mprintf ("%Q",
(const char *)
sqlite3_value_text (argv[c]));
vpgMemBufferAppend (&sql_statement, sql);
sqlite3_free (sql);
break;
default:
vpgMemBufferAppend (&sql_statement, "NULL");
break;
};
comma = 1;
}
vpgMemBufferAppend (&sql_statement, ")");
if (sql_statement.Error == 0 && sql_statement.Buffer != NULL)
{
res = vpgPQexec (p_vt->pg_conn, sql_statement.Buffer);
if (vpgPQresultStatus (res) != PGRES_COMMAND_OK)
goto illegal;
vpgMemBufferReset (&sql_statement);
vpgPQclear (res);
return SQLITE_OK;
}
illegal:
emsg =
sqlite3_mprintf ("Postgres INSERT failed:\n%s",
vpgPQerrorMessage (p_vt->pg_conn));
vpgReportError (p_vt->db, emsg);
sqlite3_free (emsg);
vpgMemBufferReset (&sql_statement);
vpgPQclear (res);
return SQLITE_ERROR;
}
static int
vpgUpdateRow (VirtualPgPtr p_vt, int nRow, int argc, sqlite3_value ** argv)
{
/* trying to update a row into the PostgreSQL real-table */
vpgMemBuffer sql_statement;
char *sql;
char *schema;
char *table;
char *xname;
char *where;
int c;
int comma;
PGresult *res;
char dummy[1024];
char *emsg;
vpgResetError (p_vt->db);
if (p_vt->nColumns + 2 != argc)
{
emsg = sqlite3_mprintf ("UPDATE failed: mismatching argc count\n");
vpgReportError (p_vt->db, emsg);
sqlite3_free (emsg);
return SQLITE_ERROR;
}
/* preparing the PostgreSQL query */
schema = vpgDoubleQuoted (p_vt->pg_schema);
table = vpgDoubleQuoted (p_vt->pg_table);
sql = sqlite3_mprintf ("UPDATE %s.%s SET ", schema, table);
free (schema);
free (table);
vpgMemBufferInitialize (&sql_statement);
vpgMemBufferAppend (&sql_statement, sql);
sqlite3_free (sql);
comma = 0;
for (c = 0; c < p_vt->nColumns; c++)
{
/* column names */
int pg_type = *(p_vt->Mapping + c);
int c2 = c + 2;
xname = vpgDoubleQuoted (*(p_vt->Column + c));
if (comma)
vpgMemBufferAppend (&sql_statement, ", ");
vpgMemBufferAppend (&sql_statement, xname);
free (xname);
vpgMemBufferAppend (&sql_statement, " = ");
switch (pg_type)
{
case SQLITE_INTEGER:
#if defined(_WIN32) || defined(__MINGW32__)
// CAVEAT - M$ runtime doesn't supports %lld for 64 bits
sprintf (dummy, "%I64d", sqlite3_value_int64 (argv[c2]));
#else
sprintf (dummy, "%lld", sqlite3_value_int64 (argv[c2]));
#endif
sql = sqlite3_mprintf ("%s", dummy);
vpgMemBufferAppend (&sql_statement, sql);
sqlite3_free (sql);
break;
case SQLITE_FLOAT:
sql =
sqlite3_mprintf ("%1.16f", sqlite3_value_double (argv[c2]));
vpgMemBufferAppend (&sql_statement, sql);
sqlite3_free (sql);
break;
case SQLITEX_DATE:
if (p_vt->julianNumbers)
{
xname =
vpgMakeDate (p_vt->db,
sqlite3_value_double (argv[c2]));
sql = sqlite3_mprintf ("%Q", xname);
sqlite3_free (xname);
}
else
sql = sqlite3_mprintf ("%Q", sqlite3_value_text (argv[c2]));
vpgMemBufferAppend (&sql_statement, sql);
sqlite3_free (sql);
break;
case SQLITEX_TIME:
if (p_vt->julianNumbers)
{
xname =
vpgMakeTime (p_vt->db,
sqlite3_value_double (argv[c2]));
sql = sqlite3_mprintf ("%Q", xname);
sqlite3_free (xname);
}
else
sql = sqlite3_mprintf ("%Q", sqlite3_value_text (argv[c2]));
vpgMemBufferAppend (&sql_statement, sql);
sqlite3_free (sql);
break;
case SQLITEX_DATETIME:
if (p_vt->julianNumbers)
{
xname =
vpgMakeDatetime (p_vt->db,
sqlite3_value_double (argv[c2]));
sql = sqlite3_mprintf ("%Q", xname);
sqlite3_free (xname);
}
else
sql = sqlite3_mprintf ("%Q", sqlite3_value_text (argv[c2]));
vpgMemBufferAppend (&sql_statement, sql);
sqlite3_free (sql);
break;
case SQLITEX_BOOL:
xname = vpgMakeBool (sqlite3_value_int (argv[c2]));
sql = sqlite3_mprintf ("%Q", xname);
sqlite3_free (xname);
vpgMemBufferAppend (&sql_statement, sql);
sqlite3_free (sql);
break;
case SQLITE_TEXT:
sql =
sqlite3_mprintf ("%Q",
(const char *)
sqlite3_value_text (argv[c2]));
vpgMemBufferAppend (&sql_statement, sql);
sqlite3_free (sql);
break;
default:
vpgMemBufferAppend (&sql_statement, "NULL");
break;
};
comma = 1;
}
/* appending the WHERE clause (PK values) */
where = vpgBuildPkWhere (p_vt, nRow);
if (where == NULL)
goto illegal_pk;
vpgMemBufferAppend (&sql_statement, where);
sqlite3_free (where);
if (sql_statement.Error == 0 && sql_statement.Buffer != NULL)
{
res = vpgPQexec (p_vt->pg_conn, sql_statement.Buffer);
if (vpgPQresultStatus (res) != PGRES_COMMAND_OK)
goto illegal;
vpgMemBufferReset (&sql_statement);
vpgPQclear (res);
return SQLITE_OK;
}
illegal_pk:
emsg =
sqlite3_mprintf
("Postgres UPDATE failed: unable to get PK values, sorry");
vpgReportError (p_vt->db, emsg);
sqlite3_free (emsg);
vpgMemBufferReset (&sql_statement);
return SQLITE_ERROR;
illegal:
emsg =
sqlite3_mprintf ("Postgres UPDATE failed:\n%s",
vpgPQerrorMessage (p_vt->pg_conn));
vpgReportError (p_vt->db, emsg);
sqlite3_free (emsg);
vpgMemBufferReset (&sql_statement);
vpgPQclear (res);
return SQLITE_ERROR;
}
static int
vpgDeleteRow (VirtualPgPtr p_vt, int nRow)
{
/* trying to delete a row from the PostgreSQL real-table */
vpgMemBuffer sql_statement;
char *schema;
char *table;
char *where;
char *sql;
PGresult *res;
char *emsg;
vpgResetError (p_vt->db);
schema = vpgDoubleQuoted (p_vt->pg_schema);
table = vpgDoubleQuoted (p_vt->pg_table);
sql = sqlite3_mprintf ("DELETE FROM %s.%s", schema, table);
free (schema);
free (table);
vpgMemBufferInitialize (&sql_statement);
vpgMemBufferAppend (&sql_statement, sql);
sqlite3_free (sql);
/* appending the WHERE clause (PK values) */
where = vpgBuildPkWhere (p_vt, nRow);
if (where == NULL)
goto illegal_pk;
vpgMemBufferAppend (&sql_statement, where);
sqlite3_free (where);
if (sql_statement.Error == 0 && sql_statement.Buffer != NULL)
{
res = vpgPQexec (p_vt->pg_conn, sql_statement.Buffer);
if (vpgPQresultStatus (res) != PGRES_COMMAND_OK)
goto illegal;
vpgMemBufferReset (&sql_statement);
vpgPQclear (res);
return SQLITE_OK;
}
illegal_pk:
emsg =
sqlite3_mprintf
("Postgres DELETE failed: unable to get PK values, sorry");
vpgReportError (p_vt->db, emsg);
sqlite3_free (emsg);
vpgMemBufferReset (&sql_statement);
return SQLITE_ERROR;
illegal:
emsg =
sqlite3_mprintf ("Postgres DELETE failed:\n%s",
vpgPQerrorMessage (p_vt->pg_conn));
vpgReportError (p_vt->db, emsg);
sqlite3_free (emsg);
vpgPQclear (res);
return SQLITE_ERROR;
}
static void
vpgReadRow (VirtualPgCursorPtr cursor)
{
/* trying to read a row from PostgreSQL real-table */
int r;
int c;
PGresult *res = cursor->resultSet;
vpgSqliteValuePtr value;
if (cursor->currentRow >= cursor->nRows)
{
/* EOF condition */
cursor->eof = 1;
return;
}
r = cursor->currentRow;
for (c = 0; c < cursor->nFields; c++)
{
/* extracting/converting column values */
value = *(cursor->Values + c);
if (vpgPQgetisnull (res, r, c))
vpgSetNullValue (value);
else
{
const char *pg_value = vpgPQgetvalue (res, r, c);
switch (*(cursor->pVtab->Mapping + c))
{
case SQLITE_INTEGER:
vpgSetIntValue (value, pg_value);
break;
case SQLITE_FLOAT:
vpgSetDoubleValue (value, pg_value);
break;
case SQLITEX_DATE:
case SQLITEX_TIME:
case SQLITEX_DATETIME:
if (cursor->pVtab->julianNumbers)
vpgSetDateValue (value,
vpgMakeJulianDay (cursor->pVtab->db,
pg_value));
else
vpgSetTextValue (value, pg_value);
break;
case SQLITEX_BOOL:
vpgSetBoolValue (value, pg_value);
break;
case SQLITE_TEXT:
vpgSetTextValue (value, pg_value);
break;
default:
vpgSetNullValue (value);
};
}
}
}
static void
vpgCheckView (PGconn * pg_conn, const char *pg_schema, const char *pg_name,
VirtualPgPtr p_vt)
{
/* testing for a Postgres View */
char *sql;
PGresult *res;
int nRows;
int nFields;
int count = 0;
sql =
sqlite3_mprintf
("SELECT count(*) FROM pg_views where schemaname = %Q AND viewname = %Q",
pg_schema, pg_name);
res = vpgPQexec (pg_conn, sql);
sqlite3_free (sql);
if (vpgPQresultStatus (res) != PGRES_TUPLES_OK)
goto illegal;
nRows = vpgPQntuples (res);
nFields = vpgPQnfields (res);
if (nRows == 1 && nFields == 1)
count = atoi (vpgPQgetvalue (res, 0, 0));
if (count)
p_vt->isView = 1;
illegal:
/* something has gone the wrong way */
if (res != NULL)
vpgPQclear (res);
}
static int
vpg_setPKcolumn (VirtualPgPtr p_vt, const char *column)
{
/* marking a PK column */
int c;
for (c = 0; c < p_vt->nColumns; c++)
{
if (strcmp (column, *(p_vt->Column + c)) == 0)
{
*(p_vt->IsPK + c) = 'Y';
return 1;
}
}
return 0;
}
/*************************************************************
/
/ VirtualTable methods
/
**************************************************************/
static int
vpg_create (sqlite3 * db, void *pAux, int argc, const char *const *argv,
sqlite3_vtab ** ppVTab, char **pzErr)
{
/* creates the virtual table connected to some PostgreSQL table */
char *vtable;
char *conninfo;
char *pg_schema;
char *pg_table;
char prefix[64];
char *xname;
const char *col_name;
const char *col_type;
const char *is_not_null;
int max_size;
int not_null;
char *sql;
PGconn *pg_conn;
PGresult *res;
int nRows;
int nFields;
int r;
int len;
int readOnly = 1;
int julianNumbers = 0;
int nPKs = 0;
VirtualPgPtr p_vt = NULL;
vpgMemBuffer sql_statement;
char *emsg;
vpgResetError (db);
if (pAux)
pAux = pAux; /* unused arg warning suppression */
/* checking arguments */
if (argc >= 6 || argc <= 8)
{
vtable = vpgDequoted ((char *) argv[2]);
conninfo = vpgDequoted ((char *) argv[3]);
pg_schema = vpgDequoted ((char *) argv[4]);
pg_table = vpgDequoted ((char *) argv[5]);
if (argc >= 7)
{
/* testing for W */
char *wr = vpgDequoted ((char *) argv[6]);
if (strcmp (wr, "W") == 0)
readOnly = 0;
free (wr);
}
if (argc == 8)
{
/* testing for J */
char *julian = vpgDequoted ((char *) argv[7]);
if (strcmp (julian, "J") == 0)
julianNumbers = 1;
free (julian);
}
}
else
{
*pzErr
=
sqlite3_mprintf
("[VirtualPostgres] CREATE VIRTUAL: illegal arg list {conn_info, schema, table [ , 'W' [ , 'J' ]] }\n");
goto error;
}
/* Make a connection to the PostgreSQL database */
pg_conn = vpgPQconnectdb (conninfo);
if (vpgPQstatus (pg_conn) != CONNECTION_OK)
{
emsg =
sqlite3_mprintf ("Connection to Postgres failed:\n%s",
vpgPQerrorMessage (pg_conn));
vpgReportError (db, emsg);
sqlite3_free (emsg);
vpgPQfinish (pg_conn);
goto create_emergency_default;
}
/* retrieving the PosgreSQL table columns */
sql =
sqlite3_mprintf
("SELECT c.attname, d.typname, c.atttypmod, c.attnotnull "
"FROM pg_attribute AS c " "JOIN pg_class AS t ON (c.attrelid = t.oid) "
"JOIN pg_namespace AS s ON (t.relnamespace = s.oid) "
"JOIN pg_type AS d ON (c.atttypid = d.oid) "
"WHERE s.nspname = %Q AND t.relname = %Q AND c.attnum > 0 "
"ORDER BY c.attnum", pg_schema, pg_table);
res = vpgPQexec (pg_conn, sql);
sqlite3_free (sql);
if (vpgPQresultStatus (res) != PGRES_TUPLES_OK)
goto illegal;
nRows = vpgPQntuples (res);
nFields = vpgPQnfields (res);
if (nRows > 0 && nFields == 4)
{
p_vt = (VirtualPgPtr) sqlite3_malloc (sizeof (VirtualPg));
if (!p_vt)
return SQLITE_NOMEM;
p_vt->db = db;
p_vt->ConnInfo = conninfo;
p_vt->isView = 0;
p_vt->readOnly = readOnly;
p_vt->pendingTransaction = 0;
p_vt->julianNumbers = julianNumbers;
p_vt->pg_conn = pg_conn;
p_vt->nRef = 0;
p_vt->zErrMsg = NULL;
len = strlen (pg_schema);
p_vt->pg_schema = sqlite3_malloc (len + 1);
strcpy (p_vt->pg_schema, pg_schema);
len = strlen (pg_table);
p_vt->pg_table = sqlite3_malloc (len + 1);
strcpy (p_vt->pg_table, pg_table);
p_vt->nColumns = nRows;
p_vt->Column = sqlite3_malloc (sizeof (char *) * nRows);
p_vt->Type = sqlite3_malloc (sizeof (char *) * nRows);
p_vt->Mapping = sqlite3_malloc (sizeof (int) * nRows);
p_vt->MaxSize = sqlite3_malloc (sizeof (int) * nRows);
p_vt->NotNull = sqlite3_malloc (sizeof (int) * nRows);
p_vt->IsPK = sqlite3_malloc (sizeof (char) * nRows);
p_vt->newRowid = 0;
p_vt->PKstrings = NULL;
p_vt->PKidx = NULL;
p_vt->PKrows = 0;
p_vt->PKcols = 0;
for (r = 0; r < p_vt->nColumns; r++)
{
*(p_vt->Column + r) = NULL;
*(p_vt->Type + r) = NULL;
*(p_vt->Mapping + r) = SQLITE_NULL;
*(p_vt->MaxSize + r) = -1;
*(p_vt->NotNull + r) = -1;
*(p_vt->IsPK + r) = 'N';
}
for (r = 0; r < nRows; r++)
{
col_name = vpgPQgetvalue (res, r, 0);
col_type = vpgPQgetvalue (res, r, 1);
max_size = atoi (vpgPQgetvalue (res, r, 2));
if (max_size > 0)
max_size -= 4;
is_not_null = vpgPQgetvalue (res, r, 3);
if (*is_not_null == 't')
not_null = 1;
else
not_null = 0;
len = strlen (col_name);
*(p_vt->Column + r) = sqlite3_malloc (len + 1);
strcpy (*(p_vt->Column + r), col_name);
len = strlen (col_type);
*(p_vt->Type + r) = sqlite3_malloc (len + 1);
strcpy (*(p_vt->Type + r), col_type);
*(p_vt->Mapping + r) = vpgMapType (col_type);
*(p_vt->MaxSize + r) = max_size;
*(p_vt->NotNull + r) = not_null;
}
}
vpgPQclear (res);
res = NULL;
if (p_vt == NULL)
goto illegal;
/* retrieving all PosgreSQL PK columns */
sql =
sqlite3_mprintf
("SELECT a.attname FROM pg_index AS i "
"JOIN pg_attribute AS a ON (a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)) "
"WHERE i.indrelid = '%s.%s'::regclass AND i.indisprimary", pg_schema,
pg_table);
res = vpgPQexec (pg_conn, sql);
sqlite3_free (sql);
if (vpgPQresultStatus (res) != PGRES_TUPLES_OK)
goto illegal;
nRows = vpgPQntuples (res);
nFields = vpgPQnfields (res);
if (nRows > 0 && nFields == 1)
{
for (r = 0; r < nRows; r++)
{
col_name = vpgPQgetvalue (res, r, 0);
if (!vpg_setPKcolumn (p_vt, col_name))
goto illegal;
nPKs++;
}
}
vpgPQclear (res);
res = NULL;
if (nPKs == 0)
{
/* missing PK; defaulting to ReadOnly */
p_vt->readOnly = 1;
}
/* preparing the COLUMNs for this VIRTUAL TABLE */
vpgMemBufferInitialize (&sql_statement);
xname = vpgDoubleQuoted (vtable);
sql = sqlite3_mprintf ("CREATE TABLE %s ", xname);
free (xname);
vpgMemBufferAppend (&sql_statement, sql);
sqlite3_free (sql);
for (r = 0; r < p_vt->nColumns; r++)
{
if (r == 0)
strcpy (prefix, "(");
else
strcpy (prefix, ", ");
xname = vpgDoubleQuoted (*(p_vt->Column + r));
if (*(p_vt->MaxSize + r) > 0)
{
if (*(p_vt->NotNull + r))
sql =
sqlite3_mprintf ("%s%s %s(%d) NOT NULL", prefix, xname,
*(p_vt->Type + r),
*(p_vt->MaxSize + r));
else
sql =
sqlite3_mprintf ("%s%s %s(%d)", prefix, xname,
*(p_vt->Type + r),
*(p_vt->MaxSize + r));
}
else
{
if (*(p_vt->NotNull + r))
sql =
sqlite3_mprintf ("%s%s %s NOT NULL", prefix, xname,
*(p_vt->Type + r));
else
sql =
sqlite3_mprintf ("%s%s %s", prefix, xname,
*(p_vt->Type + r));
}
free (xname);
vpgMemBufferAppend (&sql_statement, sql);
sqlite3_free (sql);
}
vpgMemBufferAppend (&sql_statement, ")");
if (sql_statement.Error == 0 && sql_statement.Buffer != NULL)
{
if (sqlite3_declare_vtab (db, sql_statement.Buffer) != SQLITE_OK)
{
*pzErr
=
sqlite3_mprintf
("[VirtualPostgres] CREATE VIRTUAL: invalid SQL statement \"%s\"",
sql);
goto error;
}
vpgMemBufferReset (&sql_statement);
}
else
goto error;
*ppVTab = (sqlite3_vtab *) p_vt;
free (vtable);
vpgCheckView (pg_conn, pg_schema, pg_table, p_vt);
free (pg_schema);
free (pg_table);
vpgPQfinish (pg_conn);
p_vt->pg_conn = NULL;
return SQLITE_OK;
create_emergency_default:
/* preparing a fake VIRTUAL TABLE */
xname = vpgDoubleQuoted (vtable);
sql = sqlite3_mprintf ("CREATE TABLE %s (dummy integer)", xname);
free (xname);
if (sqlite3_declare_vtab (db, sql) != SQLITE_OK)
{
*pzErr
=
sqlite3_mprintf
("[VirtualPostgres] CREATE VIRTUAL: invalid SQL statement \"%s\"",
sql);
goto error;
}
sqlite3_free (sql);
nRows = 1;
p_vt = (VirtualPgPtr) sqlite3_malloc (sizeof (VirtualPg));
if (!p_vt)
return SQLITE_NOMEM;
p_vt->db = db;
p_vt->ConnInfo = conninfo;
p_vt->isView = 0;
p_vt->readOnly = 1;
p_vt->pendingTransaction = 0;
p_vt->julianNumbers = 0;
p_vt->pg_conn = NULL;
p_vt->nRef = 0;
p_vt->zErrMsg = NULL;
len = strlen (pg_schema);
p_vt->pg_schema = sqlite3_malloc (len + 1);
strcpy (p_vt->pg_schema, pg_schema);
len = strlen (pg_table);
p_vt->pg_table = sqlite3_malloc (len + 1);
strcpy (p_vt->pg_table, pg_table);
p_vt->nColumns = nRows;
p_vt->Column = sqlite3_malloc (sizeof (char *) * nRows);
p_vt->Type = sqlite3_malloc (sizeof (char *) * nRows);
p_vt->Mapping = sqlite3_malloc (sizeof (int) * nRows);
p_vt->MaxSize = sqlite3_malloc (sizeof (int) * nRows);
p_vt->NotNull = sqlite3_malloc (sizeof (int) * nRows);
p_vt->IsPK = sqlite3_malloc (sizeof (char) * nRows);
p_vt->newRowid = 0;
p_vt->PKstrings = NULL;
p_vt->PKidx = NULL;
p_vt->PKrows = 0;
p_vt->PKcols = 0;
for (r = 0; r < p_vt->nColumns; r++)
{
*(p_vt->Column + r) = NULL;
*(p_vt->Type + r) = NULL;
*(p_vt->Mapping + r) = SQLITE_NULL;
*(p_vt->MaxSize + r) = -1;
*(p_vt->NotNull + r) = -1;
*(p_vt->IsPK + r) = 'N';
}
for (r = 0; r < nRows; r++)
{
col_name = "dummy";
col_type = "integer";
not_null = 0;
max_size = 10;
len = strlen (col_name);
*(p_vt->Column + r) = sqlite3_malloc (len + 1);
strcpy (*(p_vt->Column + r), col_name);
len = strlen (col_type);
*(p_vt->Type + r) = sqlite3_malloc (len + 1);
strcpy (*(p_vt->Type + r), col_type);
*(p_vt->Mapping + r) = vpgMapType (col_type);
*(p_vt->MaxSize + r) = max_size;
*(p_vt->NotNull + r) = not_null;
}
*ppVTab = (sqlite3_vtab *) p_vt;
free (vtable);
free (pg_schema);
free (pg_table);
return SQLITE_OK;
illegal:
/* something has gone the wrong way */
if (res != NULL)
vpgPQclear (res);
*pzErr
=
sqlite3_mprintf
("[VirtualPostgres] '%s'.'%s' isn't a valid Postgres table\n",
pg_schema, pg_table);
vpgPQfinish (pg_conn);
p_vt->pg_conn = NULL;
p_vt->ConnInfo = conninfo;
conninfo = NULL;
error:
if (vtable != NULL)
free (vtable);
if (conninfo != NULL)
free (conninfo);
if (pg_schema)
free (pg_schema);
if (pg_table)
free (pg_table);
if (p_vt)
vpgFreeTable (p_vt);
return SQLITE_ERROR;
}
static int
vpg_connect (sqlite3 * db, void *pAux, int argc, const char *const *argv,
sqlite3_vtab ** ppVTab, char **pzErr)
{
/* connects the virtual table to some shapefile - simply aliases vpg_create() */
return vpg_create (db, pAux, argc, argv, ppVTab, pzErr);
}
static int
vpg_best_index (sqlite3_vtab * pVTab, sqlite3_index_info * pIndex)
{
/* best index selection */
if (pVTab || pIndex)
pVTab = pVTab; /* unused arg warning suppression */
return SQLITE_OK;
}
static int
vpg_disconnect (sqlite3_vtab * pVTab)
{
/* disconnects the virtual table */
VirtualPgPtr p_vt = (VirtualPgPtr) pVTab;
if (p_vt->ConnInfo != NULL)
free (p_vt->ConnInfo);
vpgFreeTable (p_vt);
return SQLITE_OK;
}
static int
vpg_destroy (sqlite3_vtab * pVTab)
{
/* destroys the virtual table - simply aliases vpg_disconnect() */
return vpg_disconnect (pVTab);
}
static int
vpg_open (sqlite3_vtab * pVTab, sqlite3_vtab_cursor ** ppCursor)
{
/* opening a new cursor */
int c;
char *xname;
PGresult *res;
int nRows;
int nFields;
int first = 1;
char *emsg;
vpgMemBuffer sql_statement;
VirtualPgCursorPtr cursor = NULL;
VirtualPgPtr p_vt = (VirtualPgPtr) pVTab;
p_vt->pg_conn = vpgPQconnectdb (p_vt->ConnInfo);
if (vpgPQstatus (p_vt->pg_conn) != CONNECTION_OK)
{
emsg =
sqlite3_mprintf ("Connection to Postgres failed:\n%s",
vpgPQerrorMessage (p_vt->pg_conn));
vpgReportError (p_vt->db, emsg);
sqlite3_free (emsg);
vpgPQfinish (p_vt->pg_conn);
p_vt->pg_conn = NULL;
}
/* preparing the PostgreSQL query */
vpgMemBufferInitialize (&sql_statement);
vpgMemBufferAppend (&sql_statement, "SELECT");
for (c = 0; c < p_vt->nColumns; c++)
{
xname = vpgDoubleQuoted (*(p_vt->Column + c));
if (first)
{
first = 0;
vpgMemBufferAppend (&sql_statement, " ");
}
else
vpgMemBufferAppend (&sql_statement, ", ");
vpgMemBufferAppend (&sql_statement, xname);
free (xname);
}
vpgMemBufferAppend (&sql_statement, " FROM ");
xname = vpgDoubleQuoted (p_vt->pg_schema);
vpgMemBufferAppend (&sql_statement, xname);
free (xname);
vpgMemBufferAppend (&sql_statement, ".");
xname = vpgDoubleQuoted (p_vt->pg_table);
vpgMemBufferAppend (&sql_statement, xname);
free (xname);
if (p_vt->readOnly == 0)
{
/* appending the ORDER BY <PK> clause */
char *orderBy = vpgBuildPkOrderBy (p_vt);
if (orderBy != NULL)
{
vpgMemBufferAppend (&sql_statement, orderBy);
sqlite3_free (orderBy);
}
}
if (sql_statement.Error == 0 && sql_statement.Buffer != NULL)
{
res = vpgPQexec (p_vt->pg_conn, sql_statement.Buffer);
if (vpgPQresultStatus (res) != PGRES_TUPLES_OK)
goto illegal;
nRows = vpgPQntuples (res);
nFields = vpgPQnfields (res);
vpgMemBufferReset (&sql_statement);
}
else
return SQLITE_ERROR;
cursor = (VirtualPgCursorPtr) sqlite3_malloc (sizeof (VirtualPgCursor));
if (cursor == NULL)
return SQLITE_ERROR;
cursor->resultSet = res;
cursor->nRows = nRows;
cursor->nFields = nFields;
cursor->nColumns = p_vt->nColumns;
p_vt->newRowid = nRows;
cursor->Values = sqlite3_malloc (sizeof (vpgSqliteValue) * cursor->nFields);
for (c = 0; c < cursor->nColumns; c++)
{
vpgSqliteValue *val = vpgAllocValue ();
*(cursor->Values + c) = val;
}
cursor->pVtab = p_vt;
cursor->eof = 0;
cursor->currentRow = 0;
if (p_vt->readOnly == 0)
{
/* saving all PK values */
vpgFreePKstrings (p_vt);
p_vt->PKcols = vpgCountPKcols (p_vt);
if (nRows > 0 && p_vt->PKcols > 0)
{
/* allocating and initializing the PK struct */
int k;
int r;
int nPKs = nRows * p_vt->PKcols;
p_vt->PKrows = nRows;
p_vt->PKstrings = malloc (sizeof (char *) * nPKs);
p_vt->PKidx = malloc (sizeof (int) * p_vt->PKcols);
r = 0;
for (c = 0; c < p_vt->nColumns; c++)
{
/* initializing the PK column indices array */
if (*(p_vt->IsPK + c) == 'Y')
{
*(p_vt->PKidx + r) = c;
r++;
}
}
k = 0;
for (r = 0; r < nRows; r++)
{
for (c = 0; c < p_vt->PKcols; c++)
{
if (vpgPQgetisnull (res, r, *(p_vt->PKidx + c)))
*(p_vt->PKstrings + k) = NULL;
else
{
int len;
const char *value = vpgPQgetvalue (res, r,
*
(p_vt->
PKidx +
c));
len = strlen (value);
*(p_vt->PKstrings + k) = malloc (len + 1);
strcpy (*(p_vt->PKstrings + k), value);
}
k++;
}
}
}
}
*ppCursor = (sqlite3_vtab_cursor *) cursor;
vpgReadRow (cursor);
return SQLITE_OK;
illegal:
vpgPQclear (res);
vpgMemBufferReset (&sql_statement);
return SQLITE_ERROR;
}
static int
vpg_close (sqlite3_vtab_cursor * pCursor)
{
/* closing the cursor */
int c;
VirtualPgCursorPtr cursor = (VirtualPgCursorPtr) pCursor;
vpgPQfinish (cursor->pVtab->pg_conn);
cursor->pVtab->pg_conn = NULL;
for (c = 0; c < cursor->nColumns; c++)
vpgFreeValue (*(cursor->Values + c));
sqlite3_free (cursor->Values);
vpgPQclear (cursor->resultSet);
sqlite3_free (pCursor);
return SQLITE_OK;
}
static int
vpg_filter (sqlite3_vtab_cursor * pCursor, int idxNum, const char *idxStr,
int argc, sqlite3_value ** argv)
{
/* setting up a cursor filter */
if (pCursor || idxNum || idxStr || argc || argv)
pCursor = pCursor; /* unused arg warning suppression */
return SQLITE_OK;
}
static int
vpg_next (sqlite3_vtab_cursor * pCursor)
{
/* fetching next row from cursor */
VirtualPgCursorPtr cursor = (VirtualPgCursorPtr) pCursor;
(cursor->currentRow)++;
vpgReadRow (cursor);
return SQLITE_OK;
}
static int
vpg_eof (sqlite3_vtab_cursor * pCursor)
{
/* cursor EOF */
VirtualPgCursorPtr cursor = (VirtualPgCursorPtr) pCursor;
return cursor->eof;
}
static int
vpg_column (sqlite3_vtab_cursor * pCursor, sqlite3_context * pContext,
int column)
{
/* fetching value for the Nth column */
VirtualPgCursorPtr cursor = (VirtualPgCursorPtr) pCursor;
vpgSqliteValuePtr value;
if (column >= 0 && column < cursor->nColumns)
{
value = *(cursor->Values + column);
switch (value->Type)
{
case SQLITE_INTEGER:
sqlite3_result_int64 (pContext, value->IntValue);
break;
case SQLITE_FLOAT:
sqlite3_result_double (pContext, value->DoubleValue);
break;
case SQLITE_TEXT:
sqlite3_result_text (pContext, value->Text, value->Size,
SQLITE_STATIC);
break;
case SQLITE_BLOB:
sqlite3_result_blob (pContext, value->Blob, value->Size,
SQLITE_STATIC);
break;
default:
sqlite3_result_null (pContext);
break;
};
}
else
sqlite3_result_null (pContext);
return SQLITE_OK;
}
static int
vpg_rowid (sqlite3_vtab_cursor * pCursor, sqlite_int64 * pRowid)
{
/* fetching the ROWID */
VirtualPgCursorPtr cursor = (VirtualPgCursorPtr) pCursor;
*pRowid = cursor->currentRow;
return SQLITE_OK;
}
static int
vpg_update (sqlite3_vtab * pVTab, int argc, sqlite3_value ** argv,
sqlite_int64 * pRowid)
{
/* generic update [INSERT / UPDATE / DELETE] */
int ret;
int nRow;
VirtualPgPtr p_vt = (VirtualPgPtr) pVTab;
if (p_vt->readOnly)
return SQLITE_READONLY;
if (argc == 1)
{
/* performing a DELETE */
nRow = sqlite3_value_int (argv[0]);
ret = vpgDeleteRow (p_vt, nRow);
}
else
{
if (sqlite3_value_type (argv[0]) == SQLITE_NULL)
{
/* performing an INSERT */
ret = vpgInsertRow (p_vt, argc, argv);
if (ret == SQLITE_OK)
*pRowid = p_vt->newRowid;
}
else
{
/* performing an UPDATE */
nRow = sqlite3_value_int (argv[0]);
ret = vpgUpdateRow (p_vt, nRow, argc, argv);
}
}
return ret;
}
static int
vpg_begin (sqlite3_vtab * pVTab)
{
/* BEGIN TRANSACTION */
VirtualPgPtr p_vt = (VirtualPgPtr) pVTab;
if (pVTab)
pVTab = pVTab; /* unused arg warning suppression */
if (p_vt->readOnly == 0)
{
/* Beginning a PostgreSQL Transaction */
PGresult *res = vpgPQexec (p_vt->pg_conn, "BEGIN");
if (vpgPQresultStatus (res) != PGRES_COMMAND_OK)
goto end;
vpgPQclear (res);
p_vt->pendingTransaction = 1;
}
end:
return SQLITE_OK;
}
static int
vpg_sync (sqlite3_vtab * pVTab)
{
/* SYNC */
if (pVTab)
pVTab = pVTab; /* unused arg warning suppression */
return SQLITE_OK;
}
static int
vpg_commit (sqlite3_vtab * pVTab)
{
/* COMMIT TRANSACTION */
VirtualPgPtr p_vt = (VirtualPgPtr) pVTab;
if (pVTab)
pVTab = pVTab; /* unused arg warning suppression */
if (p_vt->pendingTransaction)
{
/* Committing a PostgreSQL Transaction */
PGresult *res = vpgPQexec (p_vt->pg_conn, "COMMIT");
if (vpgPQresultStatus (res) != PGRES_COMMAND_OK)
goto end;
vpgPQclear (res);
p_vt->pendingTransaction = 0;
}
end:
return SQLITE_OK;
}
static int
vpg_rollback (sqlite3_vtab * pVTab)
{
/* ROLLBACK TRANSACTION */
VirtualPgPtr p_vt = (VirtualPgPtr) pVTab;
if (pVTab)
pVTab = pVTab; /* unused arg warning suppression */
if (p_vt->pendingTransaction)
{
/* Rolling back a PostgreSQL Transaction */
PGresult *res = vpgPQexec (p_vt->pg_conn, "ROLLBACK");
if (vpgPQresultStatus (res) != PGRES_COMMAND_OK)
goto end;
vpgPQclear (res);
p_vt->pendingTransaction = 0;
}
end:
return SQLITE_OK;
}
static int
vpg_rename (sqlite3_vtab * pVTab, const char *zNew)
{
/* RENAME TABLE */
if (pVTab)
pVTab = pVTab; /* unused arg warning suppression */
if (zNew)
zNew = zNew; /* unused arg warning suppression */
return SQLITE_ERROR;
}
static int
VirtualpgInit (sqlite3 * db)
{
/* initializing the module methods */
int rc = SQLITE_OK;
my_pg_module.iVersion = 1;
my_pg_module.xCreate = &vpg_create;
my_pg_module.xConnect = &vpg_connect;
my_pg_module.xBestIndex = &vpg_best_index;
my_pg_module.xDisconnect = &vpg_disconnect;
my_pg_module.xDestroy = &vpg_destroy;
my_pg_module.xOpen = &vpg_open;
my_pg_module.xClose = &vpg_close;
my_pg_module.xFilter = &vpg_filter;
my_pg_module.xNext = &vpg_next;
my_pg_module.xEof = &vpg_eof;
my_pg_module.xColumn = &vpg_column;
my_pg_module.xRowid = &vpg_rowid;
my_pg_module.xUpdate = &vpg_update;
my_pg_module.xBegin = &vpg_begin;
my_pg_module.xSync = &vpg_sync;
my_pg_module.xCommit = &vpg_commit;
my_pg_module.xRollback = &vpg_rollback;
my_pg_module.xFindFunction = NULL;
my_pg_module.xRename = &vpg_rename;
sqlite3_create_module_v2 (db, "VirtualPostgres", &my_pg_module, NULL, 0);
return rc;
}
#ifndef LOADABLE_EXTENSION
VIRTUALPG_DECLARE int
virtualpg_extension_init (sqlite3 * db, virtualPQptr virtual_api)
{
/* registrering the virtual table - classic library */
if (virtual_api == NULL)
return SQLITE_ERROR;
/*
/ initializing the virtual LibPQ methods
/
/ the library is always based on a virtualized LibPQ
/ expected to be correctly initialized by the caller
*/
pq.PQclear = NULL;
pq.PQconnectdb = NULL;
pq.PQerrorMessage = NULL;
pq.PQexec = NULL;
pq.PQfinish = NULL;
pq.PQgetisnull = NULL;
pq.PQgetvalue = NULL;
pq.PQlibVersion = NULL;
pq.PQnfields = NULL;
pq.PQntuples = NULL;
pq.PQresultStatus = NULL;
pq.PQstatus = NULL;
if (virtual_api->PQclear == NULL)
return SQLITE_ERROR;
pq.PQclear = virtual_api->PQclear;
if (virtual_api->PQconnectdb == NULL)
return SQLITE_ERROR;
pq.PQconnectdb = virtual_api->PQconnectdb;
if (virtual_api->PQerrorMessage == NULL)
return SQLITE_ERROR;
pq.PQerrorMessage = virtual_api->PQerrorMessage;
if (virtual_api->PQexec == NULL)
return SQLITE_ERROR;
pq.PQexec = virtual_api->PQexec;
if (virtual_api->PQfinish == NULL)
return SQLITE_ERROR;
pq.PQfinish = virtual_api->PQfinish;
if (virtual_api->PQgetisnull == NULL)
return SQLITE_ERROR;
pq.PQgetisnull = virtual_api->PQgetisnull;
if (virtual_api->PQgetvalue == NULL)
return SQLITE_ERROR;
pq.PQgetvalue = virtual_api->PQgetvalue;
if (virtual_api->PQlibVersion == NULL)
return SQLITE_ERROR;
pq.PQlibVersion = virtual_api->PQlibVersion;
if (virtual_api->PQnfields == NULL)
return SQLITE_ERROR;
pq.PQnfields = virtual_api->PQnfields;
if (virtual_api->PQntuples == NULL)
return SQLITE_ERROR;
pq.PQntuples = virtual_api->PQntuples;
if (virtual_api->PQresultStatus == NULL)
return SQLITE_ERROR;
pq.PQresultStatus = virtual_api->PQresultStatus;
if (virtual_api->PQstatus == NULL)
return SQLITE_ERROR;
pq.PQstatus = virtual_api->PQstatus;
/* registering the VirtualTable */
return VirtualpgInit (db);
}
VIRTUALPG_DECLARE const char *
virtualpg_version ()
{
/* returning the current Version string */
return VERSION;
}
#else /* built as LOADABLE EXTENSION only */
VIRTUALPG_DECLARE int
sqlite3_modvirtualpg_init (sqlite3 * db, char **pzErrMsg,
const sqlite3_api_routines * pApi)
{
/* Register the virtual table */
SQLITE_EXTENSION_INIT2 (pApi);
(void) pzErrMsg;
/*
/ initializing the virtual LibPQ methods
/
/ the loadable module is always based on hard-linked LibPQ
*/
pq.PQclear = PQclear;
pq.PQconnectdb = PQconnectdb;
pq.PQerrorMessage = PQerrorMessage;
pq.PQexec = PQexec;
pq.PQfinish = PQfinish;
pq.PQgetisnull = PQgetisnull;
pq.PQgetvalue = PQgetvalue;
pq.PQlibVersion = PQlibVersion;
pq.PQnfields = PQnfields;
pq.PQntuples = PQntuples;
pq.PQresultStatus = PQresultStatus;
pq.PQstatus = PQstatus;
return VirtualpgInit (db);
}
#endif