Home StrucuredStreaming window
Post
Cancel

StrucuredStreaming window

打印窗口示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

object StructuredNetworkWindowShow {

  def main(args: Array[String]) {
    val host = "10.9.22.136"
    val port = "6666"
    val windowSize = 3
    val slideSize = 2
    val triggerTime = 1
    if (slideSize > windowSize) {
      System.err.println("<slide duration> must be less than or equal to <window duration>")
    }
    val windowDuration = s"$windowSize seconds"
    val slideDuration = s"$slideSize seconds"

    val spark = SparkSession
      .builder
      .master("local")
      .appName("StructuredNetworkWindowShow"+String.valueOf(System.currentTimeMillis()))
      .getOrCreate()

    import spark.implicits._
    // Create DataFrame representing the stream of input lines from connection to host:port
    val lines = spark.readStream
      .format("socket")
      .option("host", host)
      .option("port", port)
      .option("includeTimestamp", true)
      .load()
    val wordCounts:DataFrame = lines.select(window($"timestamp",windowDuration,slideDuration),$"value")
    // Start running the query that prints the windowed word counts to the console

    val query = wordCounts.writeStream
      .outputMode(OutputMode.Append())
      .format("console")
      .trigger(ProcessingTime(s"$triggerTime seconds"))
      .option("truncate", "false")
      .start()

    query.awaitTermination()

  }

如下window大小为3,滑动步长2,triggerTime=1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+-----+
|window                                    |value|
+------------------------------------------+-----+
|[2019-08-27 14:20:36, 2019-08-27 14:20:39]|11   |
|[2019-08-27 14:20:38, 2019-08-27 14:20:41]|11   |
|[2019-08-27 14:20:38, 2019-08-27 14:20:41]|12   |
|[2019-08-27 14:20:40, 2019-08-27 14:20:43]|12   |
|[2019-08-27 14:20:40, 2019-08-27 14:20:43]|13   |
+------------------------------------------+-----+
-------------------------------------------
Batch: 6
-------------------------------------------
19/08/27 14:26:39 INFO WriteToDataSourceV2Exec: Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@37e8844 committed.
+------------------------------------------+-----+
|window                                    |value|
+------------------------------------------+-----+
|[2019-08-27 14:26:32, 2019-08-27 14:26:35]|17   |
|[2019-08-27 14:26:34, 2019-08-27 14:26:37]|18   |
|[2019-08-27 14:26:34, 2019-08-27 14:26:37]|19   |
|[2019-08-27 14:26:36, 2019-08-27 14:26:39]|19   |
+------------------------------------------+-----+
This post is licensed under CC BY 4.0 by the author.