回答重点
题干没有告知峰值的统计单位,可以直接询问面试官,原理都是一样的。本答案以秒作为单位,统计每秒的最大值,
我们要统计每秒钟内的最大并发流量,也就是在某一秒内有多少个事件处于活动状态(即时间段的重叠),可以使用差分数组和扫描线思想来实现。
我们可以通过将每个事件活动的开始时间和结束时间记录为增量(开始时流量 +1,结束时流量-1),并通过扫描线的方式对每一秒进行叠加,最终得到每秒的并发流量
CREATE TABLE events (
id INT,
start_time DATETIME,
end_time DATETIME
);
假设 start_time 和 end_time 记录的时间单位就是秒,使用一个差分数组(增量数组)来记录每秒的流量变化。
具体来说:我们将每个事件的开始和结束时间处理成时间点(例如:start_time 对应 +1增加并发,end_time +1对应 -1 减少并发),并存储在一个列表中,然后我们对这些时间点进行排序,并计算每个时间点的并发变化,最终找出最大并发数
假设我们有两个事件:
事件1:从2024-12-05 10:00:00 到 2024-12-05 10:00:30
事件2:从2024-12-0510:00:01 到 2024-12-05 10:00:50
我们关心的是这些事件在某些时间点的重叠情况(即并发数),记录每个事件的开始和结束时刻,增减并发数。
时间戳变化图
时间点 | 10:00:00 | 10:00:01 | 10:00:30 | 10:00:50 |
事件1 | 开始 -> +1 | | 结束 -> -1 | |
事件2 | | 开始 -> +1 | | 结束 -> -1 |
并发数 | 1 | 2 | 1 | 0 |
代码实现如下:
import java.util.*;
public class EventConcurrency {
public static class Event {
String id;
Date startTime;
Date endTime;
public Event(String id, Date startTime, Date endTime) {
this.id = id;
this.startTime = startTime;
this.endTime = endTime;
}
}
public static class EventPoint {
Date timestamp;
int change; // +1 for start, -1 for end + 1 second
public EventPoint(Date timestamp, int change) {
this.timestamp = timestamp;
this.change = change;
}
}
public static void main(String[] args) {
// mock 数据
List<Event> events = Arrays.asList(
new Event("1", parseDate("2024-12-05 10:00:00"), parseDate("2024-12-05 10:00:30")),
new Event("2", parseDate("2024-12-05 10:00:01"), parseDate("2024-12-05 10:00:50"))
);
// 获取最大并发量和时间点
Map.Entry<Date, Integer> result = getMaxConcurrency(events);
if (result != null) {
System.out.println("Max concurrency: " + result.getValue() + " at time: " + formatDate(result.getKey()));
} else {
System.out.println("No events found.");
}
}
public static Map.Entry<Date, Integer> getMaxConcurrency(List<Event> events) {
List<EventPoint> eventPoints = new ArrayList<>();
for (Event event : events) {
eventPoints.add(new EventPoint(event.startTime, 1)); // Start time: +1 concurrency
eventPoints.add(new EventPoint(addSecond(event.endTime), -1)); // End time + 1: -1 concurrency
}
// 通过时间排序
eventPoints.sort(Comparator.comparing((EventPoint p) -> p.timestamp));
int currentConcurrency = 0;
int maxConcurrency = 0;
Date maxConcurrencyTime = null;
// 遍历已排序的事件点以查找最大并发性
for (EventPoint point : eventPoints) {
currentConcurrency += point.change;
if (currentConcurrency > maxConcurrency) {
maxConcurrency = currentConcurrency;
maxConcurrencyTime = point.timestamp;
}
}
return new AbstractMap.SimpleEntry<>(maxConcurrencyTime, maxConcurrency);
}
public static Date parseDate(String dateStr) {
try {
return new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(dateStr);
} catch (java.text.ParseException e) {
e.printStackTrace();
return null;
}
}
public static String formatDate(Date date) {
return new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date);
}
// +1 秒
public static Date addSecond(Date date) {
Calendar cal = Calendar.getInstance();
cal.setTime(date);
cal.add(Calendar.SECOND, 1);
return cal.getTime();
}
}
理解上面的思路后,我们再来看看题干,表中有 5000w 数据,所以需要性能优化,而不是一次性加载所有的数据到内存中。
性能优化
对于 5000万条数据的规模,直接查询和处理可能会非常慢。为了提高效率,可以考虑以下优化
- 索引:在 start_time 和 end_time 字段上创建索引,以加速查询。
- 按时间范围查询:如果事件按时间分布较均匀,按时间范围(例如每天)分批次查询会更高效。
- 数据据分批加载:除了时间范围,也可以分页加载处理
- 减少内存占用:利用 Map 来存储每个时间戳的增减信息,而不需要一次性存储所有时间点。
- 多线程优化:可以使用多线程并行处理。
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
import java.sql.*;
import java.text.SimpleDateFormat;
public class OptimizedEventConcurrency {
public static class Event {
String id;
Date startTime;
Date endTime;
public Event(String id, Date startTime, Date endTime) {
this.id = id;
this.startTime = startTime;
this.endTime = endTime;
}
}
public static void main(String[] args) {
// 数据库连接(示例中为模拟)
String url = "jdbc:mysql://localhost:3306/events_db";
String user = "root";
String password = "password";
// 每次查询的分页大小(每次加载的记录数)
int pageSize = 10000;
int currentPage = 1;
// 使用 ConcurrentSkipListMap 来保证时间戳的线程安全排序
Map<Long, Integer> timestampChanges = new ConcurrentSkipListMap<>();
// 数据库连接(用于读取事件数据)
try (Connection conn = DriverManager.getConnection(url, user, password)) {
// 假设总记录有 5000w 条,分页加载
while (true) {
List<Event> events = fetchEventsFromDatabase(conn, currentPage, pageSize);
if (events.isEmpty()) break; // 没有更多的事件数据,退出循环
// 使用并行流处理事件,减少处理时间
processEventsInParallel(events, timestampChanges);
currentPage++; // 下一页
}
// 计算所有事件的最大并发数
Map.Entry<Long, Integer> result = getMaxConcurrency(timestampChanges);
if (result != null) {
System.out.println("最大并发数: " + result.getValue() + " 发生在时间戳: " + formatTimestamp(result.getKey()));
} else {
System.out.println("没有找到事件数据。");
}
} catch (SQLException e) {
e.printStackTrace();
}
}
// 从数据库中获取事件数据,分页查询
public static List<Event> fetchEventsFromDatabase(Connection conn, int page, int pageSize) throws SQLException {
String query = "SELECT id, start_time, end_time FROM events LIMIT ?, ?";
try (PreparedStatement ps = conn.prepareStatement(query)) {
ps.setInt(1, (page - 1) * pageSize);
ps.setInt(2, pageSize);
try (ResultSet rs = ps.executeQuery()) {
List<Event> events = new ArrayList<>();
while (rs.next()) {
String id = rs.getString("id");
Date startTime = rs.getTimestamp("start_time");
Date endTime = rs.getTimestamp("end_time");
events.add(new Event(id, startTime, endTime));
}
return events;
}
}
}
// 使用并行流处理事件并更新时间戳变化
public static void processEventsInParallel(List<Event> events, Map<Long, Integer> timestampChanges) {
events.parallelStream().forEach(event -> {
// 对开始时间添加 +1
long startTimestamp = event.startTime.getTime() / 1000;
timestampChanges.merge(startTimestamp, 1, Integer::sum);
// 对结束时间(加1秒)添加 -1
long endTimestamp = (event.endTime.getTime() + 1000) / 1000; // 结束时间 + 1 秒
timestampChanges.merge(endTimestamp, -1, Integer::sum);
});
}
// 从时间戳变化映射中计算最大并发数
public static Map.Entry<Long, Integer> getMaxConcurrency(Map<Long, Integer> timestampChanges) {
int currentConcurrency = 0;
int maxConcurrency = 0;
long maxConcurrencyTimestamp = -1;
// 按时间戳排序,计算并发数
for (Map.Entry<Long, Integer> entry : timestampChanges.entrySet()) {
currentConcurrency += entry.getValue();
if (currentConcurrency > maxConcurrency) {
maxConcurrency = currentConcurrency;
maxConcurrencyTimestamp = entry.getKey();
}
}
return maxConcurrency > 0 ? new AbstractMap.SimpleEntry<>(maxConcurrencyTimestamp, maxConcurrency) : null;
}
// 将时间戳格式
public static String formatTimestamp(long timestamp) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sdf.format(new Date(timestamp * 1000));
}
}
代码的改造如下:
- 分页查询数据库,避免一次性加载所有数据,减少内存压力
- 使用 parallelStream() 来并行处理每批数据,充分利用多核 CPU 提高性能
- ConcurrentSkipListMap 保证线程安全和数据的有序性