data-blog
  • Main Page
  • Blog
    • Using Zookeeper for your Flume configurations
  • Apache Spark
    • Spark Streaming in Action for Click-Stream
    • Querying your Database Data with JdbcRDDs
    • Enhancing your Click-Stream with CRM database
    • Migrating Data From Couchbase to Postgres using Apache Spark
  • PostgreSQL
    • Installing unattended Postgres
    • PostgreSQL Logging
Powered by GitBook
On this page
  • Introduction
  • Installing Travel-Sample sample bucket on Couchbase
  • Including Maven Dependencies
  • Creating Spark DataFrame from Couchbase Bucket
  • Suppress Logging
  • Creating Spark Session
  • Creating Spark SQL DataFrame
  • Creating table in Postgres Database
  • Repartition and Load Data to Postgres Database

Was this helpful?

  1. Apache Spark

Migrating Data From Couchbase to Postgres using Apache Spark

PreviousEnhancing your Click-Stream with CRM databaseNextInstalling unattended Postgres

Last updated 5 years ago

Was this helpful?

Introduction

is a famous distributed data processing platform and framework. Since Apache Spark is a prior data processing platform for multiple data sources for both source and target reasons, most of the data platforms has their own connectors for Apache Spark.

For instance NoSQL database has its own for Spark RDDs and Spark SQL DataFrames which makes it quite easy to read bulk data from Couchbase and create Spark RDDs and/or Spark SQL DataFrames.

This multi source capability can also be used as reading from one kind of source and write the data to another type of target. If there is no data processing, enhancement or any kind of data alteration between the reading and writing process then, Apache Spark can also be used as a distributed, fast and efficient data migration tool.

Most important part of this process would probably be the data structure changes between different kind of data sources. NoSQL databases and Relational databases can be an example for this topic. While NoSQL databases stores data in a semi-structured, Relational databases stores data in a more strictly defined structures.

With the flexibility of Spark framework, both metadata and the data transformations can be done with the correct data modeling.

In this article, I am going to give a simple example of copying JSON structured data from Couchbase NoSQL database to a Postgresql relational database.

Installing Travel-Sample sample bucket on Couchbase

First we need to create a DataFrame from Couchbase bucket. For a sample dataset, I used travel-sample bucket of the Couchbase. Since this article is not a tutorial for Couchbase Installation or Sample Buckets you need to install travel-sample bucket after installing Couchbase. For this, you need to go Settings > Sample Buckets tab.

Including Maven Dependencies

We are going to migrate data from Couchbase to Postgresql, therefore we both need Couchbase Spark Connector Libraries as well as Postgresql JDBC libraries. I am using Maven project so I need to modify and add following dependencies to my pom.xml file.

        <!-- couchbase libraries -->
        <dependency>
            <groupId>com.couchbase.client</groupId>
            <artifactId>spark-connector_2.11</artifactId>
            <version>2.3.0</version>
            <exclusions>
                <exclusion>
                    <groupId>com.couchbase.client</groupId>
                    <artifactId>java-client</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.couchbase.client</groupId>
            <artifactId>java-client</artifactId>
            <version>2.7.4</version>
        </dependency>
        <!-- couchbase libraries -->

        <!-- https://mvnrepository.com/artifact/org.postgresql/postgresql -->
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.2.8</version>
        </dependency>

Spark version of this article is 2.3.3 so following Spark dependencies should also be added to pom.xml file.

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.3.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.3</version>
        </dependency

Creating Spark DataFrame from Couchbase Bucket

Suppress Logging

To enable logging of Apache Spark to me more readable, I usually practice to set the level of the logging to ERROR only.

        Logger log = Logger.getRootLogger();
        log.setLevel(Level.ERROR);

        Logger.getLogger("org").setLevel(Level.ERROR);
        Logger.getLogger("akka").setLevel(Level.OFF);

