- VisualStudio2022插件的安装及使用-编程手把手系列文章
- pprof-在现网场景怎么用
- C#实现的下拉多选框,下拉多选树,多级节点
- 【学习笔记】基础数据结构:猫树
Apache Kafka是一个分布式流处理平台,由LinkedIn开发并开源,后来成为Apache软件基金会的顶级项目。Kafka主要用于构建实时数据管道和流式应用程序.
从下面3张架构图中可以看出Kafka Server 实际扮演的是Broker的角色, 一个Kafka Cluster由多个Broker组成, 或者可以说是多个Topic组成.
图 1 。
图 2 。
图 3 。
一个Kafka集群是一个由多个Kafka代理组成的分布式系统,它们协同工作以处理实时流数据的存储和处理。它为大规模应用程序中高效的数据流和消息传递提供了容错性、可扩展性和高可用性.
Broker是构成Kafka集群的服务器。 每个Broker负责接收、存储和提供数据。 它们处理来自生产者和消费者的读写操作。 Broker还管理数据的复制以确保容错性.
Kafka中的数据被组织成主题(Topics),这些是生产者发送数据和消费者读取数据的逻辑通道。每个主题被划分为分区(partitions),它们是Kafka中并行处理的基本单位。分区允许Kafka通过在多个Broker 之间分布数据来水平扩展.
生产者是发布(写入)数据到Kafka主题的客户端应用程序。它们根据分区策略将记录发送到适当的主题和分区,分区策略可以是基于键(key-based)或轮询(round-robin).
消费者是订阅Kafka主题并处理数据的客户端应用程序。它们从主题中读取记录,并且可以是消费者组的一部分,这允许负载均衡和容错。每个组中的消费者从一组独特的分区中读取数据.
ZooKeeper是一个集中式服务,用于维护配置信息、命名、提供分布式同步和提供群组服务。在Kafka中,ZooKeeper用于管理和协调Kafka Broker。ZooKeeper被展示为与Kafka集群交互的独立组件.
偏移量(offsets)是分配给分区中每条消息的唯一标识符。消费者将使用这些偏移量来跟踪他们在消费主题中消息的进度.
本地docker环境启动一个kafka 。
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.4
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:7.4.4
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
使用.NET CORE + Kafka开发一个消息生产者, 一个消息消费者, 客户端需要安装组件** Confluent.Kafka** 。
public class ProducerService
{
private readonly IConfiguration _configuration;
private readonly IProducer<Null, string> _producer;
private readonly ILogger<ProducerService> _logger;
public ProducerService(IConfiguration configuration, ILogger<ProducerService> logger)
{
_configuration = configuration;
_logger = logger;
var config = new ProducerConfig
{
BootstrapServers = _configuration["Kafka:BootstrapServers"],
};
_producer = new ProducerBuilder<Null, string>(config).Build();
}
public async Task ProductAsync(string topic, string message)
{
var orderPlacedMessage = new Message<Null, string>
{
Value = message
};
await _producer.ProduceAsync(topic, orderPlacedMessage);
_logger.LogInformation("Message sent to topic: {Topic}", topic);
}
}
[Route("api/[controller]")]
[ApiController]
public class InventoryController : ControllerBase
{
private readonly ProducerService _producerService;
public InventoryController(ProducerService producerService)
{
_producerService = producerService;
}
[HttpPost]
public async Task<IActionResult> Post([FromBody] InventoryUpdateRequest request)
{
var message = System.Text.Json.JsonSerializer.Serialize(request);
await _producerService.ProductAsync("inventory-update", message);
return Ok("Inventory Updated Successfully...");
}
}
启动项目,查看Swagger 。
消息消费者程序使用.net core BackgroundService开发, 这个类需要在程序启动时注入进去,不要忘记.
public class ConsumerService : BackgroundService
{
private readonly ILogger<ConsumerService> _logger;
private readonly IConfiguration _configuration;
private readonly IConsumer<Ignore, string> _consumer;
public ConsumerService(ILogger<ConsumerService> logger, IConfiguration configuration)
{
_logger = logger;
_configuration = configuration;
var consumerConfig = new ConsumerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
GroupId = "InventoryConsumerGroup",
AutoOffsetReset = AutoOffsetReset.Earliest
};
_consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_consumer.Subscribe("inventory-update");
try
{
while (!stoppingToken.IsCancellationRequested)
{
HandleMessage(stoppingToken);
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("Consumer service has been cancelled.");
}
catch (Exception ex)
{
_logger.LogError($"Error in consuming messages: {ex.Message}");
}
finally
{
_consumer.Close();
}
}
public void HandleMessage(CancellationToken cancellation)
{
try
{
var consumeResult = _consumer.Consume(cancellation);
var message = consumeResult.Message.Value;
_logger.LogInformation($"Received inventory update: {message}");
}
catch (Exception ex)
{
_logger.LogError($"Error processing Kafka message: {ex.Message}");
}
}
}
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddHostedService<ConsumerService>();
运行程序 。
Publish Message 。
Consume Message 。
Apache Kafka不是消息中间件的一种实现。相反,它只是一种分布式流式系统。 不同于基于队列和交换器的RabbitMQ,Kafka的存储层是使用分区事务日志来实现的。Kafka也提供流式API用于实时的流处理以及连接器API用来更容易的和各种数据源集成.
最后此篇关于.NETCore+Kafka开发指南的文章就讲到这里了,如果你想了解更多关于.NETCore+Kafka开发指南的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我想让我的 NSIS 代码更具可读性。 我需要一些关于明智地编写代码部分的指南(比如 C# 中有 #region #endregion)或任何可以使编写 NSIS 代码变得有趣和容易的信息. 请帮帮我
我正在尝试找出Gherkin中所有可用的语法/格式,例如关于多行参数以及我不知道的所有其他内容。 在挖掘Google搜索结果之后,似乎综合指南位于here中: 我以为那很好,并且it链接到一个页面,该
关闭。这个问题不满足Stack Overflow guidelines .它目前不接受答案。 想改善这个问题吗?更新问题,使其成为 on-topic对于堆栈溢出。 5年前关闭。 Improve thi
当我将 guides 添加到 valueAxesSettings 中时,即使我选择 valueAxesSettings 到 valueAxes 中,它也不起作用。此外,valueAxesSetting
我正在寻找有关如何管理 .NET 程序集的三个不同程序集版本号的指针、建议,甚至是口述。 Product 版本是最简单的,因为这似乎通常由业务决定。然后,文件版本似乎用于部署之间的版本控制,其中实际的
昨晚我脑子里冒出一件事。我想知道为什么我们在项目之间仍然有不同的编码风格。由于风格是个人的东西,我认为最好这样对待它。我们为什么不呢?这有什么技术限制吗? 我举几个例子: // Code sample
我有一个应用程序,用户可以在其中从主 Activity 登录,然后可以使用 ListView 浏览实体的层次结构。因此,Activity 堆栈看起来像这样: A -> B -> B -> B -> .
关闭。这个问题是opinion-based .它目前不接受答案。 想要改进这个问题? 更新问题,以便 editing this post 可以用事实和引用来回答它. 关闭 9 年前。 Improve
大家好,上个月我开始学习 CSS。我做的第一件事是阅读我能在 www.w3school.com 上找到的所有内容,之后我开始阅读 CSS Mastery 2nd版本。我已经建立了几个自己的网站并取得了
我希望用户能够上传个人资料图片。 关于如何最好地处理这个问题,是否有任何指导方针?例如 - 在哪里保存图像?和要使用的文件夹结构。- 让用户难以浏览每个人的个人资料照片? 谢谢。 最佳答案 如果你自己
我有兴趣了解有关条件重启系统及其工作原理的更多信息。我不知道从哪里开始。我一直在查看源代码,但想知道是否有更高级别的指南可用。 最佳答案 Kent Pitman:条件系统 http://www.nhp
我想将小型、精简且平均的基于 C 的解析器合并到我的 Android 项目中。我过去做过 JNI 编程,但没有在 Android 上进行任何类型的 native (C) 开发。我的计划是将 C lib
免责声明:我试图搜索类似的问题,但是它返回了关于每个 C++ 问题的信息...此外,我将感谢任何可以提出更好标题的人。 C++ 中有两个著名的循环结构:while 和for。 我故意忽略了 do ..
我一直在尝试批量删除 Wordpress 帖子中的垃圾链接,如下所示: . 它们位于 post_content 列下的 wp_posts 表中。我试图通过在 href 标记中添加 % 的通配符来做到这
关闭。这个问题是opinion-based .它目前不接受答案。 想要改进这个问题? 更新问题,以便 editing this post 可以用事实和引用来回答它. 关闭 6 年前。 Improve
我们正在讨论为实体类定义方法的最佳方式 - 作为扩展方法或使用分部类。我们讨论的这类方法不会修改实体的状态,它们是纯粹的“辅助”方法,可以查询状态并返回值。 这两种方法的主要好处是保持实体类干净,同时
您将如何在 Flutter 中在实际屏幕上实现引导层。像这样: 最佳答案 这不是微不足道的。以下是必需的组件: 首先,你必须open a transparent full screen dialog
我们需要通过在 C/C++ 中实现特定算法来解决的大多数科学计算问题都需要远低于 double 的精度。例如,1e-6、1e-7 精度涵盖了 ODE 求解器或数值积分的 99% 情况。即使在我们确实需
我正在研究对专有 UI 框架(用于桌面应用程序)的 RTL 支持,我想知道:是否有关于如何更改小部件渲染的指南? 我正在寻找以下内容的列表: 复选框标签位于复选框左侧,右对齐 工具栏按钮从右到左排列
就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引起辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the he
我是一名优秀的程序员,十分优秀!