Skip to content

Structured Stream demos

Khoa Dang edited this page Oct 20, 2017 · 9 revisions

Below are the results of some Structured Streaming test runs in different scenarios using the CosmosDB Spark Connector.

CosmosDB Configurations

Partitioned Collection: 100,000 RUs, 50 partitions

Apache Spark Configurations

Azure HDI Cluster: 14 Standard_D12_v2 workers (4 cores, 28GB memory). 56 cores, 392GB memory in total.

Streaming scenarios

Interval count

In this scenario, we demonstrate building a streaming source from CosmosDB collection change feed and consume the stream by showing the incoming in the console.

  • Set up the CosmosDB collection to have various insert load ranging from 10 to 1500 documents/second.
  • Start a streaming source reading data from the CosmosDB change feed of the collection and use the console as the streaming sink.
  • We should be able from the console data is incrementally streaming from the source in batches.
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.codehaus.jackson.map.ObjectMapper
import com.microsoft.azure.cosmosdb.spark.streaming._

val configMap = Map(
"Endpoint" -> "COSMOSDB ENDPOINT",
"Masterkey" -> "COSMOSDB MASTER KEY",
"Database" -> "DATABASE NAME",
"collection" -> "COLLECTION NAME",
"ChangeFeedCheckpointLocation" -> "checkpointlocation",
"changefeedqueryname" -> "Structured Stream interval count")

// Start reading change feed as a stream
var streamData = spark.readStream.format(classOf[CosmosDBSourceProvider].getName).options(sourceConfigMap).load()

// Start streaming query to console sink
val query = streamData.withColumn("countcol", streamData.col("id").substr(0, 0)).groupBy("countcol").count().writeStream.outputMode("complete").format("console").start()

Observations

We've noticed that the change feed documents were received correctly for all configurations of insert load. In case of node failures, the connector was able to resume the change feed since the last checkpoint. Below are the highlights from the test run logs.

# Writing 100k documents
-------------------------------------------
Batch: 0
-------------------------------------------
+--------+-----+
|countcol|count|
+--------+-----+
+--------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+--------+-----+
|countcol|count|
+--------+-----+
+--------+-----+
...

-------------------------------------------
Batch: 5
-------------------------------------------
+--------+-----+
|countcol|count|
+--------+-----+
|        | 8108|
+--------+-----+

-------------------------------------------
Batch: 6
-------------------------------------------
+--------+-----+
|countcol|count|
+--------+-----+
|        |21040|
+--------+-----+
...

-------------------------------------------
Batch: 13
-------------------------------------------
+--------+-----+
|countcol|count|
+--------+-----+
|        |99907|
+--------+-----+

-------------------------------------------
Batch: 14
-------------------------------------------
+--------+------+
|countcol| count|
+--------+------+
|        |100000|
+--------+------+
...
# Writing 200k documents
-------------------------------------------
Batch: 39
-------------------------------------------
+--------+------+
|countcol| count|
+--------+------+
|        |108095|
+--------+------+
...

-------------------------------------------
Batch: 56
-------------------------------------------
+--------+------+
|countcol| count|
+--------+------+
|        |292406|
+--------+------+

-------------------------------------------
Batch: 57
-------------------------------------------
+--------+------+
|countcol| count|
+--------+------+
|        |300000|
+--------+------+
...

# Writing 200k documents and there were node failures
-------------------------------------------
Batch: 67
-------------------------------------------
+--------+------+
|countcol| count|
+--------+------+
|        |300647|
+--------+------+
...
-------------------------------------------
Batch: 76
-------------------------------------------
+--------+------+
|countcol| count|
+--------+------+
|        |394534|
+--------+------+

# The streaming source was terminated and restarted
-------------------------------------------
Batch: 0
-------------------------------------------
+--------+------+
|countcol| count|
+--------+------+
|        |105466|
+--------+------+

Replication streaming to another collection

