Error:java: 无效的标记: -version 解决

使用最新版Intellij IDEA以后,编译项目出现Error:java: 无效的标记: -version 解决

后来排查发现是因为公司的super-pom中的maven-compiler-pluginconfiguration有如下的配置:

1
2
3
4
 <compilerArgs>
- <arg>-J-Duser.country=US</arg>
- <arg>-version</arg>
</compilerArgs>

而这个配置会和新版本的idea冲突。把pom中的这个配置删除就好了

git没有使用自己的用户

昨晚因为一些原因删除了一波.ssh目录中的东西,导致今天在git pull
的时候出现需要我输入`git@gitlab.corp.xxx.com`的密码。这种问题一看就是没有识别我的gitlab用户。

一般这种问题有2种解决办法:

  • 走http协议
    http协议需要你输入用户名和密码
  • 走ssh协议
    重新生成秘钥,然后将公钥copy到gitlab的ssh keys

这里具体说说第二种解决办法:

  • 本次使用ssh-keygen -t rsa -C "用户名",一路回车会在.ssh目录下
    生成2个文件:id_rsa.pubid_rsa文件
  • 复制id_rsa.pub文件的内容到gitlab的profile setting -> SSH keys -> Add an SSH key

Spring的AntPathMatcher是个好东西

经常需要在各种中做一些模式匹配,正则表达式虽然是个好东西,但是Ant风格的匹配情况也非常的多。
这种情况下使用正则表达式不一定方便,而Spring提供的AntPathMatcher确可以帮助我们简化很多。

位于Spring-core中的org.springframework.util.AntPathMatcher使用起来非常简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class AntPathMatcherTest {

private AntPathMatcher pathMatcher = new AntPathMatcher();

@Test
public void test() {
pathMatcher.setCachePatterns(true);
pathMatcher.setCaseSensitive(true);
pathMatcher.setTrimTokens(true);
pathMatcher.setPathSeparator("/");

Assert.assertTrue(pathMatcher.match("a", "a"));
Assert.assertTrue(pathMatcher.match("a*", "ab"));
Assert.assertTrue(pathMatcher.match("a*/**/a", "ab/asdsa/a"));
Assert.assertTrue(pathMatcher.match("a*/**/a", "ab/asdsa/asdasd/a"));


Assert.assertTrue(pathMatcher.match("*", "a"));
Assert.assertTrue(pathMatcher.match("*/*", "a/a"));
}
}

SyntaxHighlighter Evolved带来的字符转义问题

我的WordPress使用的代码高亮插件为SyntaxHighlighter Evolved,主要是喜欢这个配色。

虽然也尝试过crayon syntax highlighter这个插件,此处提一下这个插件代码高亮的主题很多,也解决了
html字符转义的问题,但是配色不是我喜欢的类型。而且主要是以前一直使用SyntaxHighlighter Evolved
代码中使用了太多的[java]xasda[/java]类似这样的代码标签。而crayon syntax highlighter插件对这些
貌似支持的不太好,可能人家是支持的,不过我也没有太研究。所以还是想着解决SyntaxHighlighter Evolved
字符转义的问题。

问题解决

其实解决起来很简单:

  • 先停用SyntaxHighlighter Evolved插件
  • 编辑syntaxhighlighter/syntaxhighlighter.php文件的1046行
    这一行原来的内容为:
1
$code = ( false === strpos( $code, '<' ) && false === strpos( $code, '>' ) && 2 == $this->get_code_format($post) ) ? strip_tags( $code ) : htmlspecialchars( $code );

修改为:

1
$code = ( false === strpos( $code, '<' ) && false === strpos( $code, '>' ) ) ? strip_tags( $code ) : htmlspecialchars( $code );

然后保存,在启用插件就解决了代码中的html转义的问题

参考资料

将已有的工程代码push到github或者gitlab

执行下面的命令就好了

1
2
3
4
5
git init
git add .
git commit -m "Initial commit"
git remote add origin <project url>
git push -f origin master

不过有时候会在github或者gitlab上将master分支进行保护,所以可能需要先创建一个别的分支,然后merge就好了

参考资料

Spark 通用数据访问

##Data abstractions
RDD is the core abstraction in Apache Spark. It is an immutable, fault-tolerant distributed collection of statically typed objects that are usually stored in-memory.
DataFrame abstraction is built on top of RDD and it adds “named” columns. Moreover, the Catalyst optimizer, under the hood, compiles the operations and generates JVM bytecode for efficient execution.


However, the named columns approach gives rise to a new problem. Static type information is no longer available to the compiler, and hence we lose the advantage of compile-time type safety.

Dataset API was introduced to combine the best traits from both RDDs and DataFrames plus some more features of its own. Datasets provide row and column data abstraction similar to the DataFrames, but with a structure defined on top of them. This structure may be defined by a case class in Scala or a class in Java. They provide type safety and lambda functions like RDDs. So, they support both typed methods such as map and groupByKey as well as untyped methods such as select and groupBy. In addition to the Catalyst optimizer, Datasets leverage in-memory encoding provided by the Tungsten execution engine, which improves performance even further.
DataFrames are just untyped Datasets.

