《數(shù)據(jù)實時處理flink》課件-第四章 DataStream API的介紹和使用_第1頁
《數(shù)據(jù)實時處理flink》課件-第四章 DataStream API的介紹和使用_第2頁
《數(shù)據(jù)實時處理flink》課件-第四章 DataStream API的介紹和使用_第3頁
《數(shù)據(jù)實時處理flink》課件-第四章 DataStream API的介紹和使用_第4頁
《數(shù)據(jù)實時處理flink》課件-第四章 DataStream API的介紹和使用_第5頁
已閱讀5頁,還剩42頁未讀 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

DataStreamAPI的介紹和使用本章將詳細介紹DataStreamAPI中各函數(shù)的使用方法。Flink處理程序應該包含三部分:數(shù)據(jù)源(Source)、轉換操作(Transformation)、結果接收(Sink)。我們從這三部分來介紹DataStreamAPI的相關內容。主要包括DataStreamAPI介紹和示例使用、應用技巧、基本知識點總結和需要注意事項。通過本節(jié)學習您將可以:熟悉Flink程序的骨架結構。熟悉各函數(shù)的功能和使用方法。了解Flink的數(shù)據(jù)類型和序列化。學會用戶自定義函數(shù)。Flink程序的骨架結構常見Transformation的使用方法數(shù)據(jù)類型和序列化用戶自定義函數(shù)

Flink程序的骨架結構初始化運行環(huán)境讀取一到多個Source數(shù)據(jù)源根據(jù)業(yè)務邏輯對數(shù)據(jù)流進行Transformation轉換將結果輸出到Sink調用作業(yè)執(zhí)行函數(shù)執(zhí)行環(huán)境是作業(yè)與集群交互的入口設置并行度關閉算子鏈時間、Checkpoint…流處理和批處理的執(zhí)行環(huán)境不一樣Java、Scala兩套API設置執(zhí)行環(huán)境//創(chuàng)建Flink執(zhí)行環(huán)境

StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.disableOperatorChaining();

Source、Transformation和SinkSource讀取數(shù)據(jù)源統(tǒng)稱為Source文件系統(tǒng)、消息隊列、數(shù)據(jù)庫等Transformation使用Flink提供的各類函數(shù),進行有狀態(tài)的計算數(shù)據(jù)流的分組、窗口和聚合操作等Sink將計算結果輸出到外部系統(tǒng),統(tǒng)稱為Sink目的地可以是文件系統(tǒng)、消息隊列、數(shù)據(jù)庫等Flink是延遲執(zhí)行(LazyEvaluation)的調用execute()方法,F(xiàn)link才會真正執(zhí)行否則無法得到計算結果字符串參數(shù)為當前作業(yè)名執(zhí)行//execute

env.execute("kafkastreamingwordcount");Flink程序的骨架結構常見Transformation的使用方法數(shù)據(jù)類型和序列化用戶自定義函數(shù)數(shù)據(jù)傳輸、持久化序列化:將內存對象轉換成二進制串、網(wǎng)絡可傳輸或可持久化反序列化:將二進制串轉換為內存對象,可直接在編程語言中讀寫和操作常見序列化方式:JSONJava、Kryo、Avro、Thrift、ProtobufFlink開發(fā)了自己的序列化框架更早地完成類型檢查節(jié)省數(shù)據(jù)存儲空間序列化和反序列化基礎類型Java、Scala基礎數(shù)據(jù)類型數(shù)組復合類型Scala

case

classJava

POJOTuple輔助類型Option、List、Map泛型和其他類型GenericFlink支持的數(shù)據(jù)類型TypeInformaton用來表示數(shù)據(jù)類型,創(chuàng)建序列化器每種數(shù)據(jù)類型都對應一個TypeInfomationTupleTypeInfo、PojoTypeInfo

…TypeInformationFlink會自動推斷類型,調用對應的序列化器,對數(shù)據(jù)進行序列化和反序列化類型推斷和序列化packagemon.typeinfo;public

class

