The customer had an existing custom PL/SQL concurrent program that looped through a result set and processed each row sequentially. As volume grew, this sequential processing took longer and longer as some rows that took longer to process held up the remaining rows in the result set.
We needed something to process rows in parallel, 5 or 10 at a time. Using DBMS_PARALLEL_EXECUTE in PL/SQL did not yield good results, so we had to look for something else.
A Google search on "Java Multithreading" got me to this wonderful post that provided the direction for us to create a Java concurrent program to enable parallel processing.
The Java concurrent program uses the Executor framework - java.util.concurrent.* - as well as the Inner Class pattern so that the threads can access members of the executor as if they are part of the outer executor class.
public class MyParallelConcProg implements JavaConcurrentProgram {
public class WorkerThread implements Runnable {
Logic:
//1. Define number of parallel threads
String numberOfThreads = parameters.get("p_number_of_threads");
numberOfThreads = (numberOfThreads == null) ? "5" : numberOfThreads;
//2. Create the parallel thread pool
ExecutorService executor = Executors.newFixedThreadPool(new Integer(numberOfThreads)); // parameter then profile
//3. Call PL/SQL function that returns a REF CURSOR for the result set to loop through
String sql = "begin ? := MYPKG.get_cursor;end;" ;
try {
stmt = cpContext.getJDBCConnection().prepareCall(sql); // cpContext.getJDBCConnection().prepareStatement(sql);
i = 1;
stmt.registerOutParameter(i++, OracleTypes.CURSOR);
stmt.execute();
resultSet = ((OracleCallableStatement)stmt).getCursor(1);
//4. Now loop through the result set and process each row
while (resultSet.next()
{
Runnable worker = new MyParallelConcProg .WorkerThread( ....
executor.execute(worker);
}
We needed something to process rows in parallel, 5 or 10 at a time. Using DBMS_PARALLEL_EXECUTE in PL/SQL did not yield good results, so we had to look for something else.
A Google search on "Java Multithreading" got me to this wonderful post that provided the direction for us to create a Java concurrent program to enable parallel processing.
The Java concurrent program uses the Executor framework - java.util.concurrent.* - as well as the Inner Class pattern so that the threads can access members of the executor as if they are part of the outer executor class.
public class MyParallelConcProg implements JavaConcurrentProgram {
Logic:
//1. Define number of parallel threads
String numberOfThreads = parameters.get("p_number_of_threads");
numberOfThreads = (numberOfThreads == null) ? "5" : numberOfThreads;
//2. Create the parallel thread pool
ExecutorService executor = Executors.newFixedThreadPool(new Integer(numberOfThreads)); // parameter then profile
//3. Call PL/SQL function that returns a REF CURSOR for the result set to loop through
String sql = "begin ? := MYPKG.get_cursor;end;" ;
try {
stmt = cpContext.getJDBCConnection().prepareCall(sql); // cpContext.getJDBCConnection().prepareStatement(sql);
i = 1;
stmt.registerOutParameter(i++, OracleTypes.CURSOR);
stmt.execute();
resultSet = ((OracleCallableStatement)stmt).getCursor(1);
//4. Now loop through the result set and process each row
while (resultSet.next()
{
Runnable worker = new MyParallelConcProg .WorkerThread( ....
executor.execute(worker);
}
//5. The run() method in the WorkerThread class processes the row from the result set
public void run() {
String prefix = Thread.currentThread().getName() + " " + <some identifier from the result set>+ ": ";
// get connection from the pool
OracleConnection connection = (OracleConnection)cpContext.getExtraJDBCConnection(this, cpContext.getSessionId());
OracleCallableStatement stmt = null;
stmt = (OracleCallableStatement)connection.prepareCall(
" BEGIN MYPKG.process_row(...); end;"
Each thread uses a separate database connection, so there are always n+1 DB connections while the concurrent program runs. While it is tempting to use a large number of threads, there is a practical limit and we have not seen the need to go over 5 threads for our requirement. We did go as high as 20 threads during stress testing and were very satisfied with the results.
The DB connections for each thread do not inherit the concurrent request settings such as Request Id, Requestor etc. However, since the WorkerThread is an Inner Class, it has access to the cpcontext member of the outer executor class and can get to the concurrent request id by calling cpContext.getReqDetails().getRequestId())
I have skipped details like parameters passed to the PL/SQL statements since your requirements will be different. Also details pertaining to concurrent program log messages can be found in the developer's guide, so I have not mentioned that in this post.