Hiveの環境構築

Hive環境構築

インストール

1.javaのインストール

7系のjavaをインストールしてパスを通しておきます。

export JAVA_HOME=/usr/local/jdk1.7.0_71
export PATH=$PATH:$JAVA_HOME/bin

2.Hadoopのインストール

hadoop version

パスの設定

export HADOOP_HOME=/usr/local/hadoop
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native export
PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin

core-site.xmlにNameNodeの情報を設定する

<configuration>

   <property>
      <name>fs.default.name</name>
      <value>hdfs://localhost:9000</value>
   </property>

</configuration>

hdfs-site.xmlを編集してNameNode,DataNodeのデータ保存先を設定する

<configuration>

   <property>
      <name>dfs.replication</name>
      <value>1</value>
   </property>
   <property>
      <name>dfs.namenode.name.dir</name>
      <value>file:///home/hadoop/hadoopinfra/hdfs/namenode</value>
   </property>
   <property>
      <name>dfs.namenode.data.dir</name>
      <value>file:///home/hadoop/hadoopinfra/hdfs/datanode</value >
   </property>

</configuration>

yarn-site.xmlを編集yarn.nodemanager.aux-servicesにmapreduce_shuffleを設定する

<configuration>

   <property>
      <name>yarn.nodemanager.aux-services</name>
      <value>mapreduce_shuffle</value>
   </property>

</configuration>

map-red-site.xmlを変数しmapreduce.framework.nameにyarnを設定する

<configuration>

   <property>
      <name>mapreduce.framework.name</name>
      <value>yarn</value>
   </property>

</configuration>

NameNodeをフォーマットする

hdfs namenode -format

3.Hiveのインストール

Hiveをダウンロードする。インストール済みのhadoopにあっているバージョンを選びます。 https://hive.apache.org/downloads.html ダウンロード後に解凍します。それから、パスを通します。

