【SQL周周练】:利用行车轨迹分析犯罪分子作案地点
大家好,我是“蒋点数分”,多年以来一直从事数据分析工作。从今天开始,与大家持续分享关于数据分析的学习内容。
本文是第 7 篇,也是【SQL 周周练】系列的第 6 篇。该系列是挑选或自创具有一些难度的 SQL 题目,一周至少更新一篇。后续创作的内容,初步规划的方向包括:
后续内容规划
1.利用 Streamlit 实现 Hive 元数据展示
、SQL 编辑器
、 结合Docker 沙箱实现数据分析 Agent
2.时间序列异常识别、异动归因算法
3.留存率拟合、预测、建模
4.学习 AB 实验
、复杂实验设计等
5.自动化机器学习
、自动化特征工程
6.因果推断
学习
7. ……
欢迎关注,一起学习。
第 6 期题目
题目来源:纯自创题目,受到《The SQL Murder Mystery》的启发,它是一个用 SQL 来寻找凶手的题目;我玩过之后,很受启发,想出一个【SQL 破案系列】,但是我想象力不够、文笔一般。剧情又要设计为 SQL 解题,只能放弃这个方向。今天拿出来一道之前设计的题目:尽管文笔和设计上有很多不足,但有点意思。最关键,我在 8 年的数据分析经历,从来没有在 SQL 中使用过三角函数,而这道题目就需要使用三角函数
一、题目介绍
大家可以先看看故事背景,这是两个多月前写的。写作方法是我提一个梗概,然后让 Deepseek 或者 Qwen 润色和发散;等它们返回来结果,我再吸收和更改;然后再提问再更改,如此往复五次以上。我还没有尝试过 Gemini 2.5 pro 或 GPT 来写。
对故事不感兴趣的同学可以跳过,只是故事情节对题目理解略有帮助:
凌晨三点,T 市西郊分局的走廊映着惨白的荧光,袭来一种不真实感。刑警队长王泽宇脚步急促,径直走向拘留室区域,值班台的警员小李正在踱步抵抗困意。
“王队,您亲自来了,要提审谁?” 小李看到王队后一个激灵。话音未落,警员老张默默地拉开通道的铁门。
向两名值班警察点头示意后,王队走入了通道,脚步声不多时便被走廊吞噬。只剩下微弱的电流声在空旷中回荡。
七拐八拐的,王队停在了一间隐蔽的拘留室,这片区域只关押着白辉 —— 一个游走在灰色地带的小混混,也是王队的线人。
隔着铁栅栏望过去,白辉仰面躺在床板上,双眼盯着天花板。显然,已经察觉到了有人过来。
“怎么,什么话不能在外边说?非得喝酒闹事,这个点在这儿见面。” 王队有些嗔怪。
白辉佝偻着背坐起身来,无奈笑道,“道上兄弟打个喷嚏,隔天一圈全得流感。今天我收别人的风,明天别人就放我的料。”
王队跨前半步,余光撇了眼走廊的摄像头,压低声音,“说吧。”
“来了批过江龙,硬点子。可能 H 省的。” 白辉顿了顿,“老 K 您知道吧?”
王泽宇点头示意他继续,“老 K 的架生和烟花生意别说在 T 市,北方也是一号啊。遇到这帮人,也怕了。”
“这伙人找老 K 买枪弹和炸Y,搞黑吃黑?”王队眉头紧锁,脑海中飞速思考。
白辉咧咧嘴,“老 K 这种老狐狸,卖架生,小弟们都背着雷管。”
“这种交易钱货两清必须快,没想到这伙人在交易地点附近开车绕圈。” 白辉食指划了两个圈,“外围望风小弟立刻通知老 K,老 K 都准备撤了”
“结果一道红色激光瞄准了老 K 的胸口…… “ 白辉差点呛住 ”虽然最后交易成功了,但老 K 到家腿还软着。“
“不是老 K 吓坏,这消息能走漏么。” 白辉嘘了口气。
王队追问“还有什么消息。”,焦急情绪溢于言表。
白辉摇摇头,“只听说有一伙人定了两条大飞,三天后。”
三分钟后,王队已经赶到了技术科,分析师 J 正揉着发酸的后颈。
“情况紧急,小 J…… 你立刻用道路监控系统分析近期全市车辆的轨迹,排查可疑车辆。” 老王语气急促。
“我刚才已经通知了局长,局长命令半小时后开会”。
分析师 J 飞速敲着键盘,道路监视系统采集的海量视频影像,经过算法逐渐转为结构化数据流入集群。
“还有 10 分钟”,J 撇了眼屏幕右下角的时间,没注意到王泽宇已经离开,心想。“用这伙人踩点的特征去追踪……”
有一张数据仓库的表,里面是道路影像视频资料根据 CV 算法分析得到的(咱这儿就当小说不管现实可行性)。表里有如下的数据:时间
,车牌号
,纬度
,经度
(假设摄像头拍到车牌后,根据单目测距算法和摄像头本身的坐标计算出车辆的坐标)
我们假设这个故事中摄像头密度很高,利用不同摄像头记录的车辆的坐标和时间差,计算这一小段距离的平均速度。如果平均速度在某个范围内视为“疑似踩点”,在另一个范围视为“正常行驶”;并且故事设计犯罪分子会驾车不多不少刚好围着作案地点附近绕行一圈,这样利用坐标的均值就可以求得“质心/中心点”。
题目规定就是求出这个“质心/中心点” —— 也就是谋划犯罪的地点。
ts | string | 时间 (为了计算准确,这里精确到微秒) |
licence_plate | string | 车牌 |
lagtitude | double | 纬度 |
longtitude | double | 经度 |
is_case_the_join | tinyint | 是否有为踩点 (做题时不用,为了验证数据的 1-是,0-否) |
部分样例数据(完整生成逻辑参见第三节)
2025-06-01 09:45:40.846060 | J-9876 | 39.116034 | 117.194557 | 0 |
2025-06-01 09:45:44.176520 | J-9876 | 39.116105 | 117.194074 | 0 |
2025-06-01 09:45:47.831298 | J-9876 | 39.116346 | 117.194396 | 0 |
2025-06-01 09:45:52.131025 | J-9876 | 39.116621 | 117.194836 | 0 |
... | ... | ... | ... | ... |
... | ... | ... | ... | ... |
2025-06-01 10:13:39.177957 | J-9876 | 39.115997 | 117.195361 | 0 |
2025-06-01 10:13:43.865901 | J-9876 | 39.11603 | 117.194675 | 0 |
注意,模拟数据时为了简化,只设置了一个辆车即一个车牌,但是我写 SQL 的时候没有忽略掉这个维度,按照有多辆车的写法来处理。 另外,我在模拟数据时,让这个车绕了多个地点 —— 多个地方“踩点”。
二、题目思路
想要答题的同学,可以先思考答案🤔。
.……
.……
.……
我来谈谈我的思路,这道题目其实还是“断点分组”类问题 1.“断点分组”类问题,顾名思义,里面有两个词一个是“断点”一个是“分组”。“分组”最容易想到将一类有相同“维度”(分组标识)的数据放在一起统计,对于这道题目来说,就是“踩点”的某个“点”,那么这个属于这个“踩点”的所有轨迹点应该放在同一组,进而求平均得到“中心点”
2.“断点”体现在哪里?题目设计时,规定了犯罪分子驾车会有两个速度范围,一个是正常驾驶范围,一个是“踩点”低速驾驶行为(忽略在现实中的不合理性,因为这道题设计更偏趣味)。所以这两个速度范围的切换点,就是那个“断点”。就需要先求出来速度
3.摄像头拍到车,并给出坐标和时刻 —— 速度就是距离除以时间差,而距离需要用坐标来计算。如果是 Postgre
、SQL Server
或者某些数据库,可能利用 GIS
的插件/函数来计算,我没有用过。而且此题目本意也是为了练习 Hive
三角函数,根据搜索,推荐使用 Haversine公式
来处理。
其中 是地球半径
对这个公式证明感兴趣的同学,可以自行搜索。说实话,我没看证明。我还尝试了类似平面坐标系中计算欧式距离和曼哈顿距离的方法:在 Python 中我验证了后两者的 Haversine
的差距,因为这些坐标之间本身就很近,使用欧式距离也可以处理。
有人论述过反正切计算开销大,若非必要可以近似求解 —— 感兴趣的同学可以自行搜索
下面,我用 NumPy
和 Scipy
生成模拟的数据集:
三、用 Python 生成模拟数据
只关心 SQL 代码的同学,可以跳转到第四节(我在工作中使用 Hive
较多,因此采用 Hive
的语法)
模拟代码如下: 1.引入必要的包,定义“踩点”和正常的速度范围,前者4~6m/s,后者8~16m/s:
import math
from datetime import datetime
import numpy as np
import pandas as pd
import scipy
# scipy 的随机数种子
RANDOM_SEED = 2025
# 踩点速度和正常速度
SPEED_CASE = (4, 6)
SPEED_NORMAL = (8, 16)
# 起始点日期范围
DATE_BEGIN = ("2025-06-01 08:00:00.000", "2025-06-01 21:00:00.000")
TIMESTAMP_BEGIN = [
datetime.strptime(d, "%Y-%m-%d %H:%M:%S.%f").timestamp()
for d in DATE_BEGIN
]
2.path
指向的文件,里面存的数据是我在某网站一个一个点的坐标轨迹。使用 shift
函数移动一行,方便计算两个相邻行的距离:
path = "sql题目经纬度.xlsx"
df = pd.read_excel(
path,
sheet_name="Sheet1",
header=None,
names=["licencePlate", "lagtitude", "longtitude", "isCaseTheJoint"],
)
# 下移一行,使得每一行拥有上一行的坐标数据
df[["lastLagtitude", "lastLongtitude"]] = df.groupby(by=["licencePlate"])[
["lagtitude", "longtitude"]
].shift(1)
3.定义 haversine
和近似计算 approx_distance
的函数,并求结果进行对比。其中 haversine
的代码,抄的 —— 来源是,“模型视角”的《学点几何 | 计算球面距离的哈弗塞恩公式》;approx_distance
近似计算的方法,是我自己写的:
# 定义哈弗赛恩公式函数
def haversine_distance(gps1, gps2):
lat1, lon1 = gps1
lat2, lon2 = gps2
R = 6371 * 1000 # 地球半径,单位为公里
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (
math.sin(dlat / 2) ** 2
+ math.cos(math.radians(lat1))
* math.cos(math.radians(lat2))
* math.sin(dlon / 2) ** 2
)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
# c2 = 2 * math.asin(math.sqrt(a))
distance = R * c
return distance
# 计算近似距离
def approx_distance(gps1, gps2):
lat1, lon1 = gps1
lat2, lon2 = gps2
R = 6371 * 1000
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
avg_lat = math.radians((lat1 + lat2) / 2)
dx = dlon * math.cos(avg_lat) * R
dy = dlat * R
euclidean_approx = math.sqrt(pow(dx, 2) + pow(dy, 2))
manhattan_approx = abs(dx) + abs(dy)
return euclidean_approx, manhattan_approx
df["haversineDistance"] = df.apply(
lambda row: haversine_distance(
(row["lagtitude"], row["longtitude"]),
(row["lastLagtitude"], row["lastLongtitude"]),
),
axis=1,
)
# 如果路本身是斜的,曼哈顿距离比欧式距离要“虚高”较多
# 但类似路口拐角,曼哈顿距离更加准确
df["approxDistance"] = df.apply(
lambda row: approx_distance(
(row["lagtitude"], row["longtitude"]),
(row["lastLagtitude"], row["lastLongtitude"]),
),
axis=1,
)
df["euclideanDistance"] = df["approxDistance"].apply(lambda row: row[0])
df["manhattanDistance"] = df["approxDistance"].apply(lambda row: row[1])
df.drop(columns=["approxDistance"], inplace=True)
从图中可以看出可以看出 haversine 和欧式距离,在几十米的距离中,结果几乎一样(至少计算结果小数点后 6 位一致)
4.使用标准正态分布,并使用线性变换,随机生成踩点和正常驾驶的速度,使用 clip
裁剪避免数据超出这个范围;假设 4~6m/s 之间是踩点的速度,8~16m/s 之间是正常驾驶的速度:
# 假设 4 ~6m/s 之间是踩点的速度,8~16m/s 之间是正常驾驶的速度
# 用 havsersineDistance 来求,不考虑红绿灯或者堵车等情况
# 先不单独处理起始点,即 lastLagtitude 为 NaN 的点,这样代码写着更简洁,最后再赋 NaN
df["speed"] = scipy.stats.norm.rvs(
loc=0, scale=1, size=df.shape[0], random_state=RANDOM_SEED
)
# 正态分布线性变换
df.loc[df["isCaseTheJoint"] == 1, "speed"] = (
df["speed"] * (SPEED_CASE[1] - SPEED_CASE[0]) / 8
+ (SPEED_CASE[0] + SPEED_CASE[1]) / 2
)
df.loc[df["isCaseTheJoint"] == 0, "speed"] = (
df["speed"] * (SPEED_NORMAL[1] - SPEED_NORMAL[0]) / 8
+ (SPEED_NORMAL[0] + SPEED_NORMAL[1]) / 2
)
# 使用 clip 裁剪速度数据
df.loc[df["isCaseTheJoint"] == 1, "speed"] = df.loc[
df["isCaseTheJoint"] == 1, "speed"
].clip(lower=SPEED_CASE[0], upper=SPEED_CASE[1])
df.loc[df["isCaseTheJoint"] == 0, "speed"] = df.loc[
df["isCaseTheJoint"] == 0, "speed"
].clip(lower=SPEED_NORMAL[0], upper=SPEED_NORMAL[1])
5.利用距离和速度,求出时间差。随机生成起始点出发的时间,并用 cumsum
实现累加,生成 csv
文件:
# 注意单位是毫秒
df["ts"] = df["haversineDistance"] / df["speed"]
df.loc[df["lastLagtitude"].isna(), "ts"] = scipy.stats.uniform.rvs(
loc=TIMESTAMP_BEGIN[0],
scale=TIMESTAMP_BEGIN[1]-TIMESTAMP_BEGIN[0],
size=df["lastLagtitude"].isna().sum(),
random_state=RANDOM_SEED,
)
df["ts"] = df.groupby(by=["licencePlate"])["ts"].cumsum()
def timestamp_to_datestr(ts: int) -> str:
"""
注意 ts 是毫秒,需要除以 1000
"""
date = datetime.fromtimestamp(ts)
date_str = date.strftime("%Y-%m-%d %H:%M:%S.%f")
return date_str
df["ts"] = df["ts"].apply(timestamp_to_datestr)
out_csv_path = "dwd_vehicle_track.csv"
df[["ts", "licencePlate", "lagtitude", "longtitude", "isCaseTheJoint"]].to_csv(
out_csv_path, encoding="utf-8-sig", header=False, index=False
)
# 在 Jupyter 环境下展示 dataframe,其他环境执行可能报错
display(df[["ts", "licencePlate", "lagtitude", "longtitude", "isCaseTheJoint"]])
图中的数据是我在 Streamlit
中使用 st.map
绘制,该部分代码我没有贴进来,在另外 py
文件中。st.map
底层用的 mapbox
,而后者这里用的 OpenStreetMap
。这里就涉及到高德使用的是 GCJ02
坐标系,国外是 WGS84
坐标系。需要进行转换,我也是网上找的转换代码(我将在最后结果展示时,进行转换)为什么不直接使用高德地图(因为暂时没有这方面的需求,没有申请;后续有需求再说 | 我是在网上别人开发的网站手工点的坐标,人家的底图用的是高德)
6.利用 pyhive
创建新的 Hive
表,并将数据 load
到表中:
from pyhive import hive
# 配置连接参数
host_ip = "127.0.0.1"
port = 10000
username = "Jiang"
with hive.Connection(host=host_ip, port=port) as conn:
cursor = conn.cursor()
hive_table_name = "data_exercise.dwd_vehicle_track"
drop_table_sql = f"""
drop table if exists {hive_table_name}
"""
print('删除表语句:\n', drop_table_sql)
cursor.execute(drop_table_sql)
create_table_sql = f"""
create table if not exists `{hive_table_name}`(
ts string comment "时间",
licence_plate string comment "车牌",
lagtitude double comment "纬度",
longtitude double comment "经度",
is_case_the_joint tinyint comment "是否为踩点(1-是,0-否)"
)
comment "利用行车轨迹分析犯罪分子作案地点 | author:蒋点数分 | 文章编号:c9029700"
row format delimited fields terminated by ","
stored as textfile
"""
print('创建表语句:\n', create_table_sql)
cursor.execute(create_table_sql)
import os
load_data_sql = f"""
load data local inpath "{os.path.abspath(out_csv_path)}"
overwrite into table {hive_table_name}
"""
print('覆盖式导入数据语句:\n',load_data_sql)
cursor.execute(load_data_sql)
cursor.close()
我通过使用
PyHive
包实现 Python 操作Hive
。我个人电脑部署了Hadoop
及Hive
,但是没有开启认证,企业里一般常用Kerberos
来进行大数据集群的认证。
四、SQL 解答
我使用 CTE
的语法,这样将步骤串行展示,逻辑比较清晰,下面分成几部分解释 SQL 语句:
1.这部分代码的逻辑是,先做基本的数据处理:比如将字符串格式的时间转为时间戳,unix_timestamp
返回精度到秒,因此需要将微秒部分的提取出来单独转换;使用 lag
提取上一行的坐标,用来后续计算;第二部分使用 haversine
公式计算距离,注意三角函数使用前要将坐标用 radians
转换为弧度
split
函数的官方文档说明,“Splits str around pat (pat is a regular expression)”。即它是用正则表达式来切分,而不是简单的字符串,所以需要对'.'转义,为什么要用两个反斜杠来转义,可以自行搜索原因。
-- 踩点速度和正常速度
-- SPEED_CASE = (4, 6)
-- SPEED_NORMAL = (8, 16)
with calc_last_gps_and_convert_ts as (
select
-- unix_timestamp 返回的精度是秒级,需要将微妙精度的数据单独提取出来
-- split(string str, string pat) (pat is a regular expression)
-- split 函数的第二个参数,其实是一个正则表达式,而不是一个单纯的字符串
-- 所以 "." 需要转义,为什么要用两个反斜杠转义,请自行搜索
unix_timestamp(ts) + cast(concat('0.',split(ts,'\\.')[1]) as double) as ts
, licence_plate, lagtitude, longtitude, is_case_the_joint
, lag(lagtitude, 1, null) over(partition by licence_plate order by ts asc) as last_lagtitude
, lag(longtitude, 1, null) over(partition by licence_plate order by ts asc) as last_longtitude
from data_exercise.dwd_vehicle_track
)
, calc_haversine_distance as (
select
ts, licence_plate, lagtitude, longtitude, is_case_the_joint
, lag(ts, 1, null) over(partition by licence_plate order by ts asc) as lastts
, last_lagtitude, last_longtitude
-- haversine 公式,注意要用 radians 转为弧度再用三角函数
, 2 * 6371 * 1000 * asin(
sqrt(
pow( sin( radians(lagtitude-last_lagtitude)/2 ), 2 ) + cos( radians(last_lagtitude) ) *
cos( radians(lagtitude) ) * pow( sin( radians(longtitude-last_longtitude)/2 ), 2 )
)
) as haversine_distance
from calc_last_gps_and_convert_ts
)
……
2.这部分代码计算速度,并打标,没啥说的:
……
, calc_speed_table as (
select
ts, licence_plate, lagtitude, longtitude, is_case_the_joint
, haversine_distance
, haversine_distance / (ts - lastts) as speed
from calc_haversine_distance
)
, classify_speed_table as (
select
ts, licence_plate, lagtitude, longtitude, is_case_the_joint
, haversine_distance, speed
, (case
when speed between 4 and 6 then 1
when speed between 8 and 16 then 0
when speed is null then 0 -- 起始点当成普通点
else -1
end) as calc_case
from calc_speed_table
)
……
3.实现“断点分组”的逻辑,也是 SQL 常见的考题。套路还是那个套路,最后将坐标点求平均就得到中心点 —— 这需要坐标均匀分布(转整数圈是基本要求):
……
, mark_group_cut_point as (
-- 将普通点和踩点的分隔位置标记出来
select
ts, licence_plate, lagtitude, longtitude, is_case_the_joint
, haversine_distance, speed, calc_case
, (case
when calc_case <> lag(calc_case, 1, null) over(partition by licence_plate order by ts) then 1
else 0
end ) as cut_point
from classify_speed_table
)
, mark_group_number as (
select
ts, licence_plate, lagtitude, longtitude, is_case_the_joint
, haversine_distance, speed, calc_case, cut_point
, sum(cut_point) over(partition by licence_plate order by ts) as group_number
from mark_group_cut_point
)
select
licence_plate, group_number
--, collect_list(lagtitude) as lagtitude_list
--, collect_list(longtitude) as longtitude_list
, avg(lagtitude) as center_lagtitude
, avg(longtitude) as center_longtitude
from mark_group_number
where calc_case = 1
group by licence_plate, group_number
4.完整的 SQL 语句:
with calc_last_gps_and_convert_ts as (
select
-- unix_timestamp 返回的精度是秒级,需要将微妙精度的数据单独提取出来
-- split(string str, string pat) (pat is a regular expression)
-- split 函数的第二个参数,其实是一个正则表达式,而不是一个单纯的字符串
-- 所以 "." 需要转义,为什么要用两个反斜杠转义,请自行搜索
unix_timestamp(ts) + cast(concat('0.',split(ts,'\\.')[1]) as double) as ts
, licence_plate, lagtitude, longtitude, is_case_the_joint
, lag(lagtitude, 1, null) over(partition by licence_plate order by ts asc) as last_lagtitude
, lag(longtitude, 1, null) over(partition by licence_plate order by ts asc) as last_longtitude
from data_exercise.dwd_vehicle_track
)
, calc_haversine_distance as (
select
ts, licence_plate, lagtitude, longtitude, is_case_the_joint
, lag(ts, 1, null) over(partition by licence_plate order by ts asc) as lastts
, last_lagtitude, last_longtitude
-- haversine 公式,注意要用 radians 转为弧度再用三角函数
, 2 * 6371 * 1000 * asin(
sqrt(
pow( sin( radians(lagtitude-last_lagtitude)/2 ), 2 ) + cos( radians(last_lagtitude) ) *
cos( radians(lagtitude) ) * pow( sin( radians(longtitude-last_longtitude)/2 ), 2 )
)
) as haversine_distance
from calc_last_gps_and_convert_ts
)
, calc_speed_table as (
select
ts, licence_plate, lagtitude, longtitude, is_case_the_joint
, haversine_distance
, haversine_distance / (ts - lastts) as speed
from calc_haversine_distance
)
, classify_speed_table as (
select
ts, licence_plate, lagtitude, longtitude, is_case_the_joint
, haversine_distance, speed
, (case
when speed between 4 and 6 then 1
when speed between 8 and 16 then 0
when speed is null then 0 -- 起始点当成普通点
else -1
end) as calc_case
from calc_speed_table
)
, mark_group_cut_point as (
-- 将普通点和踩点的分隔位置标记出来
select
ts, licence_plate, lagtitude, longtitude, is_case_the_joint
, haversine_distance, speed, calc_case
, (case
when calc_case <> lag(calc_case, 1, null) over(partition by licence_plate order by ts) then 1
else 0
end ) as cut_point
from classify_speed_table
)
, mark_group_number as (
select
ts, licence_plate, lagtitude, longtitude, is_case_the_joint
, haversine_distance, speed, calc_case, cut_point
, sum(cut_point) over(partition by licence_plate order by ts) as group_number
from mark_group_cut_point
)
select
licence_plate, group_number
--, collect_list(lagtitude) as lagtitude_list
--, collect_list(longtitude) as longtitude_list
, avg(lagtitude) as center_lagtitude
, avg(longtitude) as center_longtitude
from mark_group_number
where calc_case = 1
group by licence_plate, group_number
最终结果可视化展示:
绿色点是正常行驶,红色点是踩点数据;橙色点就是 SQL 求得“中心点”
😁😁😁 我现在正在求职数据类工作(主要是数据分析或数据科学);如果您有合适的机会,即时到岗,不限城市。
#牛客创作赏金赛#【SQL周周练】是精选或者自编自创一系列具有挑战性或趣味性的 SQL 题目,对于数据分析校招生或者初中级数据分析师非常值得一看。