Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use unsafe.SliceData() #1178

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ out:
// or to the global Producer.Events channel, or none.
rkmessages := make([]*C.rd_kafka_message_t, int(C.rd_kafka_event_message_count(rkev)))

cnt := int(C.rd_kafka_event_message_array(rkev, (**C.rd_kafka_message_t)(unsafe.Pointer(&rkmessages[0])), C.size_t(len(rkmessages))))
cnt := int(C.rd_kafka_event_message_array(rkev, unsafe.SliceData(rkmessages), C.size_t(len(rkmessages))))

for _, rkmessage := range rkmessages[:cnt] {
msg := h.newMessageFromC(rkmessage)
Expand Down
9 changes: 2 additions & 7 deletions kafka/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,14 +309,9 @@ func (h *handle) setOAuthBearerToken(oauthBearerToken OAuthBearerToken) error {
extensionSize++
}

var cExtensionsToUse **C.char
if extensionSize > 0 {
cExtensionsToUse = (**C.char)(unsafe.Pointer(&cExtensions[0]))
}

cErr := C.rd_kafka_oauthbearer_set_token(h.rk, cTokenValue,
C.int64_t(oauthBearerToken.Expiration.UnixNano()/(1000*1000)), cPrincipal,
cExtensionsToUse, C.size_t(extensionSize), cErrstr, cErrstrSize)
C.int64_t(oauthBearerToken.Expiration.UnixMilli()), cPrincipal,
unsafe.SliceData(cExtensions), C.size_t(extensionSize), cErrstr, cErrstrSize)
if cErr == C.RD_KAFKA_RESP_ERR_NO_ERROR {
return nil
}
Expand Down
70 changes: 7 additions & 63 deletions kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,11 @@ void free_tmphdrs (tmphdr_t *tmphdrs, size_t tmphdrsCnt) {
rd_kafka_resp_err_t do_produce (rd_kafka_t *rk,
rd_kafka_topic_t *rkt, int32_t partition,
int msgflags,
int valIsNull, void *val, size_t val_len,
int keyIsNull, void *key, size_t key_len,
void *valp, size_t val_len,
void *keyp, size_t key_len,
int64_t timestamp,
tmphdr_t *tmphdrs, size_t tmphdrsCnt,
uintptr_t cgoid) {
void *valp = valIsNull ? NULL : val;
void *keyp = keyIsNull ? NULL : key;
#ifdef RD_KAFKA_V_TIMESTAMP
rd_kafka_resp_err_t err;
#ifdef RD_KAFKA_V_HEADERS
Expand Down Expand Up @@ -179,55 +177,6 @@ func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event)

crkt := p.handle.getRkt(*msg.TopicPartition.Topic)

// Three problems:
// 1) There's a difference between an empty Value or Key (length 0, proper pointer) and
// a null Value or Key (length 0, null pointer).
// 2) we need to be able to send a null Value or Key, but the unsafe.Pointer(&slice[0])
// dereference can't be performed on a nil slice.
// 3) cgo's pointer checking requires the unsafe.Pointer(slice..) call to be made
// in the call to the C function.
//
// Solution:
// Keep track of whether the Value or Key were nil (1), but let the valp and keyp pointers
// point to a 1-byte slice (but the length to send is still 0) so that the dereference (2)
// works.
// Then perform the unsafe.Pointer() on the valp and keyp pointers (which now either point
// to the original msg.Value and msg.Key or to the 1-byte slices) in the call to C (3).
//
var valp []byte
var keyp []byte
oneByte := []byte{0}
var valIsNull C.int
var keyIsNull C.int
var valLen int
var keyLen int

if msg.Value == nil {
valIsNull = 1
valLen = 0
valp = oneByte
} else {
valLen = len(msg.Value)
if valLen > 0 {
valp = msg.Value
} else {
valp = oneByte
}
}

if msg.Key == nil {
keyIsNull = 1
keyLen = 0
keyp = oneByte
} else {
keyLen = len(msg.Key)
if keyLen > 0 {
keyp = msg.Key
} else {
keyp = oneByte
}
}

var cgoid int

// Per-message state that needs to be retained through the C code:
Expand All @@ -242,7 +191,7 @@ func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event)

var timestamp int64
if !msg.Timestamp.IsZero() {
timestamp = msg.Timestamp.UnixNano() / 1000000
timestamp = msg.Timestamp.UnixMilli()
}

// Convert headers to C-friendly tmphdrs
Expand Down Expand Up @@ -270,20 +219,15 @@ func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event)
tmphdrs[n].size = C.ssize_t(-1)
}
}
} else {
// no headers, need a dummy tmphdrs of size 1 to avoid index
// out of bounds panic in do_produce() call below.
// tmphdrsCnt will be 0.
tmphdrs = []C.tmphdr_t{{nil, nil, 0}}
}

cErr := C.do_produce(p.handle.rk, crkt,
C.int32_t(msg.TopicPartition.Partition),
C.int(msgFlags)|C.RD_KAFKA_MSG_F_COPY,
valIsNull, unsafe.Pointer(&valp[0]), C.size_t(valLen),
keyIsNull, unsafe.Pointer(&keyp[0]), C.size_t(keyLen),
unsafe.Pointer(unsafe.SliceData(msg.Value)), C.size_t(len(msg.Value)),
unsafe.Pointer(unsafe.SliceData(msg.Key)), C.size_t(len(msg.Key)),
C.int64_t(timestamp),
(*C.tmphdr_t)(unsafe.Pointer(&tmphdrs[0])), C.size_t(tmphdrsCnt),
unsafe.SliceData(tmphdrs), C.size_t(tmphdrsCnt),
(C.uintptr_t)(cgoid))
if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
if cgoid != 0 {
Expand Down Expand Up @@ -326,7 +270,7 @@ func (p *Producer) produceBatch(topic string, msgs []*Message, msgFlags int) err
p.handle.messageToC(m, &cmsgs[i])
}
r := C.rd_kafka_produce_batch(crkt, C.RD_KAFKA_PARTITION_UA, C.int(msgFlags)|C.RD_KAFKA_MSG_F_FREE,
(*C.rd_kafka_message_t)(&cmsgs[0]), C.int(len(msgs)))
unsafe.SliceData(cmsgs), C.int(len(cmsgs)))
if r == -1 {
return newError(C.rd_kafka_last_error())
}
Expand Down