1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| package main
import ( "github.com/micro/go-micro/v2" "github.com/micro/go-micro/v2/metadata" "github.com/micro/go-micro/v2/util/log" proto "go14/micro/cmds/pubsub/push/proto"
"context" "github.com/micro/go-micro/v2/broker" "github.com/micro/go-plugins/broker/kafka/v2" )
type Sub struct{}
func (s *Sub) Process(ctx context.Context, event *proto.Event) error { md, _ := metadata.FromContext(ctx) log.Logf("[pubsub.1] Received event %+v with metadata %+v\n", event, md) return nil }
func subEv(ctx context.Context, event *proto.Event) error { md, _ := metadata.FromContext(ctx) log.Logf("[pubsub.2] Received event %+v with metadata %+v\n", event, md) return nil }
func main() {
broker.DefaultBroker = kafka.NewBroker() if err := broker.Init(broker.Addrs("192.168.163.121:9092"),); err != nil { log.Fatalf("Broker Init error: %v", err) }
service := micro.NewService( micro.Name("go.micro.srv.pubsub"), micro.Broker(broker.DefaultBroker), )
if err := micro.RegisterSubscriber("example.topic.pubsub.1", service.Server(), new(Sub));err !=nil{ log.Fatalf("RegisterSubscriber Init error: %v", err) }
if err := micro.RegisterSubscriber("example.topic.pubsub.2", service.Server(), subEv);err !=nil{ log.Fatalf("RegisterSubscriber Init error: %v", err) }
if err := service.Run(); err != nil { log.Fatal(err) } }
|