Types{//java.lang.Void

public

static

finalTypeInformation<Void>VOID=BasicTypeInfo.VOID_TYPE_INFO;//java.lang.String

public

static

finalTypeInformation<String>STRING=BasicTypeInfo.STRING_TYPE_INFO;//java.lang.Boolean

public

static

finalTypeInformation<Boolean>BOOLEAN=BasicTypeInfo.BOOLEAN_TYPE_INFO;//java.lang.Integer

public

static

finalTypeInformation<Integer>INT=BasicTypeInfo.INT_TYPE_INFO;//java.lang.Long

public

static

finalTypeInformation<Long>LONG=BasicTypeInfo.LONG_TYPE_INFO;...}一些基礎類型的TypeInformation:Types.STRING是用來表示java.lang.String的TypeInformationTypes.STRING被定義為BasicTypeInfo.STRING_TYPE_INFOSTRING_TYPE_INFO:使用何種序列化器和比較器類型推斷和序列化public

static

finalBasicTypeInfo<String> STRING_TYPE_INFO= newBasicTypeInfo<>( String.class, newClass<?>[]{}, StringSerializer.INSTANCE, StringComparator.class);STRING_TYPE_INFO定義使用何種序列化器和比較器:在聲明式文件中定義Schema使用工具將Schema轉換為Java可用的類Avro

Specific生成的類與POJO類似有getter、setter方法在Flink中可以像使用POJO一樣使用Avro

Specific模式Avro

Generic不生成具體的類用GenericRecord封裝所有用戶定義的數(shù)據(jù)結構必須給Flink提供Schema信息Avro{"namespace":"org.apache.flink.tutorials.avro","type":"record","name":"MyPojo","fields":[ {"name":"id","type":"int"}, {"name":"name","type":"string"}]}Avro聲明式文件:Kryo是大數(shù)據(jù)領域經(jīng)常使用的序列化框架Flink無法推斷出數(shù)據(jù)類型時,將該數(shù)據(jù)類型定義為GenericTypeInfo,使用Kryo作為后備選項進行序列化最好實現(xiàn)自己的序列化器,并對數(shù)據(jù)類型和序列化器進行注冊Kryo在有些場景效率不高env.getConfig.disableGenericTypes()禁用Kryo,可以定位到具體哪個類型無法被Flink自動推斷,然后針對該類型創(chuàng)建更高效的序列化器Kryo注冊數(shù)據(jù)類型和序列化器://將MyCustomType類進行注冊

env.getConfig().registerKryoType(MyCustomType.class);//或者使用下面的方式并且實現(xiàn)自定義序列化器

env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class,MyCustomSerializer.class);static

class

MyClassSerializer

extends

Serializer<MyCustomType>implements

Serializable

{private

static

final

longserialVersionUID=...@Overridepublic

void

write(Kryokryo,Outputoutput,MyCustomTypemyCustomType)

{...}@OverridepublicMyCustomTyperead(Kryokryo,Inputinput,Class<MyCustomType>type)

{...}}與Avro

Specific模式相似,使用聲明式語言定義Schema,使用工具將聲明式語言轉化為Java類有人已經(jīng)實現(xiàn)好Kryo的序列化器案例:MyCustomType是使用Thrift工具生成的Java類,TBaseSerializer是com.twitter:chill-thrift包中別人實現(xiàn)好的序列化器,該序列化器基于Kryo的Serializer。注意在pom.xml中添加相應的依賴Thrift、Protobuf//GoogleProtobuf

//MyCustomType類是使用Protobuf生成的Java類

//ProtobufSerializer是別人實現(xiàn)好的序列化器

env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class,ProtobufSerializer.class);//ApacheThrift

//MyCustomType是使用Thrift生成的Java類

//TBaseSerializer是別人實現(xiàn)好的序列化器

env.getConfig().addDefaultKryoSerializer(MyCustomType.class,TBaseSerializer.class);Flink的數(shù)據(jù)類型:Java、Scala、Table