export HIVE_HOME=/opt/
export PATH=$PATH:$HIVE_HOME/bin
export CLASSPATH=$CLASSPATH:$HADOOP_HOME/lib/*:.
export CLASSPATH=$CLASSPATH:$HIVE_HOME/lib/*:.

hive-env.shを有効にする

cd $HIVE_HOME/conf cp hive-env.sh.template hive-env.sh

hiveのメタ情報保存先の設定

今回はPostgreSQLにhiveのめた情報を保存するようにします。 postgresqlをインストールします。hiveインストール環境からアクセスできるようにしておきます。

yum install postgresql-server postgresql-setup initdb systemctl start postgresql systemctl enable postgresql

ドライバをhiveのlibに移動します。

wget https://jdbc.postgresql.org/download/postgresql-9.3-1103.jdbc4.jar
mv postgresql-9.3-1103.jdbc4.jar /opt/hive-0.12.0/lib/

PostgreSQLにHiveで使うユーザとDBを作成します。

createuser -P hive createdb -O hive hive

メタ情報保存に使スキーマを実行します /opt/hive-0.12.0/scripts/metastore/upgrade/postgres/hive-schema-0.12.0.postgres.sql

hive-site.xmlを編集する

<property>
  <name>javax.jdo.option.ConnectionURL</name>
  <value>jdbc:postgresql://ポスグレインストール先:5432/hive</value>
  <description>JDBC connect string for a JDBC metastore</description>
</property>

<property>
  <name>javax.jdo.option.ConnectionDriverName</name>
  <value>org.postgresql.Driver</value>
  <description></description>
</property>

<property>
  <name>javax.jdo.option.ConnectionUserName</name>
  <value>hive</value>
  <description>username to use against metastore database</description>
</property>

<property>
  <name>javax.jdo.option.ConnectionPassword</name>
  <value>hive</value>
  <description>password to use against metastore database</description>
</property>

動作確認

# hive

hive> show tables;
OK
Time taken: 3.323 seconds

Hiveserver2を起動してbeelineで接続してみる

hive-site.xmlを変数する 今回は動作確認のため認証を行わなくても接続できるようにする。

<property>
  <name>hive.server2.authentication</name>
  <value>NOSASL</value> <!-- default NONE is for SASLTransport -->
</property>

<property>
  <name>hive.server2.enable.doAs</name>
  <value>false</value> <!-- Execute query as hiveserver2 process user -->
</property>

hiveserver2を起動する

$HIVE_HOME/bin/hiveserver2 &

beelineで接続してみる

# beeline !connect jdbc:hive2://localhost:10000/default;auth=noSasl hive org.apache.hive.jdbc.HiveDriver

※接続先のポートはhive-site.xmlのhive.server2.thrift.portを確認する

hiveqlを実行してみる

データ登録の確認のためまず以下のようなテキストファイルを作成しhdfsに上げておく

1
2
3
4
5

それからbeelineで接続し以下を実行する

create table sample (
  id INT
);

load data inpath 'ファイルパス' into table sample;

select *
from sample;

インサートしたレコードはhdfs上にあるのが以下のコマンドで確認できる

hdfs dfs -ls hive-site.xmlでhive.metastore.warehouse.dirに指定しているパス/DB名

Apache Sparkのアプリをデバッグする

sparkアプリケーションのデバッグ

1.sbt assemblyでjarファイルを生成しspark-submitコマンド実行サーバにアップロードする

2.spark-submitコマンド実行サーバにポートフォワードの設定付きでssh接続する
とりあえず5039ポートを使ってみる

ssh -L 5039:remote:5039 target

3.spark-submitコマンド実行

spark-submit --master local[*] \    
--driver-java-options -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5039 \    
--class 実行対象クラス \    
--name アプリケーション名 jarファイル アプリの引数    

4.ローカルの開発環境でリモートデバッグ

pycharmを使ってpysparkの開発を行った際に"from pyspark.sql.functions import lit"でエラーがでたのを調べて見た

pysparkの開発を行った際に"from pyspark.sql.functions import lit"でimportできないとエラーが出たのを確認した時のメモ 実際は以下のようにpyspark.sql.functions.py内で以下のようにして動的にメソッドを追加している。

def _create_function(name, doc=""):
    """ Create a function for aggregator by name"""
    def _(col):
        sc = SparkContext._active_spark_context
        jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col)
        return Column(jc)
    _.__name__ = name
    _.__doc__ = doc
    return _


_functions = {
    'lit': _lit_doc,
    'col': 'Returns a :class:`Column` based on the given column name.',
    'column': 'Returns a :class:`Column` based on the given column name.',
    'asc': 'Returns a sort expression based on the ascending order of the given column name.',
    'desc': 'Returns a sort expression based on the descending order of the given column name.',

    'upper': 'Converts a string expression to upper case.',
    'lower': 'Converts a string expression to upper case.',
    'sqrt': 'Computes the square root of the specified float value.',
    'abs': 'Computes the absolute value.',

    'max': 'Aggregate function: returns the maximum value of the expression in a group.',
    'min': 'Aggregate function: returns the minimum value of the expression in a group.',
    'count': 'Aggregate function: returns the number of items in a group.',
    'sum': 'Aggregate function: returns the sum of all values in the expression.',
    'avg': 'Aggregate function: returns the average of the values in a group.',
'mean': 'Aggregate function: returns the average of the values in a group.',
    'sumDistinct': 'Aggregate function: returns the sum of distinct values in the expression.',
}

for _name, _doc in _functions.items():
    globals()[_name] = since(1.3)(_create_function(_name, _doc))

ここではcreate_functionでメソッドを生成し、globals()[name]にてname(col)で関数を呼び出せるようにしている。getattrでは"sc.jvm.functions"のnameで指定した関数を呼び出せるようにしており、ここでjvm場で動いているsparkを呼び出すようにしている。pysparkではpythonのコードがjvm場で動くという分けではなくpy4jにより連携するようになっていて、その連携部分が"getattr(sc.jvm.functions, name)(col.jc if isinstance(col, Column) else col)“のようでpyspark自体についてももうちょっと調べたいと思います。

pysparkでの開発時に気になった点のメモでした。

Sparkで単体テストをしてみる

Apache Sparkで単体テストをしてみる

Intelij IDEAでsparkの単体テストを書いてみたのでメモ

build.sbtの設定を変更

まず、build.sbtに以下の設定を追加する。

parallelExecution in Test := false

“build sbt"で複数のテストが同時に動いた場合に発生するSparkContext周りのエラーを防ぐのに必要なようである。

テストを書いてみる

まず、以下のようにcsvをDataFrameとして読み込んでデータを取得するclassのテストを書く場合

package intoroduction.spark.dataframe

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.DataType._
import org.apache.spark.sql.types.IntegerType

case class Dessert(menuId: String, name: String, price: Int, kcal: Int)

class DesertFrame(sc: SparkContext, sqlContext: SQLContext, filePath: String) {
  import sqlContext.implicits._
  lazy val dessertDF = {
    val dessertRDD = sc.textFile(filePath)
    sc.textFile(filePath)
    // データフレームとして読み込む
    dessertRDD.map { record =>
      val splitRecord = record.split(",")
      val menuId = splitRecord(0)
      val name = splitRecord(1)
      val price = splitRecord(2).toInt
      val kcal = splitRecord(3).toInt
      Dessert(menuId, name, price, kcal)
    }.toDF
  }
  dessertDF.createOrReplaceTempView("desert_table")


  def findByMenuId(menuId: String) = {
    dessertDF.where(dessertDF("menuId") === menuId)
  }
}


object DesertFrame {

  def main(args: Array[String]): Unit ={

    val conf = new SparkConf().setAppName("DesertFrame").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val filePath = "src/test/resources/data/dessert-menu.csv"
    val desertFrame = new DesertFrame(sc, sqlContext, filePath)

    val d19DF = desertFrame.findByMenuId("D-19").head
    print(d19DF)

  }
}

上記のDesertFrameのテストを書く場合は以下のようになる。

package intoroduction.spark.dataframe

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{BeforeAndAfterAll, FunSuite}

class DessertFrameTest extends FunSuite with BeforeAndAfterAll{
  private var sparkConf: SparkConf = _
  private var sc: SparkContext = _
  private var sqlContext: SQLContext = _

  override def beforeAll() {
    print("before...")
    sparkConf = new SparkConf().setAppName("DessertFrameTest").setMaster("local")
    sc = new SparkContext(sparkConf)
    sqlContext = new SQLContext(sc)
  }

  override def afterAll() {
    sc.stop()
    print("after...")
  }

  test("dessert_frame"){
    val filePath = "src/test/resources/data/dessert-menu.csv"
    val desertFrame = new DesertFrame(sc, sqlContext, filePath)

    val d19DF = desertFrame.findByMenuId("D-19").head
    assert(d19DF.get(0) == "D-19")
    assert(d19DF.get(1) == "キャラメルロール")
    assert(d19DF.get(2) == 370)
    assert(d19DF.get(3) == 230)
  }
}

ここでは"SparkConf().setAppName(“DessertFrameTest”).setMaster(“local”)“と指定しており、ローカルの環境で動かすことができるようになりテストで使うデータを"src/test/resources/data/dessert-menu.csv"にしているのでテストデータもそのままgitで管理できるようになっている。

テスト実行

あとは"sbt test"か"sbt test:testOnly クラス指定"でテストを実行できるはずである。

Hadoopについて調べてクラスタを構築してみた

並列分散処理入門

Hadoop,Spark周りについて調べたことをまとめてみる

並列分散処理とは

並列分散処理とは複数のサーバを同時に動かしてデータを処理することである。ビッグデータと呼ばれるような大量のデータを扱う場合はたくさんのサーバを使ってクラスターを構成し、多ければ数千台のサーバを使うということもある。

並列分散処理ツール登場の背景

既存のRDBMSがある中でHadoopなどの代表的な並列分散処理ツールが登場した背景として、RDBMSではトランザクションやACIDなど複雑なデータに不整合が起きないような設計によりそのまま複数台の端末で動かそうとするのであればディスクI/Oの方がボトルネックとなりCPU資源を使い切れず速度が出し切れなくなる問題があった。この問題を解決するためHadoopなどの並列分散処理ツールではRDBMSにあったデータに不整合が発生しないように気にする機能を除外し、Mapreduceという仕組みにより複数台の端末で同時に処理してもディスクI/Oがボトルネックとならないように作られている。

Hadoopについて

概要

Hadoopとは代表的な並列分散処理ツールでGoogleのDoug cuttingにより作られた。Hadoopの名前の由来はDougの子供が持っていたゾウのぬいぐるみ。HadoopではHDFS(Hadoop Distributed File System)により複数のサーバにリソースを分散させて、Mapreduceにより複数の端末で処理を実行するということができるようになっている。
オープンソースで開発が行われておりチケット管理にJiraを使用している。
https://issues.apache.org/jira/projects/HADOOP/summary

HDFS

HDFSとは巨大なデータを扱うとことに特化した分散ファイルシステム。データサイズとして想定しているのは1ファイルが数GB以上に及ぶようなファイルであり、そのような大きいファイルを高速に扱うことに特化している。HDFS自体にレプリケーションの対障害設定があり、基本的にはHDFSの役割を果たすノードにRAID構成は不要とされている。HDFSバッチ処理に的するように作られておりデータの書き込みは一度だけで、それ以降は読み込み及び新しい書き込みが行われ、データの一部更新はできないようになっている。

Mapreduce

多数のサーバを利用して巨大なデータを分散処理するためのフレームワークMapreduceではクラスター上の各ノードに以下の役割がある。
○NameNode
 データがどこに配置されているかなどのメタデータを管理する。各ノードに対してハートビートで死活監視を行う。NameNode自体が死んだ時のためにSecondaryNameNodeを設定することもできる。
○JobTracker
MapReduceのマスターサーバで1つのジョブをタスクと呼ばれる複数の処理に分割し、各スレーブにタスクを割り振る
○DataNode
データを保存する
○TaskTracker
タスクを処理する

既存のMapreduceでは5000ノードからなる40000タスクを実行した時にJobTrackerがボトルネックとなることが確認されており、これは単一のJobTrackerに次の2つの異なる責任が課せられているためとされている。
クラスターでの計算リソースの管理
クラスター上で実行されるすべてのタスクの調整

この問題を解決するために単一のJobTrackerを使用するのではなく、クラスター・マネージャーを導入するようになった。クラスター・マネージャーの役割はクラスター内のライブ・ノードと使用可能なリソースを追跡して、タスクに割り当てることである。スレーブ側のTaskTracker側で短時間存続する JobTrackerが開始されるようになったが、これによりジョブのライフサイクルの調整は、クラスター内で使用可能なすべてのマシンに分散されさらに多くのタスクを分散処理できるようになった。この流れでYARNが登場した。

YARN

JobTrackerが担っていた2つの役割のを分割し、スレーブ側でタスクの調整が行えるよう疎結合になったことでhadoop自体の分散処理エンジン以外を使うことができるようになりSparkで処理を実行することができるようになっている。Mapreduceの時に使っていた用語は以下のように変わっている。

クラスター・マネージャー → ResourceManager
短時間存続する専用の JobTracker → ApplicationMaster
TaskTracker → NodeManager
ジョブ → 分散アプリケーション

YARNの登場により従来のMapreduceはMRv1、YARNがMRv2という扱いになった。Hadoopはversion2からYARNが使えるようになった。

Sparkについて

概要

scalaで書かれた並列分散処理のエンジン部分。HDFS上のデータにアクセスしてデータを処理することができる。Hadoopとの違いとして、Hadoopの分散処理エンジンでは計算結果をメモリ上には保存せずディスクに保存する作りになっている。それに比べてSparkでは計算結果をメモリ上に保存して再利用するという仕組みがあるためHadoopの分散処理エンジンと比べるとディスクI/Oの影響で数倍早いと言われている。このように仕組みが異なっている原因としてはHadoopとSparkの開発時期の違いが考えられ、Hadoopの開発が始まった頃はサーバ1台あたりのメモリを8GBくらいで想定していたのに対しSparkの開発時にはサーバ1台でメモリ100GB以上とか積んでいるのも珍しくないような時代背景の違いが考えられる。

用途

Sparkは以下のような用途で使われている。
バッチ処理
単純にhadoopよりも早いためバッチ処理もsparkが使われることが多いきがする
機械学習処理
機械学習のライブラリ(MLib)がありhadoopで使っていたMahoutより高速化できたという事例がある
○ストリーム処理
HDFS以外でもKafkaから送られてくるストリーミングデータを処理することができ、これによりバッチ処理、ストリーミング処理の両面でリアルタイムな機械学習でレコメンドするといったことが可能になっている

hadoopクラスタ管理

hadoopクラスタでは複数のサーバを扱わなければいけないので管理の面で気をつけなければいけないが、Clouderaが出しているマネジメントツールにより各ノードの管理が行いやすくなるらしい。clouderaのマネジメントツールを使わないにしてもディストリビューションで管理している対象ツールのバージョンを確認することでHadoopクラスタ構築時にどのバージョンを入れれば良いのかの参考になりそう(Hadoop, Spark, Hiveなど)

Hadoopクラスタ構築

実際にHadoopクラスタを構築してみたいと思います。
1.javaのインストール
hadoopはopenJdkではなくOraclejdkを推奨している
2.プログラムのダウンロード
今回はHadoopとSpark,Hiveをインストールする

wget http://ftp.jaist.ac.jp/pub/apache/hadoop/common/hadoop-2.6.5/hadoop-2.6.5.tar.gz
wget https://d3kbcqa49mib13.cloudfront.net/spark-1.6.0-bin-hadoop2.6.tgz
wget https://archive.apache.org/dist/hive/hive-0.12.0/hive-0.12.0-bin.tar.gz    

3.プログラムの展開及び配置
4.環境変数を追加
vi ~/.bashrc 以下を追加

# Hadoop
export HADOOP_HOME=HADOOPの展開先
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH

# spark
export SPARK_HOME=Sparkの展開先
export PATH=$SPARK_HOME/bin:$PATH

# hive
export HIVE_HOME=Hiveの展開先
export PATH=$HIVE_HOME/bin:$PATH

source ~/.bashrc

HADOOP_CONF_DIRを環境変数に追加しているが、これはSparkで使用している。

5.設定ファイルの編集
HADOOP_CONF_DIRのディレクトリの以下の設定ファイルを編集します。

core-site.xml : マスターノードの設定
hdfs-site.xml : データ保存の設定、マスター側とスレーブで設定は異なる
hive-site.xml : hive展開時のconfディレクトリにあったhive-default.xml.templateをリネーム
mapred-site.xml : Mapreduceの設定 yarnを使う場合はここで指定する
yarn-site.xml : リソースマネージャの設定

上記設定ファイルでサーバを指定する場合はIPアドレスではなくFQDNで指定しなければいけないので注意が必要
○core-site.xml

<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://ネームノードのFQDN:9000</value>
  </property>
</configuration>

hdfs-site.xml(NameNodeの場合)

<configuration>
  <property>
    <name>dfs.replication</name>
    <value>3</value>
  </property>
  <property>
    <name>dfs.namenode.name.dir</name>
    <value>/data/1/dfs/nn</value>
  </property>
</configuration>

hdfs-site.xml(DataNodeの場合)

<configuration>
  <property>
    <name>dfs.replication</name>
    <value>3</value>
  </property>
  <property>
    <name>dfs.datanode.data.dir</name>
    <value>/data/1/dfs/dn,/data/2/dfs/dn,/data/3/dfs/dn</value>
  </property>
</configuration>

○mapred-site.xml

<configuration>
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
</configuration>

○yarn-site.xml

<configuration>
  <!-- Site specific YARN configuration properties -->
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
  <property>
    <name>yarn.resourcemanager.hostname</name>
    <value>リソースマネージャのFQDN</value>
  </property>
  <property>
    <name>yarn.web-proxy.address</name>
    <value>WebAppProxyのFQDN:9046</value>
  </property>
</configuration>

7.サービスの起動
設定が終わったらサービスを起動させます。NameNodeリソースマネージャが同一で、それ以外にDataノード兼TaskTrackerのスレーブが連なる場合は、以下のようになります。

  • マスターノード
# $HADOOP_HOME/sbin/start-all.sh
# yarn-daemon.sh start proxyserver
# start-yarn.sh
# mr-jobhistory-daemon.sh start historyserver
  • スレーブノード
# $HADOOP_HOME/sbin/start-all.sh
# start-yarn.sh

サービス起動後は各ノードでjpsを実行しサービスが立ち上がっているか確認します。それから以下URLにアクセスします。
http://ネームノードのFQDN:50070/dfshealth.html#tab-datanode
http://ノードマネージャのFQDN:8088/cluster/nodes

8.hdfsコマンドの確認 基本的なhdfsコマンドを実行しファイルのアップロードと参照が可能か確認します。

hdfs dfs -ls /
hdfs dfs -mkdir /input
echo "upload test" > test.txt
hdfs dfs -put test.txt /input
hdfs dfs -cat /input/test.txt

上記実行ご別のノードでもアップロードしたファイルが参照できるか確認してみます。

Hadoopクラスタ上でSparkのプロジェクトを動かしてみる

Sparkの簡単なプロジェクトを作成してHadoopクラスタ上で動かしてみたいと思います。
1.プロジェクト作成
まずsbt newかintelijの新規プロジェクト作成でプロジェクトを作成します。

2.build.sbtの設定
今回はspark-coreだけで十分のはずですが一応spark-sqlとかもlibraryDependenciesに追加しています。今回はjarを生成してsparkのコマンド経由で実行するのですが、jarを生成できるようにassemblyのプラグインを追加しています。またlibraryDependenciesでspark関連をprovidedで指定することも注意してください。

name := "StudySpark"

version := "1.0"

scalaVersion := "2.11.8"
val sparkVersion = "2.1.0"

// Intelijより直接実行する場合はspark関係はcompileにする、sbt assemblyでjarを出力する場合はprovidedにしておく
libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-core_2.11" % sparkVersion % "provided",
  "org.apache.spark" % "spark-sql_2.11" % sparkVersion % "provided",
  "org.apache.spark" % "spark-mllib_2.11" % sparkVersion % "provided",
  "org.apache.spark" % "spark-graphx_2.11" % sparkVersion % "provided",
  "org.apache.spark" % "spark-hive_2.11" % sparkVersion % "provided",
  "joda-time" % "joda-time" % "2.9.7"
)

// assembly settings
assemblyJarName in assembly := "studyspark.jar"
assemblyMergeStrategy in assembly := {
  case PathList("javax", "servlet", xs @ _*)         => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first
  case "application.conf"                            => MergeStrategy.concat
  case "unwanted.txt"                                => MergeStrategy.discard
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

3.サンプルのプログラム作成
よく使われるワードカウントのプログラムを作ります。

package intoroduction.spark

import org.apache.spark.{SparkConf, SparkContext}


/**
  * 実行コマンド
  * spark-submit --master yarn \
  * --class intoroduction.spark.WordCount \
  * --name WordCount target/scala-2.11/studyspark.jar \
  * /input/test.txt
  *
  */
