1 package com.atlassian.sal.core.rdbms;
2
3 import com.atlassian.fugue.Option;
4 import com.atlassian.sal.api.rdbms.ConnectionCallback;
5 import com.atlassian.sal.api.rdbms.RdbmsException;
6 import com.atlassian.sal.api.rdbms.TransactionalExecutor;
7 import com.atlassian.sal.spi.HostConnectionAccessor;
8 import com.google.common.annotations.VisibleForTesting;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11
12 import javax.annotation.Nonnull;
13 import java.sql.Connection;
14 import java.sql.SQLException;
15
16
17
18
19
20
21
22
23 public class DefaultTransactionalExecutor implements TransactionalExecutor {
24 private static final Logger log = LoggerFactory.getLogger(DefaultTransactionalExecutor.class);
25
26 private final HostConnectionAccessor hostConnectionAccessor;
27
28 @VisibleForTesting
29 boolean readOnly;
30
31 @VisibleForTesting
32 boolean newTransaction;
33
34 public DefaultTransactionalExecutor(@Nonnull final HostConnectionAccessor hostConnectionAccessor, final boolean readOnly, final boolean newTransaction) {
35 this.hostConnectionAccessor = hostConnectionAccessor;
36 this.readOnly = readOnly;
37 this.newTransaction = newTransaction;
38 }
39
40 @Override
41 public <A> A execute(@Nonnull final ConnectionCallback<A> callback) {
42 return hostConnectionAccessor.execute(readOnly, newTransaction, new ConnectionCallback<A>() {
43 @Override
44 public A execute(final Connection connection) {
45 return executeInternal(connection, callback);
46 }
47 });
48 }
49
50 @Nonnull
51 @Override
52 public Option<String> getSchemaName() {
53 return hostConnectionAccessor.getSchemaName();
54 }
55
56 @Override
57 @Nonnull
58 public TransactionalExecutor readOnly() {
59 readOnly = true;
60 return this;
61 }
62
63 @Override
64 @Nonnull
65 public TransactionalExecutor readWrite() {
66 readOnly = false;
67 return this;
68 }
69
70 @Override
71 @Nonnull
72 public TransactionalExecutor newTransaction() {
73 newTransaction = true;
74 return this;
75 }
76
77 @Override
78 @Nonnull
79 public TransactionalExecutor existingTransaction() {
80 newTransaction = false;
81 return this;
82 }
83
84 @VisibleForTesting
85 <A> A executeInternal(@Nonnull final Connection connection, @Nonnull final ConnectionCallback<A> callback) {
86 assertAutoCommitFalse(connection);
87
88
89 try (final WrappedConnection wrappedConnection = new WrappedConnection(connection)) {
90
91 final A result = callback.execute(wrappedConnection);
92 try {
93
94 connection.commit();
95 } catch (final Throwable se) {
96
97 throw new RdbmsException("Unable to commit connection", se);
98 }
99 return result;
100 } catch (final Throwable re) {
101 try {
102
103 connection.rollback();
104 } catch (final Throwable se) {
105 re.addSuppressed(se);
106 }
107 throw re;
108 }
109 }
110
111 private void assertAutoCommitFalse(final Connection connection) {
112 try {
113 if (connection.getAutoCommit()) {
114 throw new IllegalStateException("com.atlassian.sal.spi.HostConnectionAccessor returned connection with autocommit set");
115 }
116 } catch (final SQLException e) {
117 throw new RdbmsException("unable to invoke java.sql.Connection#getAutoCommit", e);
118 }
119 }
120 }