Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Camel jdbc: How can I reset datasource if mysql connection gets closed

We have developed a Camel bundle (deployed in Karaf), which is expected to pull data from MySQL every 24 hour and push it to S3. But as MySQL internally close the connection if it is idle for 8 hours, hence on next scheduled execution it starts throwing an error. Please see below snippets from our code.

Properties:

MySqlDriver=com.mysql.jdbc.Driver
MySqlDatabaseURL=jdbc:mysql://x.x.x.x/dbname?autoReconnect=true
MySqlUsername=sm*****
MySqlPassword=*******

Activator:

public class Activator implements BundleActivator {

    public CamelContext context = null;

    public void start(BundleContext bundleContext) throws Exception {
        DataSource dataSource = UDMSUtils.createDataSource(UDMSUtils.getProperty(UDMSConstants.MYSQL_DATABASE_URL));

        SimpleRegistry simpleRegistry = new SimpleRegistry();
        simpleRegistry.put(UDMSConstants.UDMS_DATA_SOURCE, dataSource);

        context = new OsgiDefaultCamelContext(bundleContext, simpleRegistry);
        context.addRoutes(new CreativeRoutes());
        context.start();
    }

}

Building Data Source:

public static DataSource createDataSource(String connectURI) {
    BasicDataSource ds = new BasicDataSource();
    ds.setDriverClassName(getProperty(UDMSConstants.MYSQL_DRIVER));
    ds.setUsername(getProperty(UDMSConstants.MYSQL_USERNAME));
    ds.setPassword(getProperty(UDMSConstants.MYSQL_PASSWORD));
    ds.setUrl(connectURI);
    ds.setMaxWait(-1);  // Waits indefinately
    return ds;
}

Routes:

from("timer://Timer?repeatCount=1").to("direct:record_count").end();

from("direct:record_count")
    .process(new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
            exchange.getIn().setBody(query);
        }
    })    
    .routeId("record_count")
    .to("jdbc:" + UDMSConstants.UDMS_DATA_SOURCE)
    .process(new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
            // ...
        }
    );

Can anyone please suggest, what changes needs to be done in the above code so the connection remains active for as long as we need.

Please Note: We do not have permissions to change the mysql.properties, hence we need to handle this in our code.

like image 450
Hussain Bohra Avatar asked Mar 20 '23 02:03

Hussain Bohra


2 Answers

I had a similar problem a while ago. VikingSteve is also spot on in what he is advising you to do. Since I was using a OSGI Blueprint I did all my configuration in XML so I went about solving it as follows.

1) Add A Apache Commons DBCP dependency to your pom:

<dependency>
    <groupId>commons-dbcp</groupId>
    <artifactId>commons-dbcp</artifactId>
    <version>1.4</version>
</dependency>

2) Declare the connection pool in your camel route/blueprint file as follows:

<bean id="MydataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close" scope="singleton" >
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://DB-001:3306/Customer"/>
    <property name="username" value="sys_ETL"/>
    <property name="password" value="Blah"/>
    <property name="initialSize" value="4"/>
    <property name="maxActive" value="32"/>
    <property name="maxIdle" value="16"/>
    <property name="minIdle" value="8"/>
    <property name="timeBetweenEvictionRunsMillis" value="1800"/>
    <property name="minEvictableIdleTimeMillis" value="1800"/>
    <property name="testOnBorrow" value="true"/>
    <property name="testWhileIdle" value="true"/>
    <property name="testOnReturn" value="true"/>
    <property name="validationQuery" value="SELECT 1"/>
    <property name="maxWait"  value="1000"/>
    <property name="removeAbandoned" value="true"/>
    <property name="logAbandoned" value="true"/>
    <property name="removeAbandonedTimeout" value="30000"/>
</bean>

This step will create a database connection pool as a bean which I can then use in my routes. This bean has the name of Mydatasource I will use this information a little later. Also note the properties I have set for the connection pool in the configuration. These properties allow my connection pool to grow and shrink and also it makes sure that even after going idle the connections are not stale.

3) Create a POJO to use this connection pool:

public class AccountInformationToDatabase {


private BasicDataSource dataSource;
public BasicDataSource getDataSource() {
    return dataSource;
}
public void setDataSource(BasicDataSource dataSource) {
    this.dataSource = dataSource;
}
@Handler
public void PersistRecord
(
        @Body AccountRecordBindy msgBody
        , @Headers Map hdr
        , Exchange exch
) throws Exception
{

    Connection conn = null;
    PreparedStatement stmt=null;



    try 
    {


        conn= dataSource.getConnection();
        stmt =conn.prepareStatement("SOME INSERT STATEMENT");   

        stmt.setString(1,msgBody.getAccountNumber().trim());
        stmt.setString(2,msgBody.getRecordType().trim() );
        stmt.setString(3,msgBody.getSequenceNumber().trim());
        stmt.setString(4,msgBody.getTitle().trim());
        stmt.setString(5,msgBody.getCustomerType().trim());
        stmt.setString(6,msgBody.getName().trim());
        stmt.setString(7,msgBody.getAccountAddress1().trim());


        stmt.executeUpdate();        






    }
    catch (Exception e)
    {

        throw new Exception(e.getMessage());

    }

    finally
    {
        try
        {
                if (stmt!=null)
                {
                    stmt.close();
                    stmt= null;
                }
                if (conn!=null)
                {
                    conn.close();
                    conn= null;
                }
        }
        catch(SQLException e)
        {

            throw new Exception(e.getMessage());

        }

    }


}

}

This POJO has a property called datasource which is of type org.apache.commons.dbcp.BasicDataSource. I can now inject the Mydatasource bean into this POJO so that my class has access to the connection pool.

4) Turning the POJO into a bean and injecting the connection pool:

<bean id="AccountPersist"   class="AccountInformationToDatabase">
    <property name="dataSource" ref="MydataSource"/>
</bean>

This technique is a must have if you are doing text file processing and would like to use concurrent inserts etc.

like image 164
Namphibian Avatar answered Mar 21 '23 14:03

Namphibian


Use a different more advanced JDBC connection pool such as HikariCP. Set the jdbc4ConnectionTest or connectionTestQuery properties to test if the connection is still alive. From the docs about connectionTestQuery:

This is for "legacy" databases that do not support the JDBC4 Connection.isValid() API. This is the query that will be executed just before a connection is given to you from the pool to validate that the connection to the database is still alive. It is database dependent and should be a query that takes very little processing by the database (eg. "VALUES 1")

like image 40
Peter Keller Avatar answered Mar 21 '23 15:03

Peter Keller