object WordCount {

  def main(args: Array[String]) = {
    require(args.length >= 1, "ファイルを指定してください")

    val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc = new SparkContext(conf)

    try {
      val filePath = args(0)
      val wordAndCountRDD = sc.textFile(filePath)
                .flatMap(_.split("[ ,.]"))
                .filter(_.matches("""\p{Alnum}+"""))
                .map((_, 1)).reduceByKey(_ + _)

      wordAndCountRDD.collect.foreach(println)

    } finally {
      sc.stop()
    }
  }
}

4.jar生成 以下のコマンドでjarを生成します。
sbt assembly

5.プログラム実行
それから以下のコマンドでプログラムを実行します。

spark-submit --master yarn \
--class intoroduction.spark.rdd.WordCount \
--name WordCount target/scala-2.11/studyspark.jar \
/input/test.txt

うまくいけばコンソールに実行結果が出力されるはずです。 実行ステータスはノードマネージャのweb画面から見ることもできます。 http://ノードマネージャのFQDN:8088/cluster/nodes

デバック実行したい場合は事前に以下の環境変数を追加しておく必要があります。
export SPARK_SUBMIT_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=7777
上記環境変数追加後はport=7777でリモートデバックできます。

Angularでカスタムディレクティブへの双方向バインドに対して調べてみた

