PulsarFunction例子-创新互联

在单机环境下实现字符串追加函数(Pulsar 2.4.2版本)

创新互联公司主营云南网站建设的网络公司,主营网站建设方案,成都app开发,云南h5微信小程序开发搭建,云南网站营销推广欢迎云南等地区企业咨询

1 启动单机Pulsar

$ bin/pulsar-daemon start standalone

2 创建函数

1) 准备环境

项目引用 compile 'org.apache.pulsar:pulsar-functions-api:2.4.2'

2) 创建JAVA函数(此函数用于数据源来的topic schema是string,输出的tiopic schema是string)

Pulsar Function 例子

导出jar包,放到pulsar服务器目录下,本例子放在 /data/jar/下

3)使用命令行工具加载函数到Pulsar,                     

bin/pulsar-admin functions create \

--classname test.AppStrFunction \

--jar /data/jar/pf.jar \

--inputs persistent://public/default/tlstest \

--output persistent://public/default/teststr \

--tenant public \

--namespace default \

--name appStrFunction

参数说明:

参数
说明
functions通知 pulsar broker,函数操作
create创建函数,默认创建成功后启动
classname函数类名称,需要加上包名
jar指定 jar 包的运行路径
inputs指定 函数 数据的来源在哪里,支持多个 topics 作为输入
output如果该 函数 有输出(有些情况下,function 没有输出),指定 function 输出的 topic,只能有一个输出
tenant指定该 函数 运行的租户名
namespace指定该 函数 运行的命名空间
name指定该 函数 运行的名称
以下是函数相关其他操作

停止函数

bin/pulsar-admin functions stop \

--tenant public \

--namespace default \

--name appStrFunction

启动函数

bin/pulsar-admin functions start \

--tenant public \

--namespace default \

--name appStrFunction

删除函数

bin/pulsar-admin functions delete \

--tenant public \

--namespace default \

--name appStrFunction

函数的日志在 pulsar安装目录 /logs/functions下

3 测试函数

根据前边函数已成功加载启动

1)向tlstest主题发送消息   

import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; public class SendMsgTest{   public static void main(String[] args){       String url="pulsar://192.168.1.48:6650";   try{      PulsarClient client =PulsarClient.builder()            .serviceUrl(url)            .connectionTimeout(10,TimeUnit.SECONDS)            .build();      Producer<String> producer=client.newProducer(Schema.STRING)            .topic("tlstest")            .sendTimeout(10,TimeUnit.SECONDS)            .producerName("senduser")            .create();            producer.send("this is a book");            System.out.print("send ok");            client.close();       }catch(Exception e){         e.printStackTrace();       }   } }

2)读取teststr主题消息

import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.schema.JSONSchema; import schema.OrderModel; import com.alibaba.fastjson.JSON; public class RecFunTest { public static void main(String[] args) { String url = "http://192.168.1.48:8080"; try{   PulsarClient client =PulsarClient.builder()     .serviceUrl(url)     .build();  Consumer<String> consumer=client.newConsumer(Schema.STRING)     .topic("teststr")     .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)     .subscriptionType(SubscriptionType.Exclusive)//订阅模式  Exclusive(独占,默认模式) Failover(灾备)Shared(共享)     .subscriptionName("wbq")//订阅者名称     .subscribe();  while (true) {    Message<String> mondmsg = consumer.receive();    String msg=mondmsg.getValue();                 System.out.println("receive message=:"+msg);              }   }catch(Exception e){      e.printStackTrace();   }  } }

另外有需要云服务器可以了解下创新互联cdcxhl.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。

网站栏目:PulsarFunction例子-创新互联
本文来源:https://www.cdcxhl.com/article16/cddggg.html

成都网站建设公司_创新互联,为您提供手机网站建设做网站网站营销服务器托管品牌网站设计关键词优化

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联

网站托管运营