Login as hduser. Ensure that the SSH, DFS, and YARN services are started sequentially. Then, examine them using the jps command
$ wget https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3-scala2.13.tgz
$ tar -xvzf spark-3.5.1-bin-hadoop3-scala2.13.tgz
$ mv spark-3.5.1-bin-hadoop3-scala2.13 spark
export SPARK_HOME=/home/hduser/spark
export PATH=$SPARK_HOME/bin:$PATH
source ~/.bashrc
pyspark
To exit the pyspark shell with command exit()
$ cp $SPARK_HOME/conf/log4j2.properties.template $SPARK_HOME/conf/log4j2.properties
Edit the newly created log4j file, by replacing every occurrence of INFO with WARN, e.g. rootLogger.level = warn
rootLogger.appenderRef.stdout.ref = console
>>> text = sc.textFile("shakespeare.txt")
>>> print(text)
>>> from operator import add
>>> def tokenize(text): return text.split()
...
>>> words = text.flatMap(tokenize)
>>> wc = words.map(lambda x: (x, 1))
>>> print(wc.toDebugString())
>>> counts = wc.reduceByKey(add)
>>> counts.saveAsTextFile("hdfs://localhost:9000/user/hduser/wc")
>>> exit()
Apache spark version 3.3.6 does not need to have the hdfs directory path (hdfs://localhost:9000/user/hduser/) explicitly indicated to write in the local path, i.e., /home/hduser. Example: counts.saveAsTextFile(“wc”)
$ hdfs dfs -ls wc
$ hdfs dfs -head wc/part-00000
$ pip3 install numpy
$ pyspark --executor-cores 4
>>> import numpy as np
>>> num_list = np.random.randint(0, 10, 20)
>>> numbers_RDD = sc.parallelize(num_list)
>>> numbers_RDD
>>> type(numbers_RDD)
>>> numbers_RDD.collect()
>>> numbers_RDD.glom().collect()
>>> numbers_RDD.first()
>>> numbers_RDD.take(3)
>>> numbers_RDD.take(5)
>>> numbers_RDD.take(8)
>>> distinct_numbers_RDD = numbers_RDD.distinct()
>>> distinct_numbers_RDD.collect()
>>> numbers_RDD.collect()
>>> numbers_RDD.sum()
>>> numbers_RDD.reduce(lambda x, y: x+y)
>>> numbers_RDD.reduce(lambda x, y: x if x > y else y)
>>> numbers_RDD.filter(lambda x:x%3==0 and x!=0).collect()
>>> numbers_RDD.collect()
>>> squares_RDD = numbers_RDD.map(lambda x:x*x)
>>> squares_RDD.collect()
>>> squares_RDD.histogram([x for x in range(0, 100, 10)])
>>> def square_if_odd(x):
... if x%2==1:
... return x*x
... else:
... return x
...
>>> numbers_RDD.map(square_if_odd).collect()
>>> results = numbers_RDD.groupBy(lambda x:x%2).collect()
>>> results
>>> type(results)
>>> sorted([(x, sorted(y)) for (x, y) in results])
>>> squares_RDD.collect()
>>> numbers_RDD.collect()
>>> union_RDD = sc.union([numbers_RDD, squares_RDD])
>>> union_RDD.collect()
>>> numbers_RDD.cartesian(numbers_RDD).collect()
>>> words = 'It is not that I am so smart but I stay with the questions much longer Albert Einstein'.split()
>>> print(words)
>>> words_RDD = sc.parallelize(words)
>>> words_RDD.reduce(lambda w, v: w if len(w) > len(v) else v)
or
>>> def findLongestWord(x, y):
... if len(x) > len(y):
... return x
... else:
... return y
...
>>> words_RDD.reduce(findLongestWord)
>>> def findLargestWord(x, y):
... if x > y:
... return x
... else:
... return y
...
>>> words_RDD.reduce(findLargestWord)
>>> words_RDD.reduce(lambda w, v: w if w < v else v)
>>> logFile = "file:///home/hduser/shakespeare.txt"
>>> logData = sc.textFile(logFile).cache()
>>> number_of_w = logData.filter(lambda s:'w' in s).count()
>>> number_of_c = logData.filter(lambda s:'c' in s).count()
>>> print("Lines with 'c':{}, lines with 'w':{}".format(number_of_c, number_of_w))
$ sudo cp -r /mnt/c/de/SparkApp /home/hduser
$ sudo chown -r hduser:hduser /home/hduser/SparkApp
$ cd /home/hduser/SparkApp
$ cat wordcount.py | more
$ spark-submit wordcount.py shakespeare.txt hdfs://localhost:9000/user/hduser/spark_wc
May always re-submit the same job by using different directory name, e.g. give a new name instead of spark_wc
$ hdfs dfs -ls spark_wc
$ hdfs dfs -head spark_wc/part-00000
$ cd /home/hduser/SparkApp
$ cat app.py | more
$ hdfs dfs -put SparkApp/ontime
$ sudo cp app.py app2.py
airlines = dict(sc.textFile("file:///home/hduser/SparkApp/ontime/airlines.csv").map(split).collect())
flights = sc.textFile("file:///home/hduser/SparkApp/ontime/flights.csv").map(split).map(parse)
to
airlines = dict(sc.textFile("ontime/airlines.csv").map(split).collect())
flights = sc.textFile("ontime/flights.csv").map(split).map(parse)
$ spark-submit app.py
Read more on Apache Spark at URLs https://spark.apache.org/sql/ and https://en.wikipedia.org/wiki/Apache_Spark#Spark_SQL
$ sudo apt install jupyter-notebook
$ pip3 install pandas
$ pip3 install reverse_geocoder
$ sudo cp -r /mnt/c/de/sparksql /home/hduser
$ sudo chown -r hduser:hduser -R /home/hduser/sparksql
$ cd ~/sparksql/data
$ hdfs dfs -mkdir data
$ hdfs dfs -put sf_parking_clean.json /user/hduser/data/
>>> from pyspark.sql import SQLContext
>>> sqlContext = SparkSession.builder.getOrCreate()
>>> parking = sqlContext.read.json("hdfs://localhost:9000/user/hduser/data/sf_parking_clean.json")
>>> parking.printSchema()
>>> parking.first()
>>> parking.createOrReplaceTempView("parking")
>>> parking.show()
>>> aggr_by_type = sqlContext.sql("SELECT primetype, secondtype, count(1) AS count, round(avg(regcap), 0) AS avg_spaces FROM parking GROUP BY primetype, secondtype HAVING trim(primetype) !='' ORDER BY count DESC")
>>> aggr_by_type.show()
>>> parking.describe("regcap", "valetcap", "mccap").show()
>>> parking.stat.crosstab("owner", "primetype").show()
>>> parking = parking.withColumnRenamed('regcap', 'regcap_old')
>>> parking = parking.withColumn('regcap', parking['regcap_old'].cast('int'))
>>> parking = parking.drop('regcap_old')
>>> parking.printSchema()
>>> def convert_column(df, col, new_type):
... old_col = '%s_old' % col
... df = df.withColumnRenamed(col, old_col)
... df = df.withColumn(col, df[old_col].cast(new_type))
... df = df.drop(old_col)
... return df
...
>>> parking = convert_column(parking, 'valetcap', 'int')
>>> parking = convert_column(parking, 'mccap', 'int')
>>> parking.printSchema()
>>> import reverse_geocoder as rg
>>> def to_city(location):
... name = 'N/A'
... r = rg.search((location.latitude, location.longitude))
... if r != None:
... name = r[0]['name']
... return name
>>> from pyspark.sql.functions import udf
>>> from pyspark.sql.types import StringType
>>> location_to_city=udf(to_city, StringType())
>>> sfmta_parking = parking.filter(parking.owner == 'SFMTA') \
... .select("location_1", "primetype", "landusetyp", "garorlot", "regcap", "valetcap", "mccap") \
... .withColumn("location_1", location_to_city("location_1")) \
... .sort("regcap", ascending=False)
>>> sfmta_parking.show()
>>> sfmta_pandas = sfmta_parking.filter(sfmta_parking.location_1 != 'N/A').toPandas()
>>> sfmta_pandas.groupby(['location_1'])['regcap'].mean().nlargest(20)
Suppose that your installed Spark does not need to have the hdfs directory path (hdfs://localhost:9000/user/hduser/) explicitly indicated. E.g., it may be run as: parking = sqlContext.read.json(“data/sf_parking_clean.json”)
Read more from URL https://arrow.apache.org/docs/python/index.html
$ cd ~
$ pip3 install jupyter
$ pip3 install jupyter-server
$ pip3 install pyspark
$ pip3 install pyarrow
alias jupyter-notebook="~/.local/bin/jupyter-notebook --no-browser"
$ source ~/.bashrc
$ sudo cp -r /mnt/c/de/sparksql /home/hduser
$ sudo chown -r hduser:hduser -R /home/hduser/sparksql
$ cd ~/sparksql
$ jupyter notebook --port=8888 --no-browser
Suppose that you have error: No module named ‘jupyter_server.contents’, try to switch the traitlets
$ pip3 uninstall traitlets $ pip3 install traitlets==5.9.0
To terminate the Jupyter notebook server with Ctrl-C
Suppose that you have learned some essential codes in G4 to G7 before attempting the G8