Hadoop ecosystem

This is just a collection of references and working examples. This is neither a self-contained tutorial nor complete documentation. So forgive me for non-consistent explanation.

All codes are checked on HDP 2.6 (Docker Image).

Disclaimer

  • At your own risk.
  • See official manual/documentation for precise explanation.

Hortonworks Data Platform (HDP)

  • Documentation, Hadoop Tutorial – Getting Started with HDP
  • The password of user maria_dev : maria_dev
  • The password of superuser : hadoop (will be changed after the first login).
  • ssh root@localhost -p 2222
  • The password of root for MySQL : hadoop
  • URL for HDFS: hdfs://sandbox.hortonworks.com:8020/ (instead of hdfs://localhost:9000/)
  • admin is the super user for Ambari

Do the followings after the first login

  • ambari-admin-password-reset to set the password for admin on Ambari
  • Prepare local command: yum install mlocate and updatedb
  • Start HBase manually if it is needed.

Hadoop

Hive

  • Apache Hive, Documentation, LanguageManual, Tutorial (tutorialpoint)
  • Hive
    • reads files in a columnar format (CSV, JSON, ORC, etc) in HDFS,
    • manages the schemas and metadata of tables, and
    • provides an SQL-like language (HiveQL) for retrieving data.
  • Add EXPLAIN before a query to see an execution plan.

Configuration

  • There are three methods to change a configuration:
    • SET command in CLI/Beeline: SET hive.xxx.yyy=zzz
    • $ hive --hiveconf hive.xxx.yyy=zzz
    • In hive-site.xml (under /etc/hive/2.6.0.3-8/0/).
  • Configuratoin variables
  • SET; shows all settings.
  • SET hive.xxx.yyy; shows the value of the variable.
  • SET hive.execution.engine = mr; : execute query without Tez.

Data Types

Primitive Types

  • Manual
  • Integers: TINYINT, SMALLINT, INT (4-byte signed: -2147483648 to 2147483647), BIGINT
  • BOOLEAN
  • Floating point numbers: FLOAT, DOUBLE
  • Fixed point numbers DECIMAL
  • String types: STRING, VARCHAR, CHAR
  • Date and time types: TIMESTAMP (up to nanosecond precision), DATE
  • BINARY

Hive Tables

Hive reads/stores files in HDFS as a table. There are two types of tables according to the management.

  • Managed table (default): all data are managed by internal Hive processes. The table is stored under /apps/hive/warehouse/. Use managed tables when Hive should manage the lifecycle of the table, or when generating temporary tables.
  • External table: describes the metadata/schema on external files. The files can be accessed and managed by processes outside of Hive. Use external tables when files are already present or in remote locations, and the files should remain even if the table is dropped. In other words, the table will be empty if we delete the external files.

To identify the type of a table use DESCRIBE FORMATTED table_name. (Look at rows Location and Table Type.)

We can create an external table from CSV files in HDFS:

CREATE EXTERNAL TABLE AutoExt (
    mpg DOUBLE, cylinders INT, displacement DOUBLE, horsepower INT, 
    weight INT, acceleration DOUBLE, year INT, origin INT, name STRING
) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' -- separator
STORED AS TEXTFILE                            -- CSV
LOCATION '/user/maria_dev/data/Auto'          -- directory for the data
tblproperties("skip.header.line.count"="1")   -- skip the first line 
;

To convert it to a managed table, use CTAS (CREATE TABLE AS SELECT):

CREATE TABLE AutoMgd STORED AS ORC AS SELECT * FROM AutoExt;

Then the values of Location and Table Type in DESCRIBE FORMATTED AutoMgd have been changed.

  • When defining a table with CTAS, then the target table is managed without partition or bucket. See below.
  • CREATE TABLE newtable LIKE oldtable; makes a copy of a table.

Storage format

  • Manual.
  • We should use ORC format to store a managed table if it is possible.
  • CREATE TABLE neworc STORED AS ORC AS SELECT * FROM oldnonorc; makes a copy of a table with an ORC format.

If the separator is ; we need to escape it.

