Read Data From Hadoop in Spark Python

Hadoop with Python (I): PySpark

Big Data Cluster

This is the first part of a serial of posts about how to leverage Hadoop (the Distributed Calculating Framework) using Python.

The goal of this serial of posts is to focus on specific tools and recipes to solve recurrent challenges many Data professionals face, for example:

  • Moving HDFS (Hadoop Distributed File System) files using Python.
  • Loading Data from HDFS into a Information Structure like a Spark or pandas DataFrame in order to make calculations.
  • Write the results of an analysis back to HDFS.

First tool in this series is Spark. A framework which defines itself as a unified analytics engine for large-scale data processing.

Apache Spark

I encourage you to utilize conda virtual environments. If you don't know how to gear up conda, please read this post.

First of all, install findspark, a library that will help yous to integrate Spark into your Python workflow, and besides pyspark in case you are working in a local reckoner and not in a proper Hadoop cluster.

If you are following this tutorial in a Hadoop cluster, tin skip PySpark install.

          conda install -c conda-forge findspark -y          # optional, for local setup
conda install -c conda-forge pyspark openjdk -y

One time you install findspark, it is time to setup Spark for usage in your Python code.

Code for both local and cluster manner is provided here, uncomment the line you lot need and arrange paths depending on your particular infrastructure and library versions (cloudera Spark path should be pretty like to the one provided here):

                      import            findspark                      # Local Spark
findspark.init('/home/cloudera/miniconda3/envs/<your_environment_name>/lib/python3.vii/site-packages/pyspark/')
# Cloudera Cluster Spark
findspark.init(spark_home='/opt/cloudera/parcels/SPARK2–2.iii.0.cloudera4–1.cdh5.13.3.p0.611179/lib/spark2/')

This tutorial accept been written using Cloudera Quickstart VM (a CentOS linux distribution with an username called cloudera), remember to adapt paths to your infrastructure!

Once Spark is initialized, nosotros have to create a Spark application, execute the following lawmaking, and make sure you specify the master you need, like 'yarn' in the instance of a proper Hadoop cluster, or 'local[*]' in the example of a fully local setup:

                      from            pyspark.sql            import            SparkSession
spark = SparkSession.architect.appName('example_app').master('yarn').getOrCreate()

Once we take our working Spark, allow's get-go interacting with Hadoop taking advantage of it with some common employ cases.

Listing Hive databases

Let'south get existing databases. I assume you lot are familiar with Spark DataFrame API and its methods:

          spark.sql("bear witness databases").show()        

You should get something like this:

          +------------+            
|databaseName|
+------------+
| db1|
| default|
| fhadoop|
+------------+

Transform pandas DataFrame into a Spark DataFrame

Outset integration is about how to move data from pandas library, which is Python standard library to perform in-memory data manipulation, to Spark.

Outset, let'due south load a pandas DataFrame. This one is about Air Quality in Madrid (just to satisfy your curiosity, but not important with regards to moving data from one identify to another 1). You tin download it here. Make sure you install the library pytables to read hdf5 formatted data.

                      import            pandas            every bit            pd          air_quality_df = pd.read_hdf('data/air_quality/air-quality-madrid/madrid.h5', primal='28079008')            
air_quality_df.head()

This data is a time series for many well known pollutants similar NOX, Ozone, and more than:

Let'south make some changes to this DataFrame, like resetting datetime alphabetize to avoid losing data when loading into Spark. Datetime column will also exist transformed to string every bit Spark has some bug working with dates (related to system locale, timezones, and so on) unless further configuration depending on your locale.

          air_quality_df.reset_index(inplace=True)            
air_quality_df['date'] = air_quality_df['engagement'].dt.strftime('%Y-%m-%d %H:%M:%Southward')

We tin simply load from pandas to Spark with createDataFrame:

          air_quality_sdf = spark.createDataFrame(air_quality_df)        

In one case DataFrame is loaded into Spark (as air_quality_sdf here), can be manipulated hands using PySpark DataFrame API:

          air_quality_sdf.select('date', 'NOx').bear witness(5)        

Output should look like this:

          +— — — — — — — — — -+ — — — — — — — — — +            
| date| NOx|
+ — — — — — — — — — + — — — — — — — — — +
|2001–07–01 01:00:00| 1017.0|
|2001–07–01 02:00:00| 409.20001220703125|
|2001–07–01 03:00:00| 143.39999389648438|
|2001–07–01 04:00:00| 149.3000030517578|
|2001–07–01 05:00:00| 124.80000305175781|
+ — — — — — — — — — + — — — — — — — — — +
only showing top five rows

Create Hive table from Spark DataFrame

To persist a Spark DataFrame into HDFS, where it tin can be queried using default Hadoop SQL engine (Hive), one straightforward strategy (not the only one) is to create a temporal view from that DataFrame:

          air_quality_sdf.createOrReplaceTempView("air_quality_sdf")        

In one case the temporal view is created, it tin can be used from Spark SQL engine to create a real tabular array using create table as select. Earlier creating this table, I volition create a new database called analytics to store it:

          sql_create_database = """
create database if not exists analytics
location '/user/cloudera/analytics/'
"""
result_create_db = spark.sql(sql_create_database)

And so, we can create a new table there:

          sql_create_table = """            
create table if not exists analytics.pandas_spark_hive
using parquet as select
to_timestamp(engagement) as date_parsed, *
from air_quality_sdf
"""
result_create_table = spark.sql(sql_create_table)

Reading information from Hive table using PySpark

Once we have created our Hive table, can check results using Spark SQL engine to load results back, for case to select ozone pollutant concentration over fourth dimension:

          spark.sql("select * from analytics.pandas_spark_hive") \ .select("date_parsed", "O_3").show(5)        

Output :

          + — — — — — — — — — + — — — — — — — — — +            
| date_parsed | O_3|
+ — — — — — — — — — + — — — — — — — — — +
|2001–07–01 01:00:00| 9.010000228881836|
|2001–07–01 02:00:00| 23.81999969482422|
|2001–07–01 03:00:00| 31.059999465942383|
|2001–07–01 04:00:00| 23.780000686645508|
|2001–07–01 05:00:00| 29.530000686645508|
+ — — — — — — — — — + — — — — — — — — — +
only showing top 5 rows

Hope you liked this post. In the next weeks we will release a series of posts with alternative tools you can use to master Hadoop with Python.

hansenphers1963.blogspot.com

Source: https://medium.com/datatau/hadoop-with-python-pyspark-a65a6f97ddf2

0 Response to "Read Data From Hadoop in Spark Python"

Postar um comentário

Iklan Atas Artikel

Iklan Tengah Artikel 1

Iklan Tengah Artikel 2

Iklan Bawah Artikel