譯者注:Apache Flink是一個面向分布式數據流處理和批量數據處理的開源計算平臺。作者在本文介紹了一些如何優化Flink應用速度的方式。以下為譯文。
Flink框架非常復雜,并提供了許多方法來調整其執行方式。本文我將介紹提高Flink應用程序性能的四種不同方法。
如果不熟悉Flink,你可以閱讀一些介紹性的文章,比如這篇,這篇,還有這篇。但是如果已經非常熟悉Apache Flink了,本文描述的內容可以幫助你如何提高應用程序的運行速度。
使用Flink Tuples
當使用類似于groupBy、join或keyBy這些操作時,Flink提供了多種方式以便用戶在數據集中選擇主鍵。用戶可以使用主鍵選擇函數:
// Join movies and ratings datasets
movies.join(ratings)
// Use movie id as a key in both cases
.where(new KeySelector<Movie, String>() {
@Override
public String getKey(Movie m) throws Exception {
return m.getId();
}
})
.equalTo(new KeySelector<Rating, String>() {
@Override
public String getKey(Rating r) throws Exception {
return r.getMovieId();
}
})
-
1
-
2
-
3
-
4
-
5
-
6
-
7
-
8
-
9
-
10
-
11
-
12
-
13
-
14
-
15
-
16
也可以在POJO類型中指定字段名稱:
movies.join(ratings)
// Use same fields as in the previous example
.where("id")
.equalTo("movieId")
但是如果現在使用的是Flink tuple類型,那么只要簡單地指定字段元組的位置,就可以被用作主鍵了:
DataSet<Tuple2<String, String>> movies = ...
DataSet<Tuple3<String, String, Double>> ratings = ...
movies.join(ratings)
// Specify fields positions in tuples
.where(0)
.equalTo(1)
可見后一種方式的性能是好的,但是可讀性怎么樣呢?代碼現在看起來是不是就像下面這樣?
DataSet<Tuple3<Integer, String, Double>> result = movies.join(ratings)
.where(0)
.equalTo(0)
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,Double>, Tuple3<Integer, String, Double>>() {
// What is happening here?
@Override
public Tuple3<Integer, String, Double> join(Tuple2<Integer, String> first, Tuple2<Integer, Double> second) throws Exception {
// Some tuples are joined with some other tuples and some fields are returned???
return new Tuple3<>(first.f0, first.f1, second.f1);
}
});
-
1
-
2
-
3
-
4
-
5
-
6
-
7
-
8
-
9
-
10
-
11
-
12
在本例中,想要提高可讀性,常見的做法就是創建一個類,該類需要繼承TupleX類,并為類里面的這些字段實現getter和setter。下面是Flink Gelly庫的Edge類,繼承了Tuple3類:
public class Edge<K, V> extends Tuple3<K, K, V> {
public Edge(K source, K target, V value) {
this.f0 = source;
this.f1 = target;
this.f2 = value;
}
// Getters and setters for readability
public void setSource(K source) {
this.f0 = source;
}
public K getSource() {
return this.f0;
}
// Also has getters and setters for other fields
...
}
-
1
-
2
-
3
-
4
-
5
-
6
-
7
-
8
-
9
-
10
-
11
-
12
-
13
-
14
-
15
-
16
-
17
-
18
-
19
拒絕使用Flink Objects
另一個可以用來提高Flink應用程序性能的選項是,當從用戶定義的函數返回數據時,好使用可變對象。看看下面這個例子:
stream
.apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
@Override
public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
long changesCount = ...
// A new Tuple instance is created on every execution
collector.collect(new Tuple2<>(userName, changesCount));
}
}
可以看出,apply函數每執行一次,都會新建一個Tuple2類的實例,因此增加了對垃圾收集器的壓力。解決這個問題的一種方法是反復使用相同的實例:
stream
.apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
// Create an instance that we will reuse on every call
private Tuple2<String, Long> result = new Tuple<>();
@Override
public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
long changesCount = ...
// Set fields on an existing object instead of creating a new one
result.f0 = userName;
// Auto-boxing!! A new Long value may be created
result.f1 = changesCount;
// Reuse the same Tuple2 object
collector.collect(result);
}
}
-
1
-
2
-
3
-
4
-
5
-
6
-
7
-
8
-
9
-
10
-
11
-
12
-
13
-
14
-
15
-
16
-
17
-
18
-
19
這種做法更好一點。雖然每次調用時都新建一個Tuple2的實例,但是其實還間接創建了Long類的實例。為了解決這個問題,Flink有許多所謂的value class:IntValue、LongValue、StringValue、FloatValue等。下面介紹一下如何使用它們:
stream
.apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
// Create a mutable count instance
private LongValue count = new IntValue();
// Assign mutable count to the tuple
private Tuple2<String, LongValue> result = new Tuple<>("", count);
@Override
// Notice that now we have a different return type
public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, LongValue>> collector) throws Exception {
long changesCount = ...
// Set fields on an existing object instead of creating a new one
result.f0 = userName;
// Update mutable count value
count.setValue(changesCount);
// Reuse the same tuple and the same LongValue instance
collector.collect(result);
}
}
-
1
-
2
-
3
-
4
-
5
-
6
-
7
-
8
-
9
-
10
-
11
-
12
-
13
-
14
-
15
-
16
-
17
-
18
-
19
-
20
-
21
-
22
這種做法經常用在Flink庫里面,如Flink Gelly。
使用注解功能
優化Flink應用程序的另一種方法是提供一些關于用戶自定義的函數會對輸入數據做哪些操作的信息。由于Flink無法解析和理解代碼,所以可以提供一些有利于構建更有效執行計劃的重要信息。可以使用以下三個注解:
-
@ForwardedFields:指定輸入值中哪些字段保持不變,哪些字段是用于輸出的。
-
@NotForwardedFields:指定在輸出中未保留相同位置的字段。
-
@ReadFields:指定用來計算結果值的字段。指定的字段應該只在計算中使用,而不僅僅是復制到輸出參數中。
看一下如何使用ForwardedFields注釋:
// Specify that the first element is copied without any changes
@ForwardedFields("0")
class MyFunction implements MapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {
@Override
public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
// Copy first field without change
return new Tuple2<>(value.f0, value.f1 + 123);
}
}
這意味著輸入元組中的個元素沒有被更改,它將返回到相同的位置。
如果不更改字段,但只需將其移動到另一個位置,那么也可以使用ForwardedFields。在下一個示例中,我們在輸入tuple中互換一下字段,并通知Flink:
// 1st element goes into the 2nd position, and 2nd element goes into the 1st position
@ForwardedFields("0->1; 1->0")
class SwapArguments implements MapFunction<Tuple2<Long, Double>, Tuple2<Double, Long>> {
@Override
public Tuple2<Double, Long> map(Tuple2<Long, Double> value) {
// Swap elements in a tuple
return new Tuple2<>(value.f1, value.f0);
}
}
上面提到的注解只能應用于只有一個輸入參數的函數,例如map或flatMap。如果函數有兩個輸入參數,則可以使用ForwardedFieldsFirst和ForwardedFieldsSecond,分別提供關于個參數和第二個參數的信息。
下面是如何在JoinFunction接口的實現中使用這些注釋:
// Two fields from the input tuple are copied to the first and second positions of the output tuple
@ForwardedFieldsFirst("0; 1")
// The third field from the input tuple is copied to the third position of the output tuple
@ForwardedFieldsSecond("2")
class MyJoin implements JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,Double>, Tuple3<Integer, String, Double>>() {
@Override
public Tuple3<Integer, String, Double> join(Tuple2<Integer, String> first, Tuple2<Integer, Double> second) throws Exception {
return new Tuple3<>(first.f0, first.f1, second.f1);
}
})
Flink還提供NotForwardedFieldsFirst、NotForwardedFieldsSecond、ReadFieldsFirst ReadFirldsSecond注釋,這些注釋都可以達到類似目的。
Select Join Type
如果給Flink另一個提示,那么就可以讓joins速度更快,但是在討論它的工作原理之前,先討論一下Flink是如何執行joins的。
當Flink處理批量數據時,集群中的每臺機器都存儲了部分數據。要執行join,Apache Flink需要找到滿足連接條件的兩個數據集。為了做到這一點,Flink首先必須將兩個數據集的項目放在同一臺機器上。這里有兩種策略:
-
Repartition-分配策略:在這種情況下,兩個數據集都被各自的主鍵分離了,并通過網絡發送。這意味著如果數據集很大,可能需要大量的時間才能通過網絡完成復制。
-
廣播轉發策略:在這種情況下,一個數據集不受影響,但是第二個數據集被復制到集群中的每臺機器上,它們都有個數據集的一部分。
如果是將某個小數據集join到更大的數據集,那么可以使用廣播轉發策略,這樣也可以避免個數據集的分區付出的昂貴代價。這很容易做到:
ds1.join(ds2, JoinHint.BROADCAST_HASH_FIRST)
這就表示個數據集比第二個數據集小得多。
你也可以使用其他連接提示:
-
BROADCAST_HASH_SECOND:第二個數據集要小得多
-
REPARTITION_HASH_FIRST:個數據集稍微小一些
-
REPARTITION_HASH_SECOND:第二個數據集要小一點
-
REPARTITION_SORT_MERGE:使用排序和合并策略對數據集進行重新分配
-
**OPTIMIZER_CHOOSES:**Flink優化器將決定如何join數據集
閱讀這篇文章可以更加了解Flink是如何在本文中執行join的。
本站文章版權歸原作者及原出處所有 。內容為作者個人觀點, 并不代表本站贊同其觀點和對其真實性負責,本站只提供參考并不構成任何投資及應用建議。本站是一個個人學習交流的平臺,網站上部分文章為轉載,并不用于任何商業目的,我們已經盡可能的對作者和來源進行了通告,但是能力有限或疏忽,造成漏登,請及時聯系我們,我們將根據著作權人的要求,立即更正或者刪除有關內容。本站擁有對此聲明的最終解釋權。