I'm creating a custom logger where we can log to std out and std err, but also adding the possibility to log to kafka (the code example is here: https://github.com/roppa/kafka-go). We have multiple topics, so we need multiple loggers, but when we use more than one we get some weird things happening. When both kafka-go settings are async, I get no consumer messages, when one is async and the other is synchronous we get something like this:
//consumer topica
{"level":"\u001b[34mINFO\u001b[0m","timeStamp":"2020-12-09T15:31:04.023Z","msg":"topic-a log 1","UID":"abc123","ns":"test-service"}
{"level":"\u001b[34mINFO\u001b[0m","timeStamp":"2020-12-09T15:31:05.078Z","msg":"topic-a log 2","UID":"abc123","ns":"test-service"}
{"level":"\u001b[34mINFO\u001b[0m","timeStamp":"2020-12-09T15:31:06.085Z","msg":"topic-a log 3","UID":"abc123","ns":"test-service"}
//consumer topicb
2020-12-09T15:31:06.085Z INFO topic-a log 3 {"UID": "abc123", "ns": "test-service"}
2","UID":"abc123","ns":"test-service"}
Changing sync results in really different effects. I'm pretty new to Go.
This is main.go:
package main
import (
"context"
"kafka-log/logger"
)
func main() {
loggerA := logger.Init("test-service", "localhost:9092", "topica", false, false)
loggerB := logger.Init("test-service", "localhost:9092", "topicb", false, true)
ctx := context.Background()
ctx2 := context.WithValue(ctx, logger.UID, "abc123")
loggerA.CInfo(ctx2, "topic-a log 1")
loggerB.CInfo(ctx2, "topic-b log 1")
loggerA.CInfo(ctx2, "topic-a log 2")
loggerB.CInfo(ctx2, "topic-b log 2")
loggerA.CInfo(ctx2, "topic-a log 3")
loggerB.CInfo(ctx2, "topic-b log 3")
}
This is the logger/logger.go:
package logger
import (
"context"
"os"
"github.com/segmentio/kafka-go"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
type (
key string
// Logger type embeds zap and also contains the current system name (namespace, Ns)
Logger struct {
*zap.Logger
Ns string
}
// KConfig type for creating a new Kafka logger. Takes a Namespace,
// Broker (eg 'localhost:9092'), Topic (eg 'topic-a')
KConfig struct {
Namespace string
Broker string
Topic string
Async bool
}
producerInterface interface {
WriteMessages(ctx context.Context, msgs ...kafka.Message) error
}
// KafkaProducer contains a kafka.Producer and Kafka topic
KafkaProducer struct {
Producer producerInterface
Topic string
}
)
const (
// UID - uniquely request identifier
UID key = "request_id"
)
var customConfig = zapcore.EncoderConfig{
TimeKey: "timeStamp",
LevelKey: "level",
NameKey: "logger",
CallerKey: "caller",
FunctionKey: zapcore.OmitKey,
MessageKey: "msg",
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.CapitalColorLevelEncoder,
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.SecondsDurationEncoder,
}
// CInfo this function takes a context as first parameter, extracts specific fields as well as namespace, and calls zap Info
func (l *Logger) CInfo(ctx context.Context, msg string, fields ...zap.Field) {
l.Info(msg, consolidate(ctx, l.Ns, fields...)...)
}
func consolidate(ctx context.Context, namespace string, fields ...zap.Field) []zap.Field {
return append(append(ctxToZapFields(ctx), fields...), zap.String("ns", namespace))
}
// See advanced config example: https://github.com/uber-go/zap/blob/master/example_test.go#L105
var lowPriority = zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
return lvl < zapcore.ErrorLevel && lvl > zapcore.DebugLevel
})
var debugPriority = zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
return lvl < zapcore.ErrorLevel
})
var kafkaPriority = zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
return lvl > zapcore.DebugLevel
})
// Init creates a new instance of a logger. Namespace is the name of the module using the logger. broker and topic are Kafa specific,
// if either of these is not set a default console logger is created.
func Init(namespace, broker, topic string, debug, async bool) *Logger {
var kp *KafkaProducer = nil
if broker != "" && topic != "" {
kp = NewKafkaProducer(&KConfig{
Broker: broker,
Topic: topic,
Async: async,
})
}
logger := getLogger(debug, kp)
// logger.Info("initiated logger", zap.String("ns", namespace), zap.Bool("kafka", kp != nil), zap.Bool("debug", debug))
return &Logger{logger, namespace}
}
func getLogger(debug bool, kp *KafkaProducer) *zap.Logger {
// cores are logger interfaces
var cores []zapcore.Core
// optimise message for console output (human readable)
consoleEncoder := zapcore.NewConsoleEncoder(customConfig)
// Lock wraps a WriteSyncer in a mutex to make it safe for concurrent use.
// See https://godoc.org/go.uber.org/zap/zapcore
cores = append(cores,
zapcore.NewCore(consoleEncoder, zapcore.Lock(os.Stdout), getPriority(debug)),
zapcore.NewCore(consoleEncoder, zapcore.Lock(os.Stderr), zap.ErrorLevel),
)
if kp != nil {
cores = append(cores, zapcore.NewCore(zapcore.NewJSONEncoder(customConfig), zapcore.Lock(zapcore.AddSync(kp)), kafkaPriority))
}
// join inputs, encoders, level-handling functions into cores, then "tee" together
logger := zap.New(zapcore.NewTee(cores...))
defer logger.Sync()
return logger
}
func getPriority(debug bool) zap.LevelEnablerFunc {
if debug {
return debugPriority
}
return lowPriority
}
func ctxToZapFields(ctx context.Context) []zap.Field {
reqID, _ := ctx.Value(UID).(string)
return []zap.Field{
zap.String("UID", reqID),
}
}
// NewKafkaProducer instantiates a kafka.Producer, saves topic, and returns a KafkaProducer
func NewKafkaProducer(c *KConfig) *KafkaProducer {
return &KafkaProducer{
Producer: kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{c.Broker},
Topic: c.Topic,
Balancer: &kafka.Hash{},
Async: c.Async,
RequiredAcks: -1, // -1 = all
}),
Topic: c.Topic,
}
}
// Write takes a message as a byte slice, wraps in a kafka.message and calls kafka Produce
func (kp *KafkaProducer) Write(msg []byte) (int, error) {
return len(msg), kp.Producer.WriteMessages(context.Background(), kafka.Message{
Key: []byte(""),
Value: msg,
})
}
I'm using these for consumers:
docker exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic topica
docker exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic topicb
And this is my kafka docker-compose:
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper
networks:
- kafka-net
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- 2181:2181
kafka:
image: confluentinc/cp-kafka
networks:
- kafka-net
container_name: kafka
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_LISTENERS-INTERNAL: //kafka:29092,EXTERNAL://localhost:9092
KAFKA_ADVERTISED: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
ports:
- 9092:9092
- 29092:29092
depends_on:
- zookeeper
restart: on-failure
networks:
kafka-net:
driver: bridge
I imagine your program is exiting before the async messages have time to send (Although if I'm reading your example correctly it is strange to me that "topic-a log 3" is the only log message that makes it). Unlike something like javascript, Go will not wait for all threads/goroutines to terminate before exiting.
Would also highlight the docstring for the Async config for kafka-go:
// Setting this flag to true causes the WriteMessages method to never block.
// It also means that errors are ignored since the caller will not receive
// the returned value. Use this only if you don't care about guarantees of
// whether the messages were written to kafka.
On the solution front: I think you can solve this by calling Close
on the writer:
https://pkg.go.dev/github.com/segmentio/kafka-go#Writer.Close
Close flushes pending writes, and waits for all writes to complete before returning. Calling Close also prevents new writes from being submitted to the writer, further calls to WriteMessages and the like will fail with io.ErrClosedPipe.
You would need to surface the underlying KafkaProducer.Producer
and call KafkaProducer.Producer.Close
before exiting.
There might be cleverer ways to structure the cleanup, but I can't seem to find an easier way to flush the pending messages than just calling Close on the writer.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With