##unified data access platform
The intention behind this unified platform is that it not only lets you combine the static and streaming data together, but also allows various different kinds of operations on the data in a unified way!

###DataFrames
The DataFrame API brings two features with it:

  • Built-in support for a variety of data formats-
  • A more robust and feature-rich DSL with functions designed for common tasks-

####RDDs versus DataFrames

  • Similarities

  • Both are fault-tolerant, partitioned data abstractions in Spark-

  • Both can handle disparate data sources-
  • Both are lazily evaluated (execution happens when an output operation is performed on them), thereby having the ability to take the most optimized execution plan-
  • Both APIs are available in all four languages: Scala, Python, Java, and R-

  • Differences

  • DataFrames are a higher-level abstraction than RDDs-

  • The definition of RDD implies defining a Directed Acyclic Graph (DAG) whereas defining a DataFrame leads to the creation of an Abstract Syntax Tree (AST). An AST will be utilized and optimized by the Spark SQL catalyst engine.-
  • RDD is a general data structure abstraction whereas a DataFrame is a specialized data structure to deal with two-dimensional, table-like data.-

####DataFrame Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
//Chapter 3 Introduction to DataFrames - Scala example code
//Creating DataFrames from RDDs
//Create a list of colours
Scala> val colors = List("white","green","yellow","red","brown","pink")
//Distribute a local collection to form an RDD
//Apply map function on that RDD to get another RDD containing colour, length tuples
Scala> val color_df = sc.parallelize(colors).map(x
=> (x,x.length)).toDF("color","length")
Scala> color_df
res0: org.apache.spark.sql.DataFrame = [color: string, length: int]
Scala> color_df.dtypes //Note the implicit type inference
res1: Array[(String, String)] = Array((color,StringType), (length,IntegerType))
Scala> color_df.show() ()//Final output as expected. Order need not be the same as shown
+------+------+
| color|length|
+------+------+
| white| 5|
| green| 5|
|yellow| 6|
| red| 3|
| brown| 5|
| pink| 4|
+------+------+
//Creating DataFrames from JSON
//Pass the source json data file path
//Note: SQLCONTEXT is deprecated in Spark 2+ so use spark as entry point
// or create sqlContext as shown
//val sqlContext = new org.apache.spark.sql.SQLContext(sc)
Scala> val df = spark.read.json("./authors.json")
Scala> df.show() //json parsed; Column names and data types inferred implicitly
+----------+---------+
|first_name|last_name|
+----------+---------+
| Mark| Twain|
| Charles| Dickens|
| Thomas| Hardy|
+----------+---------+
//The following example assumes MYSQL is already running and the required library is imported
//Launch shell with driver-class-path as a command line argument
spark-shell --driver-class-path /usr/share/java/mysql-connector-java.jar
//Pass the connection parameters
scala> val peopleDF = sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:mysql://localhost",
"dbtable" -> "test.people",
"user" -> "root",
"password" -> "mysql")).load()
peopleDF: org.apache.spark.sql.DataFrame = [first_name: string, last_name: string, gender: string, dob: date, occupation: string, person_id: int]
//Retrieve table data as a DataFrame
scala> peopleDF.show()
+----------+---------+------+----------+----------+---------+
|first_name|last_name|gender| dob|occupation|person_id|
+----------+---------+------+----------+----------+---------+
| Thomas| Hardy| M|1840-06-02| Writer| 101|
| Emily| Bronte| F|1818-07-30| Writer| 102|
| Charlotte| Bronte| F|1816-04-21| Writer| 103|
| Charles| Dickens| M|1812-02-07| Writer| 104|
+----------+---------+------+----------+----------+---------+
//Creating DataFrames from Apache Parquet
//Write DataFrame contents into Parquet format
scala> peopleDF.write.parquet("writers.parquet")
//Read Parquet data into another DataFrame
scala> val writersDF = sqlContext.read.parquet("writers.parquet")
writersDF: org.apache.spark.sql.DataFrame = [first_name: string, last_name: string, gender: string, dob: date, occupation: string, person_id: int]
//DataFrame operations
//Create a local collection of colors first
Scala> val colors = List("white","green","yellow","red","brown","pink")
//Distribute a local collection to form an RDD
//Apply map function on that RDD to get another RDD containing color, length tuples and convert that RDD to a DataFrame
Scala> val color_df = sc.parallelize(colors).map(x
=> (x,x.length)).toDF("color","length")
//Check the object type
Scala> color_df
res0: org.apache.spark.sql.DataFrame = [color: string, length: int]
//Check the schema
Scala> color_df.dtypes
res1: Array[(String, String)] = Array((color,StringType), (length,IntegerType))
//Check row count
Scala> color_df.count()
res4: Long = 6
//Look at the table contents. You can limit displayed rows by passing parameter to show
Scala> color_df.show()
+------+------+
| color|length|
+------+------+
| white| 5|
| green| 5|
|yellow| 6|
| red| 3|
| brown| 5|
| pink| 4|
+------+------+
//List out column names
Scala> color_df.columns
res5: Array[String] = Array(color, length)
//Drop a column. The source DataFrame color_df remains the same.
//Spark returns a new DataFrame which is being passed to show
Scala> color_df.drop("length").show()
+------+
| color|
+------+
| white|
| green|
|yellow|
| red|
| brown|
| pink|
+------+
//Convert to JSON format
Scala> color_df.toJSON.first()
res9: String = {“color”:”white”,”length”:5}
//filter operation is similar to WHERE clause in SQL
//You specify conditions to select only desired columns and rows
//Output of filter operation is another DataFrame object that is usually passed on to some more operations
//The following example selects the colors having a length of four or five only and label the column as “mid_length”
filter
------
Scala> color_df.filter(color_df("length").between(4,5)).select(
color_df("color").alias("mid_length")).show()
+----------+
|mid_length|
+----------+
| white|
| green|
| brown|
| pink|
+----------+
//This example uses multiple filter criteria. Notice the not equal to operator having double equal to symbols
Scala> color_df.filter(color_df("length") > 4).filter(color_df("color")!== "white").show()
+------+------+
| color|length|
+------+------+
| green| 5|
|yellow| 6|
| brown| 5|
+------+------+
//Sort the data on one or more columns
sort
----
//A simple single column sorting in default (ascending) order
Scala> color_df.sort("color").show()
+------+------+
| color|length|
+------+------+
| brown| 5|
| green| 5|
| pink| 4|
| red| 3|
| white| 5|
|yellow| 6|
+------+------+
//First filter colors of length more than 4 and then sort on multiple columns
//The filtered rows are sorted first on the column length in default ascending order. Rows with same length are sorted on color in descending order
Scala> color_df.filter(color_df("length")>=4).sort($"length", $"color".desc).show()
+------+------+
| color|length|
+------+------+
| pink| 4|
| white| 5|
| green| 5|
| brown| 5|
|yellow| 6|
+------+------+
//You can use orderBy instead, which is an alias to sort.
scala> color_df.orderBy("length","color").take(4)
res19: Array[org.apache.spark.sql.Row] = Array([red,3], [pink,4], [brown,5], [green,5])
//Alternative syntax, for single or multiple columns
scala> color_df.sort(color_df("length").desc, color_df("color").asc).show()
+------+------+
| color|length|
+------+------+
|yellow| 6|
| brown| 5|
| green| 5|
| white| 5|
| pink| 4|
| red| 3|
+------+------+
//All the examples until now have been acting on one row at a time, filtering or transforming or reordering.
//The following example deals with regrouping the data.
//These operations require “wide dependency” and often involve shuffling.
groupBy
-------
Scala> color_df.groupBy("length").count().show()
+------+-----+
|length|count|
+------+-----+
| 3| 1|
| 4| 1|
| 5| 3|
| 6| 1|
+------+-----+
//Data often contains missing information or null values.
//The following json file has names of famous authors. Firstname data is missing in one row.
dropna
------
Scala> val df1 = sqlContext.read.json("./authors_missing.json")
Scala> df1.show()
+----------+---------+
|first_name|last_name|
+----------+---------+
| Mark| Twain|
| Charles| Dickens|
| null| Hardy|
+----------+---------+
//Let us drop the row with incomplete information
Scala> val df2 = df1.na.drop()
Scala> df2.show() //Unwanted row is dropped
+----------+---------+
|first_name|last_name|
+----------+---------+
| Mark| Twain|
| Charles| Dickens|
+----------+---------+

