数据库 场景题
如何在附近 100w 的商户中,快速找到离你最近的 5 家商户?
可以使用 MySQL或 MongoDB 的空间索引,或者使用 Redis 提供的 Geo 数据结构来存储商户位置的位置信息来实现根据距离的快速查找
使用空间索引
例如 R-tree,它适用于二维空间的树形数据结构,能够高效地进行范围查询和最近邻查询。将商户位置(经纬度)数据按照地理区域组织成树结构,可以快速找到位于用户当前位置附近
像 MySQL 就支持 R-tree,或者使用 MongoDB 也支持空间索引。
拿 MongoDB 举例,实现步骤就是
- 创建一个地理集合
- 将商家的经纬度导入到集合中
[{name:"商家A", location:{ type:"Point", coordinates:[111,22]}},
{name:"商家B", location:{ type:"Point", coordinates:[22,77]}}
....
]- 创建地理索引:给location 创建 2dsphere 索引
- 根据自己的坐标,利用 $near查询附近距离
{
location: {
$near: {
$geometry: {
type: "Point" ,
coordinates: [ <我的经度> , <我的维度> ]
},
$maxDistance: <distance in meters> //最大距离,单位米
}
}
}这里已经自动按最近的排序了,再结合.1imit(20)即可得到最近的 20家饭店。
使用 Redis 的 Geospatial
Redis 内部使用 Geohash 算法将地理坐标编码为字符串的技术,通过将地球表面划分成网格,来减少坐标的存储空间,并目能够根据前缀进行范围搜索实现快速的区域范围搜索
- 数据存储:
使用 Redis 的 GEOADD命令,将饭店的地理位置信息存储在 Redis 中。每个饭店的经纬度信息作为空间坐标进行存储,并关联到饭店的唯一标识符。
GEOADD restaurants:locations longitude1 latitude1 restaurant_id1
GEOADD restaurants:locations longitude2 latitude2 restaurant_id2
...- 查询附近的饭店:
使用 GEORADIUS 命令,根据用户当前位置(经纬度)查询指定半径内的饭店,并按距离排序,返回最接近的 20 家饭店
GEORADIUS restaurants:locations user_longitude user_latitude radius km WITHDISTANCE COUNT 20 ASC- user longitude 和 user latitude 是用户的经纬度。
- radius 是查询的半径,单位可以是 m(米)、km(千米)等。
- WITHDISTANCE 返回与用户的距离。
- COUNT 20 ASC 返回距离最近的前 20 个结果,并按距离升序排列。
线上数据库连接池爆满问题排查
如果出现了线上连接池爆满的情况,一般情况需要先“止损”,也就是重启服务。
现在一般都会用云上的服务,会有监控,对应监控能查到历史数据库的情况,包括连接池的占用情况、重启后我们快速分析定位事故反生之前一段时间 SOL的执行情况,连接池的占用情况。
查看当时是否有大量的突发清求,例如做了运营活动,那么很可能请求数上来会导致数据车压力过大,处理缓慢使得连接池占用久导致连接数满了。这种情况应该之前预先压测,并且通过限流等手段控制服务的整体压力,。
查看是否有慢 SOL导致长时间连接的占用,需要特别注意近期上线的功能,快速定位查找可能影响的功能点进行代码修复。
查看连接池配置是否合理,大部分默认的连接池的最大连接数可能就只有8或者 10,这种情况下需要适当提高最大连接数,具体连接数需要根据具体业务压测后配置
常见数据库查询性能优化
查看慢查询日志:分析数据库慢查询日志,查看是否有查询性能较差的SQL语句,导致连接长期占用。
优化查询:对于慢查询,可以优化 SOL、增加索引、减少全表扫描等,提升查询效率,减少数据库连接的占用时间。
数据库锁竞争:确认是否有长时间持有锁的 SOL查询,避免长时间的锁竞争导致连接池中的连接无法释放。
面对突发流量应对措施
- 限流:限制每秒请求的数量,避免瞬时流量过大导致连接池耗尽。
- 请求排队:通过消息队列等将请求异步化,减轻数据库直接访问的压力。可以将需要访问数据库的操作转到后台处理,而不是即时返回。(业务上需要做开关,开关打开后就异步处理,正常情况下同步处理)
- 服务降级:对于某些非核心功能,考虑返回默认值或缓存数据,避免直接查询数据库,减轻高峰期的压力
项目上现在需要存储IP 地址,数据库应该用什么类型来存储?
面试时核心要回答清楚两点:IPv4和IPv6的最优存储类型,以及为什么选这个类型
Mysql
- IPv4:优先用无符号整型 (如MySQL的INT UNSIGNED),而不是字符串(如VARCHAR)。
- IPv6:优先用二进制类型(如BINARY(16)或VARBINARY(16)),次选定长字符串(如CHAR(39))。
为什么IPv4不用字符串?
IP地址(如 192.168.1.1)本质是32位二进制数(4字节),用字符串存会浪费空间。
- 字符串存储:需要15字节(如255.255.255.255),且查询时逐字符比较,效率低
- 整型存储:4字节(无符号整型范围0~4294967295,刚好覆盖 IPv4所有可能地址,比较时直接按数值比,更快。还能节省数据库存储空间和IO开销。
举个例子:MySQL有两个函数 INET_ATON()(IP转整型) 和 INET_NTOA()(整型转IP),可以轻松转换。
-- 插入时转整型存储
INSERT INTO user (ip) VALUES (INET_ATON('192.168.1.1'));
-- 查询时转回IP字符串
SELECT INET_NTOA(ip) FROM user;IPv6为什么用二进制?
IPv6地址是128位(16字节),用字符串存储需要39字节(如2801:0080:1F1F:0888:888:0108:11A8:0F),而二进制类型 BINARY(16) 仅需16字节,空间节省一半以上。
部分数据库(如MySQL)也支持 INET6_ATON() 和 INET6_NTOA() 函数转换IPv6,但二进制存储更高效。
反正,面试时要强调“整型/二进制比字符串更省空间、查询更快”,再能说出具体的数据库函数(如INETATON) 或 专用类型(如PostareSQL的inet),体现对底层原理的理解!
PostgreSQl
如果用 PostgreSQl,可以直接用 inet 类型(支持IPV4和IPV6),它会自动验证 iP 格式,且查询时支持范围判断(如 WHERE ip>"192.168.1.0'),更方便
-- 直接存储IP字符串,自动转成inet类型
CREATE TABLE user (ip inet);
INSERT INTO user (ip) VALUES ('192.168.1.1'), ('2001:0000::1');IPv4 转 int 类型转换原理和流程(以 MySQL 为例)
转换原理
IPv4地址(如 192.168.1.1)是4个8位二进制数(0-255)的点分形式,本质是32位无符号整数(范围:0~4294967295)
转换过程:将每个段(如192、168、1、1)按位拼接成一个32位整数。例如:192.168.1.1=192x256^3+168 x 256^2 +1x256^1+ 1 x 256^0 =3232235777
插入和查询流程
步骤1:创建表(字段类型为INT UNSIGNED)
-- 创建表,ip字段用无符号整型存储
CREATE TABLE user_ip (
id INT PRIMARY KEY AUTO_INCREMENT,
ip INT UNSIGNED COMMENT '存储IPv4的整型值'
);步骤2:插入数据(用INET ATON()转整型
MySQL提供 INET_ATON(ip_str)函数,直接将IPv4字符串转为整型
-- 插入IPv4地址 '192.168.1.1'
INSERT INTO user_ip (ip) VALUES (INET_ATON('192.168.1.1'));步骤3:查询数据(用INET NTOA()转回字符串
查询时用 INET_NTOA(int_ip)函数,将整型转回点分格式,
-- 查询所有IP,显示为可读的字符串
SELECT INET_NTOA(ip) AS readable_ip FROM user_ip;IPv6 转二进制类型转换原理和流程(以 MySQL为例)
转换原理
IPv6地址(如 2001:0000:1F1F:0000:0000:0100:11A0:ADDF)是128位二进制数(16字节),用字符串存储需39字节,而二进制仅需16字节
转换过程:将IPv6的16进制字符串(冒号分隔)转为16字节的二进制数据。
插入和查询流程
步骤1:创建表(字段类型为BINARY(16))
-- 创建表,ip字段用16字节的二进制类型存储IPv6
CREATE TABLE user_ipv6 (
id INT PRIMARY KEY AUTO_INCREMENT,
ip BINARY(16) COMMENT '存储IPv6的二进制值'
);步骤2:插入数据(用INET6_ATON()转二进制)
MySQL的 INET6 ATON(ipv6_str)函数可将IPv6字符串转为16字节的二进制数据
-- 插入IPv6地址 '2001:0000:1F1F:0000:0000:0100:11A0:ADDF'
INSERT INTO user_ipv6 (ip) VALUES (INET6_ATON('2001:0000:1F1F:0000:0000:0100:11A0:ADDF'));步骤3:查询数据(用INET6_NTOA()转回字符串)
查询时用 用INET6_NTOA(binary_ip)函数,将二进制数据转回IPv6字符串:
-- 查询所有IPv6,显示为可读的字符串
SELECT INET6_NTOA(ip) AS readable_ipv6 FROM user_ipv6;为什么 IPv6 不能像 IPv4 那样用整数?
核心原因:IPv6 的 128 位长度,超出了主流数据库整数类型的范围,拆成多个整数段又比较复杂和麻烦
IPv4是32位二讲制数(4字节),主流教居库的 无符号整型(QIMSOLINTUNSIGNED) 刚好能存下(4字节=32位,范围0~4294967295),但IPv6是128位(16字节),而主流数居库的整数类型最大只有64位。
MySQL的 BIGINT UNSIGNED是8字节(64位),只能存到18446744873789551615,远小于IPv6的3.4x10^38(128位最大值)。PostgreSQL的 BIGINT 同样是64位,无法存128位的IPv6地址
如果非要用整数存IPv6,需要拆分成多个整数段(比如用2个64位整数拼128位),但会带来3个问题:
- 存储复杂:需要额外字段(如 high_bits 和 low_bits ),增加表结构复杂度。
- 查询低效:查询时需要同时比较两个字段(如 WHERE high_bits= ... AND low_bits=... ),比直接比较二进制字段慢。
- 无原生函数支持:主流数据库没有直接操作128位整数的函数(如MySQL的 INET6_ATON 返回的是二进制,不是整数),转换逻辑需要手动写,容易出错。
为什么 IPv4 优先选整型而非二进制?
IPv4 用整型存储更符合其数值本质,且数据库内置函数和查询操作的便捷性远超二进制。
二进制(如BINARY(4))虽然也能存4字节,但它是“字节序列”,需要额外理解其代表的数值含义,还需要多一步手动转换成二进制的步骤,比较麻烦。
项目中同样的功能,可能是环境或其他要求原因,需要适配多种数据库,如何实现?
本质是因为不同数据库 SQL语法、分页、时间函数等细节可能都不一样,所以需要适配
这个问题的核心思路是:抽象数据库访问层,统一接口、隐藏具体实现,再根据配置注入不同的数据源或 SQL 逻辑。
常见方式可以是 ORM (JPA +Hibernate)+方言机制。或者使用 MyBatis+动态 SQL/自定义 Mapper,或自行实现多数据源配置切换
使用 ORM 框架 + 方言机制
比如JPA/Hibernate、MyBatis Plus、Spring Data,都支持数据库方言(Dialect)
改起来其实很简单,ORM 层用统一的 API(如 findById()、分页等,然后底层会通过方言,自动转换成对应数据库的 SQL 语法
spring:
jpa:
database-platform: org.hibernate.dialect.MySQL8Dialect
## 这是MySQL 8
database-platform: org.hibernate.dialect.PostgreSQLDialect
## 这是PostgreSQL
database-platform: org.hibernate.dialect.H2Dialect
## 这是H2自行封装数据库操作
定义统一接口,不同数据库写各自的实现类,运行时根据配置或环境自动选择
public interface UserRepository {
List<User> getUsers();
}
public class MysqlUserRepository implements UserRepository {
public List<User> getUsers() {
return jdbcTemplate.query("SELECT * FROM user LIMIT 10", ...);
}
}
public class OracleUserRepository implements UserRepository {
public List<User> getUsers() {
return jdbcTemplate.query("SELECT * FROM (SELECT * FROM user) WHERE ROWNUM <= 10", ...);
}
}然后通过 Spring 的@ConditionalOnProperty或工厂模式动态选择实现:
@Bean
public UserRepository userRepository() {
if (env.equals("mysql")) return new MysqlUserRepository();
else return new OracleUserRepository();
}手写 SQL
使用 MyBatis+动态 SQL/自定义 Mapper
因为 MyBatis 本身不依赖具体数据库,支持不同数据库写不同的 Mapper XML
<select id="getUsers" databaseId="mysql">
SELECT * FROM user LIMIT 10
</select>
<select id="getUsers" databaseId="oracle">
SELECT * FROM (SELECT * FROM user) WHERE ROWNUM <= 10
</select>指定数据库类型
mybatis.configuration.database-id=mysql20亿手机号存储选int还是string?varchar还是char?为什么?
怎么设计电商系统的订单数据同步方案(同步到数仓)? 要求数据准确、性能高
可以采用 基于事务日志的增量同步(如CDC)+ 消息队列 +下游服务幂等处理 的方案主要链路架构是:订单主库(如MySQL) → Canal监听订单 Binlog 捕获增删改全变更 → 消息队列(Kafka/RocketMQ)缓冲 → 同步服务异步消费 → 数
仓具体来说,重点有三:
- 数据抓取:用 Canal 对订单数据库的 Binlog 进行监听和解析。这样能避免在业务代码中耦合,保证数据抓取的准确性和高性能,对主库几乎无压力。
- 数据传输:通过消息队列进行异步解耦。将解析后的订单变更事件发送到MQ(如Kafka/RocketMO)。这能消峰填谷,保证系统吞吐量,并实现生产者和消费者的解耦
- 数据消费:下游服务必须实现幂等性。由于网络问题可能导致消息重复,需要通过唯一订单ID或业务流水号来保证重复消息不会导致数据错乱.
为了保证数据的准确性,除了幂等消费外,还需要:
- 定期全量核对主库与数仓,补漏未捕获的事件
- 每天跑批对比主库与数仓的关键指标(订单总数、总金额),确保一致
为了提升性能:
- 同步服务从 Kafka 拉批量事件,批量写入数仓,减少 IO 次数。批量处理,
- 并行消费,按订单ID哈希分区Kafka消息,多实例并行处理,提升吞吐量
设计一个实时数据同步系统,将 MySQL数据实时同步到数据仓库?
设计 MySQL 实时同步到数仓的系统,关键是用 CDC抓变更、靠队列缓冲、按分层处理、保准确提性能
- 变更捕获:用CDC工具(比如Debezium/Canal)监听MySQL的Binlog,实时抓取增/删/改的全量变更
- 异步缓冲:把CDC的事件扔到消息队列(比如Kafka),这样扛住大促的流量洪峰,避免同步服务被压垮
- 同步处理:写个同步服务(用Flink/sparkstreamning或者自研),从队列拉事件,做清洗(过滤无效数据、补全缺失字段)、转换(格式统一,如时间戳转日期)、关联维度(如订单表关联用户信息),再写入数仓。
- 数仓适配:数仓按 ODS-DWD→DWS、 分层,ODS 存原始变更事件,DWD 洗成干净的明细(比如补全商品类目),DWS 聚合成报表指标(比如每日订单量)
准确和性能:
- 准确:用唯一标识做幂等(避免重复处理),每天离线对账补
- 性能:批量写数仓、多实例并行消费、按唯一标识分区分片
如何设计一个高可用的数据同步系统?需要考虑哪些容错机制?
我会用 log-based CDC(如Canal、Debezium)把变更写入 Kafka,在 Kafka 配置副本与 ISR 保证可用性,消费者逻辑做幂等处理并保存消费位点。
遇到写失败靠重试+ DLQ(死信队列)+ 手动回溯来容错。
主要的容错机制:
- 幂等消费,保证重试的正确性
- Kafka 多副本部署,保证消息持久化
- 失败重试 + 死信队列,保证消息不丢失
- 保存消费点位(同步数据的点位)
如何实现数据库的不停服迁移?
迁移想着很简单,不就是把一个库的数据迁移到另一个库吗?
但是实际上有很多细节,在面试中我们可以假装思考下,然后向面试官复述以下几点:
- 首先关注量级,如果是几十万的数据其实直接用代码迁移,简单核对下就结束了。如果数据量大那么才需要好好设计方案。
- 不停服数据迁移需要考虑在线数据的插入和修改,保证数据的一致性。
- 迁移还需要注意回滚,因为一旦发生问题需要及时切换回老库,防止对业务产生影响。
双写方案
大部分数据库迁移都会采用双写方案,例如自建的数据库要迁移到云上的数据库这个场景,双写就是同时写入自建的数据库和云上的数据库。我们来过一遍迁移流程:
- 将云上数据库(新库)作为自建数据库(旧库)的从库,进行数据同步(或者可以利用云上的能力,比如阿里云的 DTS)。
- 改造业务代码,数据写入修改不仅要写入旧库,同时也要写入新库,这就是所谓的双写,注意这个双写需要加开关,即通过修改配置实时打开双写和关闭双写。
- 在业务低峰期,确保数据同步完全一致的时候(即主从不延迟,这个都是有对应的监控的),关闭同步,同时打开双写开关,此时业务代码读取的还是旧数据库
- 进行数据核对,数据量很大的场景只能抽样调查(可以利用定时任务写代码进行抽样核对,一旦不一致就告警和记录。)
- 如果确认数据一致,此时可以进行灰度切流,比如1%的用户切到读新的数据库,如果发现设问题,则可以逐步增加开放的比例
- 继续保留双写,跑个几天(或者更久),确保新库确实没问题了,此时关闭双写,只写新库,这时候迁移就完成了。
flink-cdc 方案
除了主从同步,代码双写的方案,也可以采用第三方工具。例效 finkcdc等工具来进行数据的同步,它的优点方便,且支持异构(比如 mysql 同步到 pg、es 等等)的数据源。

像 fink-cdc 支持先同步全量历史数据,再无缝切到同步增量教据、上图中蓝色小块就是新增的插入数据,会追加到实时一致件快照中;上图中黄色小块是更新的数据,则会在已有历史数据里做更新。
为什么不推荐在 MySQL 中直接存储图片、音频、视频等大容量内容?
MSQL是关系型数据库,它设计的初衷是高效处理结构化和关系型数据,所以存储大容量的内容本身就不是它的职责所在,因此这方面的能力也不够。
应该将大容量文件存储在文件系统或云服务提供的对象存储服务中,仅在数据库中存储文件的路径或 URL即可。
数据库表上新增一个字段,如果这个表正在进行读写操作,如何处理才能不影响现有读写操作?
可以通过 在线DDL操作 或 影子表迁移 来实现。
现在很多数据库都内置在线DDL机制,如MySQL5.6+的 ALGORITHM = INPLACE/ALGORITHM=INSTANT
如果数据库不支持在线 DDL机制,也可以采用应用双写+影子表切换的策略。
主要流程是:先在后台创建新结构的影子表并同步数据,然后在低峰期通过短暂切换(重命名表)完成字段添加,这个过程会短暂的锁表,线上读写基本不受影。可以使用数据同步工具,比如pt-online-schema-chanag、gh-ost来操作。
实际上现在很多云数据库都支持 DDL无锁变更,直接使用即可。
为什么直接加字段会受影响?
比如在 MySQL 5.5 以前版本中执行:
ALTER TABLE user ADD COLUMN age INT;这操作的底层流程是:
- 创建一个新的临时表结构
- 把旧表数据一条条复制到新表
- 删除旧日表
- 重命名新表
整一个过程都是锁表操作的,此时的读写都会卡住。
所以 MySQL从 5.6 开始支持 Online DDL,就是引入了 ALGORITHM=INPLACE 这样的执行策略。简而言之,它告诉 MySQL:“别动老表的数据,不要复制整张表,在原地修改结构。
例如:
ALTER TABLE user ADD COLUMN age INT DEFAULT 0, ALGORITHM=INPLACE, LOCK=NONE;含义是:
- INPLACE:尽量“原地”修改表结构,不复制数据
- LOCK=NONE:尽量不阻塞任何读写操作(只在开始和结束时加一小会元数据锁)
在 MySQL8.0.12 之后,某些简单变更可以使用 ALGORITHM-INSTANT,只改元数据,不改一行数据,几乎是秒级完成! 适用于:
- 新增字段必须是 NULL 或者有默认值
- 没有 NOT NULL 强约束
- 字段位置必须是最后一个
ALTER TABLE orders ADD COLUMN remark VARCHAR(255), ALGORITHM=INSTANT;使用 LIMIT OFFSET 进行分页同步时,发现数据丢失了,可能是什么原因?如何解决?
用 LIMIT OFFSET分页同步丢数据,核心原因是偏移量依赖历史位置,一旦发生数据变动就错乱了。因为 OFFSET N 的本质是先扫描前N条,再跳过它们。
若同步过程中前 N 条数据被删除,后续数据的相对位置会前移,导致实际取到的数据不是预期的下一批。比如原第 11 条因前面删了5条变成第6条,OFFSET 10 会跳过它,直接取第11条,导致漏数据。除此之外,若用 name、create time 等非唯一字段排序,相同排序值的记录顺序不稳定,可能导致某条数据被“跳过”或“重复”
解决方法:
- 优先用游标分页:
不用 OFFSET,而是基于上一页最后一条记录的唯一标识 (如自增ID、时间戳+ID)定位下一页。例如
- 第一页:SELECT* FROM table ORDER BY id ASC LIMIT 10,拿到最后一条id=10
- 第二页:SELECT* FROM table WHERE id>10 ORDER BY id ASC LIMIT 10 。
游标直接定位“具体值”,完全不受中间数据变动影响。
除此之外,不用全量分页,而是就基于日志的增量同步,只同步“变化的数据””,则可以避免分页的偏移问题。
- 约束排序字段唯一性
排序必须用唯一键组合(如id或create time +id),避免因排序字段重复导致游标不稳定。
有一张事件表里面有三个字段,分别是(id,开始时间,结束时间),表中数据量为 5000W,如何统计流量最大的时候有多少条数据?
题干没有告知峰值的统计单位,可以直接询问面试官,原理都是一样的。本答案以秒作为单位,统计每秒的最大值,
我们要统计每秒钟内的最大并发流量,也就是在某一秒内有多少个事件处于活动状态(即时间段的重叠),可以使用差分数组和扫描线思想来实现。
我们可以通过将每个事件活动的开始时间和结束时间记录为增量(开始时流量 +1,结束时流量-1),并通过扫描线的方式对每一秒进行叠加,最终得到每秒的并发流量
CREATE TABLE events (
id INT,
start_time DATETIME,
end_time DATETIME
);假设 start_time 和 end_time 记录的时间单位就是秒,使用一个差分数组(增量数组)来记录每秒的流量变化。
具体来说:我们将每个事件的开始和结束时间处理成时间点(例如:start_time 对应 +1增加并发,end_time +1对应 -1 减少并发),并存储在一个列表中,然后我们对这些时间点进行排序,并计算每个时间点的并发变化,最终找出最大并发数
假设我们有两个事件:
事件1:从2024-12-05 10:00:00 到 2024-12-05 10:00:30
事件2:从2024-12-0510:00:01 到 2024-12-05 10:00:50
我们关心的是这些事件在某些时间点的重叠情况(即并发数),记录每个事件的开始和结束时刻,增减并发数。
时间戳变化图
时间点 | 10:00:00 | 10:00:01 | 10:00:30 | 10:00:50 |
事件1 | 开始 -> +1 | | 结束 -> -1 | |
事件2 | | 开始 -> +1 | | 结束 -> -1 |
并发数 | 1 | 2 | 1 | 0 |代码实现如下:
import java.util.*;
public class EventConcurrency {
public static class Event {
String id;
Date startTime;
Date endTime;
public Event(String id, Date startTime, Date endTime) {
this.id = id;
this.startTime = startTime;
this.endTime = endTime;
}
}
public static class EventPoint {
Date timestamp;
int change; // +1 for start, -1 for end + 1 second
public EventPoint(Date timestamp, int change) {
this.timestamp = timestamp;
this.change = change;
}
}
public static void main(String[] args) {
// mock 数据
List<Event> events = Arrays.asList(
new Event("1", parseDate("2024-12-05 10:00:00"), parseDate("2024-12-05 10:00:30")),
new Event("2", parseDate("2024-12-05 10:00:01"), parseDate("2024-12-05 10:00:50"))
);
// 获取最大并发量和时间点
Map.Entry<Date, Integer> result = getMaxConcurrency(events);
if (result != null) {
System.out.println("Max concurrency: " + result.getValue() + " at time: " + formatDate(result.getKey()));
} else {
System.out.println("No events found.");
}
}
public static Map.Entry<Date, Integer> getMaxConcurrency(List<Event> events) {
List<EventPoint> eventPoints = new ArrayList<>();
for (Event event : events) {
eventPoints.add(new EventPoint(event.startTime, 1)); // Start time: +1 concurrency
eventPoints.add(new EventPoint(addSecond(event.endTime), -1)); // End time + 1: -1 concurrency
}
// 通过时间排序
eventPoints.sort(Comparator.comparing((EventPoint p) -> p.timestamp));
int currentConcurrency = 0;
int maxConcurrency = 0;
Date maxConcurrencyTime = null;
// 遍历已排序的事件点以查找最大并发性
for (EventPoint point : eventPoints) {
currentConcurrency += point.change;
if (currentConcurrency > maxConcurrency) {
maxConcurrency = currentConcurrency;
maxConcurrencyTime = point.timestamp;
}
}
return new AbstractMap.SimpleEntry<>(maxConcurrencyTime, maxConcurrency);
}
public static Date parseDate(String dateStr) {
try {
return new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(dateStr);
} catch (java.text.ParseException e) {
e.printStackTrace();
return null;
}
}
public static String formatDate(Date date) {
return new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date);
}
// +1 秒
public static Date addSecond(Date date) {
Calendar cal = Calendar.getInstance();
cal.setTime(date);
cal.add(Calendar.SECOND, 1);
return cal.getTime();
}
}理解上面的思路后,我们再来看看题干,表中有 5000w 数据,所以需要性能优化,而不是一次性加载所有的数据到内存中。
性能优化
对于 5000万条数据的规模,直接查询和处理可能会非常慢。为了提高效率,可以考虑以下优化
- 索引:在 start_time 和 end_time 字段上创建索引,以加速查询。
- 按时间范围查询:如果事件按时间分布较均匀,按时间范围(例如每天)分批次查询会更高效。
- 数据据分批加载:除了时间范围,也可以分页加载处理
- 减少内存占用:利用 Map 来存储每个时间戳的增减信息,而不需要一次性存储所有时间点。
- 多线程优化:可以使用多线程并行处理。
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
import java.sql.*;
import java.text.SimpleDateFormat;
public class OptimizedEventConcurrency {
public static class Event {
String id;
Date startTime;
Date endTime;
public Event(String id, Date startTime, Date endTime) {
this.id = id;
this.startTime = startTime;
this.endTime = endTime;
}
}
public static void main(String[] args) {
// 数据库连接(示例中为模拟)
String url = "jdbc:mysql://localhost:3306/events_db";
String user = "root";
String password = "password";
// 每次查询的分页大小(每次加载的记录数)
int pageSize = 10000;
int currentPage = 1;
// 使用 ConcurrentSkipListMap 来保证时间戳的线程安全排序
Map<Long, Integer> timestampChanges = new ConcurrentSkipListMap<>();
// 数据库连接(用于读取事件数据)
try (Connection conn = DriverManager.getConnection(url, user, password)) {
// 假设总记录有 5000w 条,分页加载
while (true) {
List<Event> events = fetchEventsFromDatabase(conn, currentPage, pageSize);
if (events.isEmpty()) break; // 没有更多的事件数据,退出循环
// 使用并行流处理事件,减少处理时间
processEventsInParallel(events, timestampChanges);
currentPage++; // 下一页
}
// 计算所有事件的最大并发数
Map.Entry<Long, Integer> result = getMaxConcurrency(timestampChanges);
if (result != null) {
System.out.println("最大并发数: " + result.getValue() + " 发生在时间戳: " + formatTimestamp(result.getKey()));
} else {
System.out.println("没有找到事件数据。");
}
} catch (SQLException e) {
e.printStackTrace();
}
}
// 从数据库中获取事件数据,分页查询
public static List<Event> fetchEventsFromDatabase(Connection conn, int page, int pageSize) throws SQLException {
String query = "SELECT id, start_time, end_time FROM events LIMIT ?, ?";
try (PreparedStatement ps = conn.prepareStatement(query)) {
ps.setInt(1, (page - 1) * pageSize);
ps.setInt(2, pageSize);
try (ResultSet rs = ps.executeQuery()) {
List<Event> events = new ArrayList<>();
while (rs.next()) {
String id = rs.getString("id");
Date startTime = rs.getTimestamp("start_time");
Date endTime = rs.getTimestamp("end_time");
events.add(new Event(id, startTime, endTime));
}
return events;
}
}
}
// 使用并行流处理事件并更新时间戳变化
public static void processEventsInParallel(List<Event> events, Map<Long, Integer> timestampChanges) {
events.parallelStream().forEach(event -> {
// 对开始时间添加 +1
long startTimestamp = event.startTime.getTime() / 1000;
timestampChanges.merge(startTimestamp, 1, Integer::sum);
// 对结束时间(加1秒)添加 -1
long endTimestamp = (event.endTime.getTime() + 1000) / 1000; // 结束时间 + 1 秒
timestampChanges.merge(endTimestamp, -1, Integer::sum);
});
}
// 从时间戳变化映射中计算最大并发数
public static Map.Entry<Long, Integer> getMaxConcurrency(Map<Long, Integer> timestampChanges) {
int currentConcurrency = 0;
int maxConcurrency = 0;
long maxConcurrencyTimestamp = -1;
// 按时间戳排序,计算并发数
for (Map.Entry<Long, Integer> entry : timestampChanges.entrySet()) {
currentConcurrency += entry.getValue();
if (currentConcurrency > maxConcurrency) {
maxConcurrency = currentConcurrency;
maxConcurrencyTimestamp = entry.getKey();
}
}
return maxConcurrency > 0 ? new AbstractMap.SimpleEntry<>(maxConcurrencyTimestamp, maxConcurrency) : null;
}
// 将时间戳格式
public static String formatTimestamp(long timestamp) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sdf.format(new Date(timestamp * 1000));
}
}代码的改造如下:
- 分页查询数据库,避免一次性加载所有数据,减少内存压力
- 使用 parallelStream() 来并行处理每批数据,充分利用多核 CPU 提高性能
- ConcurrentSkipListMap 保证线程安全和数据的有序性
类似问题:怎么求出一天 内的最大在线人数?以及维持最大在线人数的最长持续时间?
现在有一天内的大量日志,每条日志记录了用户id,登陆时间,登出时间(userid,login time,logout time;,时间单位是秒。怎么求出一天 内的最大在线人数?以及维持最大在线人数的最长持续时间?
将所有登录时间视为+1 事件,登出时间视为-1事件,放入一个时间线中排序,再依次扫描整个时间轴,记录当前在线人数的变化,即可求出最大在线人数及其持续的最长时间段。
这个方法的时间复杂度为 O(NloqN)。(这叫扫描线算法,这题本质是一个经典的区间重叠统计问题。)
MySQL 中 如果 select from 一个有 1000 万行的表,内存会飙升么?
内存不会飙升。
因为MSQL 在执行简单SELECT * FROM查询时,不会一次性将所有1000万行数据加载到内存中,而是通过逐批次处理的方式来控制内存使用。也就是说 MySQL 是边查边发送数据给客户端,
分批的大小与 net_buffer length 有关,默认16384字节(16 KB)。
所以实际上获取数据和发送数据的流程是这样的:
- 获取一行,写入到 net_buffer 中。
- 重复获取行,直到 net_buffer 写满,调用网络接口发出去。
- 发送成功后 清空 net_buffer 。
- 再继续取下一行,并写入 net_buffer,继续上述的操作。
这里还需要注意一点,发送的数据是需要被客户端读取的,如果客户端读取的慢,导致本地网络栈(socketsend bufer)写满了,那么当前数据的写入会被暂停。
综上,SELECT * FROM一个有1000 万行的表,不会导致内存飙升。
注意 ORDER BY 和 GROUP BY
正常不需要排序和分组的查询不会占用过多的内存和影响 MySQLServer 的执行,但是如果涉及到分组和排序,那么就需要使用额外的内存(或外部空间)来处理全量数据,可能会占用额外的内存并影响查询的效率。
客户端的处理方式
虽然 MVSQL会分批返回数据,但是客户端需要做一定的处理,不能全量保存教据,否则可能会导致内存溢出。
客户端需要使用流式处理或者游标来查询。
- 在编写查询代码时,使用流式读取方式(如JDBC中的 ResultSet.FETCH_SIZE),这会让数据库逐步返回数据,客户端按需处理。
// 获取数据库连接并开启流式处理
try (Connection connection = DriverManager.getConnection(URL, USER, PASSWORD)) {
// 禁用自动提交,避免结果集完全缓存到内存
connection.setAutoCommit(false);
// 创建一个可以进行流式处理的 Statement
try (PreparedStatement statement = connection.prepareStatement(query,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
// 设置流式处理的关键:MySQL 特定配置
statement.setFetchSize(Integer.MIN_VALUE);
// 执行查询
try (ResultSet resultSet = statement.executeQuery()) {
while (resultSet.next()) {
// 获取每行数据,这里假设有 "id" 和 "name" 两列
int id = resultSet.getInt("id");
String name = resultSet.getString("name");
// 处理数据(例如打印出来)
System.out.printf("ID: %d, Name: %s%n", id, name);
}
}
}
} catch (SQLException e) {
e.printStackTrace();
}- MyBatis 简单示例,重点就是 session.getconfiguration().setDefaultFetchsize(Integer.MIN_VALUE);
try (SqlSession session = sqlSessionFactory.openSession()) {
// 设置 fetchSize 为 Integer.MIN_VALUE 启用流式查询
session.getConfiguration().setDefaultFetchSize(Integer.MIN_VALUE);
// 获取 Mapper
LargeTableMapper mapper = session.getMapper(LargeTableMapper.class);
// 使用 ResultHandler 处理数据,逐行读取
mapper.selectAll(resultContext -> {
LargeTableRecord record = resultContext.getResultObject();
System.out.printf("ID: %d, Name: %s%n", record.getId(), record.getName());
// 可以在这里对数据进行处理,避免一次性加载到内存
});
}- MyBatis 游标查询示例,重点就是使用cursor接口
try (SqlSession session = sqlSessionFactory.openSession()) {
// 获取 Mapper
LargeTableMapper mapper = session.getMapper(LargeTableMapper.class);
// 使用游标查询数据
try (Cursor<LargeTableRecord> cursor = mapper.selectAllWithCursor()) {
for (LargeTableRecord record : cursor) {
System.out.printf("ID: %d, Name: %s%n", record.getId(), record.getName());
// 可以在这里对数据进行处理,避免一次性加载到内存
}
}
}