In this scenario, we simulate data replication between two CosmosDB collections by creating a streaming source from a CosmosDB collection change feed and create a streaming sink which writes incoming data to another CosmosDB collection.

  • Set up the CosmosDB collection to have various insert load ranging from 10 to 1500 documents/second.
  • Start a streaming source reading data from the CosmosDB change feed of the collection and create a sink to another CosmosDB collection.
  • We should see incoming data from the source collection being replicated to the sink collection.
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.codehaus.jackson.map.ObjectMapper
import com.microsoft.azure.cosmosdb.spark.streaming._

val sourceCollectionName = "SOURCE COLLECTION NAME"
val sinkCollectionName = "SINK COLLECTION NAME"

val configMap = Map(
"Endpoint" -> "COSMOSDB ENDPOINT",
"Masterkey" -> "COSMOSDB MASTER KEY",
"Database" -> "DATABASE NAME",
"Collection" -> sourceCollectionName,
"ChangeFeedCheckpointLocation" -> "changefeedcheckpointlocation")

val sourceConfigMap = configMap.+(("changefeedqueryname", "Structured Stream replication streaming test"))

// Start to read the stream
var streamData = spark.readStream.format(classOf[CosmosDBSourceProvider].getName).options(sourceConfigMap).load()

val sinkConfigMap = configMap.-("collection").+(("collection", sinkCollectionName))

// Start the stream writer
val streamingQueryWriter = streamData.writeStream.format(classOf[CosmosDBSinkProvider].getName).outputMode("append").options(sinkConfigMap).option("checkpointLocation", "streamingcheckpointlocation")

var streamingQuery = streamingQueryWriter.start()

Twitter feeds streaming

In this scenario, we demonstrate running analytics queries on top of a stream of Twitter feeds.

  • Start feeding a streaming source to a Cosmos DB collection as indicated in this Change Feed demo.
  • Start a streaming source reading data from the CosmosDB change feed of the collection.
  • From there we can run various streaming analytics as shown in the code below.
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.streaming._
import com.microsoft.azure.cosmosdb.spark.config._

val configMap = Map(
"Endpoint" -> "COSMOSDB ENDPOINT",
"Masterkey" -> "COSMOSDB KEY",
"Database" -> "DATABASE NAME",
"Collection" -> "COLLECTION NAME",
"ChangeFeedCheckpointLocation" -> "cfcheckpoint1406",
"ChangeFeedQueryName" -> "cfqueryname1406")

var streamData = spark.readStream.format(classOf[CosmosDBSourceProvider].getName).options(configMap).load()

// Get the stats on the language of the tweets
val query = streamData.groupBy("lang").count().sort($"count".desc).writeStream.outputMode("complete").format("console").start()

/** Console
-------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+
|lang|count|
+----+-----+
|  en| 1201|
| und|   17|
|  es|   12|
|  de|   11|
|  fr|   11|
|  tl|    5|
|  nl|    4|
|  it|    1|
|  ja|    1|
+----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+-----+
|lang|count|
+----+-----+
|  en| 1203|
| und|   17|
|  es|   12|
|  de|   11|
|  fr|   11|
|  tl|    5|
|  nl|    4|
|  it|    1|
|  ja|    1|
+----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+----+-----+
|lang|count|
+----+-----+
|  en| 1206|
| und|   17|
|  es|   12|
|  de|   11|
|  fr|   11|
|  tl|    5|
|  nl|    4|
|  it|    1|
|  ja|    1|
+----+-----+

...
*/

// Get the most popular tweets table
val query = streamData.select(explode($"entities.hashtags.text").alias("tag")).groupBy("tag").count().sort($"count".desc).writeStream.outputMode("complete").format("console").start()

/** Console
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+-----+
|                 tag|count|
+--------------------+-----+
|                  AI|   34|
|     MachineLearning|   34|
|ArtificialIntelli...|   29|
|          DataMining|   27|
|           Analytics|   27|
|                 CFO|   25|
|             BigData|   11|
|         DataScience|    7|
|         datascience|    6|
|                 IoT|    5|
|     machinelearning|    4|
|                  ML|    4|
|             bigdata|    3|
|    InternetOfThings|    3|
|           analytics|    3|
|                IIoT|    3|
|            mathchat|    2|
|             compsci|    2|
|                stem|    2|
|                math|    2|
+--------------------+-----+
only showing top 20 rows

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+-----+
|                 tag|count|
+--------------------+-----+
|                  AI|   36|
|     MachineLearning|   36|
|ArtificialIntelli...|   29|
|           Analytics|   27|
|          DataMining|   27|
|                 CFO|   25|
|             BigData|   13|
|         DataScience|    9|
|         datascience|    7|
|                  ML|    6|
|                 IoT|    6|
|             bigdata|    4|
|     machinelearning|    4|
|           analytics|    3|
|    InternetOfThings|    3|
|                IIoT|    3|
|                tech|    3|
|             Bigdata|    2|
|               Cloud|    2|
|             Fintech|    2|
+--------------------+-----+
only showing top 20 rows

...
*/