Angularでカスタムディレクティブへの双方向バインドに対して調べてみたときのメモ

コンポーネント内のメンバ変数の変更を監視する

例えばコンポーネント内に以下のメンバ変数があったとします。

@Input() textInput: String = "";

このメンバ変数は親のコンポーネントから直接値を変更されることがあるため、値が変更されたタイミングに実行したメソッド等ある場合は監視が必要になりそうですが、以下のように@Inputで監視対象の変数を指定することで変更された際に何をするか指定することができます。

@Input('textInput')
set updateInternalVal(externalVal) {
  this.textInput = externalVal;
  this.onEditChange();
}

自作のカスタムディレクティブに対して双方向バインディングしてみる

inputタグなど元からhtmlに存在しているタグであればng-modelでバインドすることでinputへの入力がバインドしている変数に直接反映されるし、バインドしている変数自体を変更することでバインド先へのinputタグの表示が切り替わる双方向バインドが働いてますが、Angular2以降では自作のディレクティブに対しての双方向バインドが廃止されたようでちょっと工夫が必要なようでした。
以下自分が試した方法になります。

Outputを受け取る際に変数を変更する

例えば以下のコンポーネントがあったとします。 myText.html

<input [(ngModel)]="textInput" (ngModelChange)="onEditChange()" placeholder="{{ placeHolder }}"/> {{ textLength }}

