Skip to content

Commit f9eacfa

Browse files
committed
Add EnableDeliveryReports to KafkaAttribute
1 parent 6c2981b commit f9eacfa

File tree

4 files changed

+42
-0
lines changed

4 files changed

+42
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ The settings exposed here are targeted to more advanced users that want to custo
323323
|LibkafkaDebug|debug|Both
324324
|MetadataMaxAgeMs|metadata.max.age.ms|Both
325325
|SocketKeepaliveEnable|socket.keepalive.enable|Both
326+
|EnableDeliveryReports|Feature of Confluent.Kafka|Output
326327

327328
**NOTE:** `MetadataMaxAgeMs` default is `180000` `SocketKeepaliveEnable` default is `true` otherwise, the default value is the same as the [Configuration properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). The reason of the default settings, refer to this [issue](https://github.com/Azure/azure-functions-kafka-extension/issues/187).
328329
**NOTE:** `AutoOffsetReset` default is Earliest. Allowed Values are `Earliest` and `Latest`.

src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,5 +139,12 @@ public KafkaAttribute()
139139
/// ssl.key.password in librdkafka
140140
/// </summary>
141141
public string SslKeyPassword { get; set; }
142+
143+
/// <summary>
144+
/// Specifies whether to enable notification of delivery reports. Typically you should
145+
/// set this parameter to true. Set it to false for "fire and forget" semantics and
146+
/// a small boost in performance. default: true importance: low
147+
/// </summary>
148+
public bool EnableDeliveryReports { get; set; } = true;
142149
}
143150
}

src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,9 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity)
118118
{
119119
BootstrapServers = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.BrokerList),
120120
BatchNumMessages = entity.Attribute.BatchSize,
121+
EnableDeliveryReports = entity.Attribute.EnableDeliveryReports,
121122
EnableIdempotence = entity.Attribute.EnableIdempotence,
123+
MessageMaxBytes = entity.Attribute.MaxMessageBytes,
122124
MessageSendMaxRetries = entity.Attribute.MaxRetries,
123125
MessageTimeoutMs = entity.Attribute.MessageTimeoutMs,
124126
RequestTimeoutMs = entity.Attribute.RequestTimeoutMs,

test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,5 +304,37 @@ public void GetProducerConfig_When_Ssl_Locations_Resolve_InAzure_Should_Contain_
304304
Assert.Equal(sslCa.FullName, config.SslCaLocation);
305305
Assert.Equal(sslKeyLocation.FullName, config.SslKeyLocation);
306306
}
307+
308+
[Fact]
309+
public void GetProducerConfig_Copies_Properties_From_Attribute()
310+
{
311+
var attribute = new KafkaAttribute("brokers:9092", "myTopic")
312+
{
313+
EnableDeliveryReports = false,
314+
BatchSize = 123,
315+
EnableIdempotence = true,
316+
MaxMessageBytes = 234,
317+
MaxRetries = 345,
318+
MessageTimeoutMs = 456,
319+
RequestTimeoutMs = 567
320+
};
321+
322+
var entity = new KafkaProducerEntity
323+
{
324+
Attribute = attribute
325+
};
326+
327+
var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerProvider.Instance);
328+
var config = factory.GetProducerConfig(entity);
329+
330+
Assert.Equal(attribute.EnableDeliveryReports, config.EnableDeliveryReports);
331+
Assert.Equal(attribute.BatchSize, config.BatchNumMessages);
332+
Assert.Equal(attribute.EnableIdempotence, config.EnableIdempotence);
333+
Assert.Equal(attribute.MaxMessageBytes, config.MessageMaxBytes);
334+
Assert.Equal(attribute.MaxRetries, config.MessageSendMaxRetries);
335+
Assert.Equal(attribute.MessageTimeoutMs, config.MessageTimeoutMs);
336+
Assert.Equal(attribute.RequestTimeoutMs, config.RequestTimeoutMs);
337+
Assert.Equal(attribute.EnableDeliveryReports, config.EnableDeliveryReports);
338+
}
307339
}
308340
}

0 commit comments

Comments
 (0)