Skip to content

Commit

Permalink
Use unsafe.SliceData()
Browse files Browse the repository at this point in the history
  • Loading branch information
jdemeyer committed Apr 18, 2024
1 parent f0d709f commit 7b31f25
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 71 deletions.
2 changes: 1 addition & 1 deletion kafka/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ out:
// or to the global Producer.Events channel, or none.
rkmessages := make([]*C.rd_kafka_message_t, int(C.rd_kafka_event_message_count(rkev)))

cnt := int(C.rd_kafka_event_message_array(rkev, (**C.rd_kafka_message_t)(unsafe.Pointer(&rkmessages[0])), C.size_t(len(rkmessages))))
cnt := int(C.rd_kafka_event_message_array(rkev, unsafe.SliceData(rkmessages), C.size_t(len(rkmessages))))

for _, rkmessage := range rkmessages[:cnt] {
msg := h.newMessageFromC(rkmessage)
Expand Down
9 changes: 2 additions & 7 deletions kafka/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,14 +309,9 @@ func (h *handle) setOAuthBearerToken(oauthBearerToken OAuthBearerToken) error {
extensionSize++
}

var cExtensionsToUse **C.char
if extensionSize > 0 {
cExtensionsToUse = (**C.char)(unsafe.Pointer(&cExtensions[0]))
}

cErr := C.rd_kafka_oauthbearer_set_token(h.rk, cTokenValue,
C.int64_t(oauthBearerToken.Expiration.UnixNano()/(1000*1000)), cPrincipal,
cExtensionsToUse, C.size_t(extensionSize), cErrstr, cErrstrSize)
C.int64_t(oauthBearerToken.Expiration.UnixMilli()), cPrincipal,
unsafe.SliceData(cExtensions), C.size_t(extensionSize), cErrstr, cErrstrSize)
if cErr == C.RD_KAFKA_RESP_ERR_NO_ERROR {
return nil
}
Expand Down
70 changes: 7 additions & 63 deletions kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,11 @@ void free_tmphdrs (tmphdr_t *tmphdrs, size_t tmphdrsCnt) {
rd_kafka_resp_err_t do_produce (rd_kafka_t *rk,
rd_kafka_topic_t *rkt, int32_t partition,
int msgflags,
int valIsNull, void *val, size_t val_len,
int keyIsNull, void *key, size_t key_len,
void *valp, size_t val_len,
void *keyp, size_t key_len,
int64_t timestamp,
tmphdr_t *tmphdrs, size_t tmphdrsCnt,
uintptr_t cgoid) {
void *valp = valIsNull ? NULL : val;
void *keyp = keyIsNull ? NULL : key;
#ifdef RD_KAFKA_V_TIMESTAMP
rd_kafka_resp_err_t err;
#ifdef RD_KAFKA_V_HEADERS
Expand Down Expand Up @@ -179,55 +177,6 @@ func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event)

crkt := p.handle.getRkt(*msg.TopicPartition.Topic)

// Three problems:
// 1) There's a difference between an empty Value or Key (length 0, proper pointer) and
// a null Value or Key (length 0, null pointer).
// 2) we need to be able to send a null Value or Key, but the unsafe.Pointer(&slice[0])
// dereference can't be performed on a nil slice.
// 3) cgo's pointer checking requires the unsafe.Pointer(slice..) call to be made
// in the call to the C function.
//
// Solution:
// Keep track of whether the Value or Key were nil (1), but let the valp and keyp pointers
// point to a 1-byte slice (but the length to send is still 0) so that the dereference (2)
// works.
// Then perform the unsafe.Pointer() on the valp and keyp pointers (which now either point
// to the original msg.Value and msg.Key or to the 1-byte slices) in the call to C (3).
//
var valp []byte
var keyp []byte
oneByte := []byte{0}
var valIsNull C.int
var keyIsNull C.int
var valLen int
var keyLen int

if msg.Value == nil {
valIsNull = 1
valLen = 0
valp = oneByte
} else {
valLen = len(msg.Value)
if valLen > 0 {
valp = msg.Value
} else {
valp = oneByte
}
}

if msg.Key == nil {
keyIsNull = 1
keyLen = 0
keyp = oneByte
} else {
keyLen = len(msg.Key)
if keyLen > 0 {
keyp = msg.Key
} else {
keyp = oneByte
}
}

var cgoid int

// Per-message state that needs to be retained through the C code:
Expand All @@ -242,7 +191,7 @@ func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event)

var timestamp int64
if !msg.Timestamp.IsZero() {
timestamp = msg.Timestamp.UnixNano() / 1000000
timestamp = msg.Timestamp.UnixMilli()
}

// Convert headers to C-friendly tmphdrs
Expand Down Expand Up @@ -270,20 +219,15 @@ func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event)
tmphdrs[n].size = C.ssize_t(-1)
}
}
} else {
// no headers, need a dummy tmphdrs of size 1 to avoid index
// out of bounds panic in do_produce() call below.
// tmphdrsCnt will be 0.
tmphdrs = []C.tmphdr_t{{nil, nil, 0}}
}

cErr := C.do_produce(p.handle.rk, crkt,
C.int32_t(msg.TopicPartition.Partition),
C.int(msgFlags)|C.RD_KAFKA_MSG_F_COPY,
valIsNull, unsafe.Pointer(&valp[0]), C.size_t(valLen),
keyIsNull, unsafe.Pointer(&keyp[0]), C.size_t(keyLen),
unsafe.Pointer(unsafe.SliceData(msg.Value)), C.size_t(len(msg.Value)),
unsafe.Pointer(unsafe.SliceData(msg.Key)), C.size_t(len(msg.Key)),
C.int64_t(timestamp),
(*C.tmphdr_t)(unsafe.Pointer(&tmphdrs[0])), C.size_t(tmphdrsCnt),
unsafe.SliceData(tmphdrs), C.size_t(tmphdrsCnt),
(C.uintptr_t)(cgoid))
if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
if cgoid != 0 {
Expand Down Expand Up @@ -326,7 +270,7 @@ func (p *Producer) produceBatch(topic string, msgs []*Message, msgFlags int) err
p.handle.messageToC(m, &cmsgs[i])
}
r := C.rd_kafka_produce_batch(crkt, C.RD_KAFKA_PARTITION_UA, C.int(msgFlags)|C.RD_KAFKA_MSG_F_FREE,
(*C.rd_kafka_message_t)(&cmsgs[0]), C.int(len(msgs)))
unsafe.SliceData(cmsgs), C.int(len(cmsgs)))
if r == -1 {
return newError(C.rd_kafka_last_error())
}
Expand Down

0 comments on commit 7b31f25

Please sign in to comment.