WSPR Analytics¶
In the early days (March 2008), WSPR Spots measured in the hundreds of thousands per month. Today, that number has increased to over 75+ Million per month and shows no sign of abatement. By any reasonable definition, it is safe to say that WSPR has entered the realm of Big Data.
Features¶
- Full Tool Chain Installation and Environment Setup Guide(s)
- Tutorials, experiments, and tests on large data sets
- Exposure to leading-edge technologies in the realm of Big Data Processing
- Hints, tips and tricks for keeping your Linux distro running smooth
- And eventually, produce useful datasets for the greater Amateur Radio Community to consume
Primary Focus¶
The focus of this project is to provide a set of tools to download, manage, transform and query WSPR DataSets using modern Big Data frameworks.
Folder Descriptions¶
Several frameworks are used in this repository. The following matrix provides a short description of each, and their intended purpose.
docs
- Python, MkDocs for repository documentationgolang
- General purpose command line apps and utilitiesjava
- Java, Maven, and SBT apps for RDD and Avro examplesnotebooks
- Jupyter Notebooks for basic test and visualizationpyspark
- Python, PyArrow scripts that interact with CSV and Parquet filesspark
-Scala programs to perform taskswsprdaemon
- Python, Scala, Psql utilities related to the WSPR Daemon projectwsprana
- Python, (soon to be retired)
Pay close attention to the README
files as they lay out how to setup the
tools needed to run their respective scripts or application.
Base Tool Requirements¶
You must have Python, Java, PySpark / Spark (Scala) and SBT available from the command line.
- Java OpenJDK version 1.8.0_275 or later
- Python 3.7 or 3.8, PyArrow has issues with 3.9 at present
- PySpark from PyPi
- Apache Arrow 2.0+
- Scala 2.12.12 - patch version 10,11,12,13 also work with Spark 3.0.1 / 3.1.1
- Spark 3.0.1
- PostgreSQL Database (local, remote, Docker, Vagrant, etc)
- Optional ClickHouse High Performance Database
IMPORTANT: The Spark / Scala combinations are version sensitive. Check the [Spark][] download page for recommended version combinations if you deviate from what is listed here. As of this writing, Spark 3.0.1 and above was built with Scala 2.12.10. For the least amount of frustration, stick with what's known to work (any of the 2.12.xx series)
Data Sources and Processing¶
The main data source will be the monthly WSPRNet Archives. At present, there is no plan to pull nightly updates. That could change if a reasonable API is identified. WSPR Daemon
The tools (apps/scripts) will be used to convert the raw CSV files into a format better suited for parallel processing, namely, Parquet. Read speeds, storage footprints, and ingestion improve dramatically with this storage format. However, there is a drawback, one cannot simply view a binary file as they can with raw text files. The original CSV will remain in place, but all bulk processing will be pulled from Parquet or a high performance database such as ClickHouse. During these transformations is where PyArrow, PySpark or [Spark][] will earn it's keep.
Persistent Storage¶
A PostgreSQL database server will be needed. There are many ways to perform this installation (local, remote, Dockerize PostgreSQL, PostgreSQL with Vagrant, etc).
High Performance Database¶
While PostgreSQL is a highly-capabale RDMS, another database that is better suited to big data and extremely fast queries called ClickHouse will be used.
It is column-oriented and allows to generate analytical reports using SQL queries in real-time. Blazingly fast, Linearly scalable, Feature rich, Hardware efficient, Fault-tolerant, Highly reliable
Distribution Tabs¶
In many of the installation sections, you will see Tabs for a particular distribution. Clicking on the desired tab will render the command or content relevant to that distribution.
NOTE: These are just examples, and not intended for actual use.
- Update the package list
apk update
- Add a package
apk add openssh apk add openssh opentp vim
Example to install a prerequisite package for Kernel compiling.
# Update the repositories
pacman -Syu
# Install a package
pacman -S dkms
Upgrade the host System Packages.
# Run the following command
sudo apt-get update && sudo apt-get upgrade
Lets not and say we did!
REM Run the following command
echo Spark runs better on Linux.
echo Please consider running Spark apps in
echo VirtualBox if your host os is Windows!!
Super Fencing¶
In many examples you may see multiple tabs relating to a particular code-block. Clicking on each tab shows the syntax for the stated language. This is the same behaviour as with Distribution Tabs
package main
import (
"fmt"
"os"
"os/exec"
"strings"
"time"
flag "github.com/spf13/pflag"
. "github.com/logrusorgru/aurora"
)
// version flags for main funciton
var (
appname string
version string
date string
description string = "Golang App to display time zone informaiton."
)
// CheckError is a function to print out errors
func CheckError(e error) {
if e != nil {
fmt.Println(e)
}
}
// clearScreen simple clears the terminal screen of any existing text
func clearScreen() {
c := exec.Command("clear")
c.Stdout = os.Stdout
c.Run()
}
// main is the primary entry point for the app
func main() {
//clearScreen()
// set the option flagsd
var ver = flag.BoolP("version", "v", false, "prints app version information")
flag.Parse()
// only print the version informaiton if the user asks for it.
if *ver {
fmt.Println("\nApp Name .....: ", Cyan(appname))
fmt.Println("Version ......: ", Cyan(version))
fmt.Println("Build Date ...: ", Cyan(date))
fmt.Println("Description ..: ", Cyan(description))
fmt.Println()
os.Exit(0)
}
tNow := time.Now()
fmt.Println(Cyan("\nCurrent Time Data In Local Time Zone"))
fmt.Println(strings.Repeat("-", 55))
// Local Time Now in Unix Timestamp format
tUnix := tNow.Unix()
fmt.Printf("Unix.Time:\t%d\n", tUnix)
// Local Time Now serived from unix timestamp
tLocal := time.Unix(tUnix, 0)
fmt.Printf("Time.Local:\t%s\n", tLocal)
// UTC Time serived from unix timestamp
tUtc := time.Unix(tUnix, 0).UTC()
fmt.Printf("Time.UTC:\t%s\n", tUtc)
// Location Source: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
fmt.Println(Cyan("\nLocal Time For A Specific Zone"))
fmt.Println(strings.Repeat("-", 58))
eastern, e := time.LoadLocation("US/Eastern")
CheckError(e)
central, e := time.LoadLocation("US/Central")
CheckError(e)
mountain, e := time.LoadLocation("US/Mountain")
CheckError(e)
pacific, e := time.LoadLocation("US/Pacific")
CheckError(e)
alaska, e := time.LoadLocation("US/Alaska")
CheckError(e)
hawaii, e := time.LoadLocation("US/Hawaii")
CheckError(e)
fmt.Println("US/Eastern:\t", tNow.In(eastern))
fmt.Println("US/Central:\t", tNow.In(central))
fmt.Println("US/Mountain:\t", tNow.In(mountain))
fmt.Println("US/Pacific:\t", tNow.In(pacific))
fmt.Println("US/Alaska:\t", tNow.In(alaska))
fmt.Println("US/Hawaii:\t", tNow.In(hawaii))
/*
Goloang Time Format Examples
*/
fmt.Println(Cyan("\nGloang Time Package Format Examples"))
fmt.Println(strings.Repeat("-", 55))
fmt.Println("ANSIC: \t", tNow.Format(time.ANSIC))
fmt.Println("UnixDate:\t", tNow.Format(time.UnixDate))
fmt.Println("RubyDate:\t", tNow.Format(time.RubyDate))
fmt.Println("RFC822: \t", tNow.Format(time.RFC822))
fmt.Println("RFC822Z: \t", tNow.Format(time.RFC822Z))
fmt.Println("RFC850: \t", tNow.Format(time.RFC850))
fmt.Println("RFC1123: \t", tNow.Format(time.RFC1123))
fmt.Println("RFC1123Z: \t", tNow.Format(time.RFC1123Z))
fmt.Println("RFC3339: \t", tNow.Format(time.RFC3339))
fmt.Println("Kitchen: \t", tNow.Format(time.Kitchen))
fmt.Println("StampMicro:\t", tNow.Format(time.StampMicro))
fmt.Println("StampMilli:\t", tNow.Format(time.StampMilli))
fmt.Println("StampNano:\t", tNow.Format(time.StampNano))
fmt.Println()
}
/**
*
* Static Method: Unzip a file to a path location
*
*/
private static void UnzipFile(String zipFilePath, String destDir) {
File dir = new File(destDir);
if (!dir.exists()) {
dir.mkdirs();
}
FileInputStream fis;
byte[] buffer = new byte[1024];
try {
fis = new FileInputStream(zipFilePath);
ZipInputStream zis = new ZipInputStream(fis);
ZipEntry ze = zis.getNextEntry();
// outer-loop
while (ze != null) {
String fileName = ze.getName();
File newFile = new File(destDir + File.separator + fileName);
System.out.println("* Unzipping to " + newFile.getAbsolutePath());
new File(newFile.getParent()).mkdirs();
FileOutputStream fos = new FileOutputStream(newFile);
int len;
// inner-loop
while ((len = zis.read(buffer)) > 0) {
fos.write(buffer, 0, len);
}
fos.close();
//close this ZipEntry
zis.closeEntry();
ze = zis.getNextEntry();
}
// close the ZipEntry
zis.closeEntry();
zis.close();
fis.close();
} catch (IOException e) {
e.printStackTrace();
System.exit(2);
}
} // END - UnzipFile method
def pandas_convert_csv(csvfile):
"""Convert CSV file using parquet_type compression"""
file_name = os.path.basename(csvfile)
clear()
print("\nPandas CSV Conversion Method")
print(f"Parquet Compression Types : {parquet_types}")
print("Sit back and relax, this takes a while!!\n")
print(f'* Reading file : {file_name}')
start = time.time()
df = pd.read_csv(csvfile, dtype=spot_dtype, names=column_names, header=None)
rc = df.shape[0]
print(f"* Spot Count : {rc:,}")
end = time.time()
print(f"* File Size : {round(get_file_size(csvfile, 'csv'), 2)} MB")
print(f"* Elapsed Time : {round((end - start), 3)} sec")
for f in parquet_types:
compression_type = str(f.upper())
file_name = csvfile.replace('csv', f.lower())
if compression_type == "PARQUET":
comp_type = "NONE"
else:
comp_type = compression_type.upper()
print(f'\n* Converting CSV to -> {f.lower()}')
start = time.time()
df.to_parquet(file_name, compression=str(comp_type.upper()))
end = time.time()
time.sleep(sleep_time) # prevent seg-fault on reads that are too quick
print(f"* File Size : {round(get_file_size(csvfile, comp_type), 2)} MB")
print(f"* Elapsed Time : {round((end - start), 3)} sec")
// Convert Epoch Times in from WSPRnet CSV files
def main(args: Array[String]): Unit = {
val debug: Boolean = false
// make Java's log4j warnings be quiet
PropertyConfigurator.configure("log4j/log4j.properties")
// IMPORTANT: When converting EPOCH times, you must do so with the
// to_utc_timestamp method. This requires telling the system what Zone
// your computer is in (the one doing the conversion) in order to get
// the correct unix time.
val z = ZoneId.systemDefault()
val zoneId = z.getId
println("Process Steps For Processing A CSV File")
println("- Create a Spark Session")
// Create the SPark Session
val spark: SparkSession = SparkSession.builder()
.appName("Read CSV and Show Schema")
.master("local[16]")
.getOrCreate()
// Add Type-Safe Schema
println("- Create the Spot Schema")
val spotSchema = new StructType()
.add("SpotID", LongType, nullable = false)
.add("Timestamp", IntegerType, nullable = false)
.add("Reporter", StringType, nullable = false)
.add("RxGrid", StringType, nullable = false)
.add("SNR", ByteType, nullable = false)
.add("Frequency", DoubleType, nullable = false)
.add("CallSign", StringType, nullable = false)
.add("Grid", StringType, nullable = false)
.add("Power", ByteType, nullable = false)
.add("Drift", ByteType, nullable = false)
.add("Distance", ShortType, nullable = false)
.add("Azimuth", ByteType, nullable = false)
.add("Band", ByteType, nullable = false)
.add("Version", StringType, nullable = true)
.add("Code", ByteType, nullable = true)
// Create the Spark DataSet ( using small 100K csv )
println("- Read the CSV file into a DataSet")
import spark.implicits._
val ds = spark.read
.option("delimiter", ",")
.option("header", "false")
.schema(spotSchema)
.csv(path = "data/spots-2020-02-100K.csv")
.as[RawSpot]
println("- Select the column we want to process")
// Filter the data set
val res = ds.select("*")
.withColumn("x_TimeStamp", date_format(col("TimeStamp")
.cast(DataTypes.TimestampType), "yyyy-MM-dd HH:mm:ss"))
// only print the schema in Debug Mode
if (debug) {
res.printSchema()
}
// See note above about ZoneId, it's important!
println("- Setup Epoh Conversion")
val res1 = res.select("*")
.withColumn("x_timestamp", to_utc_timestamp(col("x_TimeStamp"), zoneId))
.withColumn("x_date", to_date(col("x_TimeStamp")))
.withColumn("x_year", year(col("x_TimeStamp")).cast(ShortType))
.withColumn("x_month", month(col("x_TimeStamp")).cast(ByteType))
.withColumn("x_day", dayofmonth(col("x_TimeStamp")).cast(ByteType))
.withColumn("x_hour", hour(col("x_TimeStamp")).cast(ByteType))
.withColumn("x_minute", minute(col("x_TimeStamp")).cast(ByteType))
// only print the schema in Debug Mode
if (debug) {
println("- Print Res1 Schema")
res1.printSchema()
}
// When we call show(x), this is what triggers the run
println("- Execute the Query")
time {
res1.show(5)
}
// Print the final row count
println("\nGetting final row count, please wait...")
time {
val rowcount = res1.count()
println(f"Epoch Conversion Processed : ($rowcount%,d) Spots ")
}
} // END - Main CLass
WSPR Analytics is Apache 2.0 licensed code.