// Get the most popular hashtags table with a timestamp column
import java.time._
val query = streamData.select(explode($"entities.hashtags.text").alias("tag")).groupBy("tag").count().sort($"count".desc).withColumn("timestamp", lit(Instant.now().toString)).writeStream.outputMode("complete").format("console").start()

/** Console
-------------------------------------------
Batch: 0
-------------------------------------------
+---------------+-----+--------------------+
|            tag|count|           timestamp|
+---------------+-----+--------------------+
|        BigData|   65|2017-10-20T00:34:...|
|            IoT|   39|2017-10-20T00:34:...|
|      Analytics|   32|2017-10-20T00:34:...|
|             AI|   30|2017-10-20T00:34:...|
|  CyberSecurity|   29|2017-10-20T00:34:...|
|        sensors|   28|2017-10-20T00:34:...|
|     Industry40|   28|2017-10-20T00:34:...|
|      SmartCity|   27|2017-10-20T00:34:...|
|      smartgrid|   27|2017-10-20T00:34:...|
|             AR|   27|2017-10-20T00:34:...|
|MachineLearning|   26|2017-10-20T00:34:...|
|    DataScience|   15|2017-10-20T00:34:...|
|    datascience|   13|2017-10-20T00:34:...|
|        bigdata|   12|2017-10-20T00:34:...|
|          Cloud|    8|2017-10-20T00:34:...|
|             ML|    7|2017-10-20T00:34:...|
|   DeepLearning|    7|2017-10-20T00:34:...|
|     Blockchain|    7|2017-10-20T00:34:...|
|        Fintech|    6|2017-10-20T00:34:...|
|           tech|    6|2017-10-20T00:34:...|
+---------------+-----+--------------------+
only showing top 20 rows
*/

// Replicate the tweets to another collection
val sinkConfigMap = configMap.-("Collection").+(("Collection", "tweets_shadow"))
val streamQuery = streamData.writeStream.format(classOf[CosmosDBSinkProvider].getName).outputMode("append").options(sinkConfigMap).option("checkpointLocation", "streamingCheckpointLocation20171017").start()

// Get the most popular tweets table and persist them to another CosmosDB
// With this data we can later plot the trends of the tags
val sinkConfigMap = configMap.-("Collection").+(("Collection", "tags"))
val query = streamData.select(explode($"entities.hashtags.text").alias("tag")).groupBy("tag").count().sort($"count".desc).writeStream.format(classOf[CosmosDBSinkProvider].getName).outputMode("complete").options(sinkConfigMap).option("checkpointLocation", "populartagsstreaming1507").start()

// To do upsert to the sink collection, we can set the 'Upsert' flag in the configuration
val sinkConfigMap = configMap.-("Collection").+(("Collection", "tags")).+(("Upsert", "true"))
val query = streamData.select(explode($"entities.hashtags.text").alias("tag")).groupBy("tag").count().sort($"count".desc).writeStream.format(classOf[CosmosDBSinkProvider].getName).outputMode("complete").options(sinkConfigMap).option("checkpointLocation", "populartagsstreaming1507").start()

// Get the details of user country, and current batch time
val getCountry = udf((location: String) => {
    if (location == null) {
        location
    } else {
        val parts = location.split(", ")
        if (parts.size > 1) parts(1).trim else location.trim
    }
})
val query = streamData.withColumn("country", getCountry($"user.location")).groupBy($"country").count().sort($"count".desc).withColumn("timestamp", lit(Instant.now().toString)).writeStream.outputMode("complete").format("console").start()