If you been looking for a use case for a , I came to the realization my favorite dead simple one is the ability to query a pile of JSON right along side my other data via sql without really doing much. Which is the dream realized from the powerful Multi Model InterSystems Data Platform, and shown here in a simple notebook to visualize my geo location data my is emitting.
So here is the 2 step approach, Ingestion to and Visualization from to InterSystems Cloud Document, using the driver.
InterSystems Cloud Document Deployment
Ingestion
For ingestion, I wanted to get a grip on how to lift a JSON document from the file system and persist it as a collection in the document database over the listener, for this I wrote a standalone java app. This was more utility as the fun all happened in the notebook after the data was up in there.
package databricks_rivian_irisdocdb;
import java.sql.SQLException;
import com.intersystems.document.*;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.*;
import java.io.IOException;
import java.io.InputStream;
import java.io.File;
import java.io.FileInputStream;
import org.apache.commons.io.IOUtils;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Stream;
public class RivianDocDb {
public static void main(String[] args) {
String directoryPath =
"/home/sween/Desktop/POP2/DEEZWATTS/rivian-iris-docdb/databricks_rivian_irisdocdb/in/json/";
DataSource datasrc = DataSource.createDataSource();
datasrc.setPortNumber(443);
datasrc.setServerName("k8s-05868f04-a88b7ecb-5c5e41660d-404345a22ba1370c.elb.us-east-1.amazonaws.com");
datasrc.setDatabaseName("USER");
datasrc.setUser("SQLAdmin");
datasrc.setPassword("REDACTED");
try {
datasrc.setConnectionSecurityLevel(10);
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("\nCreated datasrc\n");
System.out.println(datasrc);
datasrc.preStart(2);
System.out.println("\nDataSource size =" + datasrc.getSize());
// creates the collection if it dont exist
Collection collectedDocs =
Collection.getCollection(datasrc,"deezwatts2");
try (Stream<Path> paths = Files.list(Paths.get(directoryPath))) {
paths.filter(Files::isRegularFile)
.forEach(path -> {
File file = path.toFile();
});
} catch (IOException e) {
e.printStackTrace();
}
File directory = new File(directoryPath);
if (directory.isDirectory()) {
File[] files = directory.listFiles();
if (files != null) {
for (File file : files) {
if (file.isFile()) {
try (InputStream is = new
FileInputStream("/home/sween/Desktop/POP2/DEEZWATTS/rivian-iris-docdb/databricks_rivian_irisdocdb/in/json/"
+ file.getName())) {
String jsonTxt = IOUtils.toString(is, "UTF-8");
Document doc2 = JSONObject.fromJSONString(jsonTxt);
// top level key is whip2
Document doc3 = new JSONObject().put("whip2",doc2);
collectedDocs.insert(doc3);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
long size = collectedDocs.size();
System.out.println(Long.toString(size));
System.out.println("\nIngested Documents =" + datasrc.getSize());
The above is trash, but worked, we can see the collection in the collection browser in the deployment.
Databricks
Now this takes a little bit of Databricks setup, but is well worth it to work with pyspark for the fun part.
I added the two intersystems drivers to the cluster, and put the certificate in the init_script_ so it gets added to the keystore.
For completeness, the cluster is running Databricks 16, Spark 3.5.0 and Scala 2.12
Visualizing with GeoPandas
So we should be set to run a PySpark job and plot where my whip has been in the subset of data Ill drag in.
import geopandas as gpd
import geodatasets
from shapely.geometry import Polygon
dbtablequery = f"(SELECT TOP 1000 lat,longitude FROM JSON_TABLE(deezwatts2 FORMAT COLLECTION, '$' COLUMNS (lat VARCHAR(20) path '$.whip2.data.vehicleState.gnssLocation.latitude', longitude VARCHAR(20) path '$.whip2.data.vehicleState.gnssLocation.longitude' ))) AS temp_table;"
Next we setup the connection to the IRIS Document Database Deployment and read it into a dataframe.
Next we grab an available map from geodatasets, the sdoh one is great for generic use of the united states.
# sdoh map is fantastic with bounding boxes
michigan = gpd.read_file(geodatasets.get_path("geoda.us_sdoh"))
gdf = gpd.GeoDataFrame(
df.toPandas(),
geometry=gpd.points_from_xy(df.toPandas()['longitude'].astype(float), df.toPandas()['lat'].astype(float)),
crs=michigan.crs #"EPSG:4326"
)
Now the cool part, we want to zoom in on where we want to contain the geo points of where the R1S has driven, for this we need a bounding box for the state of Michigan.
Now that we have the coordinates array of the bounding box, we need slap them into a Polygon object.