myText.ts

import { Component, Input, Output, EventEmitter } from '@angular/core';
import { Router }            from '@angular/router';

import { Observable }        from 'rxjs/Observable';
import { Subject }           from 'rxjs/Subject';


@Component({
  selector: 'my-text',
  templateUrl: './myText.html',
  styleUrls: [ './myText.css' ]
})
export class MyTextComponent {
  @Input() textInput: String = "";
  @Input() placeHolder: String = "";

  textLength: Number = 0;

  @Output() textChange = new EventEmitter<String>();

  // 親コンポーネントからの値の変更時に実行
  @Input('textInput')
  set updateInternalVal(externalVal) {
    this.textInput = externalVal;
    this.onEditChange();
  }

  onEditChange(): void {
    if(this.textInput !== void 0 && this.textInput.length !== void 0){
      this.textLength = this.textInput.length;
    }
    this.textChange.emit(this.textInput);
  }
}

このコンポーネントはメンバ変数textInputに親からバインドしているものがセットされているのですが、コンポーネント内で変数の値が変更されたことを親へ伝えるのには以下のようにカスタムのイベントを発行しています。

this.textChange.emit(this.textInput);

その場合、呼び出し元の親コンポーネントは以下のようにtextChangeのイベントを受け取った際にバインド先の変数を変更することで双方向バインドの動きになります。

