Codebase list golang-github-confluentinc-confluent-kafka-go / dc34b3e
Merge tag 'v0.11.6' into debian/sid Mpampis Kostas 5 years ago
23 changed file(s) with 2498 addition(s) and 54 deletion(s). Raw diff Collapse all Expand all
1818 # workaround for now is to skip exec for those.
1919
2020 before_install:
21 - if [[ $TRAVIS_OS_NAME == linux ]]; then wget -qO - https://packages.confluent.io/deb/5.0/archive.key | sudo apt-key add - ; fi
22 - if [[ $TRAVIS_OS_NAME == linux ]]; then sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.0 stable main" -y ; fi
23 - if [[ $TRAVIS_OS_NAME == linux ]]; then sudo apt-get update -q ; fi
24 - if [[ $TRAVIS_OS_NAME == linux ]]; then sudo apt-get install confluent-librdkafka-plugins -y ; fi
2125 - rm -rf tmp-build
2226 - bash mk/bootstrap-librdkafka.sh ${LIBRDKAFKA_VERSION} tmp-build
23 - go get -u github.com/golang/lint/golint
27 # golint requires Go >= 1.9
28 - if [[ ! $TRAVIS_GO_VERSION =~ ^1\.[78] ]] ; then go get -u golang.org/x/lint/golint && touch .do_lint ; fi
2429 - if [[ $TRAVIS_OS_NAME == osx && $TRAVIS_GO_VERSION =~ ^1\.[78] ]] ; then touch .no_exec ; fi
2530
2631 install:
2833 - go install -tags static ./...
2934
3035 script:
31 - golint -set_exit_status ./...
36 - if [[ -f .do_lint ]]; then golint -set_exit_status ./... ; fi
3237 - if [[ ! -f .no_exec ]]; then go test -timeout 60s -v -tags static ./... ; fi
3338 - if [[ ! -f .no_exec ]]; then go-kafkacat --help ; fi
6161 if err == nil {
6262 fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
6363 } else {
64 // The client will automatically try to recover from all errors.
6465 fmt.Printf("Consumer error: %v (%v)\n", err, msg)
65 break
6666 }
6767 }
6868
8484 if err != nil {
8585 panic(err)
8686 }
87
88 defer p.Close()
8789
8890 // Delivery report handler for produced messages
8991 go func() {
108110 }, nil)
109111 }
110112
111 // Wait for message deliveries
113 // Wait for message deliveries before shutting down
112114 p.Flush(15 * 1000)
113115 }
114116 ```
115117
116 More elaborate examples are available in the [examples](examples) directory.
118 More elaborate examples are available in the [examples](examples) directory,
119 including [how to configure](examples/confluent_cloud_example) the Go client
120 for use with [Confluent Cloud](https://www.confluent.io/confluent-cloud/).
117121
118122
119123 Getting Started
122126 Installing librdkafka
123127 ---------------------
124128
125 This client for Go depends on librdkafka v0.11.4 or later, so you either need to install librdkafka through your OS/distributions package manager,
126 or download and build it from source.
129 This client for Go depends on librdkafka v0.11.5 or later, so you either need to install librdkafka
130 through your OS/distributions package manager, or download and build it from source.
127131
128132 - For Debian and Ubuntu based distros, install `librdkafka-dev` from the standard
129133 repositories or using [Confluent's Deb repository](http://docs.confluent.io/current/installation.html#installation-apt).
22 producer_channel_example/producer_channel_example
33 producer_example/producer_example
44 go-kafkacat/go-kafkacat
5 admin_describe_config/admin_describe_config
6 admin_delete_topics/admin_delete_topics
7 admin_create_topic/admin_create_topic
0 // Create topic
1 package main
2
3 /**
4 * Copyright 2018 Confluent Inc.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 import (
20 "context"
21 "fmt"
22 "github.com/confluentinc/confluent-kafka-go/kafka"
23 "os"
24 "strconv"
25 "time"
26 )
27
28 func main() {
29
30 if len(os.Args) != 5 {
31 fmt.Fprintf(os.Stderr,
32 "Usage: %s <broker> <topic> <partition-count> <replication-factor>\n",
33 os.Args[0])
34 os.Exit(1)
35 }
36
37 broker := os.Args[1]
38 topic := os.Args[2]
39 numParts, err := strconv.Atoi(os.Args[3])
40 if err != nil {
41 fmt.Printf("Invalid partition count: %s: %v\n", os.Args[3], err)
42 os.Exit(1)
43 }
44 replicationFactor, err := strconv.Atoi(os.Args[4])
45 if err != nil {
46 fmt.Printf("Invalid replication factor: %s: %v\n", os.Args[4], err)
47 os.Exit(1)
48 }
49
50 // Create a new AdminClient.
51 // AdminClient can also be instantiated using an existing
52 // Producer or Consumer instance, see NewAdminClientFromProducer and
53 // NewAdminClientFromConsumer.
54 a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": broker})
55 if err != nil {
56 fmt.Printf("Failed to create Admin client: %s\n", err)
57 os.Exit(1)
58 }
59
60 // Contexts are used to abort or limit the amount of time
61 // the Admin call blocks waiting for a result.
62 ctx, cancel := context.WithCancel(context.Background())
63 defer cancel()
64
65 // Create topics on cluster.
66 // Set Admin options to wait for the operation to finish (or at most 60s)
67 maxDur, err := time.ParseDuration("60s")
68 if err != nil {
69 panic("ParseDuration(60s)")
70 }
71 results, err := a.CreateTopics(
72 ctx,
73 // Multiple topics can be created simultaneously
74 // by providing more TopicSpecification structs here.
75 []kafka.TopicSpecification{{
76 Topic: topic,
77 NumPartitions: numParts,
78 ReplicationFactor: replicationFactor}},
79 // Admin options
80 kafka.SetAdminOperationTimeout(maxDur))
81 if err != nil {
82 fmt.Printf("Failed to create topic: %v\n", err)
83 os.Exit(1)
84 }
85
86 // Print results
87 for _, result := range results {
88 fmt.Printf("%s\n", result)
89 }
90
91 a.Close()
92 }
0 // Delete topics
1 package main
2
3 /**
4 * Copyright 2018 Confluent Inc.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 import (
20 "context"
21 "fmt"
22 "github.com/confluentinc/confluent-kafka-go/kafka"
23 "os"
24 "time"
25 )
26
27 func main() {
28
29 if len(os.Args) < 3 {
30 fmt.Fprintf(os.Stderr,
31 "Usage: %s <broker> <topic1> <topic2> ..\n",
32 os.Args[0])
33 os.Exit(1)
34 }
35
36 broker := os.Args[1]
37 topics := os.Args[2:]
38
39 // Create a new AdminClient.
40 // AdminClient can also be instantiated using an existing
41 // Producer or Consumer instance, see NewAdminClientFromProducer and
42 // NewAdminClientFromConsumer.
43 a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": broker})
44 if err != nil {
45 fmt.Printf("Failed to create Admin client: %s\n", err)
46 os.Exit(1)
47 }
48
49 // Contexts are used to abort or limit the amount of time
50 // the Admin call blocks waiting for a result.
51 ctx, cancel := context.WithCancel(context.Background())
52 defer cancel()
53
54 // Delete topics on cluster
55 // Set Admin options to wait for the operation to finish (or at most 60s)
56 maxDur, err := time.ParseDuration("60s")
57 if err != nil {
58 panic("ParseDuration(60s)")
59 }
60
61 results, err := a.DeleteTopics(ctx, topics, kafka.SetAdminOperationTimeout(maxDur))
62 if err != nil {
63 fmt.Printf("Failed to delete topics: %v\n", err)
64 os.Exit(1)
65 }
66
67 // Print results
68 for _, result := range results {
69 fmt.Printf("%s\n", result)
70 }
71
72 a.Close()
73 }
0 // List current configuration for a cluster resource
1 package main
2
3 /**
4 * Copyright 2018 Confluent Inc.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 import (
20 "context"
21 "fmt"
22 "github.com/confluentinc/confluent-kafka-go/kafka"
23 "os"
24 "time"
25 )
26
27 func main() {
28
29 if len(os.Args) != 4 {
30 fmt.Fprintf(os.Stderr,
31 "Usage: %s <broker> <resource-type> <resource-name>\n"+
32 "\n"+
33 " <broker> - CSV list of bootstrap brokers\n"+
34 " <resource-type> - any, broker, topic, group\n"+
35 " <resource-name> - broker id or topic name\n",
36 os.Args[0])
37 os.Exit(1)
38 }
39
40 broker := os.Args[1]
41 resourceType, err := kafka.ResourceTypeFromString(os.Args[2])
42 if err != nil {
43 fmt.Printf("Invalid resource type: %s\n", os.Args[2])
44 os.Exit(1)
45 }
46 resourceName := os.Args[3]
47
48 // Create a new AdminClient.
49 // AdminClient can also be instantiated using an existing
50 // Producer or Consumer instance, see NewAdminClientFromProducer and
51 // NewAdminClientFromConsumer.
52 a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": broker})
53 if err != nil {
54 fmt.Printf("Failed to create Admin client: %s\n", err)
55 os.Exit(1)
56 }
57
58 // Contexts are used to abort or limit the amount of time
59 // the Admin call blocks waiting for a result.
60 ctx, cancel := context.WithCancel(context.Background())
61 defer cancel()
62
63 dur, _ := time.ParseDuration("20s")
64 // Ask cluster for the resource's current configuration
65 results, err := a.DescribeConfigs(ctx,
66 []kafka.ConfigResource{{Type: resourceType, Name: resourceName}},
67 kafka.SetAdminRequestTimeout(dur))
68 if err != nil {
69 fmt.Printf("Failed to DescribeConfigs(%s, %s): %s\n",
70 resourceType, resourceName, err)
71 os.Exit(1)
72 }
73
74 // Print results
75 for _, result := range results {
76 fmt.Printf("%s %s: %s:\n", result.Type, result.Name, result.Error)
77 for _, entry := range result.Config {
78 // Truncate the value to 60 chars, if needed, for nicer formatting.
79 fmt.Printf("%60s = %-60.60s %-20s Read-only:%v Sensitive:%v\n",
80 entry.Name, entry.Value, entry.Source,
81 entry.IsReadOnly, entry.IsSensitive)
82 }
83 }
84
85 a.Close()
86 }
0 // This is a simple example demonstrating how to produce a message to
1 // Confluent Cloud then read it back again.
2 //
3 // https://www.confluent.io/confluent-cloud/
4 //
5 // Auto-creation of topics is disabled in Confluent Cloud. You will need to
6 // use the ccloud cli to create the go-test-topic topic before running this
7 // example.
8 //
9 // $ ccloud topic create go-test-topic
10 //
11 // The <ccloud bootstrap servers>, <ccloud key> and <ccloud secret> parameters
12 // are available via the Confluent Cloud web interface. For more information,
13 // refer to the quick-start:
14 //
15 // https://docs.confluent.io/current/cloud-quickstart.html
16 package main
17
18 /**
19 * Copyright 2018 Confluent Inc.
20 *
21 * Licensed under the Apache License, Version 2.0 (the "License");
22 * you may not use this file except in compliance with the License.
23 * You may obtain a copy of the License at
24 *
25 * http://www.apache.org/licenses/LICENSE-2.0
26 *
27 * Unless required by applicable law or agreed to in writing, software
28 * distributed under the License is distributed on an "AS IS" BASIS,
29 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
30 * See the License for the specific language governing permissions and
31 * limitations under the License.
32 */
33
34 import (
35 "time"
36 "fmt"
37 "github.com/confluentinc/confluent-kafka-go/kafka"
38 )
39
40 func main() {
41
42 p, err := kafka.NewProducer(&kafka.ConfigMap{
43 "bootstrap.servers": "<ccloud bootstrap servers>",
44 "broker.version.fallback": "0.10.0.0",
45 "api.version.fallback.ms": 0,
46 "sasl.mechanisms": "PLAIN",
47 "security.protocol": "SASL_SSL",
48 "sasl.username": "<ccloud key>",
49 "sasl.password": "<ccloud secret>",})
50
51 if err != nil {
52 panic(fmt.Sprintf("Failed to create producer: %s", err))
53 }
54
55 value := "golang test value"
56 topic := "go-test-topic"
57 p.Produce(&kafka.Message{
58 TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
59 Value: []byte(value),
60 }, nil)
61
62 // Wait for delivery report
63 e := <-p.Events()
64
65 m := e.(*kafka.Message)
66 if m.TopicPartition.Error != nil {
67 fmt.Printf("failed to deliver message: %v\n", m.TopicPartition)
68 } else {
69 fmt.Printf("delivered to topic %s [%d] at offset %v\n",
70 *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
71 }
72
73 p.Close()
74
75
76 c, err := kafka.NewConsumer(&kafka.ConfigMap{
77 "bootstrap.servers": "<ccloud bootstrap servers>",
78 "broker.version.fallback": "0.10.0.0",
79 "api.version.fallback.ms": 0,
80 "sasl.mechanisms": "PLAIN",
81 "security.protocol": "SASL_SSL",
82 "sasl.username": "<ccloud key>",
83 "sasl.password": "<ccloud secret>",
84 "session.timeout.ms": 6000,
85 "group.id": "my-group",
86 "default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"},})
87
88 if err != nil {
89 panic(fmt.Sprintf("Failed to create consumer: %s", err))
90 }
91
92 topics := []string { topic }
93 c.SubscribeTopics(topics, nil)
94
95 for {
96 msg, err := c.ReadMessage(100 * time.Millisecond)
97 if err == nil {
98 fmt.Printf("consumed: %s: %s\n", msg.TopicPartition, string(msg.Value))
99 }
100 }
101
102 c.Close()
103 }
5050 err = p.Produce(&kafka.Message{
5151 TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
5252 Value: []byte(value),
53 Headers: []kafka.Header{{"myTestHeader", []byte("header values are binary")}},
53 Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}},
5454 }, deliveryChan)
5555
5656 e := <-deliveryChan
00 package kafka
11
22 /**
3 * Copyright 2016 Confluent Inc.
3 * Copyright 2016-2018 Confluent Inc.
44 *
55 * Licensed under the Apache License, Version 2.0 (the "License");
66 * you may not use this file except in compliance with the License.
1919 "fmt"
2020 )
2121
22
2322 /*
2423 #include <librdkafka/rdkafka.h>
2524
2928 //defines and strings in sync.
3029 //
3130
32 #define MIN_RD_KAFKA_VERSION 0x0000b0400
31 #define MIN_RD_KAFKA_VERSION 0x0000b0500
3332
3433 #ifdef __APPLE__
35 #define MIN_VER_ERRSTR "confluent-kafka-go requires librdkafka v0.11.4 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`"
34 #define MIN_VER_ERRSTR "confluent-kafka-go requires librdkafka v0.11.5 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`"
3635 #else
37 #define MIN_VER_ERRSTR "confluent-kafka-go requires librdkafka v0.11.4 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html"
36 #define MIN_VER_ERRSTR "confluent-kafka-go requires librdkafka v0.11.5 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html"
3837 #endif
3938
4039 #if RD_KAFKA_VERSION < MIN_RD_KAFKA_VERSION
4140 #ifdef __APPLE__
42 #error "confluent-kafka-go requires librdkafka v0.11.4 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`"
41 #error "confluent-kafka-go requires librdkafka v0.11.5 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`"
4342 #else
44 #error "confluent-kafka-go requires librdkafka v0.11.4 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html"
43 #error "confluent-kafka-go requires librdkafka v0.11.5 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html"
4544 #endif
4645 #endif
4746 */
4847 import "C"
4948
50
51 func versionCheck () error {
49 func versionCheck() error {
5250 ver, verstr := LibraryVersion()
5351 if ver < C.MIN_RD_KAFKA_VERSION {
5452 return newErrorFromString(ErrNotImplemented,
0 /**
1 * Copyright 2018 Confluent Inc.
2 *
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 package kafka
17
18 import (
19 "context"
20 "fmt"
21 "strings"
22 "time"
23 "unsafe"
24 )
25
26 /*
27 #include <librdkafka/rdkafka.h>
28 #include <stdlib.h>
29
30 static const rd_kafka_topic_result_t *
31 topic_result_by_idx (const rd_kafka_topic_result_t **topics, size_t cnt, size_t idx) {
32 if (idx >= cnt)
33 return NULL;
34 return topics[idx];
35 }
36
37 static const rd_kafka_ConfigResource_t *
38 ConfigResource_by_idx (const rd_kafka_ConfigResource_t **res, size_t cnt, size_t idx) {
39 if (idx >= cnt)
40 return NULL;
41 return res[idx];
42 }
43
44 static const rd_kafka_ConfigEntry_t *
45 ConfigEntry_by_idx (const rd_kafka_ConfigEntry_t **entries, size_t cnt, size_t idx) {
46 if (idx >= cnt)
47 return NULL;
48 return entries[idx];
49 }
50 */
51 import "C"
52
53 // AdminClient is derived from an existing Producer or Consumer
54 type AdminClient struct {
55 handle *handle
56 isDerived bool // Derived from existing client handle
57 }
58
59 func durationToMilliseconds(t time.Duration) int {
60 if t > 0 {
61 return (int)(t.Seconds() * 1000.0)
62 }
63 return (int)(t)
64 }
65
66 // TopicResult provides per-topic operation result (error) information.
67 type TopicResult struct {
68 // Topic name
69 Topic string
70 // Error, if any, of result. Check with `Error.Code() != ErrNoError`.
71 Error Error
72 }
73
74 // String returns a human-readable representation of a TopicResult.
75 func (t TopicResult) String() string {
76 if t.Error.code == 0 {
77 return t.Topic
78 }
79 return fmt.Sprintf("%s (%s)", t.Topic, t.Error.str)
80 }
81
82 // TopicSpecification holds parameters for creating a new topic.
83 // TopicSpecification is analogous to NewTopic in the Java Topic Admin API.
84 type TopicSpecification struct {
85 // Topic name to create.
86 Topic string
87 // Number of partitions in topic.
88 NumPartitions int
89 // Default replication factor for the topic's partitions, or zero
90 // if an explicit ReplicaAssignment is set.
91 ReplicationFactor int
92 // (Optional) Explicit replica assignment. The outer array is
93 // indexed by the partition number, while the inner per-partition array
94 // contains the replica broker ids. The first broker in each
95 // broker id list will be the preferred replica.
96 ReplicaAssignment [][]int32
97 // Topic configuration.
98 Config map[string]string
99 }
100
101 // PartitionsSpecification holds parameters for creating additional partitions for a topic.
102 // PartitionsSpecification is analogous to NewPartitions in the Java Topic Admin API.
103 type PartitionsSpecification struct {
104 // Topic to create more partitions for.
105 Topic string
106 // New partition count for topic, must be higher than current partition count.
107 IncreaseTo int
108 // (Optional) Explicit replica assignment. The outer array is
109 // indexed by the new partition index (i.e., 0 for the first added
110 // partition), while the inner per-partition array
111 // contains the replica broker ids. The first broker in each
112 // broker id list will be the preferred replica.
113 ReplicaAssignment [][]int32
114 }
115
116 // ResourceType represents an Apache Kafka resource type
117 type ResourceType int
118
119 const (
120 // ResourceUnknown - Unknown
121 ResourceUnknown = ResourceType(C.RD_KAFKA_RESOURCE_UNKNOWN)
122 // ResourceAny - match any resource type (DescribeConfigs)
123 ResourceAny = ResourceType(C.RD_KAFKA_RESOURCE_ANY)
124 // ResourceTopic - Topic
125 ResourceTopic = ResourceType(C.RD_KAFKA_RESOURCE_TOPIC)
126 // ResourceGroup - Group
127 ResourceGroup = ResourceType(C.RD_KAFKA_RESOURCE_GROUP)
128 // ResourceBroker - Broker
129 ResourceBroker = ResourceType(C.RD_KAFKA_RESOURCE_BROKER)
130 )
131
132 // String returns the human-readable representation of a ResourceType
133 func (t ResourceType) String() string {
134 return C.GoString(C.rd_kafka_ResourceType_name(C.rd_kafka_ResourceType_t(t)))
135 }
136
137 // ResourceTypeFromString translates a resource type name/string to
138 // a ResourceType value.
139 func ResourceTypeFromString(typeString string) (ResourceType, error) {
140 switch strings.ToUpper(typeString) {
141 case "ANY":
142 return ResourceAny, nil
143 case "TOPIC":
144 return ResourceTopic, nil
145 case "GROUP":
146 return ResourceGroup, nil
147 case "BROKER":
148 return ResourceBroker, nil
149 default:
150 return ResourceUnknown, newGoError(ErrInvalidArg)
151 }
152 }
153
154 // ConfigSource represents an Apache Kafka config source
155 type ConfigSource int
156
157 const (
158 // ConfigSourceUnknown is the default value
159 ConfigSourceUnknown = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG)
160 // ConfigSourceDynamicTopic is dynamic topic config that is configured for a specific topic
161 ConfigSourceDynamicTopic = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG)
162 // ConfigSourceDynamicBroker is dynamic broker config that is configured for a specific broker
163 ConfigSourceDynamicBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG)
164 // ConfigSourceDynamicDefaultBroker is dynamic broker config that is configured as default for all brokers in the cluster
165 ConfigSourceDynamicDefaultBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG)
166 // ConfigSourceStaticBroker is static broker config provided as broker properties at startup (e.g. from server.properties file)
167 ConfigSourceStaticBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG)
168 // ConfigSourceDefault is built-in default configuration for configs that have a default value
169 ConfigSourceDefault = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG)
170 )
171
172 // String returns the human-readable representation of a ConfigSource type
173 func (t ConfigSource) String() string {
174 return C.GoString(C.rd_kafka_ConfigSource_name(C.rd_kafka_ConfigSource_t(t)))
175 }
176
177 // ConfigResource holds parameters for altering an Apache Kafka configuration resource
178 type ConfigResource struct {
179 // Type of resource to set.
180 Type ResourceType
181 // Name of resource to set.
182 Name string
183 // Config entries to set.
184 // Configuration updates are atomic, any configuration property not provided
185 // here will be reverted (by the broker) to its default value.
186 // Use DescribeConfigs to retrieve the list of current configuration entry values.
187 Config []ConfigEntry
188 }
189
190 // String returns a human-readable representation of a ConfigResource
191 func (c ConfigResource) String() string {
192 return fmt.Sprintf("Resource(%s, %s)", c.Type, c.Name)
193 }
194
195 // AlterOperation specifies the operation to perform on the ConfigEntry.
196 // Currently only AlterOperationSet.
197 type AlterOperation int
198
199 const (
200 // AlterOperationSet sets/overwrites the configuration setting.
201 AlterOperationSet = iota
202 )
203
204 // String returns the human-readable representation of an AlterOperation
205 func (o AlterOperation) String() string {
206 switch o {
207 case AlterOperationSet:
208 return "Set"
209 default:
210 return fmt.Sprintf("Unknown%d?", int(o))
211 }
212 }
213
214 // ConfigEntry holds parameters for altering a resource's configuration.
215 type ConfigEntry struct {
216 // Name of configuration entry, e.g., topic configuration property name.
217 Name string
218 // Value of configuration entry.
219 Value string
220 // Operation to perform on the entry.
221 Operation AlterOperation
222 }
223
224 // StringMapToConfigEntries creates a new map of ConfigEntry objects from the
225 // provided string map. The AlterOperation is set on each created entry.
226 func StringMapToConfigEntries(stringMap map[string]string, operation AlterOperation) []ConfigEntry {
227 var ceList []ConfigEntry
228
229 for k, v := range stringMap {
230 ceList = append(ceList, ConfigEntry{Name: k, Value: v, Operation: operation})
231 }
232
233 return ceList
234 }
235
236 // String returns a human-readable representation of a ConfigEntry.
237 func (c ConfigEntry) String() string {
238 return fmt.Sprintf("%v %s=\"%s\"", c.Operation, c.Name, c.Value)
239 }
240
241 // ConfigEntryResult contains the result of a single configuration entry from a
242 // DescribeConfigs request.
243 type ConfigEntryResult struct {
244 // Name of configuration entry, e.g., topic configuration property name.
245 Name string
246 // Value of configuration entry.
247 Value string
248 // Source indicates the configuration source.
249 Source ConfigSource
250 // IsReadOnly indicates whether the configuration entry can be altered.
251 IsReadOnly bool
252 // IsSensitive indicates whether the configuration entry contains sensitive information, in which case the value will be unset.
253 IsSensitive bool
254 // IsSynonym indicates whether the configuration entry is a synonym for another configuration property.
255 IsSynonym bool
256 // Synonyms contains a map of configuration entries that are synonyms to this configuration entry.
257 Synonyms map[string]ConfigEntryResult
258 }
259
260 // String returns a human-readable representation of a ConfigEntryResult.
261 func (c ConfigEntryResult) String() string {
262 return fmt.Sprintf("%s=\"%s\"", c.Name, c.Value)
263 }
264
265 // setFromC sets up a ConfigEntryResult from a C ConfigEntry
266 func configEntryResultFromC(cEntry *C.rd_kafka_ConfigEntry_t) (entry ConfigEntryResult) {
267 entry.Name = C.GoString(C.rd_kafka_ConfigEntry_name(cEntry))
268 cValue := C.rd_kafka_ConfigEntry_value(cEntry)
269 if cValue != nil {
270 entry.Value = C.GoString(cValue)
271 }
272 entry.Source = ConfigSource(C.rd_kafka_ConfigEntry_source(cEntry))
273 entry.IsReadOnly = cint2bool(C.rd_kafka_ConfigEntry_is_read_only(cEntry))
274 entry.IsSensitive = cint2bool(C.rd_kafka_ConfigEntry_is_sensitive(cEntry))
275 entry.IsSynonym = cint2bool(C.rd_kafka_ConfigEntry_is_synonym(cEntry))
276
277 var cSynCnt C.size_t
278 cSyns := C.rd_kafka_ConfigEntry_synonyms(cEntry, &cSynCnt)
279 if cSynCnt > 0 {
280 entry.Synonyms = make(map[string]ConfigEntryResult)
281 }
282
283 for si := 0; si < int(cSynCnt); si++ {
284 cSyn := C.ConfigEntry_by_idx(cSyns, cSynCnt, C.size_t(si))
285 Syn := configEntryResultFromC(cSyn)
286 entry.Synonyms[Syn.Name] = Syn
287 }
288
289 return entry
290 }
291
292 // ConfigResourceResult provides the result for a resource from a AlterConfigs or
293 // DescribeConfigs request.
294 type ConfigResourceResult struct {
295 // Type of returned result resource.
296 Type ResourceType
297 // Name of returned result resource.
298 Name string
299 // Error, if any, of returned result resource.
300 Error Error
301 // Config entries, if any, of returned result resource.
302 Config map[string]ConfigEntryResult
303 }
304
305 // String returns a human-readable representation of a ConfigResourceResult.
306 func (c ConfigResourceResult) String() string {
307 if c.Error.Code() != 0 {
308 return fmt.Sprintf("ResourceResult(%s, %s, \"%v\")", c.Type, c.Name, c.Error)
309
310 }
311 return fmt.Sprintf("ResourceResult(%s, %s, %d config(s))", c.Type, c.Name, len(c.Config))
312 }
313
314 // waitResult waits for a result event on cQueue or the ctx to be cancelled, whichever happens
315 // first.
316 // The returned result event is checked for errors its error is returned if set.
317 func (a *AdminClient) waitResult(ctx context.Context, cQueue *C.rd_kafka_queue_t, cEventType C.rd_kafka_event_type_t) (rkev *C.rd_kafka_event_t, err error) {
318
319 resultChan := make(chan *C.rd_kafka_event_t)
320 closeChan := make(chan bool) // never written to, just closed
321
322 go func() {
323 for {
324 select {
325 case _, ok := <-closeChan:
326 if !ok {
327 // Context cancelled/timed out
328 close(resultChan)
329 return
330 }
331
332 default:
333 // Wait for result event for at most 50ms
334 // to avoid blocking for too long if
335 // context is cancelled.
336 rkev := C.rd_kafka_queue_poll(cQueue, 50)
337 if rkev != nil {
338 resultChan <- rkev
339 close(resultChan)
340 return
341 }
342 }
343 }
344 }()
345
346 select {
347 case rkev = <-resultChan:
348 // Result type check
349 if cEventType != C.rd_kafka_event_type(rkev) {
350 err = newErrorFromString(ErrInvalidType,
351 fmt.Sprintf("Expected %d result event, not %d", (int)(cEventType), (int)(C.rd_kafka_event_type(rkev))))
352 C.rd_kafka_event_destroy(rkev)
353 return nil, err
354 }
355
356 // Generic error handling
357 cErr := C.rd_kafka_event_error(rkev)
358 if cErr != 0 {
359 err = newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev))
360 C.rd_kafka_event_destroy(rkev)
361 return nil, err
362 }
363 close(closeChan)
364 return rkev, nil
365 case <-ctx.Done():
366 // signal close to go-routine
367 close(closeChan)
368 // wait for close from go-routine to make sure it is done
369 // using cQueue before we return.
370 rkev, ok := <-resultChan
371 if ok {
372 // throw away result since context was cancelled
373 C.rd_kafka_event_destroy(rkev)
374 }
375 return nil, ctx.Err()
376 }
377 }
378
379 // cToTopicResults converts a C topic_result_t array to Go TopicResult list.
380 func (a *AdminClient) cToTopicResults(cTopicRes **C.rd_kafka_topic_result_t, cCnt C.size_t) (result []TopicResult, err error) {
381
382 result = make([]TopicResult, int(cCnt))
383
384 for i := 0; i < int(cCnt); i++ {
385 cTopic := C.topic_result_by_idx(cTopicRes, cCnt, C.size_t(i))
386 result[i].Topic = C.GoString(C.rd_kafka_topic_result_name(cTopic))
387 result[i].Error = newErrorFromCString(
388 C.rd_kafka_topic_result_error(cTopic),
389 C.rd_kafka_topic_result_error_string(cTopic))
390 }
391
392 return result, nil
393 }
394
395 // cConfigResourceToResult converts a C ConfigResource result array to Go ConfigResourceResult
396 func (a *AdminClient) cConfigResourceToResult(cRes **C.rd_kafka_ConfigResource_t, cCnt C.size_t) (result []ConfigResourceResult, err error) {
397
398 result = make([]ConfigResourceResult, int(cCnt))
399
400 for i := 0; i < int(cCnt); i++ {
401 cRes := C.ConfigResource_by_idx(cRes, cCnt, C.size_t(i))
402 result[i].Type = ResourceType(C.rd_kafka_ConfigResource_type(cRes))
403 result[i].Name = C.GoString(C.rd_kafka_ConfigResource_name(cRes))
404 result[i].Error = newErrorFromCString(
405 C.rd_kafka_ConfigResource_error(cRes),
406 C.rd_kafka_ConfigResource_error_string(cRes))
407 var cConfigCnt C.size_t
408 cConfigs := C.rd_kafka_ConfigResource_configs(cRes, &cConfigCnt)
409 if cConfigCnt > 0 {
410 result[i].Config = make(map[string]ConfigEntryResult)
411 }
412 for ci := 0; ci < int(cConfigCnt); ci++ {
413 cEntry := C.ConfigEntry_by_idx(cConfigs, cConfigCnt, C.size_t(ci))
414 entry := configEntryResultFromC(cEntry)
415 result[i].Config[entry.Name] = entry
416 }
417 }
418
419 return result, nil
420 }
421
422 // CreateTopics creates topics in cluster.
423 //
424 // The list of TopicSpecification objects define the per-topic partition count, replicas, etc.
425 //
426 // Topic creation is non-atomic and may succeed for some topics but fail for others,
427 // make sure to check the result for topic-specific errors.
428 //
429 // Note: TopicSpecification is analogous to NewTopic in the Java Topic Admin API.
430 func (a *AdminClient) CreateTopics(ctx context.Context, topics []TopicSpecification, options ...CreateTopicsAdminOption) (result []TopicResult, err error) {
431 cTopics := make([]*C.rd_kafka_NewTopic_t, len(topics))
432
433 cErrstrSize := C.size_t(512)
434 cErrstr := (*C.char)(C.malloc(cErrstrSize))
435 defer C.free(unsafe.Pointer(cErrstr))
436
437 // Convert Go TopicSpecifications to C TopicSpecifications
438 for i, topic := range topics {
439
440 var cReplicationFactor C.int
441 if topic.ReplicationFactor == 0 {
442 cReplicationFactor = -1
443 } else {
444 cReplicationFactor = C.int(topic.ReplicationFactor)
445 }
446 if topic.ReplicaAssignment != nil {
447 if cReplicationFactor != -1 {
448 return nil, newErrorFromString(ErrInvalidArg,
449 "TopicSpecification.ReplicationFactor and TopicSpecification.ReplicaAssignment are mutually exclusive")
450 }
451
452 if len(topic.ReplicaAssignment) != topic.NumPartitions {
453 return nil, newErrorFromString(ErrInvalidArg,
454 "TopicSpecification.ReplicaAssignment must contain exactly TopicSpecification.NumPartitions partitions")
455 }
456
457 } else if cReplicationFactor == -1 {
458 return nil, newErrorFromString(ErrInvalidArg,
459 "TopicSpecification.ReplicationFactor or TopicSpecification.ReplicaAssignment must be specified")
460 }
461
462 cTopics[i] = C.rd_kafka_NewTopic_new(
463 C.CString(topic.Topic),
464 C.int(topic.NumPartitions),
465 cReplicationFactor,
466 cErrstr, cErrstrSize)
467 if cTopics[i] == nil {
468 return nil, newErrorFromString(ErrInvalidArg,
469 fmt.Sprintf("Topic %s: %s", topic.Topic, C.GoString(cErrstr)))
470 }
471
472 defer C.rd_kafka_NewTopic_destroy(cTopics[i])
473
474 for p, replicas := range topic.ReplicaAssignment {
475 cReplicas := make([]C.int32_t, len(replicas))
476 for ri, replica := range replicas {
477 cReplicas[ri] = C.int32_t(replica)
478 }
479 cErr := C.rd_kafka_NewTopic_set_replica_assignment(
480 cTopics[i], C.int32_t(p),
481 (*C.int32_t)(&cReplicas[0]), C.size_t(len(cReplicas)),
482 cErrstr, cErrstrSize)
483 if cErr != 0 {
484 return nil, newCErrorFromString(cErr,
485 fmt.Sprintf("Failed to set replica assignment for topic %s partition %d: %s", topic.Topic, p, C.GoString(cErrstr)))
486 }
487 }
488
489 for key, value := range topic.Config {
490 cErr := C.rd_kafka_NewTopic_set_config(
491 cTopics[i],
492 C.CString(key), C.CString(value))
493 if cErr != 0 {
494 return nil, newCErrorFromString(cErr,
495 fmt.Sprintf("Failed to set config %s=%s for topic %s", key, value, topic.Topic))
496 }
497 }
498 }
499
500 // Convert Go AdminOptions (if any) to C AdminOptions
501 genericOptions := make([]AdminOption, len(options))
502 for i := range options {
503 genericOptions[i] = options[i]
504 }
505 cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_CREATETOPICS, genericOptions)
506 if err != nil {
507 return nil, err
508 }
509 defer C.rd_kafka_AdminOptions_destroy(cOptions)
510
511 // Create temporary queue for async operation
512 cQueue := C.rd_kafka_queue_new(a.handle.rk)
513 defer C.rd_kafka_queue_destroy(cQueue)
514
515 // Asynchronous call
516 C.rd_kafka_CreateTopics(
517 a.handle.rk,
518 (**C.rd_kafka_NewTopic_t)(&cTopics[0]),
519 C.size_t(len(cTopics)),
520 cOptions,
521 cQueue)
522
523 // Wait for result, error or context timeout
524 rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_CREATETOPICS_RESULT)
525 if err != nil {
526 return nil, err
527 }
528 defer C.rd_kafka_event_destroy(rkev)
529
530 cRes := C.rd_kafka_event_CreateTopics_result(rkev)
531
532 // Convert result from C to Go
533 var cCnt C.size_t
534 cTopicRes := C.rd_kafka_CreateTopics_result_topics(cRes, &cCnt)
535
536 return a.cToTopicResults(cTopicRes, cCnt)
537 }
538
539 // DeleteTopics deletes a batch of topics.
540 //
541 // This operation is not transactional and may succeed for a subset of topics while
542 // failing others.
543 // It may take several seconds after the DeleteTopics result returns success for
544 // all the brokers to become aware that the topics are gone. During this time,
545 // topic metadata and configuration may continue to return information about deleted topics.
546 //
547 // Requires broker version >= 0.10.1.0
548 func (a *AdminClient) DeleteTopics(ctx context.Context, topics []string, options ...DeleteTopicsAdminOption) (result []TopicResult, err error) {
549 cTopics := make([]*C.rd_kafka_DeleteTopic_t, len(topics))
550
551 cErrstrSize := C.size_t(512)
552 cErrstr := (*C.char)(C.malloc(cErrstrSize))
553 defer C.free(unsafe.Pointer(cErrstr))
554
555 // Convert Go DeleteTopics to C DeleteTopics
556 for i, topic := range topics {
557 cTopics[i] = C.rd_kafka_DeleteTopic_new(C.CString(topic))
558 if cTopics[i] == nil {
559 return nil, newErrorFromString(ErrInvalidArg,
560 fmt.Sprintf("Invalid arguments for topic %s", topic))
561 }
562
563 defer C.rd_kafka_DeleteTopic_destroy(cTopics[i])
564 }
565
566 // Convert Go AdminOptions (if any) to C AdminOptions
567 genericOptions := make([]AdminOption, len(options))
568 for i := range options {
569 genericOptions[i] = options[i]
570 }
571 cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_DELETETOPICS, genericOptions)
572 if err != nil {
573 return nil, err
574 }
575 defer C.rd_kafka_AdminOptions_destroy(cOptions)
576
577 // Create temporary queue for async operation
578 cQueue := C.rd_kafka_queue_new(a.handle.rk)
579 defer C.rd_kafka_queue_destroy(cQueue)
580
581 // Asynchronous call
582 C.rd_kafka_DeleteTopics(
583 a.handle.rk,
584 (**C.rd_kafka_DeleteTopic_t)(&cTopics[0]),
585 C.size_t(len(cTopics)),
586 cOptions,
587 cQueue)
588
589 // Wait for result, error or context timeout
590 rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_DELETETOPICS_RESULT)
591 if err != nil {
592 return nil, err
593 }
594 defer C.rd_kafka_event_destroy(rkev)
595
596 cRes := C.rd_kafka_event_DeleteTopics_result(rkev)
597
598 // Convert result from C to Go
599 var cCnt C.size_t
600 cTopicRes := C.rd_kafka_DeleteTopics_result_topics(cRes, &cCnt)
601
602 return a.cToTopicResults(cTopicRes, cCnt)
603 }
604
605 // CreatePartitions creates additional partitions for topics.
606 func (a *AdminClient) CreatePartitions(ctx context.Context, partitions []PartitionsSpecification, options ...CreatePartitionsAdminOption) (result []TopicResult, err error) {
607 cParts := make([]*C.rd_kafka_NewPartitions_t, len(partitions))
608
609 cErrstrSize := C.size_t(512)
610 cErrstr := (*C.char)(C.malloc(cErrstrSize))
611 defer C.free(unsafe.Pointer(cErrstr))
612
613 // Convert Go PartitionsSpecification to C NewPartitions
614 for i, part := range partitions {
615 cParts[i] = C.rd_kafka_NewPartitions_new(C.CString(part.Topic), C.size_t(part.IncreaseTo), cErrstr, cErrstrSize)
616 if cParts[i] == nil {
617 return nil, newErrorFromString(ErrInvalidArg,
618 fmt.Sprintf("Topic %s: %s", part.Topic, C.GoString(cErrstr)))
619 }
620
621 defer C.rd_kafka_NewPartitions_destroy(cParts[i])
622
623 for pidx, replicas := range part.ReplicaAssignment {
624 cReplicas := make([]C.int32_t, len(replicas))
625 for ri, replica := range replicas {
626 cReplicas[ri] = C.int32_t(replica)
627 }
628 cErr := C.rd_kafka_NewPartitions_set_replica_assignment(
629 cParts[i], C.int32_t(pidx),
630 (*C.int32_t)(&cReplicas[0]), C.size_t(len(cReplicas)),
631 cErrstr, cErrstrSize)
632 if cErr != 0 {
633 return nil, newCErrorFromString(cErr,
634 fmt.Sprintf("Failed to set replica assignment for topic %s new partition index %d: %s", part.Topic, pidx, C.GoString(cErrstr)))
635 }
636 }
637
638 }
639
640 // Convert Go AdminOptions (if any) to C AdminOptions
641 genericOptions := make([]AdminOption, len(options))
642 for i := range options {
643 genericOptions[i] = options[i]
644 }
645 cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_CREATEPARTITIONS, genericOptions)
646 if err != nil {
647 return nil, err
648 }
649 defer C.rd_kafka_AdminOptions_destroy(cOptions)
650
651 // Create temporary queue for async operation
652 cQueue := C.rd_kafka_queue_new(a.handle.rk)
653 defer C.rd_kafka_queue_destroy(cQueue)
654
655 // Asynchronous call
656 C.rd_kafka_CreatePartitions(
657 a.handle.rk,
658 (**C.rd_kafka_NewPartitions_t)(&cParts[0]),
659 C.size_t(len(cParts)),
660 cOptions,
661 cQueue)
662
663 // Wait for result, error or context timeout
664 rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT)
665 if err != nil {
666 return nil, err
667 }
668 defer C.rd_kafka_event_destroy(rkev)
669
670 cRes := C.rd_kafka_event_CreatePartitions_result(rkev)
671
672 // Convert result from C to Go
673 var cCnt C.size_t
674 cTopicRes := C.rd_kafka_CreatePartitions_result_topics(cRes, &cCnt)
675
676 return a.cToTopicResults(cTopicRes, cCnt)
677 }
678
679 // AlterConfigs alters/updates cluster resource configuration.
680 //
681 // Updates are not transactional so they may succeed for a subset
682 // of the provided resources while others fail.
683 // The configuration for a particular resource is updated atomically,
684 // replacing values using the provided ConfigEntrys and reverting
685 // unspecified ConfigEntrys to their default values.
686 //
687 // Requires broker version >=0.11.0.0
688 //
689 // AlterConfigs will replace all existing configuration for
690 // the provided resources with the new configuration given,
691 // reverting all other configuration to their default values.
692 //
693 // Multiple resources and resource types may be set, but at most one
694 // resource of type ResourceBroker is allowed per call since these
695 // resource requests must be sent to the broker specified in the resource.
696 func (a *AdminClient) AlterConfigs(ctx context.Context, resources []ConfigResource, options ...AlterConfigsAdminOption) (result []ConfigResourceResult, err error) {
697 cRes := make([]*C.rd_kafka_ConfigResource_t, len(resources))
698
699 cErrstrSize := C.size_t(512)
700 cErrstr := (*C.char)(C.malloc(cErrstrSize))
701 defer C.free(unsafe.Pointer(cErrstr))
702
703 // Convert Go ConfigResources to C ConfigResources
704 for i, res := range resources {
705 cRes[i] = C.rd_kafka_ConfigResource_new(
706 C.rd_kafka_ResourceType_t(res.Type), C.CString(res.Name))
707 if cRes[i] == nil {
708 return nil, newErrorFromString(ErrInvalidArg,
709 fmt.Sprintf("Invalid arguments for resource %v", res))
710 }
711
712 defer C.rd_kafka_ConfigResource_destroy(cRes[i])
713
714 for _, entry := range res.Config {
715 var cErr C.rd_kafka_resp_err_t
716 switch entry.Operation {
717 case AlterOperationSet:
718 cErr = C.rd_kafka_ConfigResource_set_config(
719 cRes[i], C.CString(entry.Name), C.CString(entry.Value))
720 default:
721 panic(fmt.Sprintf("Invalid ConfigEntry.Operation: %v", entry.Operation))
722 }
723
724 if cErr != 0 {
725 return nil,
726 newCErrorFromString(cErr,
727 fmt.Sprintf("Failed to add configuration %s: %s",
728 entry, C.GoString(C.rd_kafka_err2str(cErr))))
729 }
730 }
731 }
732
733 // Convert Go AdminOptions (if any) to C AdminOptions
734 genericOptions := make([]AdminOption, len(options))
735 for i := range options {
736 genericOptions[i] = options[i]
737 }
738 cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_ALTERCONFIGS, genericOptions)
739 if err != nil {
740 return nil, err
741 }
742 defer C.rd_kafka_AdminOptions_destroy(cOptions)
743
744 // Create temporary queue for async operation
745 cQueue := C.rd_kafka_queue_new(a.handle.rk)
746 defer C.rd_kafka_queue_destroy(cQueue)
747
748 // Asynchronous call
749 C.rd_kafka_AlterConfigs(
750 a.handle.rk,
751 (**C.rd_kafka_ConfigResource_t)(&cRes[0]),
752 C.size_t(len(cRes)),
753 cOptions,
754 cQueue)
755
756 // Wait for result, error or context timeout
757 rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_ALTERCONFIGS_RESULT)
758 if err != nil {
759 return nil, err
760 }
761 defer C.rd_kafka_event_destroy(rkev)
762
763 cResult := C.rd_kafka_event_AlterConfigs_result(rkev)
764
765 // Convert results from C to Go
766 var cCnt C.size_t
767 cResults := C.rd_kafka_AlterConfigs_result_resources(cResult, &cCnt)
768
769 return a.cConfigResourceToResult(cResults, cCnt)
770 }
771
772 // DescribeConfigs retrieves configuration for cluster resources.
773 //
774 // The returned configuration includes default values, use
775 // ConfigEntryResult.IsDefault or ConfigEntryResult.Source to distinguish
776 // default values from manually configured settings.
777 //
778 // The value of config entries where .IsSensitive is true
779 // will always be nil to avoid disclosing sensitive
780 // information, such as security settings.
781 //
782 // Configuration entries where .IsReadOnly is true can't be modified
783 // (with AlterConfigs).
784 //
785 // Synonym configuration entries are returned if the broker supports
786 // it (broker version >= 1.1.0). See .Synonyms.
787 //
788 // Requires broker version >=0.11.0.0
789 //
790 // Multiple resources and resource types may be requested, but at most
791 // one resource of type ResourceBroker is allowed per call
792 // since these resource requests must be sent to the broker specified
793 // in the resource.
794 func (a *AdminClient) DescribeConfigs(ctx context.Context, resources []ConfigResource, options ...DescribeConfigsAdminOption) (result []ConfigResourceResult, err error) {
795 cRes := make([]*C.rd_kafka_ConfigResource_t, len(resources))
796
797 cErrstrSize := C.size_t(512)
798 cErrstr := (*C.char)(C.malloc(cErrstrSize))
799 defer C.free(unsafe.Pointer(cErrstr))
800
801 // Convert Go ConfigResources to C ConfigResources
802 for i, res := range resources {
803 cRes[i] = C.rd_kafka_ConfigResource_new(
804 C.rd_kafka_ResourceType_t(res.Type), C.CString(res.Name))
805 if cRes[i] == nil {
806 return nil, newErrorFromString(ErrInvalidArg,
807 fmt.Sprintf("Invalid arguments for resource %v", res))
808 }
809
810 defer C.rd_kafka_ConfigResource_destroy(cRes[i])
811 }
812
813 // Convert Go AdminOptions (if any) to C AdminOptions
814 genericOptions := make([]AdminOption, len(options))
815 for i := range options {
816 genericOptions[i] = options[i]
817 }
818 cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS, genericOptions)
819 if err != nil {
820 return nil, err
821 }
822 defer C.rd_kafka_AdminOptions_destroy(cOptions)
823
824 // Create temporary queue for async operation
825 cQueue := C.rd_kafka_queue_new(a.handle.rk)
826 defer C.rd_kafka_queue_destroy(cQueue)
827
828 // Asynchronous call
829 C.rd_kafka_DescribeConfigs(
830 a.handle.rk,
831 (**C.rd_kafka_ConfigResource_t)(&cRes[0]),
832 C.size_t(len(cRes)),
833 cOptions,
834 cQueue)
835
836 // Wait for result, error or context timeout
837 rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT)
838 if err != nil {
839 return nil, err
840 }
841 defer C.rd_kafka_event_destroy(rkev)
842
843 cResult := C.rd_kafka_event_DescribeConfigs_result(rkev)
844
845 // Convert results from C to Go
846 var cCnt C.size_t
847 cResults := C.rd_kafka_DescribeConfigs_result_resources(cResult, &cCnt)
848
849 return a.cConfigResourceToResult(cResults, cCnt)
850 }
851
852 // GetMetadata queries broker for cluster and topic metadata.
853 // If topic is non-nil only information about that topic is returned, else if
854 // allTopics is false only information about locally used topics is returned,
855 // else information about all topics is returned.
856 // GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.
857 func (a *AdminClient) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) {
858 return getMetadata(a, topic, allTopics, timeoutMs)
859 }
860
861 // String returns a human readable name for an AdminClient instance
862 func (a *AdminClient) String() string {
863 return fmt.Sprintf("admin-%s", a.handle.String())
864 }
865
866 // get_handle implements the Handle interface
867 func (a *AdminClient) gethandle() *handle {
868 return a.handle
869 }
870
871 // Close an AdminClient instance.
872 func (a *AdminClient) Close() {
873 if a.isDerived {
874 // Derived AdminClient needs no cleanup.
875 a.handle = &handle{}
876 return
877 }
878
879 a.handle.cleanup()
880
881 C.rd_kafka_destroy(a.handle.rk)
882 }
883
884 // NewAdminClient creats a new AdminClient instance with a new underlying client instance
885 func NewAdminClient(conf *ConfigMap) (*AdminClient, error) {
886
887 err := versionCheck()
888 if err != nil {
889 return nil, err
890 }
891
892 a := &AdminClient{}
893 a.handle = &handle{}
894
895 // Convert ConfigMap to librdkafka conf_t
896 cConf, err := conf.convert()
897 if err != nil {
898 return nil, err
899 }
900
901 cErrstr := (*C.char)(C.malloc(C.size_t(256)))
902 defer C.free(unsafe.Pointer(cErrstr))
903
904 // Create librdkafka producer instance. The Producer is somewhat cheaper than
905 // the consumer, but any instance type can be used for Admin APIs.
906 a.handle.rk = C.rd_kafka_new(C.RD_KAFKA_PRODUCER, cConf, cErrstr, 256)
907 if a.handle.rk == nil {
908 return nil, newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr)
909 }
910
911 a.isDerived = false
912 a.handle.setup()
913
914 return a, nil
915 }
916
917 // NewAdminClientFromProducer derives a new AdminClient from an existing Producer instance.
918 // The AdminClient will use the same configuration and connections as the parent instance.
919 func NewAdminClientFromProducer(p *Producer) (a *AdminClient, err error) {
920 if p.handle.rk == nil {
921 return nil, newErrorFromString(ErrInvalidArg, "Can't derive AdminClient from closed producer")
922 }
923
924 a = &AdminClient{}
925 a.handle = &p.handle
926 a.isDerived = true
927 return a, nil
928 }
929
930 // NewAdminClientFromConsumer derives a new AdminClient from an existing Consumer instance.
931 // The AdminClient will use the same configuration and connections as the parent instance.
932 func NewAdminClientFromConsumer(c *Consumer) (a *AdminClient, err error) {
933 if c.handle.rk == nil {
934 return nil, newErrorFromString(ErrInvalidArg, "Can't derive AdminClient from closed consumer")
935 }
936
937 a = &AdminClient{}
938 a.handle = &c.handle
939 a.isDerived = true
940 return a, nil
941 }
0 /**
1 * Copyright 2018 Confluent Inc.
2 *
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 package kafka
17
18 import (
19 "context"
20 "strings"
21 "testing"
22 "time"
23 )
24
25 func testAdminAPIs(what string, a *AdminClient, t *testing.T) {
26 t.Logf("AdminClient API testing on %s: %s", a, what)
27
28 expDuration, err := time.ParseDuration("0.1s")
29 if err != nil {
30 t.Fatalf("%s", err)
31 }
32
33 confStrings := map[string]string{
34 "some.topic.config": "unchecked",
35 "these.are.verified": "on the broker",
36 "and.this.is": "just",
37 "a": "unit test"}
38
39 // Correct input, fail with timeout
40 ctx, cancel := context.WithTimeout(context.Background(), expDuration)
41 defer cancel()
42 res, err := a.CreateTopics(
43 ctx,
44 []TopicSpecification{
45 {
46 Topic: "mytopic",
47 NumPartitions: 7,
48 ReplicationFactor: 3,
49 },
50 {
51 Topic: "mytopic2",
52 NumPartitions: 2,
53 ReplicaAssignment: [][]int32{
54 []int32{1, 2, 3},
55 []int32{3, 2, 1},
56 },
57 },
58 {
59 Topic: "mytopic3",
60 NumPartitions: 10000,
61 ReplicationFactor: 90,
62 Config: confStrings,
63 },
64 })
65 if res != nil || err == nil {
66 t.Fatalf("Expected CreateTopics to fail, but got result: %v, err: %v", res, err)
67 }
68 if ctx.Err() != context.DeadlineExceeded {
69 t.Fatalf("Expected DeadlineExceeded, not %v, %v", ctx.Err(), err)
70 }
71
72 // Incorrect input, fail with ErrInvalidArg
73 ctx, cancel = context.WithTimeout(context.Background(), expDuration)
74 defer cancel()
75 res, err = a.CreateTopics(
76 ctx,
77 []TopicSpecification{
78 {
79 // Must not specify both ReplicationFactor and ReplicaAssignment
80 Topic: "mytopic",
81 NumPartitions: 2,
82 ReplicationFactor: 3,
83 ReplicaAssignment: [][]int32{
84 []int32{1, 2, 3},
85 []int32{3, 2, 1},
86 },
87 },
88 })
89 if res != nil || err == nil {
90 t.Fatalf("Expected CreateTopics to fail, but got result: %v, err: %v", res, err)
91 }
92 if ctx.Err() != nil {
93 t.Fatalf("Did not expect context to fail: %v", ctx.Err())
94 }
95 if err.(Error).Code() != ErrInvalidArg {
96 t.Fatalf("Expected ErrInvalidArg, not %v", err)
97 }
98
99 // Incorrect input, fail with ErrInvalidArg
100 ctx, cancel = context.WithTimeout(context.Background(), expDuration)
101 defer cancel()
102 res, err = a.CreateTopics(
103 ctx,
104 []TopicSpecification{
105 {
106 // ReplicaAssignment must be same length as Numpartitions
107 Topic: "mytopic",
108 NumPartitions: 7,
109 ReplicaAssignment: [][]int32{
110 []int32{1, 2, 3},
111 []int32{3, 2, 1},
112 },
113 },
114 })
115 if res != nil || err == nil {
116 t.Fatalf("Expected CreateTopics to fail, but got result: %v, err: %v", res, err)
117 }
118 if ctx.Err() != nil {
119 t.Fatalf("Did not expect context to fail: %v", ctx.Err())
120 }
121 if err.(Error).Code() != ErrInvalidArg {
122 t.Fatalf("Expected ErrInvalidArg, not %v", err)
123 }
124
125 // Correct input, using options
126 ctx, cancel = context.WithTimeout(context.Background(), expDuration)
127 defer cancel()
128 res, err = a.CreateTopics(
129 ctx,
130 []TopicSpecification{
131 {
132 Topic: "mytopic4",
133 NumPartitions: 9,
134 ReplicaAssignment: [][]int32{
135 []int32{1},
136 []int32{2},
137 []int32{3},
138 []int32{4},
139 []int32{1},
140 []int32{2},
141 []int32{3},
142 []int32{4},
143 []int32{1},
144 },
145 Config: map[string]string{
146 "some.topic.config": "unchecked",
147 "these.are.verified": "on the broker",
148 "and.this.is": "just",
149 "a": "unit test",
150 },
151 },
152 },
153 SetAdminValidateOnly(false))
154 if res != nil || err == nil {
155 t.Fatalf("Expected CreateTopics to fail, but got result: %v, err: %v", res, err)
156 }
157 if ctx.Err() != context.DeadlineExceeded {
158 t.Fatalf("Expected DeadlineExceeded, not %v", ctx.Err())
159 }
160
161 //
162 // Remaining APIs
163 // Timeout code is identical for all APIs, no need to test
164 // them for each API.
165 //
166
167 ctx, cancel = context.WithTimeout(context.Background(), expDuration)
168 defer cancel()
169 res, err = a.CreatePartitions(
170 ctx,
171 []PartitionsSpecification{
172 {
173 Topic: "topic",
174 IncreaseTo: 19,
175 ReplicaAssignment: [][]int32{
176 []int32{3234522},
177 []int32{99999},
178 },
179 },
180 {
181 Topic: "topic2",
182 IncreaseTo: 2,
183 ReplicaAssignment: [][]int32{
184 []int32{99999},
185 },
186 },
187 })
188 if res != nil || err == nil {
189 t.Fatalf("Expected CreatePartitions to fail, but got result: %v, err: %v", res, err)
190 }
191 if ctx.Err() != context.DeadlineExceeded {
192 t.Fatalf("Expected DeadlineExceeded, not %v", ctx.Err())
193 }
194
195 ctx, cancel = context.WithTimeout(context.Background(), expDuration)
196 defer cancel()
197 res, err = a.DeleteTopics(
198 ctx,
199 []string{"topic1", "topic2"})
200 if res != nil || err == nil {
201 t.Fatalf("Expected DeleteTopics to fail, but got result: %v, err: %v", res, err)
202 }
203 if ctx.Err() != context.DeadlineExceeded {
204 t.Fatalf("Expected DeadlineExceeded, not %v for error %v", ctx.Err(), err)
205 }
206
207 ctx, cancel = context.WithTimeout(context.Background(), expDuration)
208 defer cancel()
209 cres, err := a.AlterConfigs(
210 ctx,
211 []ConfigResource{{Type: ResourceTopic, Name: "topic"}})
212 if cres != nil || err == nil {
213 t.Fatalf("Expected AlterConfigs to fail, but got result: %v, err: %v", cres, err)
214 }
215 if ctx.Err() != context.DeadlineExceeded {
216 t.Fatalf("Expected DeadlineExceeded, not %v", ctx.Err())
217 }
218
219 ctx, cancel = context.WithTimeout(context.Background(), expDuration)
220 defer cancel()
221 cres, err = a.DescribeConfigs(
222 ctx,
223 []ConfigResource{{Type: ResourceTopic, Name: "topic"}})
224 if cres != nil || err == nil {
225 t.Fatalf("Expected DescribeConfigs to fail, but got result: %v, err: %v", cres, err)
226 }
227 if ctx.Err() != context.DeadlineExceeded {
228 t.Fatalf("Expected DeadlineExceeded, not %v", ctx.Err())
229 }
230 }
231
232 // TestAdminAPIs dry-tests most Admin APIs, no broker is needed.
233 func TestAdminAPIs(t *testing.T) {
234
235 a, err := NewAdminClient(&ConfigMap{})
236 if err != nil {
237 t.Fatalf("%s", err)
238 }
239
240 testAdminAPIs("Non-derived, no config", a, t)
241 a.Close()
242
243 a, err = NewAdminClient(&ConfigMap{"retries": 1234})
244 if err != nil {
245 t.Fatalf("%s", err)
246 }
247
248 testAdminAPIs("Non-derived, config", a, t)
249 a.Close()
250
251 // Test derived clients
252 c, err := NewConsumer(&ConfigMap{"group.id": "test"})
253 if err != nil {
254 t.Fatalf("%s", err)
255 }
256 defer c.Close()
257
258 a, err = NewAdminClientFromConsumer(c)
259 if err != nil {
260 t.Fatalf("%s", err)
261 }
262
263 if !strings.Contains(a.String(), c.String()) {
264 t.Fatalf("Expected derived client %s to have similar name to parent %s", a, c)
265 }
266
267 testAdminAPIs("Derived from consumer", a, t)
268 a.Close()
269
270 a, err = NewAdminClientFromConsumer(c)
271 if err != nil {
272 t.Fatalf("%s", err)
273 }
274
275 if !strings.Contains(a.String(), c.String()) {
276 t.Fatalf("Expected derived client %s to have similar name to parent %s", a, c)
277 }
278
279 testAdminAPIs("Derived from same consumer", a, t)
280 a.Close()
281
282 p, err := NewProducer(&ConfigMap{})
283 if err != nil {
284 t.Fatalf("%s", err)
285 }
286 defer p.Close()
287
288 a, err = NewAdminClientFromProducer(p)
289 if err != nil {
290 t.Fatalf("%s", err)
291 }
292
293 if !strings.Contains(a.String(), p.String()) {
294 t.Fatalf("Expected derived client %s to have similar name to parent %s", a, p)
295 }
296
297 testAdminAPIs("Derived from Producer", a, t)
298 a.Close()
299
300 a, err = NewAdminClientFromProducer(p)
301 if err != nil {
302 t.Fatalf("%s", err)
303 }
304
305 if !strings.Contains(a.String(), p.String()) {
306 t.Fatalf("Expected derived client %s to have similar name to parent %s", a, p)
307 }
308
309 testAdminAPIs("Derived from same Producer", a, t)
310 a.Close()
311 }
0 /**
1 * Copyright 2018 Confluent Inc.
2 *
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 package kafka
17
18 import (
19 "fmt"
20 "time"
21 "unsafe"
22 )
23
24 /*
25 #include <librdkafka/rdkafka.h>
26 #include <stdlib.h>
27 */
28 import "C"
29
30 // AdminOptionOperationTimeout sets the broker's operation timeout, such as the
31 // timeout for CreateTopics to complete the creation of topics on the controller
32 // before returning a result to the application.
33 //
34 // CreateTopics, DeleteTopics, CreatePartitions:
35 // a value 0 will return immediately after triggering topic
36 // creation, while > 0 will wait this long for topic creation to propagate
37 // in cluster.
38 //
39 // Default: 0 (return immediately).
40 //
41 // Valid for CreateTopics, DeleteTopics, CreatePartitions.
42 type AdminOptionOperationTimeout struct {
43 isSet bool
44 val time.Duration
45 }
46
47 func (ao AdminOptionOperationTimeout) supportsCreateTopics() {
48 }
49 func (ao AdminOptionOperationTimeout) supportsDeleteTopics() {
50 }
51 func (ao AdminOptionOperationTimeout) supportsCreatePartitions() {
52 }
53
54 func (ao AdminOptionOperationTimeout) apply(cOptions *C.rd_kafka_AdminOptions_t) error {
55 if !ao.isSet {
56 return nil
57 }
58
59 cErrstrSize := C.size_t(512)
60 cErrstr := (*C.char)(C.malloc(cErrstrSize))
61 defer C.free(unsafe.Pointer(cErrstr))
62
63 cErr := C.rd_kafka_AdminOptions_set_operation_timeout(
64 cOptions, C.int(durationToMilliseconds(ao.val)),
65 cErrstr, cErrstrSize)
66 if cErr != 0 {
67 C.rd_kafka_AdminOptions_destroy(cOptions)
68 return newCErrorFromString(cErr,
69 fmt.Sprintf("Failed to set operation timeout: %s", C.GoString(cErrstr)))
70
71 }
72
73 return nil
74 }
75
76 // SetAdminOperationTimeout sets the broker's operation timeout, such as the
77 // timeout for CreateTopics to complete the creation of topics on the controller
78 // before returning a result to the application.
79 //
80 // CreateTopics, DeleteTopics, CreatePartitions:
81 // a value 0 will return immediately after triggering topic
82 // creation, while > 0 will wait this long for topic creation to propagate
83 // in cluster.
84 //
85 // Default: 0 (return immediately).
86 //
87 // Valid for CreateTopics, DeleteTopics, CreatePartitions.
88 func SetAdminOperationTimeout(t time.Duration) (ao AdminOptionOperationTimeout) {
89 ao.isSet = true
90 ao.val = t
91 return ao
92 }
93
94 // AdminOptionRequestTimeout sets the overall request timeout, including broker
95 // lookup, request transmission, operation time on broker, and response.
96 //
97 // Default: `socket.timeout.ms`.
98 //
99 // Valid for all Admin API methods.
100 type AdminOptionRequestTimeout struct {
101 isSet bool
102 val time.Duration
103 }
104
105 func (ao AdminOptionRequestTimeout) supportsCreateTopics() {
106 }
107 func (ao AdminOptionRequestTimeout) supportsDeleteTopics() {
108 }
109 func (ao AdminOptionRequestTimeout) supportsCreatePartitions() {
110 }
111 func (ao AdminOptionRequestTimeout) supportsAlterConfigs() {
112 }
113 func (ao AdminOptionRequestTimeout) supportsDescribeConfigs() {
114 }
115
116 func (ao AdminOptionRequestTimeout) apply(cOptions *C.rd_kafka_AdminOptions_t) error {
117 if !ao.isSet {
118 return nil
119 }
120
121 cErrstrSize := C.size_t(512)
122 cErrstr := (*C.char)(C.malloc(cErrstrSize))
123 defer C.free(unsafe.Pointer(cErrstr))
124
125 cErr := C.rd_kafka_AdminOptions_set_request_timeout(
126 cOptions, C.int(durationToMilliseconds(ao.val)),
127 cErrstr, cErrstrSize)
128 if cErr != 0 {
129 C.rd_kafka_AdminOptions_destroy(cOptions)
130 return newCErrorFromString(cErr,
131 fmt.Sprintf("%s", C.GoString(cErrstr)))
132
133 }
134
135 return nil
136 }
137
138 // SetAdminRequestTimeout sets the overall request timeout, including broker
139 // lookup, request transmission, operation time on broker, and response.
140 //
141 // Default: `socket.timeout.ms`.
142 //
143 // Valid for all Admin API methods.
144 func SetAdminRequestTimeout(t time.Duration) (ao AdminOptionRequestTimeout) {
145 ao.isSet = true
146 ao.val = t
147 return ao
148 }
149
150 // AdminOptionValidateOnly tells the broker to only validate the request,
151 // without performing the requested operation (create topics, etc).
152 //
153 // Default: false.
154 //
155 // Valid for CreateTopics, CreatePartitions, AlterConfigs
156 type AdminOptionValidateOnly struct {
157 isSet bool
158 val bool
159 }
160
161 func (ao AdminOptionValidateOnly) supportsCreateTopics() {
162 }
163 func (ao AdminOptionValidateOnly) supportsCreatePartitions() {
164 }
165 func (ao AdminOptionValidateOnly) supportsAlterConfigs() {
166 }
167
168 func (ao AdminOptionValidateOnly) apply(cOptions *C.rd_kafka_AdminOptions_t) error {
169 if !ao.isSet {
170 return nil
171 }
172
173 cErrstrSize := C.size_t(512)
174 cErrstr := (*C.char)(C.malloc(cErrstrSize))
175 defer C.free(unsafe.Pointer(cErrstr))
176
177 cErr := C.rd_kafka_AdminOptions_set_validate_only(
178 cOptions, bool2cint(ao.val),
179 cErrstr, cErrstrSize)
180 if cErr != 0 {
181 C.rd_kafka_AdminOptions_destroy(cOptions)
182 return newCErrorFromString(cErr,
183 fmt.Sprintf("%s", C.GoString(cErrstr)))
184
185 }
186
187 return nil
188 }
189
190 // SetAdminValidateOnly tells the broker to only validate the request,
191 // without performing the requested operation (create topics, etc).
192 //
193 // Default: false.
194 //
195 // Valid for CreateTopics, DeleteTopics, CreatePartitions, AlterConfigs
196 func SetAdminValidateOnly(validateOnly bool) (ao AdminOptionValidateOnly) {
197 ao.isSet = true
198 ao.val = validateOnly
199 return ao
200 }
201
202 // CreateTopicsAdminOption - see setters.
203 //
204 // See SetAdminRequestTimeout, SetAdminOperationTimeout, SetAdminValidateOnly.
205 type CreateTopicsAdminOption interface {
206 supportsCreateTopics()
207 apply(cOptions *C.rd_kafka_AdminOptions_t) error
208 }
209
210 // DeleteTopicsAdminOption - see setters.
211 //
212 // See SetAdminRequestTimeout, SetAdminOperationTimeout.
213 type DeleteTopicsAdminOption interface {
214 supportsDeleteTopics()
215 apply(cOptions *C.rd_kafka_AdminOptions_t) error
216 }
217
218 // CreatePartitionsAdminOption - see setters.
219 //
220 // See SetAdminRequestTimeout, SetAdminOperationTimeout, SetAdminValidateOnly.
221 type CreatePartitionsAdminOption interface {
222 supportsCreatePartitions()
223 apply(cOptions *C.rd_kafka_AdminOptions_t) error
224 }
225
226 // AlterConfigsAdminOption - see setters.
227 //
228 // See SetAdminRequestTimeout, SetAdminValidateOnly, SetAdminIncremental.
229 type AlterConfigsAdminOption interface {
230 supportsAlterConfigs()
231 apply(cOptions *C.rd_kafka_AdminOptions_t) error
232 }
233
234 // DescribeConfigsAdminOption - see setters.
235 //
236 // See SetAdminRequestTimeout.
237 type DescribeConfigsAdminOption interface {
238 supportsDescribeConfigs()
239 apply(cOptions *C.rd_kafka_AdminOptions_t) error
240 }
241
242 // AdminOption is a generic type not to be used directly.
243 //
244 // See CreateTopicsAdminOption et.al.
245 type AdminOption interface {
246 apply(cOptions *C.rd_kafka_AdminOptions_t) error
247 }
248
249 func adminOptionsSetup(h *handle, opType C.rd_kafka_admin_op_t, options []AdminOption) (*C.rd_kafka_AdminOptions_t, error) {
250
251 cOptions := C.rd_kafka_AdminOptions_new(h.rk, opType)
252 for _, opt := range options {
253 if opt == nil {
254 continue
255 }
256 err := opt.apply(cOptions)
257 if err != nil {
258 return nil, err
259 }
260 }
261
262 return cOptions, nil
263 }
9898 set(cKey *C.char, cVal *C.char, cErrstr *C.char, errstrSize int) C.rd_kafka_conf_res_t
9999 }
100100
101 func anyconfSet(anyconf rdkAnyconf, key string, value string) (err error) {
101 func anyconfSet(anyconf rdkAnyconf, key string, val ConfigValue) (err error) {
102 value, errstr := value2string(val)
103 if errstr != "" {
104 return Error{ErrInvalidArg, fmt.Sprintf("%s for key %s (expected string,bool,int,ConfigMap)", errstr, key)}
105 }
102106 cKey := C.CString(key)
103107 cVal := C.CString(value)
104108 cErrstr := (*C.char)(C.malloc(C.size_t(128)))
127131 }
128132
129133 func configConvertAnyconf(m ConfigMap, anyconf rdkAnyconf) (err error) {
130
134 // set plugins first, any plugin-specific configuration depends on
135 // the plugin to have already been set
136 pluginPaths, ok := m["plugin.library.paths"]
137 if ok {
138 err = anyconfSet(anyconf, "plugin.library.paths", pluginPaths)
139 if err != nil {
140 return err
141 }
142 }
131143 for k, v := range m {
144 if k == "plugin.library.paths" {
145 continue
146 }
132147 switch v.(type) {
133148 case ConfigMap:
134149 /* Special sub-ConfigMap, only used for default.topic.config */
151166 (*C.rd_kafka_topic_conf_t)((*rdkTopicConf)(cTopicConf)))
152167
153168 default:
154 val, errstr := value2string(v)
155 if errstr != "" {
156 return Error{ErrInvalidArg, fmt.Sprintf("%s for key %s (expected string,bool,int,ConfigMap)", errstr, k)}
157 }
158
159 err = anyconfSet(anyconf, k, val)
169 err = anyconfSet(anyconf, k, v)
160170 if err != nil {
161171 return err
162172 }
215225 return v, nil
216226 }
217227
228 func (m ConfigMap) clone() ConfigMap {
229 m2 := make(ConfigMap)
230 for k, v := range m {
231 m2[k] = v
232 }
233 return m2
234 }
235
218236 // Get finds the given key in the ConfigMap and returns its value.
219237 // If the key is not found `defval` is returned.
220238 // If the key is found but the type does not match that of `defval` (unless nil)
123123 }
124124
125125 }
126
127 // Test that plugins will always be configured before their config options
128 func TestConfigPluginPaths(t *testing.T) {
129 config := &ConfigMap{
130 "plugin.library.paths": "monitoring-interceptor",
131 }
132 _, err := config.convert()
133 if err != nil {
134 t.Skipf("Missing monitoring-interceptor: %s", err)
135 }
136
137 config = &ConfigMap{
138 "plugin.library.paths": "monitoring-interceptor",
139 "confluent.monitoring.interceptor.icdebug": true,
140 }
141
142 // convert() would fail randomly due to random order of ConfigMap key iteration.
143 // running convert() once gave the test case a 50% failure chance,
144 // running it 100 times gives ~100%
145 for i := 1; i <= 100; i++ {
146 _, err := config.convert()
147 if err != nil {
148 t.Fatalf("Failed to convert. Error: %s\n", err)
149 }
150 }
151 }
369369 return nil, err
370370 }
371371
372 groupid, _ := conf.get("group.id", nil)
372 // before we do anything with the configuration, create a copy such that
373 // the original is not mutated.
374 confCopy := conf.clone()
375
376 groupid, _ := confCopy.get("group.id", nil)
373377 if groupid == nil {
374378 // without a group.id the underlying cgrp subsystem in librdkafka wont get started
375379 // and without it there is no way to consume assigned partitions.
379383
380384 c := &Consumer{}
381385
382 v, err := conf.extract("go.application.rebalance.enable", false)
386 v, err := confCopy.extract("go.application.rebalance.enable", false)
383387 if err != nil {
384388 return nil, err
385389 }
386390 c.appRebalanceEnable = v.(bool)
387391
388 v, err = conf.extract("go.events.channel.enable", false)
392 v, err = confCopy.extract("go.events.channel.enable", false)
389393 if err != nil {
390394 return nil, err
391395 }
392396 c.eventsChanEnable = v.(bool)
393397
394 v, err = conf.extract("go.events.channel.size", 1000)
398 v, err = confCopy.extract("go.events.channel.size", 1000)
395399 if err != nil {
396400 return nil, err
397401 }
398402 eventsChanSize := v.(int)
399403
400 cConf, err := conf.convert()
404 cConf, err := confCopy.convert()
401405 if err != nil {
402406 return nil, err
403407 }
404408 cErrstr := (*C.char)(C.malloc(C.size_t(256)))
405409 defer C.free(unsafe.Pointer(cErrstr))
406410
407 C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_REBALANCE|C.RD_KAFKA_EVENT_OFFSET_COMMIT|C.RD_KAFKA_EVENT_STATS)
411 C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_REBALANCE|C.RD_KAFKA_EVENT_OFFSET_COMMIT|C.RD_KAFKA_EVENT_STATS|C.RD_KAFKA_EVENT_ERROR)
408412
409413 c.handle.rk = C.rd_kafka_new(C.RD_KAFKA_CONSUMER, cConf, cErrstr, 256)
410414 if c.handle.rk == nil {
472476 // If topic is non-nil only information about that topic is returned, else if
473477 // allTopics is false only information about locally used topics is returned,
474478 // else information about all topics is returned.
479 // GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.
475480 func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) {
476481 return getMetadata(c, topic, allTopics, timeoutMs)
477482 }
3434 return Error{ErrorCode(code), ""}
3535 }
3636
37 func newGoError(code ErrorCode) (err Error) {
38 return Error{code, ""}
39 }
40
3741 func newErrorFromString(code ErrorCode, str string) (err Error) {
3842 return Error{code, str}
3943 }
4650 str = ""
4751 }
4852 return Error{ErrorCode(code), str}
53 }
54
55 func newCErrorFromString(code C.rd_kafka_resp_err_t, str string) (err Error) {
56 return newErrorFromString(ErrorCode(code), str)
4957 }
5058
5159 // Error returns a human readable representation of an Error
00 package kafka
11 // Copyright 2016 Confluent Inc.
2 // AUTOMATICALLY GENERATED BY /Users/magnus/gocode/bin/go_rdkafka_generr ON 2018-01-30 09:13:09.376879939 +0100 CET m=+0.002362671 USING librdkafka 0.11.3-CI1-7-g3fe870-dirty
2 // AUTOMATICALLY GENERATED BY /home/maglun/gocode/bin/go_rdkafka_generr ON 2018-10-11 09:26:58.938371378 +0200 CEST m=+0.001256618 USING librdkafka 0.11.5
33
44 /*
55 #include <librdkafka/rdkafka.h>
105105 ErrNoent ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NOENT)
106106 // ErrUnderflow Local: Read underflow
107107 ErrUnderflow ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNDERFLOW)
108 // ErrInvalidType Local: Invalid type
109 ErrInvalidType ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INVALID_TYPE)
108110 // ErrUnknown Unknown broker error
109111 ErrUnknown ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN)
110112 // ErrNoError Success
1616 package kafka
1717
1818 import (
19 "context"
1920 "encoding/binary"
2021 "fmt"
22 "math/rand"
23 "path"
2124 "reflect"
25 "runtime"
2226 "testing"
2327 "time"
2428 )
502506
503507 config := &ConfigMap{"bootstrap.servers": testconf.Brokers,
504508 "group.id": testconf.GroupID}
509 config.updateFromTestconf()
505510
506511 // Create consumer
507512 c, err := NewConsumer(config)
533538 }
534539
535540 config := &ConfigMap{"bootstrap.servers": testconf.Brokers}
541 config.updateFromTestconf()
536542
537543 // Create producer
538544 p, err := NewProducer(config)
576582 }
577583
578584 config := &ConfigMap{"bootstrap.servers": testconf.Brokers}
585 config.updateFromTestconf()
579586
580587 // Create producer
581588 p, err := NewProducer(config)
628635 // test producer with bad messages
629636 func TestProducerWithBadMessages(t *testing.T) {
630637 conf := ConfigMap{"bootstrap.servers": testconf.Brokers}
638 conf.updateFromTestconf()
639
631640 p, err := NewProducer(&conf)
632641 if err != nil {
633642 panic(err)
832841 t.Skipf("Missing testconf.json")
833842 }
834843
835 conf := ConfigMap{"bootstrap.servers": testconf.Brokers,
836 "api.version.request": true,
844 consumerConf := ConfigMap{"bootstrap.servers": testconf.Brokers,
837845 "go.events.channel.enable": true,
838846 "group.id": testconf.Topic,
839847 }
840848
841 conf.updateFromTestconf()
849 consumerConf.updateFromTestconf()
842850
843851 /* Create consumer and find recognizable message, verify timestamp.
844852 * The consumer is started before the producer to make sure
845853 * the message isn't missed. */
846854 t.Logf("Creating consumer")
847 c, err := NewConsumer(&conf)
855 c, err := NewConsumer(&consumerConf)
848856 if err != nil {
849857 t.Fatalf("NewConsumer: %v", err)
850858 }
868876 /*
869877 * Create producer and produce one recognizable message with timestamp
870878 */
879 producerConf := ConfigMap{"bootstrap.servers": testconf.Brokers}
880 producerConf.updateFromTestconf()
881
871882 t.Logf("Creating producer")
872 conf.SetKey("{topic}.produce.offset.report", true)
873 p, err := NewProducer(&conf)
883 p, err := NewProducer(&producerConf)
874884 if err != nil {
875885 t.Fatalf("NewProducer: %v", err)
876886 }
10831093
10841094 c.Close()
10851095 }
1096
1097 func createAdminClient(t *testing.T) (a *AdminClient) {
1098 numver, strver := LibraryVersion()
1099 if numver < 0x000b0500 {
1100 t.Skipf("Requires librdkafka >=0.11.5 (currently on %s, 0x%x)", strver, numver)
1101 }
1102
1103 if !testconfRead() {
1104 t.Skipf("Missing testconf.json")
1105 }
1106
1107 conf := ConfigMap{"bootstrap.servers": testconf.Brokers}
1108 conf.updateFromTestconf()
1109
1110 /*
1111 * Create producer and produce a couple of messages with and without
1112 * headers.
1113 */
1114 a, err := NewAdminClient(&conf)
1115 if err != nil {
1116 t.Fatalf("NewAdminClient: %v", err)
1117 }
1118
1119 return a
1120 }
1121
1122 func validateTopicResult(t *testing.T, result []TopicResult, expError map[string]Error) {
1123 for _, res := range result {
1124 exp, ok := expError[res.Topic]
1125 if !ok {
1126 t.Errorf("Result for unexpected topic %s", res)
1127 continue
1128 }
1129
1130 if res.Error.Code() != exp.Code() {
1131 t.Errorf("Topic %s: expected \"%s\", got \"%s\"",
1132 res.Topic, exp, res.Error)
1133 continue
1134 }
1135
1136 t.Logf("Topic %s: matched expected \"%s\"", res.Topic, res.Error)
1137 }
1138 }
1139
1140 func TestAdminTopics(t *testing.T) {
1141 rand.Seed(time.Now().Unix())
1142
1143 a := createAdminClient(t)
1144 defer a.Close()
1145
1146 brokerList, err := getBrokerList(a)
1147 if err != nil {
1148 t.Fatalf("Failed to retrieve broker list: %v", err)
1149 }
1150
1151 // Few and Many replica sets use in these tests
1152 var fewReplicas []int32
1153 if len(brokerList) < 2 {
1154 fewReplicas = brokerList
1155 } else {
1156 fewReplicas = brokerList[0:2]
1157 }
1158
1159 var manyReplicas []int32
1160 if len(brokerList) < 5 {
1161 manyReplicas = brokerList
1162 } else {
1163 manyReplicas = brokerList[0:5]
1164 }
1165
1166 const topicCnt = 7
1167 newTopics := make([]TopicSpecification, topicCnt)
1168
1169 expError := map[string]Error{}
1170
1171 for i := 0; i < topicCnt; i++ {
1172 topic := fmt.Sprintf("%s-create-%d-%d", testconf.Topic, i, rand.Intn(100000))
1173 newTopics[i] = TopicSpecification{
1174 Topic: topic,
1175 NumPartitions: 1 + i*2,
1176 }
1177
1178 if (i % 1) == 0 {
1179 newTopics[i].ReplicationFactor = len(fewReplicas)
1180 } else {
1181 newTopics[i].ReplicationFactor = len(manyReplicas)
1182 }
1183
1184 expError[newTopics[i].Topic] = Error{} // No error
1185
1186 var useReplicas []int32
1187 if i == 2 {
1188 useReplicas = fewReplicas
1189 } else if i == 3 {
1190 useReplicas = manyReplicas
1191 } else if i == topicCnt-1 {
1192 newTopics[i].ReplicationFactor = len(brokerList) + 10
1193 expError[newTopics[i].Topic] = Error{code: ErrInvalidReplicationFactor}
1194 }
1195
1196 if len(useReplicas) > 0 {
1197 newTopics[i].ReplicaAssignment = make([][]int32, newTopics[i].NumPartitions)
1198 newTopics[i].ReplicationFactor = 0
1199 for p := 0; p < newTopics[i].NumPartitions; p++ {
1200 newTopics[i].ReplicaAssignment[p] = useReplicas
1201 }
1202 }
1203 }
1204
1205 maxDuration, err := time.ParseDuration("30s")
1206 if err != nil {
1207 t.Fatalf("%s", err)
1208 }
1209
1210 // First just validate the topics, don't create
1211 t.Logf("Validating topics before creation\n")
1212 ctx, cancel := context.WithTimeout(context.Background(), maxDuration)
1213 defer cancel()
1214 result, err := a.CreateTopics(ctx, newTopics,
1215 SetAdminValidateOnly(true))
1216 if err != nil {
1217 t.Fatalf("CreateTopics(ValidateOnly) failed: %s", err)
1218 }
1219
1220 validateTopicResult(t, result, expError)
1221
1222 // Now create the topics
1223 t.Logf("Creating topics\n")
1224 ctx, cancel = context.WithTimeout(context.Background(), maxDuration)
1225 defer cancel()
1226 result, err = a.CreateTopics(ctx, newTopics, SetAdminValidateOnly(false))
1227 if err != nil {
1228 t.Fatalf("CreateTopics() failed: %s", err)
1229 }
1230
1231 validateTopicResult(t, result, expError)
1232
1233 // Attempt to create the topics again, should all fail.
1234 t.Logf("Attempt to re-create topics, should all fail\n")
1235 for k := range expError {
1236 if expError[k].code == ErrNoError {
1237 expError[k] = Error{code: ErrTopicAlreadyExists}
1238 }
1239 }
1240 ctx, cancel = context.WithTimeout(context.Background(), maxDuration)
1241 defer cancel()
1242 result, err = a.CreateTopics(ctx, newTopics)
1243 if err != nil {
1244 t.Fatalf("CreateTopics#2() failed: %s", err)
1245 }
1246
1247 validateTopicResult(t, result, expError)
1248
1249 // Add partitions to some of the topics
1250 t.Logf("Create new partitions for a subset of topics\n")
1251 newParts := make([]PartitionsSpecification, topicCnt/2)
1252 expError = map[string]Error{}
1253 for i := 0; i < topicCnt/2; i++ {
1254 topic := newTopics[i].Topic
1255 newParts[i] = PartitionsSpecification{
1256 Topic: topic,
1257 IncreaseTo: newTopics[i].NumPartitions + 3,
1258 }
1259 if i == 1 {
1260 // Invalid partition count (less than current)
1261 newParts[i].IncreaseTo = newTopics[i].NumPartitions - 1
1262 expError[topic] = Error{code: ErrInvalidPartitions}
1263 } else {
1264 expError[topic] = Error{}
1265 }
1266 t.Logf("Creating new partitions for %s: %d -> %d: expecting %v\n",
1267 topic, newTopics[i].NumPartitions, newParts[i].IncreaseTo, expError[topic])
1268 }
1269
1270 ctx, cancel = context.WithTimeout(context.Background(), maxDuration)
1271 defer cancel()
1272 result, err = a.CreatePartitions(ctx, newParts)
1273 if err != nil {
1274 t.Fatalf("CreatePartitions() failed: %s", err)
1275 }
1276
1277 validateTopicResult(t, result, expError)
1278
1279 // FIXME: wait for topics to become available in metadata instead
1280 time.Sleep(5000 * time.Millisecond)
1281
1282 // Delete the topics
1283 deleteTopics := make([]string, topicCnt)
1284 for i := 0; i < topicCnt; i++ {
1285 deleteTopics[i] = newTopics[i].Topic
1286 if i == topicCnt-1 {
1287 expError[deleteTopics[i]] = Error{code: ErrUnknownTopicOrPart}
1288 } else {
1289 expError[deleteTopics[i]] = Error{}
1290 }
1291 }
1292
1293 ctx, cancel = context.WithTimeout(context.Background(), maxDuration)
1294 defer cancel()
1295 result2, err := a.DeleteTopics(ctx, deleteTopics)
1296 if err != nil {
1297 t.Fatalf("DeleteTopics() failed: %s", err)
1298 }
1299
1300 validateTopicResult(t, result2, expError)
1301 }
1302
1303 func validateConfig(t *testing.T, results []ConfigResourceResult, expResults []ConfigResourceResult, checkConfigEntries bool) {
1304
1305 _, file, line, _ := runtime.Caller(1)
1306 caller := fmt.Sprintf("%s:%d", path.Base(file), line)
1307
1308 if len(results) != len(expResults) {
1309 t.Fatalf("%s: Expected %d results, got %d: %v", caller, len(expResults), len(results), results)
1310 }
1311
1312 for i, result := range results {
1313 expResult := expResults[i]
1314
1315 if result.Error.Code() != expResult.Error.Code() {
1316 t.Errorf("%s: %v: Expected %v, got %v", caller, result, expResult.Error.Code(), result.Error.Code())
1317 continue
1318 }
1319
1320 if !checkConfigEntries {
1321 continue
1322 }
1323
1324 matchCnt := 0
1325 for _, expEntry := range expResult.Config {
1326
1327 entry, ok := result.Config[expEntry.Name]
1328 if !ok {
1329 t.Errorf("%s: %v: expected config %s not found in result", caller, result, expEntry.Name)
1330 continue
1331 }
1332
1333 if entry.Value != expEntry.Value {
1334 t.Errorf("%s: %v: expected config %s to have value \"%s\", not \"%s\"", caller, result, expEntry.Name, expEntry.Value, entry.Value)
1335 continue
1336 }
1337
1338 matchCnt++
1339 }
1340
1341 if matchCnt != len(expResult.Config) {
1342 t.Errorf("%s: %v: only %d/%d expected configs matched", caller, result, matchCnt, len(expResult.Config))
1343 }
1344 }
1345
1346 if t.Failed() {
1347 t.Fatalf("%s: ConfigResourceResult validation failed: see previous errors", caller)
1348 }
1349 }
1350
1351 func TestAdminConfig(t *testing.T) {
1352 rand.Seed(time.Now().Unix())
1353
1354 a := createAdminClient(t)
1355 defer a.Close()
1356
1357 // Steps:
1358 // 1) Create a topic, providing initial non-default configuration
1359 // 2) Read back config to verify
1360 // 3) Alter config
1361 // 4) Read back config to verify
1362 // 5) Delete the topic
1363
1364 topic := fmt.Sprintf("%s-config-%d", testconf.Topic, rand.Intn(100000))
1365
1366 // Expected config
1367 expResources := []ConfigResourceResult{
1368 {
1369 Type: ResourceTopic,
1370 Name: topic,
1371 Config: map[string]ConfigEntryResult{
1372 "compression.type": ConfigEntryResult{
1373 Name: "compression.type",
1374 Value: "snappy",
1375 },
1376 },
1377 },
1378 }
1379 // Create topic
1380 newTopics := []TopicSpecification{{
1381 Topic: topic,
1382 NumPartitions: 1,
1383 ReplicationFactor: 1,
1384 Config: map[string]string{"compression.type": "snappy"},
1385 }}
1386
1387 ctx, cancel := context.WithCancel(context.Background())
1388 defer cancel()
1389 topicResult, err := a.CreateTopics(ctx, newTopics)
1390 if err != nil {
1391 t.Fatalf("Create topic request failed: %v", err)
1392 }
1393
1394 if topicResult[0].Error.Code() != ErrNoError {
1395 t.Fatalf("Failed to create topic %s: %s", topic, topicResult[0].Error)
1396 }
1397
1398 // Wait for topic to show up in metadata before performing
1399 // subsequent operations on it, otherwise we risk DescribeConfigs()
1400 // to fail with UnknownTopic.. (this is really a broker issue).
1401 // Sometimes even the metadata is not enough, so we add an
1402 // arbitrary 10s sleep too.
1403 t.Logf("Waiting for new topic %s to show up in metadata and stabilize", topic)
1404 err = waitTopicInMetadata(a, topic, 10*1000) // 10s
1405 if err != nil {
1406 t.Fatalf("%v", err)
1407 }
1408 t.Logf("Topic %s now in metadata, waiting another 10s for stabilization", topic)
1409 time.Sleep(10 * 1000 * 1000)
1410
1411 // Read back config to validate
1412 configResources := []ConfigResource{{Type: ResourceTopic, Name: topic}}
1413 describeRes, err := a.DescribeConfigs(ctx, configResources)
1414 if err != nil {
1415 t.Fatalf("Describe configs request failed: %v", err)
1416 }
1417
1418 validateConfig(t, describeRes, expResources, true)
1419
1420 // Alter some configs.
1421 // Configuration alterations are currently atomic, all values
1422 // need to be passed, otherwise non-passed values will be reverted
1423 // to their default values.
1424 // Future versions will allow incremental updates:
1425 // https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API
1426 newConfig := make(map[string]string)
1427 for _, entry := range describeRes[0].Config {
1428 newConfig[entry.Name] = entry.Value
1429 }
1430
1431 // Change something
1432 newConfig["retention.ms"] = "86400000"
1433 newConfig["message.timestamp.type"] = "LogAppendTime"
1434
1435 for k, v := range newConfig {
1436 expResources[0].Config[k] = ConfigEntryResult{Name: k, Value: v}
1437 }
1438
1439 configResources = []ConfigResource{{Type: ResourceTopic, Name: topic, Config: StringMapToConfigEntries(newConfig, AlterOperationSet)}}
1440 alterRes, err := a.AlterConfigs(ctx, configResources)
1441 if err != nil {
1442 t.Fatalf("Alter configs request failed: %v", err)
1443 }
1444
1445 validateConfig(t, alterRes, expResources, false)
1446
1447 // Read back config to validate
1448 configResources = []ConfigResource{{Type: ResourceTopic, Name: topic}}
1449 describeRes, err = a.DescribeConfigs(ctx, configResources)
1450 if err != nil {
1451 t.Fatalf("Describe configs request failed: %v", err)
1452 }
1453
1454 validateConfig(t, describeRes, expResources, true)
1455
1456 // Delete the topic
1457 // FIXME: wait for topics to become available in metadata instead
1458 time.Sleep(5000 * time.Millisecond)
1459
1460 topicResult, err = a.DeleteTopics(ctx, []string{topic})
1461 if err != nil {
1462 t.Fatalf("DeleteTopics() failed: %s", err)
1463 }
1464
1465 if topicResult[0].Error.Code() != ErrNoError {
1466 t.Fatalf("Failed to delete topic %s: %s", topic, topicResult[0].Error)
1467 }
1468
1469 }
1470
1471 //Test AdminClient GetMetadata API
1472 func TestAdminGetMetadata(t *testing.T) {
1473 if !testconfRead() {
1474 t.Skipf("Missing testconf.json")
1475 }
1476
1477 config := &ConfigMap{"bootstrap.servers": testconf.Brokers}
1478 config.updateFromTestconf()
1479
1480 // Create Admin client
1481 a, err := NewAdminClient(config)
1482 if err != nil {
1483 t.Errorf("Failed to create Admin client: %s\n", err)
1484 return
1485 }
1486 defer a.Close()
1487
1488 metaData, err := a.GetMetadata(&testconf.Topic, false, 5*1000)
1489 if err != nil {
1490 t.Errorf("Failed to get meta data for topic %s. Error: %s\n", testconf.Topic, err)
1491 return
1492 }
1493 t.Logf("Meta data for topic %s: %v\n", testconf.Topic, metaData)
1494
1495 metaData, err = a.GetMetadata(nil, true, 5*1000)
1496 if err != nil {
1497 t.Errorf("Failed to get meta data, Error: %s\n", err)
1498 return
1499 }
1500 t.Logf("Meta data for admin client: %v\n", metaData)
1501
1502 }
9393 }
9494
9595 m := Metadata{}
96 defer C.rd_kafka_metadata_destroy(cMd)
9697
9798 m.Brokers = make([]BrokerMetadata, cMd.broker_cnt)
9899 for i := 0; i < int(cMd.broker_cnt); i++ {
2424 }
2525 return 0
2626 }
27
28 // cint2bool converts a C.int to a bool
29 func cint2bool(v C.int) bool {
30 if v == 0 {
31 return false
32 }
33 return true
34 }
3434 // tmphdr.size == 0: value is considered empty (ignored)
3535 // tmphdr.size > 0: value is considered non-empty
3636 //
37 // WARNING: The header values will be freed by this function.
37 // WARNING: The header keys and values will be freed by this function.
3838 void tmphdrs_to_chdrs (tmphdr_t *tmphdrs, size_t tmphdrsCnt,
3939 rd_kafka_headers_t **chdrs) {
4040 size_t i;
4949 tmphdrs[i].size == -1 ? 0 : tmphdrs[i].size);
5050 if (tmphdrs[i].size > 0)
5151 free((void *)tmphdrs[i].val);
52 free((void *)tmphdrs[i].key);
5253 }
5354 }
5455
5859 for (i = 0 ; i < tmphdrsCnt ; i++) {
5960 if (tmphdrs[i].size > 0)
6061 free((void *)tmphdrs[i].val);
62 free((void *)tmphdrs[i].key);
6163 }
6264 }
6365 #endif
7375 uintptr_t cgoid) {
7476 void *valp = valIsNull ? NULL : val;
7577 void *keyp = keyIsNull ? NULL : key;
78 #ifdef RD_KAFKA_V_TIMESTAMP
79 rd_kafka_resp_err_t err;
7680 #ifdef RD_KAFKA_V_HEADERS
7781 rd_kafka_headers_t *hdrs = NULL;
82 #endif
7883 #endif
7984
8085
8994
9095
9196 #ifdef RD_KAFKA_V_TIMESTAMP
92 return rd_kafka_producev(rk,
97 err = rd_kafka_producev(rk,
9398 RD_KAFKA_V_RKT(rkt),
9499 RD_KAFKA_V_PARTITION(partition),
95100 RD_KAFKA_V_MSGFLAGS(msgflags),
101106 #endif
102107 RD_KAFKA_V_OPAQUE((void *)cgoid),
103108 RD_KAFKA_V_END);
109 #ifdef RD_KAFKA_V_HEADERS
110 if (err && hdrs)
111 rd_kafka_headers_destroy(hdrs);
112 #endif
113 return err;
104114 #else
105115 if (timestamp)
106116 return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
217227 tmphdrs = make([]C.tmphdr_t, tmphdrsCnt)
218228
219229 for n, hdr := range msg.Headers {
230 // Make a copy of the key
231 // to avoid runtime panic with
232 // foreign Go pointers in cgo.
220233 tmphdrs[n].key = C.CString(hdr.Key)
221234 if hdr.Value != nil {
222235 tmphdrs[n].size = C.ssize_t(len(hdr.Value))
372385
373386 p := &Producer{}
374387
375 v, err := conf.extract("go.batch.producer", false)
388 // before we do anything with the configuration, create a copy such that
389 // the original is not mutated.
390 confCopy := conf.clone()
391
392 v, err := confCopy.extract("go.batch.producer", false)
376393 if err != nil {
377394 return nil, err
378395 }
379396 batchProducer := v.(bool)
380397
381 v, err = conf.extract("go.delivery.reports", true)
398 v, err = confCopy.extract("go.delivery.reports", true)
382399 if err != nil {
383400 return nil, err
384401 }
385402 p.handle.fwdDr = v.(bool)
386403
387 v, err = conf.extract("go.events.channel.size", 1000000)
404 v, err = confCopy.extract("go.events.channel.size", 1000000)
388405 if err != nil {
389406 return nil, err
390407 }
391408 eventsChanSize := v.(int)
392409
393 v, err = conf.extract("go.produce.channel.size", 1000000)
410 v, err = confCopy.extract("go.produce.channel.size", 1000000)
394411 if err != nil {
395412 return nil, err
396413 }
397414 produceChannelSize := v.(int)
398415
399 v, _ = conf.extract("{topic}.produce.offset.report", nil)
400 if v == nil {
401 // Enable offset reporting by default, unless overriden.
402 conf.SetKey("{topic}.produce.offset.report", true)
416 if int(C.rd_kafka_version()) < 0x01000000 {
417 // produce.offset.report is no longer used in librdkafka >= v1.0.0
418 v, _ = confCopy.extract("{topic}.produce.offset.report", nil)
419 if v == nil {
420 // Enable offset reporting by default, unless overriden.
421 confCopy.SetKey("{topic}.produce.offset.report", true)
422 }
403423 }
404424
405425 // Convert ConfigMap to librdkafka conf_t
406 cConf, err := conf.convert()
426 cConf, err := confCopy.convert()
407427 if err != nil {
408428 return nil, err
409429 }
411431 cErrstr := (*C.char)(C.malloc(C.size_t(256)))
412432 defer C.free(unsafe.Pointer(cErrstr))
413433
414 C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_DR|C.RD_KAFKA_EVENT_STATS)
434 C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_DR|C.RD_KAFKA_EVENT_STATS|C.RD_KAFKA_EVENT_ERROR)
415435
416436 // Create librdkafka producer instance
417437 p.handle.rk = C.rd_kafka_new(C.RD_KAFKA_PRODUCER, cConf, cErrstr, 256)
532552 // If topic is non-nil only information about that topic is returned, else if
533553 // allTopics is false only information about locally used topics is returned,
534554 // else information about all topics is returned.
555 // GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.
535556 func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) {
536557 return getMetadata(p, topic, allTopics, timeoutMs)
537558 }
1919 "encoding/json"
2020 "fmt"
2121 "os"
22 "time"
2223 )
2324
2425 /*
9596 func getMessageCountInTopic(topic string) (int, error) {
9697
9798 // Create consumer
98 c, err := NewConsumer(&ConfigMap{"bootstrap.servers": testconf.Brokers,
99 "group.id": testconf.GroupID})
99 config := &ConfigMap{"bootstrap.servers": testconf.Brokers,
100 "group.id": testconf.GroupID}
101 config.updateFromTestconf()
102
103 c, err := NewConsumer(config)
100104 if err != nil {
101105 return 0, err
102106 }
124128
125129 return cnt, nil
126130 }
131
132 // getBrokerList returns a list of brokers (ids) in the cluster
133 func getBrokerList(H Handle) (brokers []int32, err error) {
134 md, err := getMetadata(H, nil, true, 15*1000)
135 if err != nil {
136 return nil, err
137 }
138
139 brokers = make([]int32, len(md.Brokers))
140 for i, mdBroker := range md.Brokers {
141 brokers[i] = mdBroker.ID
142 }
143
144 return brokers, nil
145 }
146
147 // waitTopicInMetadata waits for the given topic to show up in metadata
148 func waitTopicInMetadata(H Handle, topic string, timeoutMs int) error {
149 d, _ := time.ParseDuration(fmt.Sprintf("%dms", timeoutMs))
150 tEnd := time.Now().Add(d)
151
152 for {
153 remain := tEnd.Sub(time.Now()).Seconds()
154 if remain < 0.0 {
155 return newErrorFromString(ErrTimedOut,
156 fmt.Sprintf("Timed out waiting for topic %s to appear in metadata", topic))
157 }
158
159 md, err := getMetadata(H, nil, true, int(remain*1000))
160 if err != nil {
161 return err
162 }
163
164 for _, t := range md.Topics {
165 if t.Topic != topic {
166 continue
167 }
168 if t.Error.Code() != ErrNoError || len(t.Partitions) < 1 {
169 continue
170 }
171 // Proper topic found in metadata
172 return nil
173 }
174
175 time.Sleep(500 * 1000) // 500ms
176 }
177
178 }
2727
2828 # Use golang.org for external resources (such as CSS and JS)
2929 for t in soup.find_all(href=re.compile(r'^/')):
30 t['href'] = 'http://golang.org' + t['href']
30 t['href'] = '//golang.org' + t['href']
3131
3232 for t in soup.find_all(src=re.compile(r'^/')):
33 t['src'] = 'http://golang.org' + t['src']
33 t['src'] = '//golang.org' + t['src']
3434
3535 # Write updated HTML to stdout
3636 print(soup.prettify().encode('utf-8'))