Get­ting Started with Big Data An­a­lyt­ics on OpenS­tack with Your Own ETL Work­flow Process

ETL jobs are very com­mon, with IT or­gan­i­sa­tions han­dling very large data sets. Ex­e­cut­ing an ETL work­flow on OpenS­tack in­fra­struc­ture with just a few clicks is a boon for pri­vate cloud op­er­a­tors, com­pared to set­ting up Hadoop in­fra­struc­ture on phys­i­cal har

OpenSource For You - - Contents - The au­thor is a DevOps en­gi­neer with CloudEn­ablers Inc., Chen­nai, a prod­uct-based com­pany tar­get­ing multi-cloud orches­tra­tion and multi-cloud gov­er­nance plat­forms. He has ex­per­tise in Linux server ad­min­is­tra­tion, OpenS­tack Cloud and Hadoop ad­min­is­tra­tion,

When think­ing about a lead­ing open source cloud com­put­ing plat­form, OpenS­tack comes to mind, as it con­trols large pools of com­pute, stor­age and net­work­ing re­sources through­out data cen­tres, and has gained much pop­u­lar­ity in the tech­nol­ogy mar­ket. It in­tro­duces new ser­vices and fea­tures in ev­ery re­lease cy­cle to ad­dress crit­i­cal IT re­quire­ments. A very im­por­tant re­quire­ment for any IT or­gan­i­sa­tion is to build a ro­bust plat­form for per­form­ing data an­a­lyt­ics with large data sets. Big Data is the lat­est buzz­word in the IT in­dus­try. This ar­ti­cle fo­cuses on how OpenS­tack plays a key role in ad­dress­ing Big Data use cases.

Big Data on OpenS­tack

Nowa­days, data is gen­er­ated ev­ery­where and its vol­ume is grow­ing ex­po­nen­tially. Data is gen­er­ated from Web servers, ap­pli­ca­tion servers and data­base servers in the form of user in­for­ma­tion, log files and sys­tem state in­for­ma­tion. Apart from this, a huge vol­ume of data is gen­er­ated from IoT de­vices like sen­sors, vehicles and in­dus­trial de­vices. Data gen­er­ated from the sci­en­tific sim­u­la­tion model is also an ex­am­ple of a Big Data source. It is dif­fi­cult to store this data and per­form an­a­lyt­ics with tra­di­tional soft­ware tools. Hadoop, how­ever, can ad­dress this is­sue.

Let me share my use case with you. I have a large vol­ume of data stored in an RDBMS en­vi­ron­ment. It does not per­form well when the data grows big­ger. I can­not imag­ine its per­for­mance when it grows even big­ger. I do not feel com­fort­able with adopt­ing the NoSQL cul­ture at this stage, but I need to store and process the bulk of my data in a cost-ef­fec­tive way. I also need to scale the clus­ter at any time and re­quire a bet­ter dash­board to man­age all of its com­po­nents. Hence, I planned to set up a Hadoop clus­ter on top of OpenS­tack and cre­ate my ETL job en­vi­ron­ment.

Hadoop is an in­dus­try stan­dard frame­work for stor­ing and analysing a large data set with its fault tol­er­ant Hadoop Dis­trib­uted File Sys­tem and MapRe­duce im­ple­men­ta­tion. Scal­a­bil­ity is a very com­mon prob­lem in a typ­i­cal Hadoop clus­ter. OpenS­tack has in­tro­duced a project called Sa­hara –Data Pro­cess­ing as a Ser­vice. OpenS­tack Sa­hara aims to pro­vi­sion and man­age data pro­cess­ing frame­works such as Hadoop MapRe­duce, Spark and Storm in a clus­ter topol­ogy. This project is sim­i­lar to the data an­a­lyt­ics plat­form pro­vided by the Ama­zon Elas­tic MapRe­duce (EMR) ser­vice. Sa­hara de­ploys the clus­ter in a few min­utes. Be­sides, OpenS­tack Sa­hara can scale the clus­ter by adding or re­mov­ing worker nodes, based on de­mand.

The ben­e­fits of man­ag­ing a Hadoop clus­ter with OpenS­tack Sa­hara are:

