storm trident 的介绍与使用
一.trident的介绍
trident的英文意思是三叉戟,在这里我的理解是因为之前我们通过之前的学习topology spout bolt去处理数据是没有问题的,但trident的对spout bolt更高层次的一个抽象,其实现功能是一样的,只不过是trident做了更多的优化和封装.如果对处理的性能要求比较高,建议要采用spout bolt来处理,反之则可以用trident
trident你可以这样理解,本身这个拓扑就是分散的,如果一个spout可以有2个bolt,跟三叉戟比较像。(个人理解)
因为trident是对storm更高一层的抽象,它与之前学的spout bolt处理数据流的方式不一样,trident是以batch(一组tuples)为单位进行处理的。
二.trident API操作
trident采用批处理的方式来处理数据,其API的操作是对数据处理的方式改成了函数。对数据处理的操作有:filter sum aggregator等
function函数的操作都是对流中的tuple进行操作的
下面介绍trident常用的API
123.partitionBy(Fields fields)
作用:将tuples中的数据按设置的字段重定向到下一处理逻辑,设置相同字段的tuple一定会被分配到同一个线程中处理。
三.trident的常用函数
1234"x",“y”,“z”],使用投影: project(new Fields(“y”, “z”)) 则输出的流仅包含 [“y”,“z”]字段5.repatition(重定向)
作用:重定向是指tuple通过下面哪种方式路由到下一层
shuffle: 通过随机分配算法来均衡tuple到各个分区
broadcast: 每个tuple都被广播到所有的分区,这种方式在drcp时非常有用,比如在每个分区上做stateQuery
partitionBy:根据指定的字段列表进行划分,具体做法是用指定字段列表的hash值对分区个数做取模运算,确保相同字段列表的数据被划分到同一个分区
global: 所有的tuple都被发送到这个分区上,这个分区用来处理整个Stream的tuple数据的,但这个线程是独立起的
batchGlobal:一个batch中的tuple都被发送到同一个分区,不同的batch会去往不同的分区
partition: 通过一个自定义的分区函数来进行分区,这个自定义函数实现了 backtype.storm.grouping.CustomStreamGrouping 6.Aggregation(聚合)
在storm的trident中处理数据是以批的形式进行处理的,所以在聚合时也是对批量内的数据进行的。经过aggregation的tuple,是被改变了原有的数据状态
在Aggregator接口中有3个方法需要实现
init() : 当batch接收到数据时执行。并对tuple中的数据进行初始化
aggregate(): 在接收到batch中的每一个tuple时执行,该方法一个重定向方法,它会随机启动一个单独的线程来进行聚合操作
complete() : 在一个batch的结束时执行
它是对当前partition上的各个batch执行聚合操作,它不是一个重定向操作,即统计batch上的tuple的操作
6.2aggregator
对一批batch中的tuple数据进行聚合
对一批batch中第n个元素的操作
对一批batch中的tuple进行聚合操作,它是一个重定向操作
持久化聚合器,在聚合之前先将数据存到一个位置,然后再对数据进行聚合操作
聚合链,对一批batch 中的tuple进行多条件聚合操作
7.GroupBy
GroupBy会根据指定字段,把整个stream切分成一个个grouped stream,如果在grouped stream上做聚合操作,那么聚合就会发生在这些grouped stream上而不是整个batch。 如果groupBy后面跟的是aggregator,则是聚合操作,如果跟的是partitionAggregate,则不是聚合操作。
四.trident常用函数示例
1.FilterFunction
需求:在一组数据中,过滤出第1个值与第2个值相加的值是偶数的
public classFilterTrident {private static final Logger LOG = LoggerFactory.getLogger(FilterTrident.class);
@SuppressWarnings(“unchecked”)public static void main(String[] args) throwsInterruptedException {
FixedBatchSpout spout= new FixedBatchSpout(new Fields(“a”,“b”,“c”,“d”), 3,new Values(1,4,7,10),new Values(1,1,3,11),new Values(2,2,7,1),new Values(2,5,7,2));
spout.setCycle(false);
Config conf= newConfig();
conf.setNumWorkers(4);
conf.setDebug(false);
TridentTopology topology= newTridentTopology();//peek: 不做任务操作,因为参数的consumer//each:spout中的指定元素进行操作
topology.newStream(“filter”, spout).parallelismHint(1)
.localOrShuffle()
.peek(input-> LOG.info(“peek1 ================{},{},{},{}”,input.get(0),input.get(1),input.get(2),input.get(3)))
.parallelismHint(2)
.localOrShuffle()
.each(new Fields(“a”,“b”),newCheckEvenSumFilter())
.parallelismHint(2)
.localOrShuffle()
.peek(input-> LOG.info(“peek2 +++++++++++++++++++{},{},{},{}”,
input.getIntegerByField(“a”),input.getIntegerByField(“b”),
input.getIntegerByField(“c”),input.getIntegerByField(“d”))
).parallelismHint(1);
LocalCluster cluster= newLocalCluster();
cluster.submitTopology(“FilterTrident”, conf, topology.build());
LOG.warn("==================================================");
LOG.warn(“the LocalCluster topology {} is submitted.”,“FilterTrident”);
LOG.warn("==================================================");
TimeUnit.SECONDS.sleep(30);
cluster.killTopology(“FilterTrident”);
cluster.shutdown();
}private static class CheckEvenSumFilter extendsBaseFilter{
@Overridepublic booleanisKeep(TridentTuple tuple) {
Integer a= tuple.getIntegerByField(“a”);
Integer b= tuple.getIntegerByField(“b”);return (a + b) % 2 == 0;
}
}
}
2.SumFunction
需求:对一组数据中的前2个数求各
public classSumFunctionTrident {private static final Logger LOG = LoggerFactory.getLogger(SumFunctionTrident.class);
@SuppressWarnings(“unchecked”)public static void main(String[] args) throwsInterruptedException {
FixedBatchSpout spout= new FixedBatchSpout(new Fields(“a”,“b”,“c”,“d”), 3,new Values(1,4,7,10),new Values(1,1,3,11),new Values(2,2,7,1),new Values(2,5,7,2));
spout.setCycle(false);
Config conf= newConfig();
conf.setNumWorkers(4);
conf.setDebug(false);
TridentTopology topology= newTridentTopology();//peek: 不做任务操作,因为参数的consumer//each:spout中的指定元素进行操作
topology.newStream(“function”, spout).parallelismHint(1)
.localOrShuffle()
.peek(input-> LOG.info(“peek1 ================{},{},{},{}”,input.get(0),input.get(1),input.get(2),input.get(3)))
.parallelismHint(2)
.localOrShuffle()
.each(new Fields(“a”,“b”),new SumFunction(),new Fields(“sum”))
.parallelismHint(2)
.localOrShuffle()
.peek(input-> LOG.info(“peek2 ================{},{},{},{},{}”,
input.getIntegerByField(“a”),input.getIntegerByField(“b”),input.getIntegerByField(“c”),input.getIntegerByField(“d”),input.getIntegerByField(“sum”)))
.parallelismHint(1);
LocalCluster cluster= newLocalCluster();
cluster.submitTopology(“SumFunctionTrident”, conf, topology.build());
LOG.warn("==================================================");
LOG.warn(“the LocalCluster topology {} is submitted.”,“SumFunctionTrident”);
LOG.warn("==================================================");
TimeUnit.SECONDS.sleep(30);
cluster.killTopology(“HelloTridentTopology”);
cluster.shutdown();
}private static class SumFunction extendsBaseFunction{
@Overridepublic voidexecute(TridentTuple tuple, TridentCollector collector) {
Integer a= tuple.getIntegerByField(“a”);
Integer b= tuple.getIntegerByField(“b”);
collector.emit(new Values(a+b));
}
}
}
3.MapFunction
需求:对一组batch中的tuple进行大小写转换
public classMapFunctionTrident {private static final Logger LOG = LoggerFactory.getLogger(MapFunctionTrident.class);
@SuppressWarnings(“unchecked”)public static void main(String[] args) throwsInterruptedException, AlreadyAliveException, InvalidTopologyException, AuthorizationException {boolean isRemoteMode = false;if(args.length > 0){
isRemoteMode= true;
}
FixedBatchSpout spout= new FixedBatchSpout(new Fields(“line”),3,new Values(“hello stream”),new Values(“hello kafka”),new Values(“hello hadoop”),new Values(“hello scala”),new Values(“hello java”)
);
spout.setCycle(true);
TridentTopology topology= newTridentTopology();
Config conf= newConfig();
conf.setNumWorkers(4);
conf.setDebug(false);
topology.newStream(“hello”, spout).parallelismHint(1)
.localOrShuffle()
.map(new MyMapFunction(),new Fields(“upper”))
.parallelismHint(2)
.partition(Grouping.fields(ImmutableList.of(“upper”)))
.peek(input->LOG.warn("================>> peek process value:{}",input.getStringByField(“upper”)))
.parallelismHint(3);if(isRemoteMode){
StormSubmitter.submitTopology(“HelloTridentTopology”, conf, topology.build());
LOG.warn("==================================================");
LOG.warn(“the remote topology {} is submitted.”,“HelloTridentTopology”);
LOG.warn("==================================================");
}else{
LocalCluster cluster= newLocalCluster();
cluster.submitTopology(“HelloTridentTopology”, conf, topology.build());
LOG.warn("==================================================");
LOG.warn(“the LocalCluster topology {} is submitted.”,“HelloTridentTopology”);
LOG.warn("==================================================");
TimeUnit.SECONDS.sleep(5);
cluster.killTopology(“HelloTridentTopology”);
cluster.shutdown();
}
}private static class MyMapFunction implementsMapFunction{private static final Logger LOG = LoggerFactory.getLogger(MyMapFunction.class);
@OverridepublicValues execute(TridentTuple input) {
String line= input.getStringByField(“line”);
LOG.warn("================>> myMapFunction process execute:value :{}",line);return newValues(line.toUpperCase());
}
}
}
4.ProjectionFunctionTrident
需求:对一组tuple的数据,取部分数据
public classProjectionFunctionTrident {private static final Logger LOG = LoggerFactory.getLogger(ProjectionFunctionTrident.class);public static void main(String [] args) throwsInterruptedException{
@SuppressWarnings(“unchecked”)
FixedBatchSpout spout= new FixedBatchSpout(new Fields(“x”,“y”,“z”), 3,new Values(1,2,3),new Values(4,5,6),new Values(7,8,9),new Values(10,11,12)
);
spout.setCycle(false);
Config conf= newConfig();
conf.setNumWorkers(3);
conf.setDebug(false);
TridentTopology topology= newTridentTopology();
topology.newStream(“ProjectionTrident”, spout).parallelismHint(1)
.localOrShuffle().peek(tridentTuple->LOG.info("================ {}",tridentTuple)).parallelismHint(2)
.shuffle()
.project(new Fields(“y”,“z”)).parallelismHint(2)
.localOrShuffle().peek(tridentTuple->LOG.info(">>>>>>>>>>>>>>>> {}",tridentTuple)).parallelismHint(2);
LocalCluster cluster= newLocalCluster();
cluster.submitTopology(“ProjectionTrident”, conf, topology.build());
TimeUnit.SECONDS.sleep(30);
cluster.killTopology(“ProjectionTrident”);
cluster.shutdown();
}
}
5.2 Broadcast
需求:将一组batch的tuple数据发送到所有partition上
public classBroadcastRepartitionTrident {private static final Logger LOG = LoggerFactory.getLogger(BroadcastRepartitionTrident.class);public static void main(String [] args) throwsInterruptedException{
@SuppressWarnings(“unchecked”)
FixedBatchSpout spout= new FixedBatchSpout(new Fields(“language”,“age”), 3,new Values(“java”,1),new Values(“scala”,2),new Values(“haddop”,3),new Values(“java”,4),new Values(“haddop”,5)
);
spout.setCycle(false);
Config conf= newConfig();
conf.setNumWorkers(3);
conf.setDebug(false);
TridentTopology topology= newTridentTopology();
topology.newStream(“BroadcastRepartitionTrident”, spout).parallelismHint(1)
.broadcast().peek(tridentTuple->LOG.info("================ {}",tridentTuple))
.parallelismHint(2);
LocalCluster cluster= newLocalCluster();
cluster.submitTopology(“BroadcastRepartitionTrident”, conf, topology.build());
TimeUnit.SECONDS.sleep(30);
cluster.killTopology(“BroadcastRepartitionTrident”);
cluster.shutdown();
}
}
5.3PartitionBy
需求:将一组batch中的tuple通过设置的字段分到同一个task中执行
public classPartitionByRepartitionTrident {private static final Logger LOG = LoggerFactory.getLogger(PartitionByRepartitionTrident.class);public static void main(String [] args) throwsInterruptedException{
@SuppressWarnings(“unchecked”)//FixedBatchSpout()里面参数解释://1.spout 的字段名称的设置//2.设置数据几个为一个批次//3.字段值的设置
FixedBatchSpout spout = new FixedBatchSpout(new Fields(“language”,“age”), 3,new Values(“java”,23),new Values(“scala”,3),new Values(“haddop”,10),new Values(“java”,23),new Values(“haddop”,10)
);
spout.setCycle(false);
Config conf= newConfig();
conf.setNumWorkers(3);
conf.setDebug(false);
TridentTopology topology= newTridentTopology();
topology.newStream(“PartitionByRepartitionTrident”, spout).parallelismHint(1)
.partitionBy(new Fields(“language”)).peek(tridentTuple ->LOG.info("++++++++++++++++ {}",tridentTuple))
.parallelismHint(3);
LocalCluster cluster= newLocalCluster();
cluster.submitTopology(“PartitionByRepartitionTrident”, conf, topology.build());
TimeUnit.SECONDS.sleep(30);
cluster.killTopology(“PartitionByRepartitionTrident”);
cluster.shutdown();
}
}
5.4Global
需求:对一组batch中的tuple进行全局分组统计
public classGlobalRepatitionTrident {private static final Logger LOG = LoggerFactory.getLogger(GlobalRepatitionTrident.class);public static void main(String [] args) throwsInterruptedException{
@SuppressWarnings(“unchecked”)//FixedBatchSpout()里面参数解释://1.spout 的字段名称的设置//2.设置数据几个为一个批次//3.字段值的设置
FixedBatchSpout spout = new FixedBatchSpout(new Fields(“language”,“age”), 3,new Values(“java”,23),new Values(“scala”,3),new Values(“haddop”,10),new Values(“java”,23),new Values(“haddop”,10)
);
spout.setCycle(false);
Config conf= newConfig();
conf.setNumWorkers(3);
conf.setDebug(false);
TridentTopology topology= newTridentTopology();
topology.newStream(“PartitionByRepartitionTrident”, spout).parallelismHint(1)
.partitionBy(new Fields(“language”))
.parallelismHint(3) //不管配多少个并行度,都没有影响
.peek(tridentTuple ->LOG.info(" ================= {}",tridentTuple))
.global()
.peek(tridentTuple->LOG.info(" >>>>>>>>>>>>>>>>> {}",tridentTuple));
LocalCluster cluster= newLocalCluster();
cluster.submitTopology(“GlobalRepatitionTrident”, conf, topology.build());
TimeUnit.SECONDS.sleep(30);
cluster.killTopology(“GlobalRepatitionTrident”);
cluster.shutdown();
}
}
5.5 batchGlobal
需求:不同batch的tuple分到不同的task中
public classBatchGlobalRepatitionTrident2 {private static final Logger LOG = LoggerFactory.getLogger(BatchGlobalRepatitionTrident2.class);public static void main(String [] args) throwsInterruptedException{
@SuppressWarnings(“unchecked”)
FixedBatchSpout spout= new FixedBatchSpout(new Fields(“language”,“age”), 3,new Values(“java”,1),new Values(“scala”,2),new Values(“scala”,3),new Values(“haddop”,4),new Values(“java”,5),new Values(“haddop”,6)
);
spout.setCycle(false);
Config conf= newConfig();
conf.setNumWorkers(3);
conf.setDebug(false);
TridentTopology topology= newTridentTopology();
topology.newStream(“BatchGlobalRepatitionTrident2”, spout).parallelismHint(1)
.batchGlobal().peek(tridentTuple->LOG.info("++++++++++++++++ {}",tridentTuple))
.parallelismHint(3);
LocalCluster cluster= newLocalCluster();
cluster.submitTopology(“BatchGlobalRepatitionTrident2”, conf, topology.build());
TimeUnit.SECONDS.sleep(30);
cluster.killTopology(“BatchGlobalRepatitionTrident2”);
cluster.shutdown();
}
}
5.6 partition
需求:自定义partition
public classCustomRepartitionTrident {private static final Logger LOG = LoggerFactory.getLogger(CustomRepartitionTrident.class);public static void main(String [] args) throwsInterruptedException{
@SuppressWarnings(“unchecked”)
FixedBatchSpout spout= new FixedBatchSpout(new Fields(“language”,“age”), 3,new Values(“java”,1),new Values(“scala”,2),new Values(“haddop”,3),new Values(“java”,4),new Values(“haddop”,5)
);
spout.setCycle(false);
Config conf= newConfig();
conf.setNumWorkers(3);
conf.setDebug(false);
TridentTopology topology= newTridentTopology();
topology.newStream(“CustomRepartitionTrident”, spout).parallelismHint(1)
.partition(new HighTaskIDGrouping()).peek(tridentTuple ->LOG.info("++++++++++++++++ {}",tridentTuple))
.parallelismHint(2);
LocalCluster cluster= newLocalCluster();
cluster.submitTopology(“CustomRepartitionTrident”, conf, topology.build());
TimeUnit.SECONDS.sleep(30);
cluster.killTopology(“CustomRepartitionTrident”);
cluster.shutdown();
}
}/*** 自定义grouping :
* 让task编号更大的执行任务
*@authorpengbo.zhao
**/
public class HighTaskIDGrouping implementsCustomStreamGrouping{private inttaskID;
@Overridepublic void prepare(WorkerTopologyContext context, GlobalStreamId stream, ListtargetTasks) {//List targetTasks: 下游所有的tasks的集合
ArrayList tasks = new ArrayList<>(targetTasks);
Collections.sort(tasks);//从小到大排列
this.taskID = tasks.get(tasks.size() -1);
}
@Overridepublic List chooseTasks(int taskId, Listvalues) {returnArrays.asList(taskID);
}
}
6.1partitionAggregate
需求:对一组batch中tuple个数的统计
public classPartitionAggregateTrident {private static final Logger LOG = LoggerFactory.getLogger(PartitionAggregateTrident.class);privateFixedBatchSpout spout;
@SuppressWarnings(“unchecked”)
@Beforepublic voidsetSpout(){this.spout = new FixedBatchSpout(new Fields(“name”,“age”), 3,new Values(“java”,1),new Values(“scala”,2),new Values(“scala”,3),new Values(“haddop”,4),new Values(“java”,5),new Values(“haddop”,6)
);this.spout.setCycle(false);
}
@Testpublic voidtestPartitionAggregtor(){
TridentTopology topoloty= newTridentTopology();
topoloty.newStream(“PartitionAggregateTrident”, spout).parallelismHint(2)//内部的优先级参数是1,所以我们写2是无效的
.shuffle()
.partitionAggregate(new Fields(“name”,“age”), new Count(),new Fields(“count”))
.parallelismHint(2)//.each(new Fields(“count”),new Debug());
.peek(input ->LOG.info(" >>>>>>>>>>>>>>>>> {}",input.getLongByField(“count”)));this.submitTopology(“PartitionAggregateTrident”, topoloty.build());
}public voidsubmitTopology(String name,StormTopology topology) {
LocalCluster cluster= newLocalCluster();
cluster.submitTopology(name, createConf(), topology);try{
TimeUnit.MINUTES.sleep(1);
}catch(InterruptedException e) {
e.printStackTrace();
}
cluster.killTopology(name);
cluster.shutdown();
}publicConfig createConf(){
Config conf= newConfig();
conf.setNumWorkers(3);
conf.setDebug(false);returnconf;
}
}
6.2 aggregator
需求:对tuple中的数据进行统计
public classAggregateTrident {private static final Logger LOG = LoggerFactory.getLogger(AggregateTrident.class);privateFixedBatchSpout spout;
@SuppressWarnings(“unchecked”)
@Beforepublic voidsetSpout(){this.spout = new FixedBatchSpout(new Fields(“name”,“age”), 3,new Values(“java”,1),new Values(“scala”,2),new Values(“scala”,3),new Values(“haddop”,4),new Values(“java”,5),new Values(“haddop”,6)
);this.spout.setCycle(false);
}
@Testpublic voidtestPartitionAggregtor(){
TridentTopology topoloty= newTridentTopology();
topoloty.newStream(“AggregateTrident”, spout).parallelismHint(2)
.partitionBy(new Fields(“name”))
.aggregate(new Fields(“name”,“age”), new Count(),new Fields(“count”))//.aggregate(new Fields(“name”,“age”), new CountAsAggregator(),new Fields(“count”))
.parallelismHint(2)
.each(new Fields(“count”),newDebug())
.peek(input-> LOG.info("============> count:{}",input.getLongByField(“count”)));this.submitTopology(“AggregateTrident”, topoloty.build());
}public voidsubmitTopology(String name,StormTopology topology) {
LocalCluster cluster= newLocalCluster();
cluster.submitTopology(name, createConf(), topology);try{
TimeUnit.MINUTES.sleep(1);
}catch(InterruptedException e) {
e.printStackTrace();
}
cluster.killTopology(name);
cluster.shutdown();
}publicConfig createConf(){
Config conf= newConfig();
conf.setNumWorkers(3);
conf.setDebug(false);returnconf;
}
}
6.3reduceAggregator
需求:对一批batch中的tuple第0个元素求和。即一批batch中的多少条tuple,对tuple中的指定字段求和
public classReduceAggregatorTrident {privateFixedBatchSpout spout;
@SuppressWarnings(“unchecked”)
@Beforepublic voidsetSpout(){this.spout = new FixedBatchSpout(new Fields(“name”,“age”), 3,new Values(“java”,1),new Values(“scala”,2),new Values(“scala”,3),new Values(“haddop”,4),new Values(“java”,5),new Values(“haddop”,6)
);this.spout.setCycle(false);
}
@Testpublic voidtestReduceAggregator(){
TridentTopology topoloty= newTridentTopology();
topoloty.newStream(“ReduceAggregator”, spout).parallelismHint(2)
.partitionBy(new Fields(“name”))
.aggregate(new Fields(“age”,“name”), new MyReduce(),new Fields(“sum”))
.parallelismHint(5)
.each(new Fields(“sum”),newDebug());this.submitTopology(“ReduceAggregator”, topoloty.build());
}public voidsubmitTopology(String name,StormTopology topology) {
LocalCluster cluster= newLocalCluster();
cluster.submitTopology(name, createConf(), topology);try{
TimeUnit.MINUTES.sleep(1);
}catch(InterruptedException e) {
e.printStackTrace();
}
cluster.killTopology(name);
cluster.shutdown();
}publicConfig createConf(){
Config conf= newConfig();
conf.setNumWorkers(3);
conf.setDebug(false);returnconf;
}static class MyReduce implements ReducerAggregator{
@OverridepublicInteger init() {return 0; //初始值为0
}
@OverridepublicInteger reduce(Integer curr, TridentTuple tuple) {return curr + tuple.getInteger(0);
}
}
}
6.4combinerAggregate
需求:对tuple中的字段进行求和操作
public classCombinerAggregate {privateFixedBatchSpout spout;
@SuppressWarnings(“unchecked”)
@Beforepublic voidsetSpout(){this.spout = new FixedBatchSpout(new Fields(“name”,“age”), 3,new Values(“java”,1),new Values(“scala”,2),new Values(“scala”,3),new Values(“haddop”,4),new Values(“java”,5),new Values(“haddop”,6)
);this.spout.setCycle(false);
}
@Testpublic voidtestCombinerAggregate(){
TridentTopology topoloty= newTridentTopology();
topoloty.newStream(“CombinerAggregate”, spout).parallelismHint(2)
.partitionBy(new Fields(“name”))
.aggregate(new Fields(“age”), new MyCount(),new Fields(“count”))
.parallelismHint(5)
.each(new Fields(“count”),newDebug());this.submitTopology(“CombinerAggregate”, topoloty.build());
}public voidsubmitTopology(String name,StormTopology topology) {
LocalCluster cluster= newLocalCluster();
cluster.submitTopology(name, createConf(), topology);try{
TimeUnit.MINUTES.sleep(1);
}catch(InterruptedException e) {
e.printStackTrace();
}
cluster.killTopology(name);
cluster.shutdown();
}publicConfig createConf(){
Config conf= newConfig();
conf.setNumWorkers(3);
conf.setDebug(false);returnconf;
}static class MyCount implements CombinerAggregator{
@OverridepublicInteger init(TridentTuple tuple) {return tuple.getInteger(0);
}
@OverridepublicInteger combine(Integer val1, Integer val2) {return val1 +val2;
}
@OverridepublicInteger zero() {return 0;
}
}
}
6.5 persistenceAggregator
需求:对一批batch中tuple元素进行统计
public classPersistenceAggregator {private static final Logger LOG = LoggerFactory.getLogger(PersistenceAggregator.class);privateFixedBatchSpout spout;
@SuppressWarnings(“unchecked”)
@Beforepublic voidsetSpout(){this.spout = new FixedBatchSpout(new Fields(“name”,“age”), 3,new Values(“java”,1),new Values(“scala”,2),new Values(“scala”,3),new Values(“haddop”,4),new Values(“java”,5),new Values(“haddop”,6)
);this.spout.setCycle(false);
}
@Testpublic voidtestPersistenceAggregator(){
TridentTopology topoloty= newTridentTopology();
topoloty.newStream(“testPersistenceAggregator”, spout).parallelismHint(2)
.partitionBy(new Fields(“name”))
.persistentAggregate(new MemoryMapState.Factory(), new Fields(“name”), new Count(),new Fields(“count”))
.parallelismHint(4)
.newValuesStream()
.peek(input->LOG.info(“count:{}”,input.getLongByField(“count”)));this.submitTopology(“testPersistenceAggregator”, topoloty.build());
}public voidsubmitTopology(String name,StormTopology topology) {
LocalCluster cluster= newLocalCluster();
cluster.submitTopology(name, createConf(), topology);try{
TimeUnit.MINUTES.sleep(1);
}catch(InterruptedException e) {
e.printStackTrace();
}
cluster.killTopology(name);
cluster.shutdown();
}publicConfig createConf(){
Config conf= newConfig();
conf.setNumWorkers(3);
conf.setDebug(false);returnconf;
}
}
6.6AggregateChina
需求:对batch中的tuple进行统计、求和、统计操作
public classAggregateChina {private static final Logger LOG = LoggerFactory.getLogger(AggregateChina.class);privateFixedBatchSpout spout;
@SuppressWarnings(“unchecked”)
@Beforepublic voidsetSpout(){this.spout = new FixedBatchSpout(new Fields(“name”,“age”),3,new Values(“java”,1),new Values(“scala”,2),new Values(“scala”,3),new Values(“haddop”,4),new Values(“java”,5),new Values(“haddop”,6)
);this.spout.setCycle(false);
}
@Testpublic voidtestAggregateChina(){
TridentTopology topoloty= newTridentTopology();
topoloty.newStream(“AggregateChina”, spout).parallelismHint(2)
.partitionBy(new Fields(“name”))
.chainedAgg()
.aggregate(new Fields(“name”),new Count(), new Fields(“count”))
.aggregate(new Fields(“age”),new Sum(), new Fields(“sum”))
.aggregate(new Fields(“age”),new Count(), new Fields(“count2”))
.chainEnd()
.peek(tuple->LOG.info("{}",tuple));this.submitTopology(“AggregateChina”, topoloty.build());
}public voidsubmitTopology(String name,StormTopology topology) {
LocalCluster cluster= newLocalCluster();
cluster.submitTopology(name, createConf(), topology);try{
TimeUnit.MINUTES.sleep(1);
}catch(InterruptedException e) {
e.printStackTrace();
}
cluster.killTopology(name);
cluster.shutdown();
}publicConfig createConf(){
Config conf= newConfig();
conf.setNumWorkers(3);
conf.setDebug(false);returnconf;
}
}
7.GroupBy
需求:对一批batch中的tuple按name来分组,求对分组后的tuple中的数据进行统计
public classGroupBy {private static final Logger LOG = LoggerFactory.getLogger(GroupBy.class);privateFixedBatchSpout spout;
@SuppressWarnings(“unchecked”)
@Beforepublic voidsetSpout(){this.spout = new FixedBatchSpout(new Fields(“name”,“age”), 3,new Values(“java”,1),new Values(“scala”,2),new Values(“scala”,3),new Values(“haddop”,4),new Values(“java”,5),new Values(“haddop”,6)
);this.spout.setCycle(false);
}
@Testpublic voidtestGroupBy(){
TridentTopology topoloty= newTridentTopology();
topoloty.newStream(“GroupBy”, spout).parallelismHint(1)//.partitionBy(new Fields(“name”))
.groupBy(new Fields(“name”))
.aggregate(new Count(), new Fields(“count”))
.peek(tuple-> LOG.info("{},{}",tuple.getFields(),tuple));this.submitTopology(“GroupBy”, topoloty.build());
}public voidsubmitTopology(String name,StormTopology topology) {
LocalCluster cluster= newLocalCluster();
cluster.submitTopology(name, createConf(), topology);try{
TimeUnit.MINUTES.sleep(1);
}catch(InterruptedException e) {
e.printStackTrace();
}
cluster.killTopology(name);
cluster.shutdown();
}publicConfig createConf(){
Config conf= newConfig();
conf.setNumWorkers(3);
conf.setDebug(false);returnconf;
}
}
- 点赞
- 收藏
- 关注作者
评论(0)