在单机环境下实现字符串追加函数(Pulsar 2.4.2版本)
创新互联公司主营云南网站建设的网络公司,主营网站建设方案,成都app开发,云南h5微信小程序开发搭建,云南网站营销推广欢迎云南等地区企业咨询1 启动单机Pulsar
$ bin/pulsar-daemon start standalone
2 创建函数
1) 准备环境
项目引用 compile 'org.apache.pulsar:pulsar-functions-api:2.4.2'
2) 创建JAVA函数(此函数用于数据源来的topic schema是string,输出的tiopic schema是string)
导出jar包,放到pulsar服务器目录下,本例子放在 /data/jar/下
3)使用命令行工具加载函数到Pulsar,
bin/pulsar-admin functions create \
--classname test.AppStrFunction \
--jar /data/jar/pf.jar \
--inputs persistent://public/default/tlstest \
--output persistent://public/default/teststr \
--tenant public \
--namespace default \
--name appStrFunction
参数说明:
参数 | 说明 |
functions | 通知 pulsar broker,函数操作 |
create | 创建函数,默认创建成功后启动 |
classname | 函数类名称,需要加上包名 |
jar | 指定 jar 包的运行路径 |
inputs | 指定 函数 数据的来源在哪里,支持多个 topics 作为输入 |
output | 如果该 函数 有输出(有些情况下,function 没有输出),指定 function 输出的 topic,只能有一个输出 |
tenant | 指定该 函数 运行的租户名 |
namespace | 指定该 函数 运行的命名空间 |
name | 指定该 函数 运行的名称 |
停止函数
bin/pulsar-admin functions stop \
--tenant public \
--namespace default \
--name appStrFunction
启动函数
bin/pulsar-admin functions start \
--tenant public \
--namespace default \
--name appStrFunction
删除函数
bin/pulsar-admin functions delete \
--tenant public \
--namespace default \
--name appStrFunction
函数的日志在 pulsar安装目录 /logs/functions下
3 测试函数
根据前边函数已成功加载启动
1)向tlstest主题发送消息
import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; public class SendMsgTest{ public static void main(String[] args){ String url="pulsar://192.168.1.48:6650"; try{ PulsarClient client =PulsarClient.builder() .serviceUrl(url) .connectionTimeout(10,TimeUnit.SECONDS) .build(); Producer<String> producer=client.newProducer(Schema.STRING) .topic("tlstest") .sendTimeout(10,TimeUnit.SECONDS) .producerName("senduser") .create(); producer.send("this is a book"); System.out.print("send ok"); client.close(); }catch(Exception e){ e.printStackTrace(); } } }2)读取teststr主题消息
import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.schema.JSONSchema; import schema.OrderModel; import com.alibaba.fastjson.JSON; public class RecFunTest { public static void main(String[] args) { String url = "http://192.168.1.48:8080"; try{ PulsarClient client =PulsarClient.builder() .serviceUrl(url) .build(); Consumer<String> consumer=client.newConsumer(Schema.STRING) .topic("teststr") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionType(SubscriptionType.Exclusive)//订阅模式 Exclusive(独占,默认模式) Failover(灾备)Shared(共享) .subscriptionName("wbq")//订阅者名称 .subscribe(); while (true) { Message<String> mondmsg = consumer.receive(); String msg=mondmsg.getValue(); System.out.println("receive message=:"+msg); } }catch(Exception e){ e.printStackTrace(); } } }另外有需要云服务器可以了解下创新互联cdcxhl.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。
网站栏目:PulsarFunction例子-创新互联
本文来源:https://www.cdcxhl.com/article16/cddggg.html
成都网站建设公司_创新互联,为您提供手机网站建设、做网站、网站营销、服务器托管、品牌网站设计、关键词优化
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联