Clus­ters can be pro­vi­sioned faster and are easy to con­fig­ure.

Like other OpenS­tack ser­vices, the Sa­hara ser­vice can be man­aged through its pow­er­ful REST API, CLI and hori­zon dash­board.

Plug­ins are avail­able to sup­port mul­ti­ple Hadoop ven­dors such as Vanilla (Apache Hadoop), HDP (Am­bari), CDH (Cloud­era), MapR, Spark and Storm.

Clus­ter size can be scaled up and down, based on de­mand. Clus­ters can be in­te­grated with OpenS­tack Swift to store the data pro­cessed by Hadoop and Spark.

Clus­ter mon­i­tor­ing can be made sim­ple.

Apart from clus­ter pro­vi­sion­ing, Sa­hara can be used as ‘An­a­lyt­ics as a Ser­vice’ for ad hoc or bursty an­a­lytic work­loads. Here, we can se­lect the frame­work and load the job data. It be­haves as a tran­sient clus­ter and will be au­to­mat­i­cally ter­mi­nated af­ter job com­ple­tion.

Ar­chi­tec­ture

OpenS­tack Sa­hara is de­signed to lever­age the core ser­vices and other fully man­aged ser­vices of OpenS­tack. It makes Sa­hara more re­li­able and ca­pa­ble of man­ag­ing the Hadoop clus­ter ef­fi­ciently. We can op­tion­ally use ser­vices such as Trove and Swift in our de­ploy­ment. Let us look into the in­ter­nals of the Sa­hara ser­vice.

The Sa­hara ser­vice has an API server which re­sponds to HTTP re­quests from the end user and in­ter­acts with other OpenS­tack ser­vices to per­form its func­tions.

Keystone (Iden­tity as a Ser­vice) au­then­ti­cates users and pro­vides se­cu­rity to­kens that are used to work with OpenS­tack, lim­it­ing users’ abil­i­ties in Sa­hara to their OpenS­tack priv­i­leges.

Heat (Orches­tra­tion as a Ser­vice) is used to pro­vi­sion and or­ches­trate the de­ploy­ment of data pro­cess­ing clus­ters. Glance (Vir­tual Ma­chine Im­age as a Ser­vice) stores VM images with op­er­at­ing sys­tem and pre-in­stalled Hadoop/ Spark soft­ware pack­ages to cre­ate a data pro­cess­ing clus­ter.

Nova (Com­pute as a Ser­vice) pro­vi­sions a vir­tual ma­chine for data pro­cess­ing clus­ters.

Ironic (Bare metal as a Ser­vice) pro­vi­sions a bare metal node for data pro­cess­ing clus­ters.

Neu­tron (Net­work­ing as a Ser­vice) fa­cil­i­tates net­work­ing ser­vices from ba­sic to ad­vanced topolo­gies to ac­cess the data pro­cess­ing clus­ters.

Cin­der (Block Stor­age) pro­vides a per­sis­tent stor­age me­dia for clus­ter nodes.

Swift (Ob­ject Stor­age) pro­vides re­li­able stor­age to keep job bi­na­ries and the data pro­cessed by Hadoop/ Spark. Des­ig­nate (DNS as a Ser­vice) pro­vides a hosted zone to keep DNS records of the clus­ter in­stances. Hadoop ser­vices com­mu­ni­cate with the clus­ter in­stances by their host names.

Ceilome­ter (Teleme­try as a Ser­vice) col­lects and stores the met­rics about the clus­ter for me­ter­ing and mon­i­tor­ing pur­poses.

Manila (File Share as a Ser­vice) can be used to store job bi­na­ries and data cre­ated by the job.

Bar­bican (Key Man­age­ment Ser­vice) stores sen­si­tive data such as pass­word and pri­vate keys se­curely.

Trove (Data­base as a Ser­vice) pro­vides a data­base in­stance for the Hive meta­s­tore, and stores the states of the Hadoop ser­vices and other man­age­ment ser­vices.

Set­ting up a Sa­hara clus­ter