<my-text [(textInput)]="hero.name" (textChange)="hero.name = $event" placeHolder="name"></my-text><br />

Outputのカスタムイベント名を"メンバ変数 + Change"にすることで自動で双方向バインドする

さっきの方法だと親コンポーネント側で双方向バインドするか選べましたが、こっちの方は子コンポーネントの方で直接双方向バインドするように指定できます。 まず子コンポーネント側でカスタムイベントの名称を"メンバ変数 + Change"に指定し、値に変更があったらemitするようにします。 myText.ts

import { Component, Input, Output, EventEmitter } from '@angular/core';
import { Router }            from '@angular/router';

import { Observable }        from 'rxjs/Observable';
import { Subject }           from 'rxjs/Subject';


@Component({
  selector: 'my-text',
  templateUrl: './myText.html',
  styleUrls: [ './myText.css' ]
})
export class MyTextComponent {
  @Input() textInput: String = "";
  @Input() placeHolder: String = "";

  textLength: Number = 0;

  @Output() textInputChange = new EventEmitter<String>();

  // 親コンポーネントからの値の変更時に実行
  @Input('textInput')
  set updateInternalVal(externalVal) {
    this.textInput = externalVal;
    this.onEditChange();
  }

  onEditChange(): void {
    if(this.textInput !== void 0 && this.textInput.length !== void 0){
      this.textLength = this.textInput.length;
    }
    this.textInputChange.emit(this.textInput);
  }
}

