View Javadoc

1   package com.atlassian.sal.core.rdbms;
2   
3   import com.atlassian.fugue.Option;
4   import com.atlassian.sal.api.rdbms.ConnectionCallback;
5   import com.atlassian.sal.api.rdbms.RdbmsException;
6   import com.atlassian.sal.api.rdbms.TransactionalExecutor;
7   import com.atlassian.sal.spi.HostConnectionAccessor;
8   import com.google.common.annotations.VisibleForTesting;
9   import org.slf4j.Logger;
10  import org.slf4j.LoggerFactory;
11  
12  import javax.annotation.Nonnull;
13  import java.sql.Connection;
14  import java.sql.SQLException;
15  
16  /**
17   * Default implementation that invokes {@link com.atlassian.sal.spi.HostConnectionAccessor}.
18   * <p/>
19   * Created by {@link com.atlassian.sal.core.rdbms.DefaultTransactionalExecutorFactory}
20   *
21   * @since 3.0
22   */
23  public class DefaultTransactionalExecutor implements TransactionalExecutor {
24      private static final Logger log = LoggerFactory.getLogger(DefaultTransactionalExecutor.class);
25  
26      private final HostConnectionAccessor hostConnectionAccessor;
27  
28      @VisibleForTesting
29      boolean readOnly;
30  
31      @VisibleForTesting
32      boolean newTransaction;
33  
34      public DefaultTransactionalExecutor(@Nonnull final HostConnectionAccessor hostConnectionAccessor, final boolean readOnly, final boolean newTransaction) {
35          this.hostConnectionAccessor = hostConnectionAccessor;
36          this.readOnly = readOnly;
37          this.newTransaction = newTransaction;
38      }
39  
40      @Override
41      public <A> A execute(@Nonnull final ConnectionCallback<A> callback) {
42          return hostConnectionAccessor.execute(readOnly, newTransaction, new ConnectionCallback<A>() {
43              @Override
44              public A execute(final Connection connection) {
45                  return executeInternal(connection, callback);
46              }
47          });
48      }
49  
50      @Nonnull
51      @Override
52      public Option<String> getSchemaName() {
53          return hostConnectionAccessor.getSchemaName();
54      }
55  
56      @Override
57      @Nonnull
58      public TransactionalExecutor readOnly() {
59          readOnly = true;
60          return this;
61      }
62  
63      @Override
64      @Nonnull
65      public TransactionalExecutor readWrite() {
66          readOnly = false;
67          return this;
68      }
69  
70      @Override
71      @Nonnull
72      public TransactionalExecutor newTransaction() {
73          newTransaction = true;
74          return this;
75      }
76  
77      @Override
78      @Nonnull
79      public TransactionalExecutor existingTransaction() {
80          newTransaction = false;
81          return this;
82      }
83  
84      @VisibleForTesting
85      <A> A executeInternal(@Nonnull final Connection connection, @Nonnull final ConnectionCallback<A> callback) {
86          assertAutoCommitFalse(connection);
87  
88          // give the user the restricted connection
89          try (final WrappedConnection wrappedConnection = new WrappedConnection(connection)) {
90              // execute the user's callback
91              final A result = callback.execute(wrappedConnection);
92              try {
93                  // no exception indicates success
94                  connection.commit();
95              } catch (final Throwable se) {
96                  // failure to commit should be propagated, to halt any further processing
97                  throw new RdbmsException("Unable to commit connection", se);
98              }
99              return result;
100         } catch (final Throwable re) {
101             try {
102                 // any exception indicates failure
103                 connection.rollback();
104             } catch (final Throwable se) {
105                 re.addSuppressed(se);
106             }
107             throw re;
108         }
109     }
110 
111     private void assertAutoCommitFalse(final Connection connection) {
112         try {
113             if (connection.getAutoCommit()) {
114                 throw new IllegalStateException("com.atlassian.sal.spi.HostConnectionAccessor returned connection with autocommit set");
115             }
116         } catch (final SQLException e) {
117             throw new RdbmsException("unable to invoke java.sql.Connection#getAutoCommit", e);
118         }
119     }
120 }