Open dybxin opened 2 years ago
package main
import ( "encoding/json" "fmt"
kingpin "gopkg.in/alecthomas/kingpin.v2" "github.com/Shopify/sarama"
)
type Order struct { OrderId string json:"order_id" ProductName string json:"product_name" OrderStatus string json:"order_status" StatusChangeTime string json:"status_change_time" Mobile string json:"mobile" }
json:"order_id"
json:"product_name"
json:"order_status"
json:"status_change_time"
json:"mobile"
const ( // brokerList = "localhost:9092" // topic = "kafka-test" )
var ( order []byte brokerList = kingpin.Flag("brokerList", "List of brokers to connect").Default("127.0.0.1:9092").Strings() topic = kingpin.Flag("topic", "Topic name").Default("kafka-test").String() maxRetry = kingpin.Flag("maxRetry", "Retry limit").Default("5").Int() )
func init() { order, _ = json.Marshal(&Order{ OrderId: "1", ProductName: "牙刷", OrderStatus: "待付款", StatusChangeTime: "2020-01-01 00:00:00", Mobile: "1537954106", }) }
func main() { kingpin.Parse() config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max = maxRetry config.Producer.Return.Successes = true producer, err := sarama.NewSyncProducer(brokerList, config) if err != nil { panic(err) } defer func() { if err := producer.Close(); err != nil { panic(err) } }()
fmt.Println("order", string(order)) msg := &sarama.ProducerMessage{ Topic: *topic, Value: sarama.ByteEncoder(order), } partition, offset, err := producer.SendMessage(msg) if err != nil { panic(err) } fmt.Printf("Message is stored in topic(%s)/partition(%d)/offset(%d)\n", *topic, partition, offset)
}
Your issue does not seem related to this container. Panics in Golang client should be reported at https://github.com/Shopify/sarama/issues
package main
import ( "encoding/json" "fmt"
)
type Order struct { OrderId string
json:"order_id"
ProductName stringjson:"product_name"
OrderStatus stringjson:"order_status"
StatusChangeTime stringjson:"status_change_time"
Mobile stringjson:"mobile"
}const ( // brokerList = "localhost:9092" // topic = "kafka-test" )
var ( order []byte brokerList = kingpin.Flag("brokerList", "List of brokers to connect").Default("127.0.0.1:9092").Strings() topic = kingpin.Flag("topic", "Topic name").Default("kafka-test").String() maxRetry = kingpin.Flag("maxRetry", "Retry limit").Default("5").Int() )
func init() { order, _ = json.Marshal(&Order{ OrderId: "1", ProductName: "牙刷", OrderStatus: "待付款", StatusChangeTime: "2020-01-01 00:00:00", Mobile: "1537954106", }) }
func main() { kingpin.Parse() config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max = maxRetry config.Producer.Return.Successes = true producer, err := sarama.NewSyncProducer(brokerList, config) if err != nil { panic(err) } defer func() { if err := producer.Close(); err != nil { panic(err) } }()
}