View Javadoc
1   package com.atlassian.dbexporter.importer;
2   
3   import com.atlassian.dbexporter.BatchMode;
4   import com.atlassian.dbexporter.Context;
5   import com.atlassian.dbexporter.DatabaseInformation;
6   import com.atlassian.dbexporter.DatabaseInformations;
7   import com.atlassian.dbexporter.EntityNameProcessor;
8   import com.atlassian.dbexporter.ImportExportErrorService;
9   import com.atlassian.dbexporter.jdbc.JdbcUtils;
10  import com.atlassian.dbexporter.node.NodeParser;
11  import com.atlassian.dbexporter.progress.ProgressMonitor;
12  import com.google.common.collect.Maps;
13  
14  import java.math.BigDecimal;
15  import java.math.BigInteger;
16  import java.sql.Connection;
17  import java.sql.PreparedStatement;
18  import java.sql.ResultSet;
19  import java.sql.SQLException;
20  import java.sql.Statement;
21  import java.sql.Timestamp;
22  import java.sql.Types;
23  import java.util.ArrayList;
24  import java.util.Date;
25  import java.util.List;
26  import java.util.Map;
27  
28  import static com.atlassian.dbexporter.importer.ImporterUtils.isNodeNotClosed;
29  import static com.atlassian.dbexporter.jdbc.JdbcUtils.closeQuietly;
30  import static com.atlassian.dbexporter.jdbc.JdbcUtils.metadata;
31  import static com.atlassian.dbexporter.jdbc.JdbcUtils.preparedStatement;
32  import static com.atlassian.dbexporter.jdbc.JdbcUtils.quote;
33  import static com.atlassian.dbexporter.jdbc.JdbcUtils.withConnection;
34  import static com.atlassian.dbexporter.node.NodeBackup.ColumnDataNode;
35  import static com.atlassian.dbexporter.node.NodeBackup.RowDataNode;
36  import static com.atlassian.dbexporter.node.NodeBackup.TableDataNode;
37  import static com.atlassian.dbexporter.progress.ProgressMonitor.Task;
38  import static com.google.common.base.Preconditions.checkNotNull;
39  import static com.google.common.collect.Lists.newArrayList;
40  
41  public final class DataImporter extends AbstractSingleNodeImporter {
42      private final String schema;
43      private final AroundTableImporter aroundTable;
44  
45      public DataImporter(ImportExportErrorService errorService, String schema, AroundTableImporter aroundTableImporter, List<AroundImporter> arounds) {
46          super(errorService, arounds);
47          this.schema = isBlank(schema) ? null : schema;
48          this.aroundTable = checkNotNull(aroundTableImporter);
49      }
50  
51      public DataImporter(ImportExportErrorService errorService, String schema, AroundTableImporter aroundTableImporter, AroundImporter... arounds) {
52          this(errorService, schema, aroundTableImporter, newArrayList(checkNotNull(arounds)));
53      }
54  
55      @Override
56      protected String getNodeName() {
57          return TableDataNode.NAME;
58      }
59  
60      @Override
61      protected void doImportNode(final NodeParser node, final ImportConfiguration configuration, final Context context) {
62          final ProgressMonitor monitor = configuration.getProgressMonitor();
63          monitor.begin(Task.TABLES_DATA);
64          withConnection(errorService, configuration.getConnectionProvider(), new JdbcUtils.JdbcCallable<Void>() {
65              public Void call(Connection connection) {
66                  try {
67                      final boolean autoCommit = connection.getAutoCommit();
68                      try {
69                          connection.setAutoCommit(false);
70                          for (; TableDataNode.NAME.equals(node.getName()) && !node.isClosed(); node.getNextNode()) {
71                              importTable(node, configuration, context, connection, configuration.getDatabaseInformation());
72                          }
73                          connection.commit();
74                      } finally {
75                          connection.setAutoCommit(autoCommit);   // restore autocommit
76                      }
77                  } catch (SQLException e) {
78                      throw errorService.newImportExportSqlException(null, "", e);
79                  }
80                  return null;
81              }
82          });
83          monitor.end(Task.TABLES_DATA);
84      }
85  
86      private NodeParser importTable(NodeParser node, ImportConfiguration configuration, Context context, Connection connection, DatabaseInformation databaseInformation) {
87          final ProgressMonitor monitor = configuration.getProgressMonitor();
88          final EntityNameProcessor entityNameProcessor = configuration.getEntityNameProcessor();
89  
90          final String currentTable = entityNameProcessor.tableName(TableDataNode.getName(node));
91  
92          monitor.begin(Task.TABLE_DATA, currentTable);
93  
94          final InserterBuilder builder = new InserterBuilder(errorService, schema, currentTable, configuration.getBatchMode());
95  
96          node = node.getNextNode();
97          for (; isNodeNotClosed(node, ColumnDataNode.NAME); node = node.getNextNode()) {
98              final String column = ColumnDataNode.getName(node);
99              builder.addColumn(entityNameProcessor.columnName(column));
100             node = node.getNextNode();  // close column node
101         }
102 
103         final Inserter inserter = builder.build(connection);
104 
105         long rowNum = 0L;
106         try {
107             aroundTable.before(configuration, context, currentTable, connection);
108 
109             for (; isNodeNotClosed(node, RowDataNode.NAME); node = node.getNextNode()) {
110                 node = node.getNextNode();  // read the first field node
111                 for (; !node.isClosed(); node = node.getNextNode()) {
112                     inserter.setValue(node, databaseInformation);
113                 }
114                 inserter.execute();
115                 rowNum++;
116             }
117         } catch (SQLException e) {
118             throw errorService.newRowImportSqlException(currentTable, rowNum, e);
119         } finally {
120             inserter.close();
121             aroundTable.after(configuration, context, currentTable, connection);
122         }
123 
124         monitor.end(Task.TABLE_DATA, currentTable);
125 
126         return node;
127     }
128 
129     private static interface Inserter {
130         void setValue(NodeParser node, DatabaseInformation databaseInformation) throws SQLException;
131 
132         void execute() throws SQLException;
133 
134         void close();
135     }
136 
137 
138     private static class InserterBuilder {
139         public static final int UNLIMITED_COLUMN_SIZE = -1;
140 
141         private final ImportExportErrorService errorService;
142         private final String schema;
143         private final String table;
144         private final BatchMode batch;
145         private final List<String> columns;
146 
147         public InserterBuilder(ImportExportErrorService errorService, String schema, String table, BatchMode batch) {
148             this.errorService = checkNotNull(errorService);
149             this.schema = schema;
150             this.table = table;
151             this.batch = batch;
152             columns = new ArrayList<String>();
153         }
154 
155         public String getTable() {
156             return table;
157         }
158 
159         public void addColumn(String column) {
160             columns.add(column);
161         }
162 
163         public Inserter build(Connection connection) {
164             final StringBuilder query = new StringBuilder("INSERT INTO ")
165                     .append(tableName(connection))
166                     .append(" (");
167 
168             for (int i = 0; i < columns.size(); i++) {
169                 query.append(quote(errorService, table, connection, columns.get(i)));
170                 if (i < columns.size() - 1) {
171                     query.append(", ");
172                 }
173             }
174 
175             query.append(") VALUES (");
176             for (int i = 0; i < columns.size(); i++) {
177                 query.append("?");
178                 if (i < columns.size() - 1) {
179                     query.append(", ");
180                 }
181             }
182             query.append(")");
183 
184             final List<Integer> maxColumnSizes = calculateColumnSizes(connection, columns);
185             final PreparedStatement ps = preparedStatement(errorService, table, connection, query.toString());
186             return newInserter(maxColumnSizes, ps);
187         }
188 
189         private String tableName(Connection connection) {
190             final String quoted = quote(errorService, table, connection, table);
191             return schema != null ? schema + "." + quoted : quoted;
192         }
193 
194         private Inserter newInserter(List<Integer> maxColumnSizes, PreparedStatement ps) {
195             return batch.equals(BatchMode.ON) ?
196                     new BatchInserter(errorService, getTable(), columns, ps, maxColumnSizes) :
197                     new ImmediateInserter(errorService, getTable(), columns, ps, maxColumnSizes);
198         }
199 
200         /**
201          * Get the column size for all columns in the table -- only the sizes for String columns will be used
202          *
203          * @param connection
204          * @param columns
205          * @return list of column sizes
206          */
207         private List<Integer> calculateColumnSizes(Connection connection, List<String> columns) {
208             Map<String, Integer> columnSizeMap = Maps.newHashMap();
209             ResultSet rs = null;
210             try {
211                 rs = getColumnsResultSet(connection);
212                 ColumnNameAndSize columnNameAndSize = getColumnNameAndSize(rs);
213                 while (columnNameAndSize != ColumnNameAndSize.NULL) {
214                     columnSizeMap.put(columnNameAndSize.name, columnNameAndSize.size);
215                     columnNameAndSize = getColumnNameAndSize(rs);
216                 }
217 
218                 final List<Integer> sizes = newArrayList(0);
219                 for (String column : columns) {
220                     final Integer size = columnSizeMap.get(column);
221                     sizes.add(size != null ? size : UNLIMITED_COLUMN_SIZE);
222                 }
223                 return sizes;
224             } finally {
225                 closeQuietly(rs);
226             }
227         }
228 
229         private ColumnNameAndSize getColumnNameAndSize(ResultSet rs) {
230             try {
231                 if (rs.next()) {
232                     final String name = rs.getString("COLUMN_NAME");
233                     final int size = rs.getInt("COLUMN_SIZE");
234                     final int type = rs.getInt("DATA_TYPE");
235                     return new ColumnNameAndSize(name, type == Types.CLOB ? UNLIMITED_COLUMN_SIZE : size);
236                 } else {
237                     return ColumnNameAndSize.NULL;
238                 }
239             } catch (SQLException e) {
240                 throw errorService.newImportExportSqlException(table, "", e);
241             }
242         }
243 
244         private ResultSet getColumnsResultSet(Connection connection) {
245             try {
246                 return metadata(errorService, connection).getColumns(null, null, table, null);
247             } catch (SQLException e) {
248                 throw errorService.newImportExportSqlException(table, "", e);
249             }
250         }
251     }
252 
253     private static class ColumnNameAndSize {
254         private static ColumnNameAndSize NULL = new ColumnNameAndSize();
255 
256         public final String name;
257         public final int size;
258 
259         private ColumnNameAndSize() {
260             this.name = null;
261             this.size = InserterBuilder.UNLIMITED_COLUMN_SIZE;
262         }
263 
264         public ColumnNameAndSize(String name, int size) {
265             this.name = checkNotNull(name);
266             this.size = size <= 0 ? InserterBuilder.UNLIMITED_COLUMN_SIZE : size;
267         }
268     }
269 
270     private static abstract class BaseInserter implements Inserter {
271         protected final ImportExportErrorService errorService;
272         protected final String tableName;
273 
274         private int col;
275         // this list is zero based
276         private final List<String> columnNames;
277         protected final PreparedStatement ps;
278         // indices into this list are 1 based -- values of -1 indicate that we don't know the max length and assume there is no limit
279         // e.g. HSQL doesn't provide sizes
280         private final List<Integer> maxColumnSize;
281 
282         public BaseInserter(ImportExportErrorService errorService, String tableName, List<String> columnNames, PreparedStatement ps, List<Integer> maxColumnSize) {
283             this.errorService = checkNotNull(errorService);
284             this.tableName = tableName;
285             this.columnNames = columnNames;
286             this.ps = ps;
287             this.maxColumnSize = maxColumnSize;
288             col = 1;
289         }
290 
291         private void setBoolean(Boolean value, DatabaseInformations.Database.Type databaseType) throws SQLException {
292             if (value == null) {
293                 if (databaseType == DatabaseInformations.Database.Type.ORACLE) {
294                     // Oracle stores booleans as NUMERICs with a precision of 1
295                     ps.setNull(col, Types.NUMERIC);
296                 } else if (databaseType == DatabaseInformations.Database.Type.MSSQL) {
297                     // SQL Server stores booleans as BITs
298                     ps.setNull(col, Types.BIT);
299                 } else {
300                     ps.setNull(col, Types.BOOLEAN);
301                 }
302                 // Derby stores booleans as SMALLINTs with a precision of 1 but is currently not being supported
303             } else {
304                 if (databaseType == DatabaseInformations.Database.Type.ORACLE) {
305                     // Oracle stores booleans as NUMERICs with a precision of 1
306                     ps.setObject(col, value, Types.NUMERIC, 1);
307                 } else {
308                     // setBoolean also handles BITs which are used by SQL Server
309                     ps.setBoolean(col, value);
310                 }
311             }
312         }
313 
314         private void setString(String value) throws SQLException {
315             if (value == null) {
316                 ps.setNull(col, Types.VARCHAR);
317             } else {
318                 int maxSize = maxColumnSize.get(col);
319                 if (maxSize != -1 && value.length() > maxSize) {
320                     throw errorService.newImportExportException(tableName, "Could not import data in table '" + tableName + "' column #" + col + ", value is too big for column which size limit is " + maxSize + ", value is:\n" + value + "\n");
321                 }
322                 ps.setString(col, value);
323             }
324         }
325 
326         private void setDate(Date value) throws SQLException {
327             if (value == null) {
328                 ps.setNull(col, Types.TIMESTAMP);
329             } else {
330                 ps.setTimestamp(col, new Timestamp(value.getTime()));
331             }
332         }
333 
334         private void setBigInteger(BigInteger value) throws SQLException {
335             if (value == null) {
336                 ps.setNull(col, Types.BIGINT);
337             } else {
338                 ps.setBigDecimal(col, new BigDecimal(value));
339             }
340         }
341 
342         private void setBigDecimal(BigDecimal value) throws SQLException {
343             if (value == null) {
344                 ps.setNull(col, Types.DOUBLE);
345             } else {
346                 ps.setBigDecimal(col, value);
347             }
348         }
349 
350         public void setValue(NodeParser node, DatabaseInformation databaseInformation) throws SQLException {
351             DatabaseInformations.Database.Type databaseType = DatabaseInformations.database(databaseInformation).getType();
352 
353             if (RowDataNode.isString(node)) {
354                 setString(node.getContentAsString());
355             } else if (RowDataNode.isBoolean(node)) {
356                 setBoolean(node.getContentAsBoolean(), databaseType);
357             } else if (RowDataNode.isInteger(node)) {
358                 final BigInteger bigInt = node.getContentAsBigInteger();
359                 if (bigInt != null && maxColumnSize.get(col) == 1)
360                 // this is actually a boolean that was stored as an Integer!
361                 // Happens with legacy Oracle.
362                 {
363                     setBoolean((bigInt.intValue() == 1), databaseType);
364                 } else {
365                     setBigInteger(bigInt);
366                 }
367             } else if (RowDataNode.isDouble(node)) {
368                 setBigDecimal(node.getContentAsBigDecimal());
369             } else if (RowDataNode.isDate(node)) {
370                 setDate(node.getContentAsDate());
371             } else {
372                 throw new IllegalArgumentException("Unsupported field encountered: " + node.getName());
373             }
374             col++;
375         }
376 
377         public final void execute() throws SQLException {
378             executePS();
379             col = 1;
380         }
381 
382         protected abstract void executePS() throws SQLException;
383     }
384 
385     private static class ImmediateInserter extends BaseInserter {
386         private ImmediateInserter(ImportExportErrorService errorService, String table, List<String> columns, PreparedStatement ps, List<Integer> maxColumnSize) {
387             super(errorService, table, columns, ps, maxColumnSize);
388         }
389 
390         protected void executePS() throws SQLException {
391             ps.execute();
392         }
393 
394         public void close() {
395             closeQuietly(ps);
396         }
397     }
398 
399     private static class BatchInserter extends BaseInserter {
400         private final int batchSize;
401         private int batch;
402 
403         private BatchInserter(ImportExportErrorService errorService, String table, List<String> columns, PreparedStatement ps, List<Integer> maxColumnSize) {
404             super(errorService, table, columns, ps, maxColumnSize);
405             batchSize = 5000;
406             batch = 0;
407         }
408 
409         protected void executePS() throws SQLException {
410             ps.addBatch();
411             if ((++batch) % batchSize == 0) {
412                 flush();
413                 batch = 0;
414             }
415         }
416 
417         private void flush() {
418             if (batch == 0) {
419                 return;
420             }
421             try {
422                 for (int result : ps.executeBatch()) {
423                     if (result == Statement.EXECUTE_FAILED) {
424                         throw new SQLException("SQL batch insert failed.");
425                     }
426                 }
427                 ps.getConnection().commit();
428             } catch (SQLException e) {
429                 throw errorService.newImportExportSqlException(tableName, "", e);
430             }
431         }
432 
433         public void close() {
434             flush();
435             closeQuietly(ps);
436         }
437     }
438 
439     public static interface AroundTableImporter {
440         void before(ImportConfiguration configuration, Context context, String table, Connection connection);
441 
442         void after(ImportConfiguration configuration, Context context, String table, Connection connection);
443     }
444 
445     private static boolean isBlank(String str) {
446         int strLen;
447         if (str == null || (strLen = str.length()) == 0) {
448             return true;
449         }
450         for (int i = 0; i < strLen; i++) {
451             if (!Character.isWhitespace(str.charAt(i))) {
452                 return false;
453             }
454         }
455         return true;
456     }
457 }