Spark客户端配置

[TOC]

maven依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_${scala.tools.version}</artifactId>
    <version>1.6.0</version>
</dependency>

初始化spark

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

应用提交

./bin/spark-submit \
  --class <main-class>
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]

Run application locally on 8 cores

Run on a Spark standalone cluster in client deploy mode

Run on a Spark standalone cluster in cluster deploy mode with supervise

Run on a YARN cluster

Run a Python application on a Spark standalone cluster

Run on a Mesos cluster in cluster deploy mode with supervise

关闭任务

关闭spark应用可以调用SparkContext的stop() 方法或者直接退出应用 System.exit(0) 或者 sys.exit()

spark操作类型

RDD的操作可以分为转化操作和行为操作, 转化操作生成新的RDD,行为操作会真正的执行计算

数据持久化

默认情况下,spark的RDD会在每次对他们进行行为操作的时候重新计算,如果想在多个行为操作中重用同一个RDD,可以通过RDD.persist() 让Spark把这个RDD缓存起来。默认的在第一次对持久化的RDD计算之后,spark会把RDD的内容保存在内存中。我们也可以把RDD保存在其他存储中。 cache() 与使用默认存储级别调用persist()是一样的,即保存在内存中。

parallelize数据加载

parallelize() 把程序中一个已有的集合传给SparkContext

常用的行为操作

行为操作中, count() 返回计算结果数, take() 来收集RDD的一些元素, collection() 可以用来获取整数个RDD的元素, 只有当你的整个集群数据集能在单台机器的内存中放的下的时候,才能使用collect()。 除此之外,可以使用saveAsTExtFile() saveAsSequenceFile() 或者其他行为操作把数据保存下来。

转化操作

  • map(func) 接收一个函数,把这个函数用于RDD中的每个元素,将函数的返回结果作为结果RDD中的对应元素的值

  • filter(func) 接收一个函数,并将RDD中满足该函数的元素放入新的RDD中返回

  • flatmap(func) 接收一个函数,将这个函数用于RDD的每个元素上,不过返回的不是一个元素,而是一个返回值序列的迭代器, 将返回的迭代器拍扁

  • mapPartitions(func) 与map相同,接收一个函数,将这个函数用于RDD的每个分区上, 函数应该是Iterator<T> => Iterator<U>

  • mapPartitionsWithIndex(func) 与mapPartitions相同, 接收一个函数用于RDD的每个分区上,函数应该是 (Int, Iterator<T>) => Iterator<U>

  • distinct([numTasks]) 生成一个只包含不同元素的新RDD, 开销较大,因为需要将所有数据通过网络进行混洗

  • union(other) 返回一个包含俩个RDD的所有元素的新的RDD, 会包含重复的数据

  • intersection(other) 只返回俩个RDD都有的元素,会去掉所有重复的元素, 性能较差,因为也需要通过网络来混洗数据

  • subtract(other) 返回只存在于第一个RDD而不存在第二个RDD的所有元素的新的RDD, 性能较差,也需要通过网络混洗数据

  • cartesian(other) 返回笛卡尔集

  • sample(withReplacement, fraction, seed) 对RDD采样。

  • groupByKey([numTasks]) 用于数据集 (K, V)上,返回数据集(K, Iterable<V>)

  • reduceByKey(func, [numTasks]) 对数据集 (K, V) 进行计算,返回(K, Iterable<V>)

  • aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

  • sortByKey([ascending], [numTasks])

  • join(otherDataset, [numTasks])

  • cogroup(otherDataset, [numTasks])

  • pipe(command, [envVars])

  • coalesce(numPartitions)

  • repartition(numPartitions)

  • repartitionAndSortWithinPartitions(partitioner)

行为操作

  • reduce(func)

  • colect() 返回RDD中的所有元素

  • count() 返回RDD中元素的个数

  • countByKey()

  • countByValue() 返回各个元素在RDD中出现的次数

  • take(n) 从RDD中返回n个元素

  • top(n) 从RDD中返回前面n个元素

  • first()

  • takeOrderd(n)(ordering) 从RDD中按照提供的殊勋返回最前面的n个元素

  • takeSample 从RDD中返回任意一些元素

  • reduce(func) 接收一个函数作为参数,这个函数要操作俩个相同元素类型的RDD数据并返回一个同样类型的新元素,可以求元素的总和,元素个数以及其他类型的聚合操作

  • fold(zero)(func) 与reduce一样,但需要提供初始值

  • aggregate() 与reduce一样,但是返回不同类型的函数

  • foreach(func) 对RDD中的每个元素使用给定的函数

  • saveAsTextFile(path)

  • saveAsSequenceFile(path)

  • saveAsObjectFile(path)

持久化

persist()调用不会触发强制求值 scala和java 默认会把数据以序列化的形式缓存在jvm的堆空间中。 当我们让spark持久化存储一个RDD的时候,计算出RDD的节点会分别保存他们所求的分区数据,如果一个有持久化数据的节点发生故障,spark会在需要用到混粗的数据时重新计算丢失的数据分区。如果希望节点故障的情况不会拖累我们的执行速度,也可以把数据被飞到多个节点上。

org.apache.spark.storage.StorageLevel

  • MEMORY_ONLY

  • MEMORY_ONLY_SER

  • MEMORY_AND_DISK

  • MEMORY_AND_DISK_SER

  • DISK_ONLY

    如果缓存的数据太多,内存中放不下,spark自动利用最近最少使用(LRU)的缓存策略把最老的分区从内存中移除。对于仅把数据存放在内存的缓存级别,下一次要用到已经被移除的分区时,这些分区就需要重新计算。 但是对于使用使用内存与磁盘的缓存级别分区来说,被移除的分区都会写入磁盘,不论哪一种情况都不必担心你的作业因为缓存了太多数据而被打断。不过缓存不要的数据会导致有用的数据被移除内存,带来更多重算的时间开销。

upersist()

手动把持久化的RDD从缓存移除

stats()

打印RDD的总数,平均数,最大值,最小值等

span()

用第一个特定的符号将一行拆分为俩部分

lookup()

查找数据

spark 参数

实际命令

任务提交

Last updated

Was this helpful?