The OpenS­tack team has pro­vided clear doc­u­men­ta­tion on how to set up Sa­hara ser­vices. Do fol­low the steps given in the in­stal­la­tion guide to de­ploy Sa­hara in your en­vi­ron­ment. There are sev­eral ways in which the Sa­hara ser­vice can be de­ployed. To ex­per­i­ment with it, Kolla would be a good choice. You can also man­age a Sa­hara project through the Hori­zon dash­board (https://docs.opens­tack.org/sa­hara/lat­est/ in­stall/in­dex.html).

ETL (ex­tract, trans­form and load) or ELT (ex­tract, load and trans­form) with a Sa­hara clus­ter

There are nu­mer­ous ETL tools avail­able in the mar­ket and tra­di­tional data ware­houses have their own ben­e­fits and lim­i­ta­tions—they might be in some other lo­ca­tion, rather than in your data source. The rea­son I am tar­get­ing Hadoop is that it is an ideal plat­form to run your ETL jobs. Data in your data store can come in var­i­ous forms like struc­tured, semi-struc­tured and un­struc­tured data. The Hadoop ecosys­tem has tools to in­gest data from dif­fer­ent data sources in­clud­ing data­bases, files and other data streams, and store it in a cen­tralised Hadoop Dis­trib­uted File Sys­tem (HDFS). As the vol­ume of data grows rapidly, Hadoop clus­ters can be scaled and you can lever­age OpenS­tack Sa­hara.

Apache Hive is the data ware­house project built on top of the Hadoop ecosys­tem. It is a proven tool for ETL anal­y­sis. Once the data is ex­tracted from the data sources with tools such as Sqoop, Flume, Kafka, etc, it should be cleansed and trans­formed by Hive or Pig scripts, us­ing the MapRe­duce tech­nique. Later we can load the data to the ta­ble we have cre­ated dur­ing the trans­for­ma­tion phase and an­a­lyse it. This can be gen­er­ated as a re­port and vi­su­alised with any BI tool.

An­other ad­van­tage of Hive is that it is an in­ter­ac­tive query engine and can be ac­cessed via Hive Query Lan­guage, which re­sem­bles SQL. So a data­base operative can ex­e­cute a job in the Hadoop ecosys­tem with­out prior knowl­edge of Java and MapRe­duce con­cepts. The Hive query ex­e­cu­tion engine parses the Hive query and con­verts it into a se­quence of MapRe­duce/Spark jobs for a clus­ter. Hive can be ac­cessed by the JDBC/ODBC driver and Thrift clients.

Oozie is a work­flow engine avail­able in the Hadoop ecosys­tem. A work­flow is noth­ing but a set of tasks that needs to be ex­e­cuted as a se­quence in a dis­trib­uted en­vi­ron­ment. Oozie helps us to con­vert a sim­ple work­flow into cas­cad­ing mul­ti­ple work­flows and cre­ates co­or­di­nated jobs. How­ever, it is ideal for cre­at­ing work­flows for com­plex ETL jobs. It does not have mod­ules to sup­port all ac­tions re­lated to Hadoop. A large or­gan­i­sa­tion might have its own work­flow engine to ex­e­cute its tasks. We can use any work­flow engine to carry out our ETL job. OpenS­tack Mis­tral (Work­flow as a Ser­vice) is the best ex­am­ple. Apache Oozie re­sem­bles OpenS­tack Mis­tral in some as­pects. It acts as a job sched­uler and can be trig­gered at reg­u­lar in­ter­vals of time.

Let us look at a typ­i­cal ETL job process with Hadoop. An ap­pli­ca­tion stores its data in a MySQL server. The data stored in the DB server should be an­a­lysed within the short­est time and at min­i­mum cost.

The ex­tract phase

The very first step is to ex­tract data from MySQL and store it in HDFS. Apache Sqoop can be used to ex­port/im­port from a struc­tured data source such as an RDBMS data store. If the data to be ex­tracted is semi-struc­tured or un­struc­tured, we can use Apache Flume to in­gest the data from a data source such as a Web server log, a Twit­ter data stream or sen­sor data.

sqoop im­port \

--con­nect jdbc:mysql://<data­base host>/<data­base name> \ -- d river com. mysql.jdbc.D river

--ta­ble <ta­ble name> \

--user­name <data­base user> \

--pass­word <data­base pass­word> \ --num-map­pers 3 \

-- tar get-d irhdfs://H ad o op_ sa­hara_ lo­cal/ user/ rkkr­ish­naa/data­base

The trans­form phase

The data ex­tracted from the above phase is not in a proper for­mat. It is just raw data, and should be cleansed by ap­ply­ing a proper fil­ter and data ag­gre­ga­tion from mul­ti­ple ta­bles. This is our stag­ing and in­ter­me­di­ate area for stor­ing data in HDFS.

Now, we should de­sign a Hive schema for each ta­ble and cre­ate a data­base to trans­form the data stored in the stag­ing area. Typ­i­cally, the data is in CSV for­mat and each record is de­lim­ited by a comma (,).

We don’t need to check HDFS data to know how it is stored. Since we have im­ported it from MySQL, we are aware of the schema. It is al­most com­pat­i­ble with Hive ex­cept for some data types.

Once the data­base is mod­elled, we can load the ex­tracted data for clean­ing. Still, the data in the ta­ble is de-nor­malised. Ag­gre­gate the re­quired col­umns from the dif­fer­ent ta­bles.

Open a Hive or Bee­line shell, or if you have HUE (Hadoop User Ex­pe­ri­ence) in­stalled, do open the Hive ed­i­tor and run the com­mands given below:

CRE­ATE DATA­BASE <data­base name>;

USE <data­base name>;

CRE­ATE TA­BLE <ta­ble name>

( vari­able1 DATATYPE , vari­able2 DATATYPE , vari­able3 DATATYPE ,

.,

.,

., vari­able DATATYPE

)

PAR­TI­TIONED BY (Year INT, Month INT )

ROW FOR­MAT DE­LIM­ITED FIELDS TER­MI­NATED BY ‘,’ STORED AS TEXTFILE;

LOAD DATA INPATH ‘hdfs://Hadoop_sa­hara_lo­cal/user/rkkr­ish­naa/ data­base/files.csv’

Sim­i­larly, we can ag­gre­gate the data from the mul­ti­ple ta­bles with the ‘OVERWRITE INTO TA­BLE’ state­ment.

Hive sup­ports par­ti­tion­ing ta­bles to im­prove the query per­for­mance by dis­tribut­ing the ex­e­cu­tion load hor­i­zon­tally. We pre­fer to par­ti­tion the col­umns that store the year and month. Some­times, a par­ti­tioned ta­ble cre­ates more tasks in a MapRe­duce job.

The load phase

We have checked the trans­form phase ear­lier. It is time to load the trans­formed data into a data ware­house di­rec­tory in HDFS, which is the fi­nal state of the data. Here we can ap­ply our SQL queries to get the ap­pro­pri­ate re­sults.

All DML com­mands can be used to an­a­lyse the ware­house data based on the use case (https://cwiki.apache. org/con­flu­ence/dis­play/Hive/Lan­guageMan­ual+DML).

Re­sults can be down­loaded as CSV, graphs or charts for anal­y­sis. They can be in­te­grated with other pop­u­lar BI tools such as Tal­end OpenS­tu­dio, Ta­belau, etc.

What next?

We had a brief look at the ben­e­fits of OpenS­tack Sa­hara and ex­plored a typ­i­cal ex­am­ple of an ETL task in the Hadoop ecosys­tem. It is time to au­to­mate the ETL job with the Oozie work­flow engine.

If you are ex­pe­ri­enced in us­ing Mis­tral, you can stick with it. Most Hadoop users are ac­quainted with Apache Oozie, so I am us­ing Oozie for this ex­am­ple.

Step 1: Cre­ate a job.prop­er­ties file, as fol­lows:

Step 2: Cre­ate a work­flow.xml file to de­fine the tasks for ex­tract­ing data from the MySQL data store to HDFS, as fol­lows:

<work­flow­app xmlns=”uri:oozie:work­flow:0.2” name=”my­etl­job”> <global>

<job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node>

</global>

<start to=”ex­tract”/>

<ac­tion name=”ex­tract”>

<sqoop xmlns=”uri:oozie:sqoop­ac­tion:0.2”>

<con­fig­u­ra­tion>

<prop­erty>

<name> map red. job. queue. name </ name> <value >${ queue Name }</ value>

</prop­erty>

</con­fig­u­ra­tion>

<com­mand>im­port --con­nect jdbc:mysql://<data­base host>:3306/<data­base name> --user­name <data­base user> --pass­word <data­base pass­word> --ta­ble <ta­ble name> --driver com. mysql.jdbc.D river--tar­get-d irhdfs://h ad o op_ sa­hara_ lo­cal/user/rkkr­ish­naa/data­base${date} --m 3 </com­mand> </sqoop>

<ok to=”trans­form”/> <er­ror to=”fail”/>

<ac­tion name=”trans­form”>

<hive xmlns=”uri:oozie:hive­ac­tion:0.2”> <con­fig­u­ra­tion>

<prop­erty>

<name>mapred.job.queue.name</name> <value>${queueName}</value>

</prop­erty>

<prop­erty>

<name>oozie.hive.de­faults</name> <value>/user/rkkr­ish­naa/oozie/hive-site.xml</value> </prop­erty>

</con­fig­u­ra­tion>

<script> tr ans form.q </ script> </hive>

<ok to=”load”/> <er­ror to=”fail”/>

<ac­tion name=”load”>

<hi vex mlns=”uri:oozie: hive­ ac­tion :0.2”> <con­fig­u­ra­tion>

<prop­erty>

<name> map red. job. queue. name </ name> <value >${ queue Name }</ value>

</prop­erty>

<prop­erty>

<name>oozie.h ive. de­faults </ name> <value>/user/rkkr­ish­naa/oozie/hive-site.xml</value> </prop­erty>

</con­fig­u­ra­tion>

<script>load.q</script>

</hive>

<ok to=”end”/>

<er­ror to=”fail”/>

</ac­tion>

<kill name=”fail”>

<mes­sage>Sqoop failed, er­ror m essa ge[${wf:erro rM essa ge(wf:l as tEr­rorNode())}]</mes­sage>

</kill>

<end name=”end”/>

</work­flow­app>

Step 3: Cre­ate the file trans­form.q:

LOAD DATA IN PATH‘hdfs://h ad o op_ sa­hara_ lo­cal/ user/rk kr­ishna a/ data­base${date}/files.csv’

Step 4: Cre­ate the file load.q: SE­LECT name, age, de­part­ment from sales_de­part­ment where ex­pe­ri­ence > 5;

The above work­flow job will do the ETL op­er­a­tion. We can cus­tomise the work­flow job based on our own use case. There are a lot of mech­a­nisms to han­dle task fail­ure in our work­flow job. Email ac­tion helps to track the sta­tus of the work­flow task. This ETL job can be ex­e­cuted on a daily or weekly ba­sis, at any time. Usu­ally, or­gan­i­sa­tions per­form these long run­ning tasks dur­ing week­ends or at night.

To know more about work­flow ac­tion, check out https:// oozie.apache.org/docs/3.3.1/in­dex.html.

OpenS­tack has in­te­grated a very large Hadoop ecosys­tem to its uni­verse. Many cloud providers of­fer the Hadoop ser­vice with just a few clicks on their cloud man­age­ment por­tal. OpenS­tack Sa­hara proves the abil­ity and ro­bust­ness of OpenS­tack in­fra­struc­ture. Ac­cord­ing to an OpenS­tack user sur­vey, very few or­gan­i­sa­tions have adopted the Sa­hara ser­vice in their pri­vate cloud. Sa­hara sup­ports most of the Hadoop ven­dor plug­ins to op­er­ate the Hadoop work­load ef­fec­tively. So, go ahead and ex­e­cute your ETL work­flow with OpenS­tack Sa­hara.

Fig­ure 2: OpenS­tack Sa­hara ar­chi­tec­ture

Fig­ure 1: Hadoop on OpenS­tack

Fig­ure 3: ETL work­flow in Hadoop

Newspapers in English

Newspapers from India

© PressReader. All rights reserved.