1 package com.atlassian.dbexporter.importer;
2
3 import com.atlassian.dbexporter.BatchMode;
4 import com.atlassian.dbexporter.Context;
5 import com.atlassian.dbexporter.DatabaseInformation;
6 import com.atlassian.dbexporter.DatabaseInformations;
7 import com.atlassian.dbexporter.EntityNameProcessor;
8 import com.atlassian.dbexporter.ImportExportErrorService;
9 import com.atlassian.dbexporter.jdbc.JdbcUtils;
10 import com.atlassian.dbexporter.node.NodeParser;
11 import com.atlassian.dbexporter.progress.ProgressMonitor;
12 import com.google.common.collect.Maps;
13
14 import java.math.BigDecimal;
15 import java.math.BigInteger;
16 import java.sql.Connection;
17 import java.sql.PreparedStatement;
18 import java.sql.ResultSet;
19 import java.sql.SQLException;
20 import java.sql.Statement;
21 import java.sql.Timestamp;
22 import java.sql.Types;
23 import java.util.ArrayList;
24 import java.util.Date;
25 import java.util.List;
26 import java.util.Map;
27
28 import static com.atlassian.dbexporter.importer.ImporterUtils.isNodeNotClosed;
29 import static com.atlassian.dbexporter.jdbc.JdbcUtils.closeQuietly;
30 import static com.atlassian.dbexporter.jdbc.JdbcUtils.metadata;
31 import static com.atlassian.dbexporter.jdbc.JdbcUtils.preparedStatement;
32 import static com.atlassian.dbexporter.jdbc.JdbcUtils.quote;
33 import static com.atlassian.dbexporter.jdbc.JdbcUtils.withConnection;
34 import static com.atlassian.dbexporter.node.NodeBackup.ColumnDataNode;
35 import static com.atlassian.dbexporter.node.NodeBackup.RowDataNode;
36 import static com.atlassian.dbexporter.node.NodeBackup.TableDataNode;
37 import static com.atlassian.dbexporter.progress.ProgressMonitor.Task;
38 import static com.google.common.base.Preconditions.checkNotNull;
39 import static com.google.common.collect.Lists.newArrayList;
40
41 public final class DataImporter extends AbstractSingleNodeImporter {
42 private final String schema;
43 private final AroundTableImporter aroundTable;
44
45 public DataImporter(ImportExportErrorService errorService, String schema, AroundTableImporter aroundTableImporter, List<AroundImporter> arounds) {
46 super(errorService, arounds);
47 this.schema = isBlank(schema) ? null : schema;
48 this.aroundTable = checkNotNull(aroundTableImporter);
49 }
50
51 public DataImporter(ImportExportErrorService errorService, String schema, AroundTableImporter aroundTableImporter, AroundImporter... arounds) {
52 this(errorService, schema, aroundTableImporter, newArrayList(checkNotNull(arounds)));
53 }
54
55 @Override
56 protected String getNodeName() {
57 return TableDataNode.NAME;
58 }
59
60 @Override
61 protected void doImportNode(final NodeParser node, final ImportConfiguration configuration, final Context context) {
62 final ProgressMonitor monitor = configuration.getProgressMonitor();
63 monitor.begin(Task.TABLES_DATA);
64 withConnection(errorService, configuration.getConnectionProvider(), new JdbcUtils.JdbcCallable<Void>() {
65 public Void call(Connection connection) {
66 try {
67 final boolean autoCommit = connection.getAutoCommit();
68 try {
69 connection.setAutoCommit(false);
70 for (; TableDataNode.NAME.equals(node.getName()) && !node.isClosed(); node.getNextNode()) {
71 importTable(node, configuration, context, connection, configuration.getDatabaseInformation());
72 }
73 connection.commit();
74 } finally {
75 connection.setAutoCommit(autoCommit);
76 }
77 } catch (SQLException e) {
78 throw errorService.newImportExportSqlException(null, "", e);
79 }
80 return null;
81 }
82 });
83 monitor.end(Task.TABLES_DATA);
84 }
85
86 private NodeParser importTable(NodeParser node, ImportConfiguration configuration, Context context, Connection connection, DatabaseInformation databaseInformation) {
87 final ProgressMonitor monitor = configuration.getProgressMonitor();
88 final EntityNameProcessor entityNameProcessor = configuration.getEntityNameProcessor();
89
90 final String currentTable = entityNameProcessor.tableName(TableDataNode.getName(node));
91
92 monitor.begin(Task.TABLE_DATA, currentTable);
93
94 final InserterBuilder builder = new InserterBuilder(errorService, schema, currentTable, configuration.getBatchMode());
95
96 node = node.getNextNode();
97 for (; isNodeNotClosed(node, ColumnDataNode.NAME); node = node.getNextNode()) {
98 final String column = ColumnDataNode.getName(node);
99 builder.addColumn(entityNameProcessor.columnName(column));
100 node = node.getNextNode();
101 }
102
103 final Inserter inserter = builder.build(connection);
104
105 long rowNum = 0L;
106 try {
107 aroundTable.before(configuration, context, currentTable, connection);
108
109 for (; isNodeNotClosed(node, RowDataNode.NAME); node = node.getNextNode()) {
110 node = node.getNextNode();
111 for (; !node.isClosed(); node = node.getNextNode()) {
112 inserter.setValue(node, databaseInformation);
113 }
114 inserter.execute();
115 rowNum++;
116 }
117 } catch (SQLException e) {
118 throw errorService.newRowImportSqlException(currentTable, rowNum, e);
119 } finally {
120 inserter.close();
121 aroundTable.after(configuration, context, currentTable, connection);
122 }
123
124 monitor.end(Task.TABLE_DATA, currentTable);
125
126 return node;
127 }
128
129 private static interface Inserter {
130 void setValue(NodeParser node, DatabaseInformation databaseInformation) throws SQLException;
131
132 void execute() throws SQLException;
133
134 void close();
135 }
136
137
138 private static class InserterBuilder {
139 public static final int UNLIMITED_COLUMN_SIZE = -1;
140
141 private final ImportExportErrorService errorService;
142 private final String schema;
143 private final String table;
144 private final BatchMode batch;
145 private final List<String> columns;
146
147 public InserterBuilder(ImportExportErrorService errorService, String schema, String table, BatchMode batch) {
148 this.errorService = checkNotNull(errorService);
149 this.schema = schema;
150 this.table = table;
151 this.batch = batch;
152 columns = new ArrayList<String>();
153 }
154
155 public String getTable() {
156 return table;
157 }
158
159 public void addColumn(String column) {
160 columns.add(column);
161 }
162
163 public Inserter build(Connection connection) {
164 final StringBuilder query = new StringBuilder("INSERT INTO ")
165 .append(tableName(connection))
166 .append(" (");
167
168 for (int i = 0; i < columns.size(); i++) {
169 query.append(quote(errorService, table, connection, columns.get(i)));
170 if (i < columns.size() - 1) {
171 query.append(", ");
172 }
173 }
174
175 query.append(") VALUES (");
176 for (int i = 0; i < columns.size(); i++) {
177 query.append("?");
178 if (i < columns.size() - 1) {
179 query.append(", ");
180 }
181 }
182 query.append(")");
183
184 final List<Integer> maxColumnSizes = calculateColumnSizes(connection, columns);
185 final PreparedStatement ps = preparedStatement(errorService, table, connection, query.toString());
186 return newInserter(maxColumnSizes, ps);
187 }
188
189 private String tableName(Connection connection) {
190 final String quoted = quote(errorService, table, connection, table);
191 return schema != null ? schema + "." + quoted : quoted;
192 }
193
194 private Inserter newInserter(List<Integer> maxColumnSizes, PreparedStatement ps) {
195 return batch.equals(BatchMode.ON) ?
196 new BatchInserter(errorService, getTable(), columns, ps, maxColumnSizes) :
197 new ImmediateInserter(errorService, getTable(), columns, ps, maxColumnSizes);
198 }
199
200
201
202
203
204
205
206
207 private List<Integer> calculateColumnSizes(Connection connection, List<String> columns) {
208 Map<String, Integer> columnSizeMap = Maps.newHashMap();
209 ResultSet rs = null;
210 try {
211 rs = getColumnsResultSet(connection);
212 ColumnNameAndSize columnNameAndSize = getColumnNameAndSize(rs);
213 while (columnNameAndSize != ColumnNameAndSize.NULL) {
214 columnSizeMap.put(columnNameAndSize.name, columnNameAndSize.size);
215 columnNameAndSize = getColumnNameAndSize(rs);
216 }
217
218 final List<Integer> sizes = newArrayList(0);
219 for (String column : columns) {
220 final Integer size = columnSizeMap.get(column);
221 sizes.add(size != null ? size : UNLIMITED_COLUMN_SIZE);
222 }
223 return sizes;
224 } finally {
225 closeQuietly(rs);
226 }
227 }
228
229 private ColumnNameAndSize getColumnNameAndSize(ResultSet rs) {
230 try {
231 if (rs.next()) {
232 final String name = rs.getString("COLUMN_NAME");
233 final int size = rs.getInt("COLUMN_SIZE");
234 final int type = rs.getInt("DATA_TYPE");
235 return new ColumnNameAndSize(name, type == Types.CLOB ? UNLIMITED_COLUMN_SIZE : size);
236 } else {
237 return ColumnNameAndSize.NULL;
238 }
239 } catch (SQLException e) {
240 throw errorService.newImportExportSqlException(table, "", e);
241 }
242 }
243
244 private ResultSet getColumnsResultSet(Connection connection) {
245 try {
246 return metadata(errorService, connection).getColumns(null, null, table, null);
247 } catch (SQLException e) {
248 throw errorService.newImportExportSqlException(table, "", e);
249 }
250 }
251 }
252
253 private static class ColumnNameAndSize {
254 private static ColumnNameAndSize NULL = new ColumnNameAndSize();
255
256 public final String name;
257 public final int size;
258
259 private ColumnNameAndSize() {
260 this.name = null;
261 this.size = InserterBuilder.UNLIMITED_COLUMN_SIZE;
262 }
263
264 public ColumnNameAndSize(String name, int size) {
265 this.name = checkNotNull(name);
266 this.size = size <= 0 ? InserterBuilder.UNLIMITED_COLUMN_SIZE : size;
267 }
268 }
269
270 private static abstract class BaseInserter implements Inserter {
271 protected final ImportExportErrorService errorService;
272 protected final String tableName;
273
274 private int col;
275
276 private final List<String> columnNames;
277 protected final PreparedStatement ps;
278
279
280 private final List<Integer> maxColumnSize;
281
282 public BaseInserter(ImportExportErrorService errorService, String tableName, List<String> columnNames, PreparedStatement ps, List<Integer> maxColumnSize) {
283 this.errorService = checkNotNull(errorService);
284 this.tableName = tableName;
285 this.columnNames = columnNames;
286 this.ps = ps;
287 this.maxColumnSize = maxColumnSize;
288 col = 1;
289 }
290
291 private void setBoolean(Boolean value, DatabaseInformations.Database.Type databaseType) throws SQLException {
292 if (value == null) {
293 if (databaseType == DatabaseInformations.Database.Type.ORACLE) {
294
295 ps.setNull(col, Types.NUMERIC);
296 } else if (databaseType == DatabaseInformations.Database.Type.MSSQL) {
297
298 ps.setNull(col, Types.BIT);
299 } else {
300 ps.setNull(col, Types.BOOLEAN);
301 }
302
303 } else {
304 if (databaseType == DatabaseInformations.Database.Type.ORACLE) {
305
306 ps.setObject(col, value, Types.NUMERIC, 1);
307 } else {
308
309 ps.setBoolean(col, value);
310 }
311 }
312 }
313
314 private void setString(String value) throws SQLException {
315 if (value == null) {
316 ps.setNull(col, Types.VARCHAR);
317 } else {
318 int maxSize = maxColumnSize.get(col);
319 if (maxSize != -1 && value.length() > maxSize) {
320 throw errorService.newImportExportException(tableName, "Could not import data in table '" + tableName + "' column #" + col + ", value is too big for column which size limit is " + maxSize + ", value is:\n" + value + "\n");
321 }
322 ps.setString(col, value);
323 }
324 }
325
326 private void setDate(Date value) throws SQLException {
327 if (value == null) {
328 ps.setNull(col, Types.TIMESTAMP);
329 } else {
330 ps.setTimestamp(col, new Timestamp(value.getTime()));
331 }
332 }
333
334 private void setBigInteger(BigInteger value) throws SQLException {
335 if (value == null) {
336 ps.setNull(col, Types.BIGINT);
337 } else {
338 ps.setBigDecimal(col, new BigDecimal(value));
339 }
340 }
341
342 private void setBigDecimal(BigDecimal value) throws SQLException {
343 if (value == null) {
344 ps.setNull(col, Types.DOUBLE);
345 } else {
346 ps.setBigDecimal(col, value);
347 }
348 }
349
350 public void setValue(NodeParser node, DatabaseInformation databaseInformation) throws SQLException {
351 DatabaseInformations.Database.Type databaseType = DatabaseInformations.database(databaseInformation).getType();
352
353 if (RowDataNode.isString(node)) {
354 setString(node.getContentAsString());
355 } else if (RowDataNode.isBoolean(node)) {
356 setBoolean(node.getContentAsBoolean(), databaseType);
357 } else if (RowDataNode.isInteger(node)) {
358 final BigInteger bigInt = node.getContentAsBigInteger();
359 if (bigInt != null && maxColumnSize.get(col) == 1)
360
361
362 {
363 setBoolean((bigInt.intValue() == 1), databaseType);
364 } else {
365 setBigInteger(bigInt);
366 }
367 } else if (RowDataNode.isDouble(node)) {
368 setBigDecimal(node.getContentAsBigDecimal());
369 } else if (RowDataNode.isDate(node)) {
370 setDate(node.getContentAsDate());
371 } else {
372 throw new IllegalArgumentException("Unsupported field encountered: " + node.getName());
373 }
374 col++;
375 }
376
377 public final void execute() throws SQLException {
378 executePS();
379 col = 1;
380 }
381
382 protected abstract void executePS() throws SQLException;
383 }
384
385 private static class ImmediateInserter extends BaseInserter {
386 private ImmediateInserter(ImportExportErrorService errorService, String table, List<String> columns, PreparedStatement ps, List<Integer> maxColumnSize) {
387 super(errorService, table, columns, ps, maxColumnSize);
388 }
389
390 protected void executePS() throws SQLException {
391 ps.execute();
392 }
393
394 public void close() {
395 closeQuietly(ps);
396 }
397 }
398
399 private static class BatchInserter extends BaseInserter {
400 private final int batchSize;
401 private int batch;
402
403 private BatchInserter(ImportExportErrorService errorService, String table, List<String> columns, PreparedStatement ps, List<Integer> maxColumnSize) {
404 super(errorService, table, columns, ps, maxColumnSize);
405 batchSize = 5000;
406 batch = 0;
407 }
408
409 protected void executePS() throws SQLException {
410 ps.addBatch();
411 if ((++batch) % batchSize == 0) {
412 flush();
413 batch = 0;
414 }
415 }
416
417 private void flush() {
418 if (batch == 0) {
419 return;
420 }
421 try {
422 for (int result : ps.executeBatch()) {
423 if (result == Statement.EXECUTE_FAILED) {
424 throw new SQLException("SQL batch insert failed.");
425 }
426 }
427 ps.getConnection().commit();
428 } catch (SQLException e) {
429 throw errorService.newImportExportSqlException(tableName, "", e);
430 }
431 }
432
433 public void close() {
434 flush();
435 closeQuietly(ps);
436 }
437 }
438
439 public static interface AroundTableImporter {
440 void before(ImportConfiguration configuration, Context context, String table, Connection connection);
441
442 void after(ImportConfiguration configuration, Context context, String table, Connection connection);
443 }
444
445 private static boolean isBlank(String str) {
446 int strLen;
447 if (str == null || (strLen = str.length()) == 0) {
448 return true;
449 }
450 for (int i = 0; i < strLen; i++) {
451 if (!Character.isWhitespace(str.charAt(i))) {
452 return false;
453 }
454 }
455 return true;
456 }
457 }