API分別有自己的數(shù)據(jù)類型體系絕大多數(shù)情況下,程序員不需要關心使用何種TypeInformation,只需要使用自己所需的數(shù)據(jù)類型Flink會做類型推斷、選擇對應的序列化器當自動類型推斷失效,用戶需要關注TypeInformation數(shù)據(jù)類型選擇:需要考慮:上下游的數(shù)據(jù)結構、序列化器的性能、狀態(tài)數(shù)據(jù)的持續(xù)迭代能力POJO和Tuple等內置類型性能更好Avro、Thrift和Protobuf對上下游數(shù)據(jù)的兼容性更好,不需要在Flink應用中重新設計一套POJOPOJO和Avro對Flink狀態(tài)數(shù)據(jù)的持續(xù)迭代更友好數(shù)據(jù)類型小結Flink程序的骨架結構常見Transformation的使用方法數(shù)據(jù)類型和序列化用戶自定義函數(shù)單數(shù)據(jù)流轉換基于Key的分組轉換多數(shù)據(jù)流轉換數(shù)據(jù)重分布轉換DataStream<T>泛型T為數(shù)據(jù)流中每個元素的類型四類Tranformation轉換每個輸入元素對應一個輸出元素重寫MapFunction或RichMapFunctionMapFunction<T,O>

T為輸入類型O為輸出類型實現(xiàn)其中的map()虛方法主邏輯中調用該函數(shù)單數(shù)據(jù)流轉換-

map@FunctionalInterfacepublic

interface

MapFunction<T,O>extends

Function,Serializable{//調用這個API就是繼承并實現(xiàn)這個虛函數(shù)

Omap(Tvalue)

throwsException;}//第一個泛型是輸入類型,第二個泛型是輸出類型

public

static

class

DoubleMapFunction

implements

MapFunction<Integer,String>{@OverridepublicStringmap(Integerinput)

{ return

"functioninput:"+input+",output:"+(input*2);}}DataStream<String>functionDataStream=dataStream.map(newDoubleMapFunction());MapFunction源代碼一個MapFunction的實現(xiàn)直接繼承接口類并實現(xiàn)map虛方法上頁所示使用匿名類使用Lambda表達式單數(shù)據(jù)流轉換-

map//匿名類

DataStream<String>anonymousDataStream=dataStream.map(newMapFunction<Integer,String>(){@OverridepublicStringmap(Integerinput)

throwsException{ return

"anonymousfunctioninput:"+input+",output:"+(input*2);}});//使用Lambda表達式

DataStream<String>lambdaStream=dataStream .map(input->"lambdainput:"+input+",output:"+(input*2));匿名類實現(xiàn)MapFunctionLambda表達式實現(xiàn)MapFunction對輸入元素進行過濾繼承并實現(xiàn)FilterFunction或RichFilterFunction重寫filter虛方法True

–保留False

–過濾單數(shù)據(jù)流轉換-

filterDataStream<Integer>dataStream=senv.fromElements(1,2,-3,0,5,-9,8);//使用->構造Lambda表達式

DataStream<Integer>lambda=dataStream.filter(input->input>0);public

static

class

MyFilterFunction

extends

RichFilterFunction<Integer>{//limit參數(shù)可以從外部傳入

privateIntegerlimit;public

MyFilterFunction(Integerlimit)

{this.limit=limit;}@Overridepublic

boolean

filter(Integerinput)

{ returninput>this.limit;}}Lambda表達式實現(xiàn)FilterFunction實現(xiàn)FilterFunction與map()相似輸出零個、一個或多個元素可對列表結果展平單數(shù)據(jù)流轉換-

flatMap{蘋果,梨,香蕉}.map(去皮){去皮蘋果,去皮梨,去皮香蕉}mapflatMap{蘋果,梨,香蕉}.flatMap(切碎){[蘋果碎片1,蘋果碎片2],[梨碎片1,梨碎片2,梨碎片3],[香蕉碎片1]}{蘋果碎片1,蘋果碎片2,梨碎片1,梨碎片2,梨碎片3,香蕉碎片1}使用Lambda表達式Collector用來收集元素flatMap()虛方法中不使用return返回數(shù)據(jù),使用Collector收集返回數(shù)據(jù)Collector<String>中的泛型String為返回數(shù)據(jù)類型將flatMap()看做map()和filter()更一般的形式map()和filter()的語義更明確單數(shù)據(jù)流轉換-

flatMapDataStream<String>dataStream=senv.fromElements("HelloWorld","HellothisisFlink");

//split函數(shù)的輸入為"HelloWorld"輸出為"Hello"和"World"組成的列表["Hello","World"]

//flatMap將列表中每個元素提取出來

//最后輸出為["Hello","World","Hello","this","is","Flink"]

