This is just a collection of references and working examples. This is neither a self-contained tutorial nor complete documentation. So forgive me for non-consistent explanation.
All codes are checked on HDP 2.6 (Docker Image).
Disclaimer
- At your own risk.
- See official manual/documentation for precise explanation.
Hortonworks Data Platform (HDP)
- Documentation, Hadoop Tutorial – Getting Started with HDP
- The password of user
maria_dev
:maria_dev
- The password of superuser :
hadoop
(will be changed after the first login). ssh root@localhost -p 2222
- The password of root for MySQL :
hadoop
- URL for HDFS:
hdfs://sandbox.hortonworks.com:8020/
(instead ofhdfs://localhost:9000/
) admin
is the super user for Ambari
Do the followings after the first login
ambari-admin-password-reset
to set the password foradmin
on Ambari- Prepare local command:
yum install mlocate
andupdatedb
- Start HBase manually if it is needed.
Hadoop
Hive
- Apache Hive, Documentation, LanguageManual, Tutorial (tutorialpoint)
- Hive
- reads files in a columnar format (CSV, JSON, ORC, etc) in HDFS,
- manages the schemas and metadata of tables, and
- provides an SQL-like language (HiveQL) for retrieving data.
- Add
EXPLAIN
before a query to see an execution plan.
Configuration
- There are three methods to change a configuration:
- SET command in CLI/Beeline:
SET hive.xxx.yyy=zzz
$ hive --hiveconf hive.xxx.yyy=zzz
- In
hive-site.xml
(under/etc/hive/2.6.0.3-8/0/
).
- SET command in CLI/Beeline:
- Configuratoin variables
SET;
shows all settings.SET hive.xxx.yyy;
shows the value of the variable.SET hive.execution.engine = mr;
: execute query without Tez.
Data Types
Primitive Types
- Manual
- Integers:
TINYINT
,SMALLINT
,INT
(4-byte signed: -2147483648 to 2147483647),BIGINT
BOOLEAN
- Floating point numbers:
FLOAT
,DOUBLE
- Fixed point numbers
DECIMAL
- String types:
STRING
,VARCHAR
,CHAR
- Date and time types:
TIMESTAMP
(up to nanosecond precision),DATE
BINARY
Hive Tables
Hive reads/stores files in HDFS as a table. There are two types of tables according to the management.
- Managed table (default): all data are managed by internal Hive processes.
The table is stored under
/apps/hive/warehouse/
. Use managed tables when Hive should manage the lifecycle of the table, or when generating temporary tables. - External table: describes the metadata/schema on external files. The files can be accessed and managed by processes outside of Hive. Use external tables when files are already present or in remote locations, and the files should remain even if the table is dropped. In other words, the table will be empty if we delete the external files.
To identify the type of a table use DESCRIBE FORMATTED table_name
.
(Look at rows Location and Table Type.)
We can create an external table from CSV files in HDFS:
CREATE EXTERNAL TABLE AutoExt (
mpg DOUBLE, cylinders INT, displacement DOUBLE, horsepower INT,
weight INT, acceleration DOUBLE, year INT, origin INT, name STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' -- separator
STORED AS TEXTFILE -- CSV
LOCATION '/user/maria_dev/data/Auto' -- directory for the data
tblproperties("skip.header.line.count"="1") -- skip the first line
;
To convert it to a managed table, use CTAS (CREATE TABLE AS SELECT):
CREATE TABLE AutoMgd STORED AS ORC AS SELECT * FROM AutoExt;
Then the values of Location and Table Type in DESCRIBE FORMATTED AutoMgd
have been changed.
- When defining a table with CTAS, then the target table is managed without partition or bucket. See below.
CREATE TABLE newtable LIKE oldtable;
makes a copy of a table.
Storage format
- Manual.
- We should use ORC format to store a managed table if it is possible.
CREATE TABLE neworc STORED AS ORC AS SELECT * FROM oldnonorc;
makes a copy of a table with an ORC format.
If the separator is ;
we need to escape it.
CREATE EXTERNAL TABLE csv_07 (
code STRING, description STRING, total_emp INT, salary INT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\;'
STORED AS TEXTFILE
LOCATION '/user/maria_dev/csv_07'
;
INSERT INTO TABLE csv_07 SELECT * FROM sample_07;
If more detailed options for CSV are required, then OpenCSVSerde should be used. But this SerDe treats all columns to be of type String. That is, the following code works, but data types of all columns are String.
CREATE EXTERNAL TABLE csv_07a (
code STRING, description STRING, total_emp INT, salary INT
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
"separatorChar" = "\;",
"quoteChar" = "\"",
"escapeChar" = "\\"
)
LOCATION '/user/maria_dev/csv_07a'
;
INSERT INTO TABLE csv_07a SELECT * FROM sample_07;
Partitioned Table
- Manual, Tutorial
- A partition is created mostly for a faster query (of certain kind). This can be regarded as an index for a categoriacal variable (column).
- HCatalog only supports partition columns of type string.
Example: Create an external table at first.
CREATE EXTERNAL TABLE irise (
Id INT,
SepalLength FLOAT, SepalWidth FLOAT, PetalLength FLOAT, PetalWidth FLOAT,
Species STRING
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
"separatorChar" = "\;",
"quoteChar" = "\"",
"escapeChar" = "\\"
)
STORED AS TEXTFILE
LOCATION '/user/maria_dev/iris'
;
Next we define a managed table with partition
CREATE TABLE irisp (
Id INT, SepalLength FLOAT, SepalWidth FLOAT, PetalLength FLOAT, PetalWidth FLOAT
)
PARTITIONED BY (Species STRING)
STORED AS ORC
;
Note that the column Species
which will be partitioned is NOT contained in irisp (...)
.
Next we insert rows of the external table into the managed table
SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT OVERWRITE TABLE irisp PARTITION(Species) SELECT * FROM irise;
The partitioned table is stored in three directories according to the values
of column Species
.
$ hadoop fs -ls /apps/hive/warehouse/irisp
Found 3 items
drwxrwxrwx - maria_dev hdfs 0 2017-08-24 14:07 /apps/hive/warehouse/irisp/species=setosa
drwxrwxrwx - maria_dev hdfs 0 2017-08-24 14:07 /apps/hive/warehouse/irisp/species=versicolor
drwxrwxrwx - maria_dev hdfs 0 2017-08-24 14:07 /apps/hive/warehouse/irisp/species=virginica
We can also check the partition on Hive CLI:
hive> show partitions irisp;
OK
species=setosa
species=versicolor
species=virginica
Bucketed table
- A bucket is a kind of a partion for a numerical variable (column). Manual.
Example: we define a table with a bucket (and a partition).
CREATE TABLE irisb (
Id INT, SepalLength FLOAT, SepalWidth FLOAT, PetalLength FLOAT, PetalWidth FLOAT
)
PARTITIONED BY (Species STRING)
CLUSTERED BY (Id) SORTED BY(SepalLength) INTO 10 BUCKETS
STORED AS ORC
;
Note that the bucketed column Id
is included in irisb (...)
unlike
a partitioned column.
SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT OVERWRITE TABLE irisb PARTITION(Species) SELECT * FROM irisb;
Then directory /apps/hive/warehouse/irisb
contains three directories for
the partitions and each of them contains 10 files (i.e. buckets).
Load data
LOAD DATA (LOCAL)
used to insert rows of files into a Hive table. Manual- "Load operations are currently pure copy/move operations that move datafiles into locations corresponding to Hive tables."
- Therefore
- we must already have a Hive table, and
- the format of the file(s) agrees with one of the table.
OVERWRITE INTO TABLE
deletes all existing rows.- The original file in HDFS is deleted while the file in local is not.
LOAD DATA
statement automatically detects compression (.gz). Thus the following example also works without any change except the file name when the CSV file is compressed.- Data type of the table can be ignored.
- One of the best practices: we insert the data to a temporary table and then we insert the temporary table into the target one (which will be stored as ORC).
Example: Prepare a CSV file in local (and in HDFS).
maria_dev$ R -q -e "write.table(MASS::Boston, 'mass/boston.csv', col.name=F, sep=';')"
maria_dev$ hadoop fs -mkdir mass
maria_dev$ hadoop fs -put mass/boston.csv mass
Then define a table (according to the file format, even though the data types will be ignored).
CREATE TABLE Boston(
crim FLOAT, zn FLOAT, indus FLOAT, chas INT, nox FLOAT, rm FLOAT, age FLOAT,
dis FLOAT, rad INT, tax INT, ptratio FLOAT, black FLOAT, lstat FLOAT, medv FLOAT
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
"separatorChar" = "\;",
"quoteChar" = "\"",
"escapeChar" = "\\"
)
STORED AS TEXTFILE;
Then insert the rows of the local file into the Hive table.
LOAD DATA LOCAL INPATH '${env:HOME}/mass' INTO TABLE Boston;
Or of the file in HDFS
LOAD DATA INPATH '/user/maria_dev/mass' INTO TABLE Boston;
Then /apps/hive/warehouse/boston
contains the original CSV: boston.csv
.
Update/Delete/Insert
"Row-level" update, delete and insert are available from Hive 0.14.
UPDATE tablename SET column=value [, column=value ...] [WHERE expression]
DELETE FROM tablename [WHERE expression]
INSERT INTO TABLE tablename [PARTITION (col1[=val1], col2[=val2] ...)] VALUES values_row [, values_row ...]
Examples from the manual with partition.
INSERT INTO TABLE pageviews PARTITION (datestamp = '2014-09-23')
VALUES ('jsmith', 'mail.com', 'sports.com'),
('jdoe', 'mail.com', null);
INSERT INTO TABLE pageviews PARTITION (datestamp)
VALUES ('tjohnson', 'sports.com', 'finance.com', '2014-09-23'),
('tlee', 'finance.com', null, '2014-09-21');
Built in operators/functions
Most operators/functions of SQL can be available in HQL.
LIKE
ignores cases.A RLIKE regex
: matching with a regular expressionregexp_replace(col, regex, replaced_with)
:$col =~ s/regex/replaced_with/g
- Window functions are also available.
- Example:
rank() over (partition by abt order by salary desc)
- Example:
INSERT OVERWRITE TABLE mytable SELECT ...
can be use to make a table new:- Example:
INSERT OVERWRITE TABLE user_active SELECT * FROM user WHERE active = 1;
- Example:
- We can export a result of a query into CSVs:
INSERT OVERWRITE LOCAL DIRECTORY '/tmp/csvs' SELECT ....
creates directorytmp/csvs
and stores the output under it.
To change the type of a column use an ALTER TABLE
statement:
ALTER TABLE AutoMgd CHANGE origin origin String;
ALTER TABLE
statements of HiveQL are slightly different from the ordinary SQL
Check the usage
when using it.
Improve the performance of Hive
- The data format must be ORC.
- Partition and buckets. See above.
- Tez and Cost based optimization (are enabled by default).
- Index is available for Hive tables:
- How to create an index:
CREATE INDEX AutoNameIndex ON TABLE AutoMgd (name) AS 'COMPACT';
- But Indexes unsupported for Tez execution engine.
- How to create an index:
Sqoop
- Official, Documents for Sqoop 1
- Sqoop is a converter between RDBMS and HDFS (Hive tables, CSV in HDFS, etc.)
- Using Sqoop, Sqoop import/export tutorial (Hortonworks), Tutorialspoint
- How to write a database URL for JDBC
Import (RDBMS -> HDFS)
- We can import a table from a table in RDBMS as a Hive table or a CSV file. Manual
Create table squares
in database "test" on MySQL by the following commands.
CREATE TABLE squares (x INT, y INT, z TEXT);
INSERT INTO squares (x,y,z) VALUES (1,1,'one');
INSERT INTO squares (x,y,z) VALUES (2,4,'two');
INSERT INTO squares (x,y,z) VALUES (3,9,'three');
INSERT INTO squares (x,y,z) VALUES (4,16,'four');
INSERT INTO squares (x,y,z) VALUES (5,25,'five');
INSERT INTO squares (x,y,z) VALUES (6,36,'six');
INSERT INTO squares (x,y,z) VALUES (7,49,'seven');
INSERT INTO squares (x,y,z) VALUES (8,64,'eight');
INSERT INTO squares (x,y,z) VALUES (9,81,'nine');
INSERT INTO squares (x,y,z) VALUES (10,100,'ten');
Import 1: MySQL -> Hive
We should not create a Hive table or directory for the file in advance
(because of the create-hive-table
option).
The following command creates automatically a Hive table and inserts all rows
of squares
table (in default
database).
sqoop import --connect jdbc:mysql://localhost/test \
--username root --password hadoop --table squares \
--driver com.mysql.jdbc.Driver \
--hive-import --create-hive-table --hive-table default.squares -m 1
--username
,--password
,--table
: are for RDBMS.--hive-import
: imports a table into a Hive--create-hive-table
: creates a table automatically. If it is absense, then Sqoop tries UPDATE.--hive-table
: specifies a database and a table in Hive-m 1
: performs a sequential import. This is necessary if no primary key does not exist.
The password for RDBMS should not be included in the command line. We should use a text file containing the password instead.
maria_dev$ echo -n "hadoop" > pw.txt ## create a password file (without LF at the end)
maria_dev$ hadoop fs -put pw.txt ## put it in HDFS (/user/maria_dev/pw.txt)
maria_dev$ hadoop fs -chmod 400 pw.txt ## set the suitable permission
Then we can replace --password hadoop
with --password-file /user/maria_dev/pw.txt
.
Import 2: MySQL -> CSV in HDFS
The following command creates CSV files under /user/maria_dev/squared
.
sqoop import --connect jdbc:mysql://localhost/test \
--username root --password-file /user/maria_dev/pw.txt --table squares \
--driver com.mysql.jdbc.Driver --target-dir '/user/maria_dev/squared' -m 1
Note that no header is contained in the CSV files.
We can import a result of a query as follows:
sqoop import --connect jdbc:mysql://localhost/test \
--username root --password hadoop \
--query 'SELECT * FROM squares WHERE x % 2 = 0 AND $CONDITIONS' \
--driver com.mysql.jdbc.Driver \
--target-dir '/user/maria_dev/even_squared' -m 1
Then the result of SELECT * FROM squares WHERE x % 2 = 0
is stored in the
target directory in HDFS. Here $CONDITIONS
is a token and required. (In
practice we may regard it as a true condition and put it in WHERE clause.)
The created CSV file is obviously not registered as a Hive table. The following HQL command create an external Hive table:
CREATE EXTERNAL TABLE squared (x INT, y INT, z STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/user/maria_dev/squared'
;
Export (HDFS -> RDBMS)
- We can export a Hive table or a CSV file into an RDBMS. Manual.
- We have to create a table on RDBMS in advance. (Because Sqoop does actually "INSERT".)
- We can not assume that the order of rows is kept.
- Wenn exporting a CSV file in HDFS, the CSV file should contain no header and be clean. (This causes a problem when you export the CSV file of "example_07" directly.)
Export 1: Hive -> MySQL
Create a table (in a database "test") in MySQL.
CREATE TABLE export07 (code TEXT, description TEXT, total_emp INT, salary INT);
Then the following command inserts all rows in Hive table "sample_07" into the table "export07" in MySQL
sqoop export --connect jdbc:mysql://localhost/test \
--username root --password hadoop --table export07 \
--hcatalog-table sample_07
--connect
,--username
,--password
,--table
: setting for RDBMS.--hcatalog-table
: Hive table
Export2: CSV in HDFS -> MySQL
Prepare a CSV-file. Note that we write no header and the separater is ;
.
maria_dev$ R -q -e "write.table(iris, 'iris.csv', col.name=F, sep=';')"
maria_dev$ hadoop fs -mkdir /user/maria_dev/iris
maria_dev$ hadoop fs -put iris.csv /user/maria_dev/iris
Create a table in MySQL in advance.
CREATE TABLE iris (
Id INT,
SepalLength FLOAT, SepalWidth FLOAT, PetalLength FLOAT, PetalWidth FLOAT,
Species TEXT
);
The following command inserts rows of the CSV file in HDFS into the table in MySQL.
sqoop export --connect jdbc:mysql://localhost/test \
--username root --password hadoop --table iris \
--export-dir hdfs://sandbox.hortonworks.com:8020/user/maria_dev/iris \
--input-fields-terminated-by ";" --input-optionally-enclosed-by '"'
--export-dir
: directory of CSVs in HDFS--input-fields-terminated-by
: separator of values--input-optionally-enclosed-by
: Text values are often enclosed by a letter such as"
. If it is the case, we have to specify it.
Next we try to UPDATE the table. Create the following CSV
50,test1
100,test2
150,test3
and store it in HDFS
maria_dev$ hadoop fs -mkdir /user/maria_dev/conv
maria_dev$ hadoop fs -put conv.csv /user/maria_dev/conv
The command
sqoop export --connect jdbc:mysql://localhost/test \
--username root --password hadoop --table iris \
--export-dir hdfs://sandbox.hortonworks.com:8020/user/maria_dev/conv \
--columns "Id,Species" --update-key Id --input-fields-terminated-by ","
is equivalent the following SQL commands
UPDATE iris SET Species='test1' WHERE Id=50;
UPDATE iris SET Species='test2' WHERE Id=100;
UPDATE iris SET Species='test3' WHERE Id=150;
Flume
- Apache Flume, User Guide, tutorial (tutorial point)
- Flume collects (streaming) data and stores them in HDFS (or pass them to other service such as ElasticSearch).
export FLUME_CONF=/etc/flume/conf
(Flume configuration directory)
Agent
- An agent is an independent daemon process which fetchs data and stores them in a specified way.
- Flume system is basicaly just a workflow consisting of agents.
- An agent has several components:
- source: fetchs data and sends them to one ore more channels (or intercepters).
- intercepter (optional): converts received data and pass them to channels.
- channel: is a data buffer. It sends data to a sink.
- sink: stores data in HDFS or sends a specified service.
- We have to wirte a configuration of an agent in a file
$FLUME_CONF/MyAgent.conf
. ("MyAgent" should be replaced with a suitable name.) $FLUME_CONF/flume-conf.properties.template
is a tamplate file for an agent.
Example of an agent
This section will be replaced with a better examle.
- Check with
flume-ng
command whether Flume works. - The most easiest agent will be netcat.conf.
- Following
this tutorial,
we create an agent fetching data from Twitter.
- An agent gets twees and stores them in HDFS in (Apache Avro format)[https://avro.apache.org/].
- we can create OAuth credentials for Twitter at Twitter Application Management.
First we copy template file of flume-env.sh
. (This has no meaning, because
the shell script does nothing.)
root# cp flume-env.sh.template flume-env.sh
Next we prepare the directory in which data are stored. (This is different from other applications related to Hadoop.)
root# sudo -u flume hadoop fs -mkdir -p /user/flume/twitter_data
root# sudo -u flume hadoop fs -chown -R flume:hdfs /user/flume
root# sudo -u flume hadoop fs -chmod -R 770 /user/flume
Note that we assume that an agent works with the privilege of user flume.
Next we give a configuration of an agent. We save the following configuration
in the file $FLUME_CONF/twitter.conf
. (This is a modification of the
configuration in the tutorial.)
# Naming the components on the current agent.
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS
# Describing/Configuring the source
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.consumerKey = xx_YourConsumerKey_xx
TwitterAgent.sources.Twitter.consumerSecret = xx_YourConsumerSecret_xx
TwitterAgent.sources.Twitter.accessToken = xx_YourAccessToken_xx
TwitterAgent.sources.Twitter.accessTokenSecret = xx_YourAccessSecret_xx
# Describing/Configuring the sink
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://sandbox.hortonworks.com:8020/user/flume/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
TwitterAgent.sinks.HDFS.serializer = avro_event
# Describing/Configuring the channel
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 1000
# Binding the source and sink to the channel
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel
We start a Flume agent as user "flume".
root# sudo -u flume flume-ng agent \
--conf $FLUME_CONF -f $FLUME_CONF/twitter.conf -n TwitterAgent \
-Dflume.root.logger=DEBUG,console
Here
--conf $FLUME_CONF
: the directory containing flume-env.sh, etc.-f $FLUME_CONF/twitter.conf
: the configuration file for the agent-n TwitterAgent
: the name of the agent-Dflume.root.logger=DEBUG,console
: debugging mode.INFO
instead ofDEBUG
reduces the level of logging.
The format of the saved files is Apache Avro. This is because of the source, not the sink. But I do not know how to read tweets from We can convert the stored files into "JSON" with avro-tools. But the result is not readable yet.
Components of an agent
An event is a basic unit of data. It consists of header and data (byte payload).
source
- Manual
type = spooldir
: Spooling Directory Source. The source watches a specified directory for new files and will parse events from them.spoolDir
: a local directory to watch
type = org.apache.flume.source.twitter.TwitterSource
: Twitter 1% firehose Source (experimental)- See the example above for the required properties.
type = http
: HTTP sourceport
,bind
,handler
(important!)
channel
- Manual
type = memory
: memory channelcapacity = 100
: the maximum number of events stored in a channel.transactionCapacity
: the maximum number of events per transaction.
sink
- Manual
type = hdfs
: HDFS sinkhdfs.path
: directory in HDFS where the data will be stored.hdfs.fileType
: file format. One ofSequenceFile
,DataStream
andCompressedStream
.
type = hive
: Hive sinkhive.metastore
: Hive metastore URIhive.database
: name of the databasehive.table
: name of the tablebatchSize = 15000
: the maximum number of events for a transactionserializer
: how to parse the data. If the data is comma-separated, we may use the following configuration:serializer = DELIMITED
serializer.delimiter = ","
serializer.serdeSeparator = ','
serializer.fieldnames = col1,col2,col3
type = logger
: Logger sink (for test)
Pig
- Apache Pig, Tutorial (tutorialspoint)
- There are four execution modes: local/mapreduce, with Tez/without Tez
- MapReduce mode (default):
$ pig script.pig
- Tez mode:
$ pig -x tez script.pig
- local mode:
$ pig -x local script.pig
- Tez local mode :
$ pig -x tez_local script.pig
- MapReduce mode (default):
- The local mode runs without HDFS.
- The Grunt shell starts if we give no script.
grunt> sh ls
: same asls
grunt> fs -ls
: same ashadoop fs -ls
Data Types
- int (32-bit), long (64-bit) / float (32-bit), double (64-bit) / chararray / Boolean (true,false) / Datetime (e.g. 1970-01-01T00:00:00.000+00:00)
- tuple : an ordered set of "fields" (an element).
It represents a row of a relation (a table). Ex.
(10, "a", 3.14)
- bag : an unordered set of tuples. It represents a relation (table).
Ex.
{(1,"a",1.4), (2,"b",1.7)}
- The length of a tuple in a bag can vary.
- map : something like a hash in Perl.
[key#value, key2#val2]
Projection operator
Manual.
Let b
be the following relation
(setosa, {(1, 5.1, 3.5, 1.4, 0.2, setosa),
(2, 4.9, 3.0, 1.4, 0.2, setosa), ... })
(virginica, {(150, 5.9, 3.0, 5.1, 1.8, virginica),
(101, 6.3, 3.3, 6.0, 2.5, virginica), ...})
(versicolor, {(76, 6.6, 3.0, 4.4, 1.4, versicolor),
(52, 6.4, 3.2, 4.5, 1.5, versicolor),...})
And its schema is
group: chararray, a: {(id: int, x1: float, x2: float, x3: float, x4: float, species: chararray)}
.
Then
$0
andgroup
are the first column consisting of chararray.$1
anda
are the second column consisting of bags.$1.id
anda.id
are the first elements of the bags.
Data I/O
LOAD
a = LOAD 'basic_types.csv' USING PigStorage(';')
AS (id:int, time:datetime, score:int, prob:float, color:chararray);
- Schema on read. But we do not need to give it.
- A schema is case-sensitive.
- Load/Store functions
- csv :
a = LOAD 'geolocation.csv' USING PigStorage(',');
- The quotation letter must be single quotation.
- Without USING is equivalent to
USING PigStorage('\t')
- CSVExcelStorage:
enables an advanced option for loading a CSV file.
REGISTER '/usr/hdp/2.6.0.3-8/pig/piggybank.jar';
is required before using it.USING org.apache.pig.piggybank.storage.CSVExcelStorage(';');
is the standard usage. This removes quotations.USING org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'NO_MULTILINE', 'UNIX', 'SKIP_INPUT_HEADER');
: skips the first line (the header).
- Hive table:
a = LOAD 'geolocation' USING org.apache.hive.hcatalog.pig.HCatLoader();
- The option
-useHCatalog
is required. - The schema is automatically imported.
- The option
- JSON Lines
JsonLoader()
requires the order of the keys in the json string. In other words, keys are ignored.- elephant-bird (to be tried)
- A general Pig relation :
BinStorage()
. This function requires a careful treatment of schema.
STORE
b = GROUP a BY id;
c = FOREACH b GENERATE group as id, SUM(a.score) AS Total, AVG(a.score) AS Average;
DUMP c;
STORE c INTO 'store-test-dir' USING PigStorage (',');
STORE c INTO 'pigtohive' USING org.apache.hive.hcatalog.pig.HCatStorer();
- The directory where CSV files are stored must not exist in advance.
- But a Hive table must be created in advance and the schema must agree.
Diagnostic Operators
- Manual
DUMP c
: shows the relationDESCRIBE c
: shows the schema :c: {id: int,Total: long,Average: double}
.EXPLAIN c
: shows an execution plan.ILLUSTRATE c
: shows a step-by-step execution of a sequence of statements. This is useful to see how the relation is changed in a script.
Relational Algebra
For "rows" (FILTER, ORDER BY, etc)
b = FILTER a BY prob > 0.5; -- pick the tuples satisfying the condition
c = ORDER b BY score DESC; -- arrange the tuples in descending order by score
d = LIMIT c 3; -- pick the top 3
col1 MATCHES '.*stryoulike.*'
is equivalent tocol1 LIKE '%stryoulike%'
of SQL. But as the name suggests, regex is applied. Note that we must gives a regular expression which matches the whole string.- Note that
==
is used to compare two values unlike SQL. is (not) null
is available- The standard projection index (such as
$2
) can be used instead of the schema (such asscore
). LIMIT c 4;
picks "first" 4 tuples in the relationc
. You should use it in a combination withDUMP
and/orORDER BY
.DISTINCT c;
is equivalent toSELECT DISTINCT * FROM c;
in SQL.UNION c1, c2;
concatenates two relations, i.e. the union in the sense of relational algebra / SQL.SPLIT a INTO b IF score >= 0, c IF score < 0;
creates two relations by given condisions. i.e. a combination ofFILTER
statements.SAMPLE c 0.2;
For "columns" (FOREACH)
b = FOREACH a GENERATE id, score, prob as rate, score * prob as prod;
FOREACH ... GENERATE ...
corresponds to SELECT
in SQL.
The following function can be used to mutate data. (Manual).
- Maths:
- well-known functions such as
ABS
,SQRT
,EXP
,LOG
,LOG10
, ... - the ceiling function
CEIL
and the floor functionFLOOR
. RANDOM()
produce a pseudo random number in $[0,1)$.ROUND(field)
rounds "field" to an integer.ROUND_TO(field,digits)
rounds "field" to a fixed number of decimal digits. E.g.ROUND(3.15,1)
gives 3.2
- well-known functions such as
- String:
TRIM()
: removes leading/trailing white spaces.LOWER()
,UPPER()
converts strings to lower/upper case.SUBSTRING(string,s,t)
is equivalent toSUBSTRING(string,s,t-s)
in SQL. Namely "t" is the index of the end letter (not a length).EqualsIgnoreCase(str1,str2)
: compares two strings ignoring caseCONCAT()
is the same asCONCAT()
of MySQL.SPRINTF(format,arg1,arg2,...)
: formats strings. Use it if you wantSTRSPLIT(string,regex,limit)
splits a string with a regex and returns a tuple. Non-positive limit means no limits. But there is a difference between a negative limit and zero.STRPLIT('accbc','c', 0)
returns(a,b)
.STRPLIT('accbc','c',-1)
returns(a,,b,)
. (Namely empty entries remain.)
REGEX_EXTRACT(string,regex,index)
returns the substring which is matched the marked subexpression (i.e. regular expression with parentheses). If string is "a9aa8aa7a", thenREGEX_EXTRACT(string,'aa(\\d)a',1)
gives "8". Note that we must escape "\" in regexp, while we use no backslash for index, i.e. "1" instead of "\1" or "$1".REGEX_EXTRACT_ALL(string,regex)
returns the matched substrings as a tuple. Note that the regex must corresponds the whole string. That isREGEX_EXTRACT_ALL('a1b2c343', '.*?(\\d)(\\w).*')
returns(1,b)
, whileREGEX_EXTRACT_ALL('a1b2c343', '(\\d)(\\w)')
returns NULL.REPLACE(string,regex,newstr)
replaces the matched part of the string with "newstr". Note that we can apply regular expression unlikeREPLACE()
of MySQL. (ThusREGEXP_REPLACE()
of Oracle is equivalent to ourREPLACE()
.)
- Datetime:
CurrentTime()
returns the current time.GetYear()
,GetMonth()
,GetDay()
,GetHour()
,GetMinute()
,GetSecond()
GetWeek()
: returns the calender weekYearsBetween(t1,t0)
,MonthsBetween(t1,t0)
,WeeksBetween(t1,t0)
,DaysBetween(t1,t0)
,HoursBetween(t1,t0)
,MinutesBetween(t1,t0)
: they correspond toTIMEDIFF(t1,t0)
in SQL. The prefix such asYears
is a precision of the difference. (The returned values are always integer.)- There is no buildin function for days of week, but the following statement
works instead:
(DaysBetween(datetime,ToDate(0L)) + 4L) % 7
. This assigns 0 to Sunday. The logic: 1970-01-01 is Thursday.
- NB: A function name is case-sensitive.
Grouping
b = GROUP a BY color; -- {group: chararray, a: bag{...}}
c = FOREACH b GENERATE group AS color, AVG(a.score) AS average; -- Aggregation
The schema of b
is {group: chararray,a: {(id: int,time: datetime,score: int,prob: float,color: chararray)}}
.
- The grouping field (column) becomes
group
. (not "color" in the above case) - The second element of a tuple is a bag consisting of the corresponding
tuples in the original relation. The field of the bag is
a
, i.e. the name of the original relation. - Therefore to reach a field in the bag, we need ".". In the above code the
field "score" in the bag "a" can be reached through
a.score
ora.$2
.
GROUP ALL
is used to count the numbers of tuples.
a_all = GROUP a ALL; -- consists of a single tuple: (all, a)
nrow = FOREACH a_all GENERATE COUNT_STAR(a); -- consists only of the number of tuples
We can use two fields for grouping.
a1 = FOREACH a GENERATE color, (score>=0 ? 'pos' : 'neg') AS sign, prob;
b = GROUP a1 BY (color, sign);
c = FOREACH b GENERATE group.$0, group.$1, AVG(a1.prob) AS average;
The field group
is a tuple consisting of the grouping fields. Note that the
schema of the tuple holds the names of the grouping fields. (Therefore we did
not write group.$0 AS color
in the above code.)
Aggregate functions
AVG()
,SUM()
,MIN()
andMAX()
ignores NULL values.COUNT()
andCOUNT_STAR()
compute the number of tuples in a bag. The difference isCOUNT()
ignores NULL whileCOUNT_STAR()
does not.
COGROUP
Consider the following two relations l
and r
.
-- l (id:int, dept:chararray)
(110,HR)
(120,Marketing)
(130,Marketing)
(140,Sales)
-- r (dept:chararray,deptid:int)
(Marketing,13)
(Sales,17)
(Development,20)
COGROUP l by dept, r by dept;
applies GROUP BY
for l
and r
and
combines them.
(HR, {(110,HR)}, {} )
(Sales, {(140,Sales)}, {(Sales,17)} )
(Marketing, {(120,Marketing),(130,Marketing)}, {(Marketing,13)} )
(Development,{}, {(Development,20)})
- Its schema is
{group: chararray, l: {schema of l}, r: {schema of r}}
. - If
a
is the above relation, thenFILTER a BY not IsEmpty(l) and not IsEmpty(r);
removes tuples with empty bag, so that the resulting relation is like an inner join.
JOIN
JOIN l by dept, r by dept;
returns the inner join of the two relations:
(140, Sales, Sales, 17)
(120, Marketing, Marketing, 13)
(130, Marketing, Marketing, 13)
- Its schema is
{l::id, l::dept, r::dept, r::deptid}
. - There is not
INNER JOIN
statement. JustJOIN
. a = COGROUP l by dept, r by dept;
LEFT JOIN: JOIN l by dept LEFT OUTER, r by dept;
(110, HR, , )
(140, Sales, Sales, 17)
(120, Marketing, Marketing, 13)
(130, Marketing, Marketing, 13)
RIGHT JOIN: JOIN l by dept RIGHT OUTER, r by dept;
(140, Sales, Sales, 17)
(120, Marketing, Marketing, 13)
(130, Marketing, Marketing, 13)
(, , Development, 20)
OUTER JOIN: JOIN l by dept FULL OUTER, r by dept;
(110, HR, , )
(140, Sales, Sales, 17)
(120, Marketing, Marketing, 13)
(130, Marketing, Marketing, 13)
(, , Development, 20)
Sample codes
Count the number of rows by year:
b = GROUP a BY year;
c = FOREACH b GENERATE group AS year, COUNT_STAR(a.year) AS count;
User defined functions (UDF)
- Piggy Bank is a place to share the Java UDFs.
/usr/hdp/2.6.0.3-8/pig/piggybank.jar
Performance
- Performance and Efficiency, Pig Cookbook
- Specify the number of reduce tasks for a Pig MapReduce job
SET default_parallel 20;
: specifies the number of reducers.- Replicated join:
if one of the relations to be joined are small enough, the small one is
copied to all workers.
- Add
USING 'replicated'
at the end to perform a replicated join. C = JOIN big BY b1, tiny BY t1, mini BY m1 USING 'replicated';
- Add
Tez
- Aapche Tez is a framework for a YARN-based application. This improves the performance of MapReduce execution.
- Interactive Query for Hadoop with Apache Hive on Apache Tez. (It is better to use CLI because execution time is shown.)
- Do not forget Tez View of Ambari.
Oozie
Apache Oozie Apache Ambari Workflow Manager View for Apache Oozie