この場合は、イベントが発行された時に何をするとか特に指定しなくても双方向バインドが実現されます。以下呼び出し元のサンプルになります。

<my-text [(textInput)]="hero.name" placeHolder="name"></my-text><br />

深層学習(青本) 1章. はじめに

深層学習の1章読書まとめ

研究の歴史

多層ニューラルネットへの期待と失望

ニューラルネットワークはこれまでに2度のブームが訪れていた

1回目のブーム
 1960年代〜1970年代
 パーセプトロン(1層ニューラルネット)

2回目のブーム
 80年代半ばから90年代前半前半
 誤差逆伝搬の誕生(多層ニューラルネット)

パーセプトロンの欠点

 ノイズに弱い
 収束が遅く、学習効率が悪い(誤差逆伝搬でいう損失関数の微分を効率的に使ったパラメータ更新が行えない)  n次元の入力に対して重み付けを行った結果がある値より大きいかどうかで2つのクラスに分類する
 y=WtX (Wt:学習済みの重み、 X:入力、 y:2つのクラスを分類する境界)
 線形分離可能な問題にしかできない(多層化できれば解決できる)
 多層化できない(中間層を出力することができない)

誤差逆伝搬(多層ニューラルネット)の欠点

 入力にノイズがあったら過学習は発生する
 2層程度であれば期待通りだが、それより増やしていくと勾配が急速にに小さくなったり大きくなったりする勾配消失問題が発生する
 ※ニューラルネットの層を増やす動きがあるが、これは中間層を用意することで柔軟なネットワークを構築でき学習精度を高めることができるからである、誤差逆伝搬により多層化は行えるが層が増えると勾配が消失しあまり精度が上がらなくなるという問題がある
 ※この頃はニューラルネットの層数やユニット数(中間層の主力)の設定に対しての理論がなかった