DataStream<String>words=dataStream.flatMap((Stringinput,Collector<String>collector)->{

for(Stringword:input.split("")){

collector.collect(word);

}}).returns(Types.STRING);數(shù)據(jù)分組后可進行聚合操作keyBy()將一個DataStream轉化為一個KeyedStream聚合操作將KeyedStream轉化為DataStreamKeyedStream繼承自DataStream基于Key的分組轉換根據(jù)某種屬性或數(shù)據(jù)的某些字段對數(shù)據(jù)進行分組對一個分組內的數(shù)據(jù)進行處理股票:相同股票代號的數(shù)據(jù)分組到一起相同Key的數(shù)據(jù)被分配到同一算子實例上需要指定Key數(shù)字位置字段名KeySelector基于Key的分組轉換-

keyByDataStream<Tuple2<Integer,Double>>dataStream=senv.fromElements( Tuple2.of(1,1.0),Tuple2.of(2,3.2), Tuple2.of(1,5.5),Tuple2.of(3,10.0),Tuple2.of(3,12.5));//使用數(shù)字位置定義Key按照第一個字段進行分組

DataStream<Tuple2<Integer,Double>>keyedStream=dataStream.keyBy(0).sum(1);KeySelector重寫getKey()方法單數(shù)據(jù)流轉換-

keyBy//IN為數(shù)據(jù)流元素,KEY為所選擇的Key

@FunctionalInterfacepublic

interface

KeySelector<IN,KEY>extends

Function,Serializable

{//選擇一個字段作為Key

KEYgetKey(INvalue)

throwsException;}public

class

Word{publicStringword;public

intcount;}//使用KeySelector

DataStream<Word>keySelectorStream=wordStream.keyBy(newKeySelector<Word,String>(){@OverridepublicStringgetKey(Wordin)

{returnin.word;}}).sum("count");KeySelector源碼一個KeySelector的實現(xiàn)sum()、max()、min()等指定字段,對該字段進行聚合KeySelector流數(shù)據(jù)上的聚合實時不斷輸出到下游狀態(tài)存儲中間數(shù)據(jù)單數(shù)據(jù)流轉換–

Aggregations將某個字段加和結果保存到該字段上不關心其他字段的計算結果單數(shù)據(jù)流轉換–

sumDataStream<Tuple3<Integer,Integer,Integer>>tupleStream=

senv.fromElements( Tuple3.of(0,0,0),Tuple3.of(0,1,1), Tuple3.of(0,2,2),Tuple3.of(1,0,6), Tuple3.of(1,1,7),Tuple3.of(1,0,8));DataStream<Tuple3<Integer,Integer,Integer>>sumStream=tupleStream.keyBy(0).sum(1);//按第一個字段分組,對第二個字段求和,打印出來的結果如下:

//(0,0,0)

//(0,1,0)

//(0,3,0)

//(1,0,6)

//(1,1,6)

//(1,1,6)

max()對該字段求最大值結果保存到該字段上不保證其他字段的計算結果maxBy()對該字段求最大值其他字段保留最大值元素的值單數(shù)據(jù)流轉換–

max

/

maxByDataStream<Tuple3<Integer,Integer,Integer>>tupleStream=

senv.fromElements( Tuple3.of(0,0,0),Tuple3.of(0,1,1), Tuple3.of(0,2,2),Tuple3.of(1,0,6), Tuple3.of(1,1,7),Tuple3.of(1,0,8));//按第一個字段分組,對第三個字段求最大值max,打印出來的結果如下:

DataStream<Tuple3<Integer,Integer,Integer>>maxStream=tupleStream.keyBy(0).max(2);//(0,0,0)

//(0,0,1)

//(0,0,2)

//(1,0,6)

//(1,0,7)

//(1,0,8)

//按第一個字段分組,對第三個字段求最大值maxBy,打印出來的結果如下:

DataStream<Tuple3<Integer,Integer,Integer>>maxByStream=tupleStream.keyBy(0).maxBy(2);//(0,0,0)

//(0,1,1)

//(0,2,2)

//(1,0,6)

//(1,1,7)

//(1,0,8)

比Aggregation更通用在KeyedStream上生效接受兩個輸入,生成一個輸出兩兩合一地匯總操作基于Key的分組轉換-