Creating Spark Session

        SparkSession spark = SparkSession
                .builder()
                .appName("MigrateCouchbase2Postgres")
                .master("local[*]")
                .config("spark.couchbase.nodes", "localhost")
                .config("spark.couchbase.username", "Administrator")
                .config("spark.couchbase.password", "Administrator")
                .config("spark.couchbase.bucket." + "travel-sample", "")
                .config("com.couchbase.socketConnect", 300000)
                .config("com.couchbase.connectTimeout", 300000)
                .config("com.couchbase.queryTimeout", 300000)
                .config("com.couchbase.maxRequestLifetime", 300000)
                .config("spark.driver.allowMultipleContext", "true")
                .getOrCreate();

Creating Spark SQL DataFrame

This is a part where load the data within the Couchbase Bucket into a DataFrame. With only the following part running the project will display how many entries in the travel-sample bucket which has the type of airline.

Dataset<Row> travelDS = couchbaseReader(spark.read()).couchbase(new EqualTo("type", "airline"));
System.out.println(travelDS.count());

The important thing to remember is to filter the bucket data for the same structured information. Since Couchbase is a NoSQL database where different type of structured data can be stored in the same bucket, remember to load the relevant data which also have the same JSON structure.

Creating table in Postgres Database

As we know the airlines JSON data structure lies in the Couchbase Bucket, the corresponding table should exist in the Postgresql database in order to fill in the data which is loaded from Couchbase.

create table travel_sample
(
    meta_id   varchar(50),
    call_sign varchar(100),
    country   varchar(50),
    iata      varchar(50),
    icao      varchar(50),
    id        integer,
    name      varchar(50),
    type      varchar(50)
);

alter table travel_sample
    owner to postgres;

Repartition and Load Data to Postgres Database

travelDS.repartiton(4).foreachPartition(each -> {
    try {
        Class.forName("org.postgresql.Driver");

        //jdbc:postgresql://host:port/database
        String url = "jdbc:postgresql://localhost:5432/postgres";
        Properties props = new Properties();
        props.setProperty("user","postgres");
        props.setProperty("password","postgres");
        Connection conn = DriverManager.getConnection(url, props);

        PreparedStatement pStmt = conn.prepareStatement(  "insert into travel_sample values (?,?,?,?,?,?,?,?)");
        
        while (each.hasNext()){
            Row row = each.next();

            pStmt.setString(1, row.getString(0));
            pStmt.setString(2, row.getString(1));
            pStmt.setString(3, row.getString(2));
            pStmt.setString(4, row.getString(3));
            pStmt.setString(5, row.getString(4));
            pStmt.setLong(6, row.getLong(5));
            pStmt.setString(7, row.getString(6));
            pStmt.setString(8, row.getString(7));

            pStmt.executeUpdate();
        }
    }
    catch (ClassNotFoundException cnfe){
        cnfe.printStackTrace();
    } catch (SQLException e) {
        e.printStackTrace();
    } finally{

    }
});

Explaining the Code: After reading and distributing data within the Spark, now it is time to repartition and load data to the previously created Postgresql table.

First; I am repartitioning the data to control the parallel threads of the data ingestion to the Postgres Database. I find parallel threads of 4 is more than enough for this example so the DataFrame will be split into four adjacent partitions.

For every partition there will be only one JDBC connection is created which means there should be only four connections created on the Postgres Database at the same time and all the data will be ingested to the database with these connections.

Also the insert statement is prepared once for each partition, so Postgresql is not going to parse and execute the statement for every inserted row, instead the cached statement will be parsed once and executed many times for each row.

ForEachPartition will enable an iterator of the row type of the Spark SQL. So each partition should iterate the rows and insert the rows by using the already opened JDBC connection and prepared statement.

After the execution data should be inserted to the Postgresql. When you select from the Postgresql table you should find the data as follows.

Spark session is the entry point of a Spark program. To enable Couchbase connection from the Spark Session some special configurations should be added to the Session parameters. The detailed explanations of these configurations can be found on .

the official documentation
Apache Spark
Couchbase
Spark API