###Datasets
Apache Spark Datasets are an extension of the DataFrame API that provide a type-safe object-oriented programming interface. DataFrame becomes a generic, untyped Dataset; or a Dataset is a DataFrame with an added structure.
The unification of Datasets and DataFrames applies to Scala and Java API only.
At the core of Dataset abstraction are the encoders. These encoders translate between JVM objects and Spark’s internal Tungsten binary format. This internal representation bypasses JVM’s memory management and garbage collection. Spark has its own C-style memory access that is specifically written to address the kind of workflows it supports. The resultant internal representations take less memory and have efficient memory management. Compact memory representation leads to reduced network load during shuffle operations. The encoders generate compact byte code that directly operates on serialized objects without de-serializing, thereby enhancing performance. Knowing the schema early on results in a more optimal layout in memory when caching Datasets.

###Creating Datasets from JSON
Datasets can be created from JSON files, similar to DataFrames. Note that a JSON file may contain several records, but each record has to be on one line.If your source JSON has newlines, you have to programmatically remove them.

###Datasets API’s limitations

  • While querying the dataset, the selected fields should be given specific data types as in the case class, or else the output will become a DataFrame. An example is:-

[code lang=”scala”]
auth.select(col(“first_name”).as[String]).
[/code]

  • Python and R are inherently dynamic in nature, and hence typed Datasets do not fit in.-