reduce實現(xiàn)ReduceFunction基于Key的分組轉換-

reducepublic

static

class

MyReduceFunction

implements

ReduceFunction<Score>{@OverridepublicScorereduce(Scores1,Scores2)

{ returnScore.of(,"Sum",s1.score+s2.score);}}DataStream<Score>dataStream=senv.fromElements( Score.of("Li","English",90),Score.of("Wang","English",88), Score.of("Li","Math",85),Score.of("Wang","Math",92), Score.of("Liu","Math",91),Score.of("Liu","English",87));//實現(xiàn)ReduceFunction

DataStream<Score>sumReduceFunctionStream=dataStream.keyBy("name").reduce(newMyReduceFunction());//使用Lambda表達式

DataStream<Score>sumLambdaStream=dataStream .keyBy("name")

.reduce((s1,s2)->Score.of(,"Sum",s1.score+s2.score));將多個同類型的DataStream<T>合并為一個DataStream<T>數(shù)據(jù)按照先進先出(FIFO)合并多數(shù)據(jù)流轉換-

unionDataStream<StockPrice>shenzhenStockStream=...DataStream<StockPrice>hongkongStockStream=...DataStream<StockPrice>shanghaiStockStream=...DataStream<StockPrice>unionStockStream=shenzhenStockStream.union(hongkongStockStream,shanghaiStockStream);只能連接兩個DataStream數(shù)據(jù)流兩個數(shù)據(jù)流類型可以不一致兩個DataStream經(jīng)過connect()之后轉化為ConnectedStreams,ConnectedStreams會對兩個流的數(shù)據(jù)應用不同的處理方法,且雙流之間可以共享狀態(tài)應用場景為:使用一個控制流對另一個數(shù)據(jù)流進行控制多數(shù)據(jù)流轉換-

connect重寫CoMapFunction或CoFlatMapFunction三個泛型,分別對應第一個輸入流的數(shù)據(jù)類型、第二個輸入流的數(shù)據(jù)類型和輸出流的數(shù)據(jù)類型對于CoFlatMapFunction,flatMap1()方法處理第一個流的數(shù)據(jù),flatMap2()方法處理第二個流的數(shù)據(jù)可以做到類似SQL

Join的效果多數(shù)據(jù)流轉換-

connect//IN1為第一個輸入流的數(shù)據(jù)類型

//IN2為第二個輸入流的數(shù)據(jù)類型

//OUT為輸出類型

public

interface

CoFlatMapFunction<IN1,IN2,OUT>extends

Function,Serializable

{//處理第一個流的數(shù)據(jù)

void

flatMap1(IN1value,Collector<OUT>out)

throwsException;//處理第二個流的數(shù)據(jù)

void

flatMap2(IN2value,Collector<OUT>out)

throwsException;}//CoMapFunction三個泛型分別對應第一個流的輸入、第二個流的輸入,map之后的輸出

public

static

class

MyCoMapFunction

implements

CoMapFunction<Integer,String,String>{@OverridepublicStringmap1(Integerinput1)

{ returninput1.toString();}@OverridepublicStringmap2(Stringinput2)

{ returninput2;}}CoFlatMapFunction源代碼一個CoMapFunction實現(xiàn)并行度邏輯視圖中的算子被切分為多個算子子任務每個算子子任務處理一部分數(shù)據(jù)可以在整個作業(yè)的執(zhí)行環(huán)境層面設置也可以對某個算子單獨設置并行度StreamExecutionEnvironmentsenv=StreamExecutionEnvironment.getExecutionEnvironment();//獲取當前執(zhí)行環(huán)境的默認并行度

intdefaultParalleism=senv.getParallelism();//設置所有算子的并行度為4,表示所有算子的并行執(zhí)行的實例數(shù)為4

senv.setParallelism(4);在執(zhí)行環(huán)境中設置并行度:對某個算子單獨設置:dataStream.map(newMyMapper()).setParallelism(defaultParallelism*2);默認情況下,數(shù)據(jù)自動分布到多個實例(或者稱之為分區(qū))上手動在多個實例上進行數(shù)據(jù)分配避免數(shù)據(jù)傾斜輸入是DataStream,輸出也是DataStream數(shù)據(jù)重分布dataStream.shuffle();基于正態(tài)分布,將數(shù)據(jù)隨機分配到下游各算子實例上:dataStream.broadcast();數(shù)據(jù)會被復制并廣播發(fā)送給下游的所有實例上:dataStream.global();將所有數(shù)據(jù)發(fā)送給下游算子的第一個實例上:

