Querying your Database Data with JdbcRDDs

Spark JdbcRDD is a special RDD structure to handle the database tables. It is quite confusing to realize the code. In this post we are going to demonstrate collecting the data from a database table and distribute is on the Spark Workers step by step.

We are going to us PostgreSQL database for this case and need a demo table and a demo record. We are not going to cover how to install and create a PostgreSQL Database but the following script can be used to create a table and insert a record.

create table users
	(user_id numeric,
	 user_name varchar(100),
	 user_email varchar(100),
	 constraint pk_user primary key (user_id));
 
 insert into users
  values
	(
	1024,
	'john_doe',
	'john_doe@mail.com'
	);

After creating the database table, switch to Spark Code. In order to read the table data to Spark RDD we need two classes first. the DBConnection class which is going to extend AbstractFunction0 and a Result Mapper MapResult which extends AbstractFunction1 to map the rows to RDD. Both classes will also implement Serializable to be able to distributed over the workers.

DBConnection.class
package spark_jdbc;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
import org.apache.spark.rdd.JdbcRDD;
import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;

public class DBConnection extends AbstractFunction0<Connection> 
    implements Serializable { 

    private String driverClassName; 
    private String connectionUrl; 
    private String userName; 
    private String password; 

    public DBConnection(String driverClassName, 
                        String connectionUrl, 
                        String userName, 
                        String password) { 
        this.driverClassName = driverClassName; 
        this.connectionUrl = connectionUrl; 
        this.userName = userName; 
        this.password = password; 
    } 

    public DBConnection() { 
        this.driverClassName = "org.postgresql.Driver"; 
        this.connectionUrl = "jdbc:postgresql://localhost/xennio"; 
        this.userName = "postgres"; 
        this.password = "postgres"; 
    }         

    @Override 
    public Connection apply() { 
        try { 
            Class.forName(driverClassName); 
        } catch (ClassNotFoundException e) { 
            //LOGGER.error("Failed to load driver class", e); 
            e.printStackTrace();
        } 

        Properties properties = new Properties(); 
        properties.setProperty("user", userName); 
        properties.setProperty("password", password); 

        Connection connection = null; 
        try { 
            connection = DriverManager.getConnection(connectionUrl, properties); 
        } catch (SQLException e) { 
            //LOGGER.error("Connection failed", e); 
            e.printStackTrace();
        } 
        return connection; 
    } 
} 

Last updated

Was this helpful?