####Datasets Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// Chapter 4 Unified Data Access - Scala example code
Datasets
---------
Example 1: Create a Dataset from a simple collection
scala> val ds1 = List.range(1,5).toDS()
ds1: org.apache.spark.sql.Dataset[Int] = [value: int]
//Perform an action
scala> ds1.collect()
res3: Array[Int] = Array(1, 2, 3, 4)
//Create from an RDD
scala> val colors = List("red","orange","blue","green","yellow")
scala> val color_ds = sc.parallelize(colors).map(x =>
(x,x.length)).toDS()
//Add a case class
case class Color(var color: String, var len: Int)
val color_ds = sc.parallelize(colors).map(x =>
Color(x,x.length)).toDS()
//Examine the structure
scala> color_ds.dtypes
res26: Array[(String, String)] = Array((color,StringType), (len,IntegerType))
scala> color_ds.schema
res25: org.apache.spark.sql.types.StructType = StructType(StructField(color,StringType,true),
StructField(len,IntegerType,false))
//Examine the execution plan
scala> color_ds.explain()
== Physical Plan ==
Scan ExistingRDD[color#57,len#58]
//Convert the Dataset to a DataFrame
scala> val color_df = color_ds.toDF()
color_df: org.apache.spark.sql.DataFrame = [color: string, len: int]
Example 2: Convert the dataset to a DataFrame
scala> color_df.show()
+------+---+
| color|len|
+------+---+
| red| 3|
|orange| 6|
| blue| 4|
| green| 5|
|yellow| 6|
+------+---+
Example 3: Convert a DataFrame to a Dataset
//Construct a DataFrame first
scala> val color_df = sc.parallelize(colors).map(x =>
(x,x.length)).toDF("color","len")
color_df: org.apache.spark.sql.DataFrame = [color: string, len: int]
//Convert the DataFrame to a Dataset with a given Structure
scala> val ds_from_df = color_df.as[Color]
ds_from_df: org.apache.spark.sql.Dataset[Color] = [color: string, len: int]
//Check the execution plan
scala> ds_from_df.explain
== Physical Plan ==
WholeStageCodegen
: +- Project [_1#102 AS color#105,_2#103 AS len#106]
: +- INPUT
+- Scan ExistingRDD[_1#102,_2#103]
//Example 4: Create a Dataset from json
//Set filepath
scala> val file_path = "<Your parh>/authors.json"
file_path: String = <Your path>/authors.json
//Create case class to match schema
scala> case class Auth(first_name: String, last_name: String,books: Array[String])
defined class Auth
//Create dataset from json using case class
//Note that the json document should have one record per line
scala> val auth = spark.read.json(file_path).as[Auth]
auth: org.apache.spark.sql.Dataset[Auth] = [books: array<string>, firstName: string ... 1 more field]
//Look at the data
scala> auth.show()
+--------------------+----------+---------+
| books|first_name|last_name|
+--------------------+----------+---------+
| null| Mark| Twain|
| null| Charles| Dickens|
|[Jude the Obscure...| Thomas| Hardy|
+--------------------+----------+---------+
//Try explode to see array contents on separate lines
scala> auth.select(explode($"books") as "book",
$"first_name",$"last_name").show(2,false)
+------------------------+----------+---------+
|book |first_name|last_name|
+------------------------+----------+---------+
|Jude the Obscure |Thomas |Hardy |
|The Return of the Native|Thomas |Hardy |
+------------------------+----------+---------+

##Spark SQL
Spark SQL is a Spark module for structured data processing.We’ll be focusing on window operations, which have been just introduced in Spark 2.0. They address sliding window operations.
For example, if you want to report the average peak temperature every day in the past seven days, then you are operating on a sliding window of seven days until today. Here is an example that computes average sales per month for the past three months. The data file contains 24 observations showing monthly sales for two products, P1 and P2.
The Catalyst optimizer contains libraries for representing trees and applying rules to transform the trees. These tree transformations are applied to create the most optimized logical and physical execution plans. In the final phase, it generates Java bytecode using a special feature of the Scala language called quasiquotes.
The optimizer also enables external developers to extend the optimizer by adding data-source-specific rules that result in pushing operations to external systems, or support for new data types.
The Catalyst optimizer arrives at the most optimized plan to execute the operations on hand. The actual execution and related improvements are provided by the Tungsten engine. The goal of Tungsten is to improve the memory and CPU efficiency of Spark backend execution. The following are some salient features of this engine:

  • Reducing the memory footprint and eliminating garbage collection overheads by bypassing (off-heap) Java memory management.-
  • Code generation fuses across multiple operators and too many virtual function calls are avoided. The generated code looks like hand-optimized code.-
  • Memory layout is in columnar, in-memory parquet format because that enables vectorized processing and is also closer to usual data access operations.-
  • In-memory encoding using encoders. Encoders use runtime code generation to build custom byte code for faster and compact serialization and deserialization. Many operations can be performed in-place without deserialization because they are already in Tungsten binary format.-

####Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//Example 5: Window example with moving average computation
scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window
//Create a DataFrame containing monthly sales data for two products
scala> val monthlySales = spark.read.options(Map({"header"->"true"},{"inferSchema" -> "true"})).
csv("<Your path>/MonthlySales.csv")
monthlySales: org.apache.spark.sql.DataFrame = [Product: string, Month: int ... 1 more field]
//Prepare WindowSpec to create a 3 month sliding window for a product
//Negative subscript denotes rows above current row
scala> val w = Window.partitionBy(monthlySales("Product")).orderBy(monthlySales("Month")).rangeBetween(-2,0)
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@3cc2f15
//Define compute on the sliding window, a moving average in this case
scala> val f = avg(monthlySales("Sales")).over(w)
f: org.apache.spark.sql.Column = avg(Sales) OVER (PARTITION BY Product ORDER BY Month ASC RANGE BETWEEN 2 PRECEDING AND CURRENT ROW)
//Apply the sliding window and compute. Examine the results
scala> monthlySales.select($"Product",$"Sales",$"Month",bround(f,2).alias("MovingAvg")).
orderBy($"Product",$"Month").show(6)
+-------+-----+-----+---------+
|Product|Sales|Month|MovingAvg|
+-------+-----+-----+---------+
| P1| 66| 1| 66.0|
| P1| 24| 2| 45.0|
| P1| 54| 3| 48.0|
| P1| 0| 4| 26.0|
| P1| 56| 5| 36.67|
| P1| 34| 6| 30.0|
+-------+-----+-----+---------+

##Structured Streaming
Apache Spark 2.0 has the first version of the higher level stream processing API called the Structured Streaming engine. This scalable and fault-tolerant engine leans on the Spark SQL API to simplify the development of real-time, continuous big data applications. It is probably the first successful attempt in unifying the batch and streaming computation.
At a technical level, Structured Streaming leans on the Spark SQL API, which extends DataFrames/Datasets,

##The Spark streaming programming model
The idea is to treat the real-time data stream as a table that is continuously being appended
Structured Streaming provides three output modes:

  • Append: In the external storage, only the new rows appended to the result table since the last trigger will be written. This is applicable only on queries where existing rows in the result table cannot change (for example, a map on an input stream).-
  • Complete: In the external storage, the entire updated result table will be written as is.-
  • Update: In the external storage, only the rows that were updated in the result table since the last trigger will be changed. This mode works for output sinks that can be updated in place, such as a MySQL table.-

####Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//Example 6: Streaming example
//Understand nc
// Netcat or nc is a networking utility that can be used for creating TCP/UDP connections
// -k Forces nc to stay listening for another connection after its current connection is completed.
// -l Used to specify that nc should listen for an incoming connection
// rather than initiate a connection to a remote host.
//Run system activity report to collect memory usage in one terminal window
// The following command shows memory utilization for every 2 seconds, 20 times
// It diverts the output to the given port and you can check raw output from the browser
//sar -r 2 20 | nc -lk 9999
//In spark-shell window, do the following
//Read stream
scala> val myStream = spark.readStream.format("socket").
option("host","localhost").
option("port",9999).load()
myStream: org.apache.spark.sql.DataFrame = [value: string]
//Filter out unwanted lines and then extract free memory part as a float
//Drop missing values, if any
scala> val myDF = myStream.filter($"value".contains("IST")).
select(substring($"value",15,9).cast("float").as("memFree")).
na.drop().select($"memFree")
myDF: org.apache.spark.sql.DataFrame = [memFree: float]
//Define an aggregate function
scala> val avgMemFree = myDF.select(avg("memFree"))
avgMemFree: org.apache.spark.sql.DataFrame = [avg(memFree): double]
//Create StreamingQuery handle that writes on to the console
scala> val query = avgMemFree.writeStream.
outputMode("complete").
format("console").
start()
query: org.apache.spark.sql.streaming.StreamingQuery = Streaming Query - query-0 [state = ACTIVE]
Batch: 0
-------------------------------------------
+-----------------+
| avg(memFree)|
+-----------------+
|4116531.380952381|
+-----------------+
....

理解Compressed Sparse Column Format (CSC)

最近在看《Spark for Data Science》这本书,阅读到《Machine Learning》这一节的时候被稀疏矩阵的存储格式CSC给弄的晕头转向的。所以专门写一篇文章记录一下我对这种格式的理解。

##目的
Compressed Sparse Column Format (CSC)的目的是为了压缩矩阵,减少矩阵存储所占用的空间。这很好理解,手法无法就是通过增加一些”元信息”来描述矩阵中的非零元素存储的位置(基于列),然后结合非零元素的值来表示矩阵。这样在一些场景下可以减少矩阵存储的空间。

##Spark API

在Spark中我们一般创建这样的稀疏矩阵的API为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 package org.apache.spark.ml.linalg
/**
* Creates a column-major sparse matrix in Compressed Sparse Column (CSC) format.
*
* @param numRows number of rows
* @param numCols number of columns
* @param colPtrs the index corresponding to the start of a new column
* @param rowIndices the row index of the entry
* @param values non-zero matrix entries in column major
*/
@Since("2.0.0")
def sparse(
numRows: Int,
numCols: Int,
colPtrs: Array[Int],
rowIndices: Array[Int],
values: Array[Double]): Matrix = {
new SparseMatrix(numRows, numCols, colPtrs, rowIndices, values)
}

##使用CSC格式表示稀疏矩阵
例如我们想创建一下如下的3x3的稀疏矩阵:

1
2
3
1	0	4
0 3 5
2 0 6

我们就可以使用上面的这个api:

1
2
3
4
5
6
7
8
9
10
	import org.apache.spark.ml.linalg.{Matrix,Matrices}
val sm: Matrix = Matrices.sparse(3,3, Array(0,2,3,6), Array(0,2,1,0,1,2), Array(1.0,2.0,3.0,4.0,5.0,6.0))
输出如下:
sm: org.apache.spark.ml.linalg.Matrix = 3 x 3 CSCMatrix
(0,0) 1.0
(2,0) 2.0
(1,1) 3.0
(0,2) 4.0
(1,2) 5.0
(2,2) 6.0

也就是说上面的3x3的矩阵,可以表示为下面3个数组:

1
2
3
Array(0, 2, 3, 6)
Array(0, 2, 1, 0, 1, 2)
Array(1, 2, 3, 4, 5, 6)

说实话我第一次看到这个api的时候有点蒙。下面因为没太看懂上面三个Array中的第一个Array(0, 2, 3, 6)是怎么的出来的。也翻看了比较权威的资料(本文最下方的参考资料),但是感觉说的比较不清楚,因此下面谈谈我是如何理解的。

##我的理解
上面的3个Array:(为了便于书写我没有写1.0,而是直接写为1)

1
2
3
Array(0, 2, 3, 6)
Array(0, 2, 1, 0, 1, 2)
Array(1, 2, 3, 4, 5, 6)

其中第三个Array很好理解。它的值就是按照,依次按照顺序记录的矩阵中的非零值。
第二个Array也比较好理解,他表示的是每一列,非零元素所在的行号,行号从0开始。比如上面的矩阵中,第一列元素1在第0行,元素2在第2行。
至于第1个Array理解起来稍微麻烦一些。我的总结就是:

  • 第一个Array的元素个数就是(矩阵的列数+1),也就是矩阵是3列,那么这个Array的个数就是4.
  • 第一个元素一直是0。第二个元素是第一列的非零元素的数量
  • 后续的值为前一个值 + 下一列非零元素的数量

上面的总结可能看起来比较模糊,根据上面的例子我来分析一下:

  • 首先矩阵的3x3的,所以第一个Array会有4个元素。第一个元素是0。得到Array(0)。
  • 矩阵第一列有2个非零元素,所以得到Array的第二个元素为2.得到Array(0, 2)
  • 矩阵的第二列有1个非零元素,那么第三个元素的数量为当前Array的最后一个元素加1,也就是2 + 1=3. 得到Array(0,2, 3)
  • 矩阵的第三列有3个非零元素,那么Array的最后一个元素的值为 3 + 3 = 6. 得到Array(0, 2, 3, 6)

##验证例子
对于下面的这个3x3的矩阵:

1
2
3
1	0	2
0 0 3
4 5 6

我们可以得到3个Array为:

1
2
3
Array(0, 2, 3, 6)
Array(0, 2, 2, 0, 1, 2)
Array(1, 4, 5, 2, 3, 6)

对于下面的矩阵:

1
2
3
9	0
0 8
0 6

我们可以得到3个Array来表示他:

1
2
3
Array(0, 1, 3)
Array(0, 1, 2)
Array(9, 8, 6)

对于下面的矩阵:

1
2
9	0	0	0
0 8 6 5

我们可以表示为:

1
2
3
Array(0, 1, 2, 3, 4)
Array(0, 1, 1, 1)
Array(9, 8, 6, 5)

##根据CSC表示法,画出原始矩阵
上面展示了如何把稀疏矩阵使用CSC表示,那么反过来应该怎么操作呢,
假设有一个2x4的矩阵,他的CSC表示为:

1
2
3
Array(0, 1, 2, 3, 4)
Array(0, 1, 1, 1)
Array(9, 8, 6, 5)

我大致描述一下还原的过程:

  • 首先我们知道是2x4的矩阵,并且第一个Array的第二个元素是1,而且后续的每一个元素都比前一个元素大1,说明每一列都只有1个非零元素。
  • 根据第二个数组,我们可以知道只有第一列的非零元素在第一行,2,3,4列的非零元素都在第二行
  • 根据第三个Array,我们就可以比较简单的画出原始矩阵。

##参考资料

使用Numpy进行矩阵的基本运算

本文介绍了使用Python的Numpy库进行矩阵的基本运算

##创建全0矩阵

1
2
3
# 创建3x5的全0矩阵
myZero = np.zeros([3, 5])
print myZero

输出结果:

1
2
3
[[ 0.  0.  0.  0.  0.]
[ 0. 0. 0. 0. 0.]
[ 0. 0. 0. 0. 0.]]

##创建全1矩阵

1
2
3
# 创建3x5的全1矩阵
myOnes = np.ones([3, 5])
print myOnes

输出结果:

1
2
3
[[ 1.  1.  1.  1.  1.]
[ 1. 1. 1. 1. 1.]
[ 1. 1. 1. 1. 1.]]

##创建0~1之间的随机矩阵

1
2
3
# 3x4的0~1之间的随机数矩阵
myRand = np.random.rand(3, 4)
print myRand

输出结果为:

1
2
3
[[ 0.26845651  0.26713961  0.12632736  0.69840295]
[ 0.92745819 0.44091417 0.21733213 0.76135785]
[ 0.97161283 0.13570203 0.07819361 0.72129986]]

##创建单位矩阵

1
2
3
# 3x3的单位矩阵
myEye = np.eye(3)
print myEye

输出结果为:

1
2
3
[[ 1.  0.  0.]
[ 0. 1. 0.]
[ 0. 0. 1.]]

##矩阵求和

1
print myZero + myOnes

输出结果为:

1
2
3
[[ 1.  1.  1.  1.  1.]
[ 1. 1. 1. 1. 1.]
[ 1. 1. 1. 1. 1.]]

##矩阵求差

1
print myZero - myOnes

输出结果为:

1
2
3
[[-1. -1. -1. -1. -1.]
[-1. -1. -1. -1. -1.]
[-1. -1. -1. -1. -1.]]

##创建矩阵

1
2
myMatrix = np.mat([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
print myMatrix

输出结果为:

1
2
3
[[1 2 3]
[4 5 6]
[7 8 9]]

##矩阵乘以常数

1
print 10 * myMatrix

输出结果为:

1
2
3
[[10 20 30]
[40 50 60]
[70 80 90]]

##矩阵所有元素求和

1
print np.sum(myMatrix)

输出结果为45

##矩阵乘法
当维度相同的时候,为各个位置对应元素的乘积
当矩阵的维度不同时,会根据一定的广播规则将维数扩充到一致的形式

1
2
3
4
5
6
myMatrix2 = 2 * myMatrix
print myMatrix2
print np.multiply(myMatrix, myMatrix2)
myMatrix3 = np.mat([[1], [2], [3]])
print(myMatrix3)
print myMatrix * myMatrix3

输出结果为:

1
2
3
4
5
6
7
8
9
10
11
12
[[ 2  4  6]
[ 8 10 12]
[14 16 18]]
[[ 2 8 18]
[ 32 50 72]
[ 98 128 162]]
[[1]
[2]
[3]]
[[14]
[32]
[50]]

##矩阵的幂

1
print np.power(myMatrix, 2)

输出结果为:

1
2
3
[[ 1  4  9]
[16 25 36]
[49 64 81]]

##矩阵的转置

1
2
3
print myMatrix
print myMatrix.T
print myMatrix.transpose()

输出结果为:

1
2
3
4
5
6
7
8
9
[[1 2 3]
[4 5 6]
[7 8 9]]
[[1 4 7]
[2 5 8]
[3 6 9]]
[[1 4 7]
[2 5 8]
[3 6 9]]

##矩阵的其他操作:行列数、切片、复制、比较

1
2
3
4
5
6
7
8
9
10
11
[m, n] = myMatrix.shape
print "矩阵的行列数为:", m, n
# 按照行切片(输出矩阵的行)
print myMatrix[0]
#按照列切片 (输出矩阵的列)
print myMatrix.T[0]
#矩阵的复制
myMatrixCopy = myMatrix.copy()
print myMatrixCopy
#矩阵的比较
print myMatrix < myMatrix.T

输出结果为:

1
2
3
4
5
6
7
8
9
矩阵的行列数为: 3 3
[[1 2 3]]
[[1 4 7]]
[[1 2 3]
[4 5 6]
[7 8 9]]
[[False True True]
[False False True]
[False False False]]

本文章全部代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# -*- coding: utf-8 -*-
import numpy as np
# 创建3x5的全0矩阵
myZero = np.zeros([3, 5])
print myZero
# 创建3x5的全1矩阵
myOnes = np.ones([3, 5])
print myOnes
# 3x4的0~1之间的随机数矩阵
myRand = np.random.rand(3, 4)
print myRand
# 3x3的单位矩阵
myEye = np.eye(3)
print myEye
# 矩阵求和
print myZero + myOnes
# 矩阵求差
print myZero - myOnes
# 创建矩阵
myMatrix = np.mat([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
print myMatrix
# 矩阵乘以常数
print 10 * myMatrix
# 矩阵所有元素求和
print np.sum(myMatrix)
# 矩阵乘法
# 当维度相同的时候,为各个位置对应元素的乘积
# 当矩阵的维度不同时,会根据一定的广播规则将维数扩充到一致的形式
myMatrix2 = 2 * myMatrix
print myMatrix2
print np.multiply(myMatrix, myMatrix2)
# 矩阵的幂
print np.power(myMatrix, 2)
myMatrix3 = np.mat([[1], [2], [3]])
print(myMatrix3)
print myMatrix * myMatrix3
#矩阵的转置
print myMatrix
print myMatrix.T
print myMatrix.transpose()
##矩阵的其他操作:行列数、切片、复制、比较
[m, n] = myMatrix.shape
print "矩阵的行列数为:", m, n
# 按照行切片(输出矩阵的行)
print myMatrix[0]
#按照列切片 (输出矩阵的列)
print myMatrix.T[0]
#矩阵的复制
myMatrixCopy = myMatrix.copy()
print myMatrixCopy
#矩阵的比较
print myMatrix < myMatrix.T

促销系统设计

写在前面

首先必须得说一下,我并没有实际参与过电商系统相关的业务,我一直工作的项目组做的事情和本篇文章要讲的东西完全不同。因此本篇文章仅仅是我自己平时观察和构想的一些整理,如果有不太合理的地方,希望大家指正,先谢谢大家。

文章简介

在各大电商网站上,基本时时刻刻都可以看到促销活动。相信大家基本都参与过一些促销活动。随着业务的复杂化、运营的精细化,以及品类、平台、渠道的不断丰富,各种新的促销形式也层出不穷,贯穿从商品展示、搜索、购买、支付等整个流程。虽然促销的商品本身千差万别,但是但对于促销这个事情来说,又有很多共同的地方,本篇文章希望可以归纳总结出一套设计促销系统模型的方法论出来。

促销系统介绍

如果需要给促销一个定义的话,那么促销就是:

在某个时间范围内,对满足某些条件的用户,给予满足某些约束的商品进行一定形式的优惠

而促销系统就是为了支撑若干个这样的促销活动而设计出来的系统。
促销规则的生效页面是购物车页面和结算页面。在结算页面比购物车页面多出的是对运费的处理,其它的信息和购物车页面的信息是一致。只有在顾客将某个产品加入购物车后,基于购物车内的产品进行计算分析才会得出折扣后的价格、赠送或其它信息。当然在具体结算的时候,也会根据用户所选择的购物车中的项目重新计算折扣后的价格、赠送等其他信息的

常见的促销活动例子:

  • 购买的图书满100减20,满200减50
  • 购买某商品,赠送另外一个商品
  • 满200元任选一个赠品
  • 某商品特价
  • 买A商品,在买B商品,则给予一定的折扣
  • 满多少免运费
  • ……

促销系统模型的目标

  • 功能强大
  • 可扩展性好
  • 与其他系统耦合度低

促销系统模型的设计

基本的促销模型

基本的促销模型

基本信息模型

促销条件

优惠

限额

规则

参考资料

RBAC权限系统设计

##Role Based Access Control

Role-based access control (RBAC) is a method of regulating access to computer or network resources based on the roles of individual users within an enterprise. In this context, access is the ability of an individual user to perform a specific task, such as view, create, or modify a file.

(The National Institute of Standards and Technology,美国国家标准与技术研究院)标准RBAC模型由4个部件模型组成,这4个部件模型分别是:

  • 基本模型RBAC0(Core RBAC)
  • 角色分级模型RBAC1(Hierarchal RBAC)
  • 角色限制模型RBAC2(Constraint RBAC)
  • 统一模型RBAC3(Combines RBAC)

关于这四个区别,建议大家直接看本文参考资料中第二个链接:标准RBAC模型由4个部件模型,这篇文章说的很清楚。

##我的理解

本篇文章我打算简要的描述一下我在权限系统设计方面的一些心得吧,欢迎大家斧正。
基于RBAC的权限系统的设计,简单的描述就是通过角色和权限(组)关联起来。一个用户从属于某些个角色,每个角色都拥有若干个权限(组)。这样就可以构建出用户-角色-权限的模型出来。
当然了,系统有大有小,复杂度不同,因此对于权限的系统的设计要求也不尽相同。但是基本都大同小异。
我这边参与过的系统设计一般遵循RBAC0权限模型。

http://7niucdn.wenchao.ren/16-12-12/48145442-file_1481543097370_d3b6.png

在这个模型中我并没有画出权限组这个模型出来,当然各位的如果权限实在太多,需要对权限进行分组的话,可以增加一个权限组。
当然大家也可以按照自己系统的要求,来具体选择使用的权限模型。

##参考资料

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×