CREATE EXTERNAL TABLE csv_07 (
    code STRING, description STRING, total_emp INT, salary INT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\;'
STORED AS TEXTFILE
LOCATION '/user/maria_dev/csv_07'
;

INSERT INTO TABLE csv_07 SELECT * FROM sample_07;

If more detailed options for CSV are required, then OpenCSVSerde should be used. But this SerDe treats all columns to be of type String. That is, the following code works, but data types of all columns are String.

CREATE EXTERNAL TABLE csv_07a (
    code STRING, description STRING, total_emp INT, salary INT
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    "separatorChar" = "\;",
    "quoteChar"     = "\"",
    "escapeChar"    = "\\"
)
LOCATION '/user/maria_dev/csv_07a'
;

INSERT INTO TABLE csv_07a SELECT * FROM sample_07;

Partitioned Table

  • Manual, Tutorial
  • A partition is created mostly for a faster query (of certain kind). This can be regarded as an index for a categoriacal variable (column).
  • HCatalog only supports partition columns of type string.

Example: Create an external table at first.

CREATE EXTERNAL TABLE irise (
    Id INT, 
    SepalLength FLOAT, SepalWidth FLOAT, PetalLength FLOAT, PetalWidth FLOAT, 
    Species STRING
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    "separatorChar" = "\;",
    "quoteChar"     = "\"",
    "escapeChar"    = "\\"
)
STORED AS TEXTFILE
LOCATION '/user/maria_dev/iris'
;

Next we define a managed table with partition

CREATE TABLE irisp (
    Id INT, SepalLength FLOAT, SepalWidth FLOAT, PetalLength FLOAT, PetalWidth FLOAT
)
PARTITIONED BY (Species STRING)
STORED AS ORC
;

Note that the column Species which will be partitioned is NOT contained in irisp (...).

Next we insert rows of the external table into the managed table

SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT OVERWRITE TABLE irisp PARTITION(Species) SELECT * FROM irise;

The partitioned table is stored in three directories according to the values of column Species.

$ hadoop fs -ls /apps/hive/warehouse/irisp
Found 3 items
drwxrwxrwx   - maria_dev hdfs          0 2017-08-24 14:07 /apps/hive/warehouse/irisp/species=setosa
drwxrwxrwx   - maria_dev hdfs          0 2017-08-24 14:07 /apps/hive/warehouse/irisp/species=versicolor
drwxrwxrwx   - maria_dev hdfs          0 2017-08-24 14:07 /apps/hive/warehouse/irisp/species=virginica

We can also check the partition on Hive CLI:

hive> show partitions irisp;
OK
species=setosa
species=versicolor
species=virginica

Bucketed table

  • A bucket is a kind of a partion for a numerical variable (column). Manual.

Example: we define a table with a bucket (and a partition).

CREATE TABLE irisb (
    Id INT, SepalLength FLOAT, SepalWidth FLOAT, PetalLength FLOAT, PetalWidth FLOAT
)
PARTITIONED BY (Species STRING)
CLUSTERED BY (Id) SORTED BY(SepalLength) INTO 10 BUCKETS
STORED AS ORC
;

Note that the bucketed column Id is included in irisb (...) unlike a partitioned column.

SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT OVERWRITE TABLE irisb PARTITION(Species) SELECT * FROM irisb;

Then directory /apps/hive/warehouse/irisb contains three directories for the partitions and each of them contains 10 files (i.e. buckets).

Load data

  • LOAD DATA (LOCAL) used to insert rows of files into a Hive table. Manual
  • "Load operations are currently pure copy/move operations that move datafiles into locations corresponding to Hive tables."
  • Therefore
    • we must already have a Hive table, and
    • the format of the file(s) agrees with one of the table.
  • OVERWRITE INTO TABLE deletes all existing rows.
  • The original file in HDFS is deleted while the file in local is not.
  • LOAD DATA statement automatically detects compression (.gz). Thus the following example also works without any change except the file name when the CSV file is compressed.
  • Data type of the table can be ignored.
  • One of the best practices: we insert the data to a temporary table and then we insert the temporary table into the target one (which will be stored as ORC).

Example: Prepare a CSV file in local (and in HDFS).

maria_dev$ R -q -e "write.table(MASS::Boston, 'mass/boston.csv', col.name=F, sep=';')"
maria_dev$ hadoop fs -mkdir mass
maria_dev$ hadoop fs -put mass/boston.csv mass

Then define a table (according to the file format, even though the data types will be ignored).

CREATE TABLE Boston(
    crim FLOAT, zn FLOAT, indus FLOAT, chas INT, nox FLOAT, rm FLOAT, age FLOAT,
    dis FLOAT, rad INT, tax INT, ptratio FLOAT, black FLOAT, lstat FLOAT, medv FLOAT
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    "separatorChar" = "\;",
    "quoteChar"     = "\"",
    "escapeChar"    = "\\"
)
STORED AS TEXTFILE;

Then insert the rows of the local file into the Hive table.

LOAD DATA LOCAL INPATH '${env:HOME}/mass' INTO TABLE Boston;

Or of the file in HDFS

LOAD DATA INPATH '/user/maria_dev/mass' INTO TABLE Boston;

Then /apps/hive/warehouse/boston contains the original CSV: boston.csv.

Update/Delete/Insert

"Row-level" update, delete and insert are available from Hive 0.14.

  • UPDATE tablename SET column=value [, column=value ...] [WHERE expression]
  • DELETE FROM tablename [WHERE expression]
  • INSERT INTO TABLE tablename [PARTITION (col1[=val1], col2[=val2] ...)] VALUES values_row [, values_row ...]

Examples from the manual with partition.

INSERT INTO TABLE pageviews PARTITION (datestamp = '2014-09-23')
    VALUES ('jsmith', 'mail.com', 'sports.com'),
           ('jdoe',   'mail.com',  null);

INSERT INTO TABLE pageviews PARTITION (datestamp)
    VALUES ('tjohnson', 'sports.com',  'finance.com', '2014-09-23'), 
           ('tlee',     'finance.com', null,          '2014-09-21');

Built in operators/functions

Most operators/functions of SQL can be available in HQL.

  • LIKE ignores cases.
  • A RLIKE regex: matching with a regular expression
  • regexp_replace(col, regex, replaced_with) : $col =~ s/regex/replaced_with/g
  • Window functions are also available.
    • Example: rank() over (partition by abt order by salary desc)
  • INSERT OVERWRITE TABLE mytable SELECT ... can be use to make a table new:
    • Example: INSERT OVERWRITE TABLE user_active SELECT * FROM user WHERE active = 1;
  • We can export a result of a query into CSVs:
    • INSERT OVERWRITE LOCAL DIRECTORY '/tmp/csvs' SELECT .... creates directory tmp/csvs and stores the output under it.

To change the type of a column use an ALTER TABLE statement:

ALTER TABLE AutoMgd CHANGE origin origin String;

ALTER TABLE statements of HiveQL are slightly different from the ordinary SQL Check the usage when using it.

Improve the performance of Hive

  • The data format must be ORC.
  • Partition and buckets. See above.
  • Tez and Cost based optimization (are enabled by default).
  • Index is available for Hive tables:
    • How to create an index: CREATE INDEX AutoNameIndex ON TABLE AutoMgd (name) AS 'COMPACT';
    • But Indexes unsupported for Tez execution engine.

Sqoop

Import (RDBMS -> HDFS)

  • We can import a table from a table in RDBMS as a Hive table or a CSV file. Manual

Create table squares in database "test" on MySQL by the following commands.

CREATE TABLE squares (x INT, y INT, z TEXT);
INSERT INTO squares (x,y,z) VALUES (1,1,'one');
INSERT INTO squares (x,y,z) VALUES (2,4,'two');
INSERT INTO squares (x,y,z) VALUES (3,9,'three');
INSERT INTO squares (x,y,z) VALUES (4,16,'four');
INSERT INTO squares (x,y,z) VALUES (5,25,'five');
INSERT INTO squares (x,y,z) VALUES (6,36,'six');
INSERT INTO squares (x,y,z) VALUES (7,49,'seven');
INSERT INTO squares (x,y,z) VALUES (8,64,'eight');
INSERT INTO squares (x,y,z) VALUES (9,81,'nine');
INSERT INTO squares (x,y,z) VALUES (10,100,'ten');

Import 1: MySQL -> Hive

We should not create a Hive table or directory for the file in advance (because of the create-hive-table option).

The following command creates automatically a Hive table and inserts all rows of squares table (in default database).

sqoop import --connect jdbc:mysql://localhost/test \
--username root --password hadoop --table squares \
--driver com.mysql.jdbc.Driver \
--hive-import --create-hive-table --hive-table default.squares -m 1
  • --username, --password, --table : are for RDBMS.
  • --hive-import : imports a table into a Hive
  • --create-hive-table : creates a table automatically. If it is absense, then Sqoop tries UPDATE.
  • --hive-table : specifies a database and a table in Hive
  • -m 1 : performs a sequential import. This is necessary if no primary key does not exist.

The password for RDBMS should not be included in the command line. We should use a text file containing the password instead.

maria_dev$ echo -n "hadoop" > pw.txt   ## create a password file (without LF at the end)
maria_dev$ hadoop fs -put pw.txt       ## put it in HDFS (/user/maria_dev/pw.txt)
maria_dev$ hadoop fs -chmod 400 pw.txt ## set the suitable permission

Then we can replace --password hadoop with --password-file /user/maria_dev/pw.txt.

Import 2: MySQL -> CSV in HDFS

The following command creates CSV files under /user/maria_dev/squared.

sqoop import --connect jdbc:mysql://localhost/test \
--username root --password-file /user/maria_dev/pw.txt --table squares \
--driver com.mysql.jdbc.Driver --target-dir '/user/maria_dev/squared' -m 1

Note that no header is contained in the CSV files.

We can import a result of a query as follows:

sqoop import --connect jdbc:mysql://localhost/test \
--username root --password hadoop \
--query 'SELECT * FROM squares WHERE x % 2 = 0 AND $CONDITIONS' \
--driver com.mysql.jdbc.Driver \
--target-dir '/user/maria_dev/even_squared' -m 1

Then the result of SELECT * FROM squares WHERE x % 2 = 0 is stored in the target directory in HDFS. Here $CONDITIONS is a token and required. (In practice we may regard it as a true condition and put it in WHERE clause.)

The created CSV file is obviously not registered as a Hive table. The following HQL command create an external Hive table:

CREATE EXTERNAL TABLE squared (x INT, y INT, z STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/user/maria_dev/squared'
;

Export (HDFS -> RDBMS)

  • We can export a Hive table or a CSV file into an RDBMS. Manual.
  • We have to create a table on RDBMS in advance. (Because Sqoop does actually "INSERT".)
  • We can not assume that the order of rows is kept.
  • Wenn exporting a CSV file in HDFS, the CSV file should contain no header and be clean. (This causes a problem when you export the CSV file of "example_07" directly.)

Export 1: Hive -> MySQL

Create a table (in a database "test") in MySQL.

CREATE TABLE export07 (code TEXT, description TEXT, total_emp INT, salary INT);

Then the following command inserts all rows in Hive table "sample_07" into the table "export07" in MySQL

sqoop export --connect jdbc:mysql://localhost/test \
--username root --password hadoop --table export07 \
--hcatalog-table sample_07
  • --connect, --username, --password, --table : setting for RDBMS.
  • --hcatalog-table : Hive table

Export2: CSV in HDFS -> MySQL

Prepare a CSV-file. Note that we write no header and the separater is ;.

maria_dev$ R -q -e "write.table(iris, 'iris.csv', col.name=F, sep=';')"
maria_dev$ hadoop fs -mkdir /user/maria_dev/iris
maria_dev$ hadoop fs -put iris.csv /user/maria_dev/iris

Create a table in MySQL in advance.

CREATE TABLE iris (
    Id INT, 
    SepalLength FLOAT, SepalWidth FLOAT, PetalLength FLOAT, PetalWidth FLOAT,
    Species TEXT
);

The following command inserts rows of the CSV file in HDFS into the table in MySQL.

sqoop export --connect jdbc:mysql://localhost/test \
--username root --password hadoop --table iris \
--export-dir hdfs://sandbox.hortonworks.com:8020/user/maria_dev/iris \
--input-fields-terminated-by ";" --input-optionally-enclosed-by '"'
  • --export-dir : directory of CSVs in HDFS
  • --input-fields-terminated-by : separator of values
  • --input-optionally-enclosed-by : Text values are often enclosed by a letter such as ". If it is the case, we have to specify it.

Next we try to UPDATE the table. Create the following CSV

50,test1
100,test2
150,test3

and store it in HDFS

maria_dev$ hadoop fs -mkdir /user/maria_dev/conv
maria_dev$ hadoop fs -put conv.csv /user/maria_dev/conv

The command

sqoop export --connect jdbc:mysql://localhost/test \
--username root --password hadoop --table iris \
--export-dir hdfs://sandbox.hortonworks.com:8020/user/maria_dev/conv \
--columns "Id,Species" --update-key Id --input-fields-terminated-by ","

is equivalent the following SQL commands

UPDATE iris SET Species='test1' WHERE Id=50;
UPDATE iris SET Species='test2' WHERE Id=100;
UPDATE iris SET Species='test3' WHERE Id=150;

Flume

  • Apache Flume, User Guide, tutorial (tutorial point)
  • Flume collects (streaming) data and stores them in HDFS (or pass them to other service such as ElasticSearch).
  • export FLUME_CONF=/etc/flume/conf (Flume configuration directory)

Agent

  • An agent is an independent daemon process which fetchs data and stores them in a specified way.
  • Flume system is basicaly just a workflow consisting of agents.
  • An agent has several components:
    • source: fetchs data and sends them to one ore more channels (or intercepters).
    • intercepter (optional): converts received data and pass them to channels.
    • channel: is a data buffer. It sends data to a sink.
    • sink: stores data in HDFS or sends a specified service.
  • We have to wirte a configuration of an agent in a file $FLUME_CONF/MyAgent.conf. ("MyAgent" should be replaced with a suitable name.)
  • $FLUME_CONF/flume-conf.properties.template is a tamplate file for an agent.

Example of an agent

This section will be replaced with a better examle.

  • Check with flume-ng command whether Flume works.
  • The most easiest agent will be netcat.conf.
  • Following this tutorial, we create an agent fetching data from Twitter.
    • An agent gets twees and stores them in HDFS in (Apache Avro format)[https://avro.apache.org/].
    • we can create OAuth credentials for Twitter at Twitter Application Management.

First we copy template file of flume-env.sh. (This has no meaning, because the shell script does nothing.)

root# cp flume-env.sh.template flume-env.sh

Next we prepare the directory in which data are stored. (This is different from other applications related to Hadoop.)

root# sudo -u flume hadoop fs -mkdir -p /user/flume/twitter_data
root# sudo -u flume hadoop fs -chown -R flume:hdfs /user/flume
root# sudo -u flume hadoop fs -chmod -R 770 /user/flume

Note that we assume that an agent works with the privilege of user flume.

Next we give a configuration of an agent. We save the following configuration in the file $FLUME_CONF/twitter.conf. (This is a modification of the configuration in the tutorial.)

# Naming the components on the current agent. 
TwitterAgent.sources = Twitter 
TwitterAgent.channels = MemChannel 
TwitterAgent.sinks = HDFS

# Describing/Configuring the source 
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.consumerKey = xx_YourConsumerKey_xx
TwitterAgent.sources.Twitter.consumerSecret = xx_YourConsumerSecret_xx
TwitterAgent.sources.Twitter.accessToken = xx_YourAccessToken_xx
TwitterAgent.sources.Twitter.accessTokenSecret = xx_YourAccessSecret_xx


# Describing/Configuring the sink 
TwitterAgent.sinks.HDFS.type = hdfs 
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://sandbox.hortonworks.com:8020/user/flume/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream 
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text 
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0 
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000 
TwitterAgent.sinks.HDFS.serializer = avro_event


# Describing/Configuring the channel 
TwitterAgent.channels.MemChannel.type = memory 
TwitterAgent.channels.MemChannel.capacity = 10000 
TwitterAgent.channels.MemChannel.transactionCapacity = 1000

# Binding the source and sink to the channel 
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel 

We start a Flume agent as user "flume".

root# sudo -u flume flume-ng agent \
--conf $FLUME_CONF -f $FLUME_CONF/twitter.conf -n TwitterAgent \
-Dflume.root.logger=DEBUG,console

Here

  • --conf $FLUME_CONF : the directory containing flume-env.sh, etc.
  • -f $FLUME_CONF/twitter.conf : the configuration file for the agent
  • -n TwitterAgent : the name of the agent
  • -Dflume.root.logger=DEBUG,console : debugging mode. INFO instead of DEBUG reduces the level of logging.

The format of the saved files is Apache Avro. This is because of the source, not the sink. But I do not know how to read tweets from We can convert the stored files into "JSON" with avro-tools. But the result is not readable yet.

Components of an agent

An event is a basic unit of data. It consists of header and data (byte payload).

source

  • Manual
  • type = spooldir : Spooling Directory Source. The source watches a specified directory for new files and will parse events from them.
    • spoolDir : a local directory to watch
  • type = org.apache.flume.source.twitter.TwitterSource : Twitter 1% firehose Source (experimental)
    • See the example above for the required properties.
  • type = http : HTTP source
    • port, bind, handler (important!)

channel

  • Manual
  • type = memory : memory channel
    • capacity = 100 : the maximum number of events stored in a channel.
    • transactionCapacity : the maximum number of events per transaction.

sink

  • Manual
  • type = hdfs : HDFS sink
    • hdfs.path : directory in HDFS where the data will be stored.
    • hdfs.fileType : file format. One of SequenceFile, DataStream and CompressedStream.
  • type = hive : Hive sink
    • hive.metastore : Hive metastore URI
    • hive.database : name of the database
    • hive.table : name of the table
    • batchSize = 15000 : the maximum number of events for a transaction
    • serializer : how to parse the data. If the data is comma-separated, we may use the following configuration:
      • serializer = DELIMITED
      • serializer.delimiter = ","
      • serializer.serdeSeparator = ','
      • serializer.fieldnames = col1,col2,col3
  • type = logger : Logger sink (for test)

Pig

  • Apache Pig, Tutorial (tutorialspoint)
  • There are four execution modes: local/mapreduce, with Tez/without Tez
    • MapReduce mode (default): $ pig script.pig
    • Tez mode: $ pig -x tez script.pig
    • local mode: $ pig -x local script.pig
    • Tez local mode : $ pig -x tez_local script.pig
  • The local mode runs without HDFS.
  • The Grunt shell starts if we give no script.
    • grunt> sh ls : same as ls
    • grunt> fs -ls : same as hadoop fs -ls

Data Types

  • int (32-bit), long (64-bit) / float (32-bit), double (64-bit) / chararray / Boolean (true,false) / Datetime (e.g. 1970-01-01T00:00:00.000+00:00)
  • tuple : an ordered set of "fields" (an element). It represents a row of a relation (a table). Ex. (10, "a", 3.14)
  • bag : an unordered set of tuples. It represents a relation (table). Ex. {(1,"a",1.4), (2,"b",1.7)}
  • The length of a tuple in a bag can vary.
  • map : something like a hash in Perl. [key#value, key2#val2]

Projection operator

Manual. Let b be the following relation

(setosa,     {(1,   5.1, 3.5, 1.4, 0.2, setosa),
              (2,   4.9, 3.0, 1.4, 0.2, setosa), ... })
(virginica,  {(150, 5.9, 3.0, 5.1, 1.8, virginica),
              (101, 6.3, 3.3, 6.0, 2.5, virginica), ...})
(versicolor, {(76,  6.6, 3.0, 4.4, 1.4, versicolor),
              (52,  6.4, 3.2, 4.5, 1.5, versicolor),...})

And its schema is group: chararray, a: {(id: int, x1: float, x2: float, x3: float, x4: float, species: chararray)}. Then

  • $0 and group are the first column consisting of chararray.
  • $1 and a are the second column consisting of bags.
  • $1.id and a.id are the first elements of the bags.

Data I/O

LOAD

a = LOAD 'basic_types.csv' USING PigStorage(';')
        AS (id:int, time:datetime, score:int, prob:float, color:chararray);
  • Schema on read. But we do not need to give it.
  • A schema is case-sensitive.
  • Load/Store functions
  • csv : a = LOAD 'geolocation.csv' USING PigStorage(',');
    • The quotation letter must be single quotation.
    • Without USING is equivalent to USING PigStorage('\t')
  • CSVExcelStorage: enables an advanced option for loading a CSV file.
    • REGISTER '/usr/hdp/2.6.0.3-8/pig/piggybank.jar'; is required before using it.
    • USING org.apache.pig.piggybank.storage.CSVExcelStorage(';'); is the standard usage. This removes quotations.
    • USING org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'NO_MULTILINE', 'UNIX', 'SKIP_INPUT_HEADER');: skips the first line (the header).
  • Hive table: a = LOAD 'geolocation' USING org.apache.hive.hcatalog.pig.HCatLoader();
    • The option -useHCatalog is required.
    • The schema is automatically imported.
  • JSON Lines
    • JsonLoader() requires the order of the keys in the json string. In other words, keys are ignored.
    • elephant-bird (to be tried)
  • A general Pig relation : BinStorage(). This function requires a careful treatment of schema.

STORE

b = GROUP a BY id;
c = FOREACH b GENERATE group as id, SUM(a.score) AS Total, AVG(a.score) AS Average;
DUMP c;
STORE c INTO 'store-test-dir' USING PigStorage (',');
STORE c INTO 'pigtohive' USING org.apache.hive.hcatalog.pig.HCatStorer();
  • The directory where CSV files are stored must not exist in advance.
  • But a Hive table must be created in advance and the schema must agree.

Diagnostic Operators

  • Manual
  • DUMP c : shows the relation
  • DESCRIBE c : shows the schema : c: {id: int,Total: long,Average: double}.
  • EXPLAIN c : shows an execution plan.
  • ILLUSTRATE c : shows a step-by-step execution of a sequence of statements. This is useful to see how the relation is changed in a script.

Relational Algebra

Manual

For "rows" (FILTER, ORDER BY, etc)

b = FILTER a BY prob > 0.5; -- pick the tuples satisfying the condition
c = ORDER b BY score DESC;  -- arrange the tuples in descending order by score
d = LIMIT c 3;              -- pick the top 3
  • col1 MATCHES '.*stryoulike.*' is equivalent to col1 LIKE '%stryoulike%' of SQL. But as the name suggests, regex is applied. Note that we must gives a regular expression which matches the whole string.
  • Note that == is used to compare two values unlike SQL.
  • is (not) null is available
  • The standard projection index (such as $2) can be used instead of the schema (such as score).
  • LIMIT c 4; picks "first" 4 tuples in the relation c. You should use it in a combination with DUMP and/or ORDER BY.
  • DISTINCT c; is equivalent to SELECT DISTINCT * FROM c; in SQL.
  • UNION c1, c2; concatenates two relations, i.e. the union in the sense of relational algebra / SQL.
  • SPLIT a INTO b IF score >= 0, c IF score < 0; creates two relations by given condisions. i.e. a combination of FILTER statements.
  • SAMPLE c 0.2;

For "columns" (FOREACH)

b = FOREACH a GENERATE id, score, prob as rate, score * prob as prod;

FOREACH ... GENERATE ... corresponds to SELECT in SQL.

The following function can be used to mutate data. (Manual).

  • Maths:
    • well-known functions such as ABS, SQRT, EXP, LOG, LOG10, ...
    • the ceiling function CEIL and the floor function FLOOR.
    • RANDOM() produce a pseudo random number in $[0,1)$.
    • ROUND(field) rounds "field" to an integer.
    • ROUND_TO(field,digits) rounds "field" to a fixed number of decimal digits. E.g. ROUND(3.15,1) gives 3.2
  • String:
    • TRIM(): removes leading/trailing white spaces.
    • LOWER(), UPPER() converts strings to lower/upper case.
    • SUBSTRING(string,s,t) is equivalent to SUBSTRING(string,s,t-s) in SQL. Namely "t" is the index of the end letter (not a length).
    • EqualsIgnoreCase(str1,str2) : compares two strings ignoring case
    • CONCAT() is the same as CONCAT() of MySQL.
    • SPRINTF(format,arg1,arg2,...) : formats strings. Use it if you want
    • STRSPLIT(string,regex,limit) splits a string with a regex and returns a tuple. Non-positive limit means no limits. But there is a difference between a negative limit and zero.
      • STRPLIT('accbc','c', 0) returns (a,b).
      • STRPLIT('accbc','c',-1) returns (a,,b,). (Namely empty entries remain.)
    • REGEX_EXTRACT(string,regex,index) returns the substring which is matched the marked subexpression (i.e. regular expression with parentheses). If string is "a9aa8aa7a", then REGEX_EXTRACT(string,'aa(\\d)a',1) gives "8". Note that we must escape "\" in regexp, while we use no backslash for index, i.e. "1" instead of "\1" or "$1".
    • REGEX_EXTRACT_ALL(string,regex) returns the matched substrings as a tuple. Note that the regex must corresponds the whole string. That is REGEX_EXTRACT_ALL('a1b2c343', '.*?(\\d)(\\w).*') returns (1,b), while REGEX_EXTRACT_ALL('a1b2c343', '(\\d)(\\w)') returns NULL.
    • REPLACE(string,regex,newstr) replaces the matched part of the string with "newstr". Note that we can apply regular expression unlike REPLACE() of MySQL. (Thus REGEXP_REPLACE() of Oracle is equivalent to our REPLACE().)
  • Datetime:
    • CurrentTime() returns the current time.
    • GetYear(), GetMonth(), GetDay(), GetHour(), GetMinute(), GetSecond()
    • GetWeek() : returns the calender week
    • YearsBetween(t1,t0), MonthsBetween(t1,t0), WeeksBetween(t1,t0), DaysBetween(t1,t0), HoursBetween(t1,t0), MinutesBetween(t1,t0) : they correspond to TIMEDIFF(t1,t0) in SQL. The prefix such as Years is a precision of the difference. (The returned values are always integer.)
    • There is no buildin function for days of week, but the following statement works instead: (DaysBetween(datetime,ToDate(0L)) + 4L) % 7. This assigns 0 to Sunday. The logic: 1970-01-01 is Thursday.
  • NB: A function name is case-sensitive.

Grouping

b = GROUP a BY color;                       -- {group: chararray, a: bag{...}}
c = FOREACH b GENERATE group AS color, AVG(a.score) AS average; -- Aggregation 

The schema of b is {group: chararray,a: {(id: int,time: datetime,score: int,prob: float,color: chararray)}}.

  • The grouping field (column) becomes group. (not "color" in the above case)
  • The second element of a tuple is a bag consisting of the corresponding tuples in the original relation. The field of the bag is a, i.e. the name of the original relation.
  • Therefore to reach a field in the bag, we need ".". In the above code the field "score" in the bag "a" can be reached through a.score or a.$2.

GROUP ALL is used to count the numbers of tuples.

a_all = GROUP a ALL;                         -- consists of a single tuple: (all, a) 
nrow = FOREACH a_all GENERATE COUNT_STAR(a); -- consists only of the number of tuples

We can use two fields for grouping.

a1 = FOREACH a GENERATE color, (score>=0 ? 'pos' : 'neg') AS sign, prob;
b = GROUP a1 BY (color, sign);
c = FOREACH b GENERATE group.$0, group.$1, AVG(a1.prob) AS average;

The field group is a tuple consisting of the grouping fields. Note that the schema of the tuple holds the names of the grouping fields. (Therefore we did not write group.$0 AS color in the above code.)

Aggregate functions

  • AVG(), SUM(), MIN() and MAX() ignores NULL values.
  • COUNT() and COUNT_STAR() compute the number of tuples in a bag. The difference is COUNT() ignores NULL while COUNT_STAR() does not.

COGROUP

Consider the following two relations l and r.

-- l (id:int, dept:chararray)
(110,HR)
(120,Marketing)
(130,Marketing)
(140,Sales)

-- r (dept:chararray,deptid:int)
(Marketing,13)
(Sales,17)
(Development,20)

COGROUP l by dept, r by dept; applies GROUP BY for l and r and combines them.

(HR,         {(110,HR)},                        {}                )
(Sales,      {(140,Sales)},                     {(Sales,17)}      )
(Marketing,  {(120,Marketing),(130,Marketing)}, {(Marketing,13)}  )
(Development,{},                                {(Development,20)})
  • Its schema is {group: chararray, l: {schema of l}, r: {schema of r}}.
  • If a is the above relation, then FILTER a BY not IsEmpty(l) and not IsEmpty(r); removes tuples with empty bag, so that the resulting relation is like an inner join.

JOIN

JOIN l by dept, r by dept; returns the inner join of the two relations:

(140, Sales,     Sales,     17)
(120, Marketing, Marketing, 13)
(130, Marketing, Marketing, 13)
  • Its schema is {l::id, l::dept, r::dept, r::deptid}.
  • There is not INNER JOIN statement. Just JOIN.
  • a = COGROUP l by dept, r by dept;

LEFT JOIN: JOIN l by dept LEFT OUTER, r by dept;

(110, HR,        ,            )
(140, Sales,     Sales,     17)
(120, Marketing, Marketing, 13)
(130, Marketing, Marketing, 13)

RIGHT JOIN: JOIN l by dept RIGHT OUTER, r by dept;

(140, Sales,     Sales,       17)
(120, Marketing, Marketing,   13)
(130, Marketing, Marketing,   13)
(,             , Development, 20)

OUTER JOIN: JOIN l by dept FULL OUTER, r by dept;

(110, HR,        ,              )
(140, Sales,     Sales,       17)
(120, Marketing, Marketing,   13)
(130, Marketing, Marketing,   13)
(,             , Development, 20)

Sample codes

Count the number of rows by year:

b = GROUP a BY year;
c = FOREACH b GENERATE group AS year, COUNT_STAR(a.year) AS count;

User defined functions (UDF)

  • Piggy Bank is a place to share the Java UDFs.
  • /usr/hdp/2.6.0.3-8/pig/piggybank.jar

Performance

  • Performance and Efficiency, Pig Cookbook
  • Specify the number of reduce tasks for a Pig MapReduce job
  • SET default_parallel 20; : specifies the number of reducers.
  • Replicated join: if one of the relations to be joined are small enough, the small one is copied to all workers.
    • Add USING 'replicated' at the end to perform a replicated join.
    • C = JOIN big BY b1, tiny BY t1, mini BY m1 USING 'replicated';

Tez

Oozie

Apache Oozie Apache Ambari Workflow Manager View for Apache Oozie

Share this page on