rebalance()使用Round-Ribon思想將數(shù)據(jù)均勻分配到各實例上rescale()就近發(fā)送給下游每個實例數(shù)據(jù)重分布rebalance()將數(shù)據(jù)輪詢式地分布到下游子任務上

當上游有2個子任務、下游有4個子任務時使用rescale()partitionCustom()自定義數(shù)據(jù)重分布邏輯Partitioner[K]中泛型K為根據(jù)哪個字段進行分區(qū)對一個Score類型數(shù)據(jù)流重分布,希望按照id均勻分配到下游各實例,那么泛型K就為id的數(shù)據(jù)類型Long重寫partition()方法數(shù)據(jù)重分布@FunctionalInterfacepublic

interface

Partitioner<K>extends

java.io.Serializable,Function

{//根據(jù)key決定該數(shù)據(jù)分配到下游第幾個分區(qū)(實例)

int

partition(Kkey,intnumPartitions);}/**

*Partitioner<T>其中泛型T為指定的字段類型*重寫partiton函數(shù),并根據(jù)T字段對數(shù)據(jù)流中的所有元素進行數(shù)據(jù)重分配**/

public

static

class

MyPartitioner

implements

Partitioner<String>{privateRandomrand=newRandom();privatePatternpattern=Ppile(".*\\d+.*");/**

*key泛型T即根據(jù)哪個字段進行數(shù)據(jù)重分配,本例中是Tuple2(Int,String)中的String

*numPartitons為當前有多少個并行實例*函數(shù)返回值是一個Int為該元素將被發(fā)送給下游第幾個實例**/

@Overridepublic

int

partition(Stringkey,intnumPartitions)

{intrandomNum=rand.nextInt(numPartitions/2);Matcherm=pattern.matcher(key);if(m.matches()){returnrandomNum;}else{returnrandomNum+numPartitions/2;}}}//對(Int,String)中的第二個字段使用MyPartitioner中的重分布邏輯

DataStream<Tuple2<Integer,String>>partitioned= dataStream.partitionCustom(newMyPartitioner(),1);Partitioner源碼

一個Partitioner的實現(xiàn)Flink程序的骨架結構常見Transformation的使用方法數(shù)據(jù)類型和序列化用戶自定義函數(shù)用戶自定義函數(shù)的三種方式:繼承并實現(xiàn)函數(shù)類使用Lambda表達式繼承并實現(xiàn)Rich函數(shù)類用戶自定義函數(shù)對于map()、flatMap()、reduce()等函數(shù),我們可以實現(xiàn)MapFunction、FlatMapFunction、ReduceFunction等interface接口。以FlatMapFunction函數(shù)式接口為例:繼承了Flink的Function函數(shù)式接口函數(shù)在運行過程中要發(fā)送到各個實例上,發(fā)送前后要進行序列化和反序列化,一定要保證函數(shù)內的所有內容都可以被序列化兩個泛型T和O,T是輸入,O是輸出,要設置好輸入和輸出數(shù)據(jù)類型,否則會報錯重寫虛方法flatMap()Collector收集輸出數(shù)據(jù)函數(shù)類packagemon.functions;@FunctionalInterfacepublicinterfaceFlatMapFunction<T,O>extendsFunction,Serializable{voidflatMap(Tvalue,Collector<O>out)throwsException;}//使用FlatMapFunction實現(xiàn)過濾邏輯,只對字符串長度大于limit的內容進行詞頻統(tǒng)計

publicstaticclass

WordSplitFlatMap

implements

FlatMapFunction<String,String>

{privateIntegerlimit;publicWordSplitFlatMap(Integerlimit){this.limit=limit;}@OverridepublicvoidflatMap(Stringinput,Collector<String>collector)throwsException{if(input.length()>limit){for(Stringword:input.split(""))

collector.collect(word);}}}DataStream<String>dataStream=senv.fromElements("HelloWorld","HellothisisFlink");DataStream<String>functionStream=dataStream.flatMap(ne

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論