畳み込みニューラルネットの誕生

1980年代後半画像を対象としていた畳み込みニューラルネットについてはこの時点で5層からなる多層ネットワークの学習に成功していた
※前の層の畳み込みの出力を粗くリサンプリングするようなイメージとなっており、これにより画像の多少のずれによる見え方の違いを吸収することが可能となり勾配消失問題が発生しなかった
1980年代半ばからのブームの多層ニューラルネットワークでは各層の出力全てを次の層に入力していた(全結合)
多層ニューラルネットワーク自体の研究低下とともに畳み込みニューラルネットの関心も小さくなっていった

多層ネットワークの事前学習

1990年代〜2000年代前半(ニューラルネットの関心が低い時期)
ディープビリーフネットワーク(DBN)の研究
 一つの層を貪欲アルゴリズムで訓練できることでブレークスルーとなった
制約ボルツマンマシン(RBM)の誕生

その後、自己符号化の登場

1990年代〜2000年代前半では事前学習によりノイズ除去などで良い入力を得られるようにする研究が行われた

特徴量の学習

画像や音声などの自然界のデータなど同じものでもちょっとの違いでデータに差が出るものをネットワークの多層構造に取り込むために、どのように特徴量を抽出するか
 例えば猫の画像でも明るさが違ったり、向きが違ったりするだけでデータとして差が出る
 音声の場合は白色ノイズなど、扱うデータの種類によりノイズの種類も様々
 正規化することでそれらの違いを吸収した特徴量の情報が欲しい

画像や音声については事前学習ではなく事前処理による特徴量抽出で良い入力を得られるようにしていた

深層学習の隆盛

音声認識、画像認識のベンチマークで多層ニューラルネットワークの有効性が認められるようになり深層学習の有効性が広く認知されるようになった
 一重に深層学習といっても扱う対象によって学習方法が異なる
 ・音声認識の場合は層間ユニット全結合で事前学習が一般的に行われる
 ・画像認識では畳込みニューラルネットが主流で事前学習は不要
 ・自然言語処理音声認識の特定のタスクでは再帰ニューラルネット

それぞれ学習方法が違う多層ニューラルネットワークで性能を把握できるようになった
 →共通の理由は計算機の計算能力の飛躍的な功により扱えるデータが増えた

GPU Technology ConferenceとかみるとGPUの発達が深層学習に与える影響が大きそうに感じる

計算機の処理能力向上によりニューラルネットワークの多層化をしたら思わぬ結果が出た