yanchang
yanchang
发布于 2025-08-12 / 21 阅读
0
0

微信聊天记录提取

使用github开源工具chatlog

官网页面:https://github.com/sjzar/chatlog?tab=readme-ov-file#http-api

访问 Releases 页面下载适合您系统的预编译版本。

依次使用

  1. 安装 Chatlog下载预编译版本使用 Go 安装

  2. 运行程序:执行 chatlog 启动 Terminal UI 界面

  3. 解密数据:选择 解密数据 菜单项

  4. 开启 HTTP 服务:选择 开启 HTTP 服务 菜单项

  5. 访问数据:通过 HTTP APIMCP 集成 访问聊天记录

然后将结果复制到文件中使用python进一步处理。

PS:当前这个应用导出为csv格式或者jsonl格式都是有问题的,反正经过我的测试是导不出来没关系,反正也没事。然后使用如下代码合并消息,把连续发的消息连接在一起

处理数据

import re

def merge_chat_messages(file_path):
    merged_records = []
    current_sender = None
    current_time = None
    current_message = []

    # 正则表达式匹配发送者、时间、消息内容
    # 例子: 律政先锋🀄(wxid_m7ca7aue2jml22) 2023-01-20 11:25:38
    pattern = re.compile(r'^(.*?)\s+([\d-]+\s+[\d:]+)\s*(.*)$')

    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if not line:
                continue

            match = pattern.match(line)
            if match:
                sender = match.group(1).strip()
                time = match.group(2).strip()
                message = match.group(3).strip()

                if sender == current_sender:
                    # 如果是同一个人发送,就合并消息
                    current_message.append(message)
                else:
                    # 如果不是同一个人,就先处理上一条记录
                    if current_sender:
                        merged_records.append({
                            'sender': current_sender,
                            'time': current_time,
                            'message': " ".join(current_message)
                        })

                    # 然后更新当前记录
                    current_sender = sender
                    current_time = time
                    current_message = [message]
            else:
                # 处理没有时间戳的后续消息行
                if current_sender:
                    current_message.append(line)

    # 循环结束后,处理最后一条记录
    if current_sender:
        merged_records.append({
            'sender': current_sender,
            'time': current_time,
            'message': "\n".join(current_message)
        })

    return merged_records

def format_merged_records(records):
    """
    将合并后的记录列表格式化为字符串。
    """
    formatted_output = []
    for record in records:
        sender = record['sender']
        time = record['time']
        message = record['message']
        formatted_output.append(f"{sender} {time}\n{message}")
    return "\n\n".join(formatted_output)



file_name = "chatlog.txt"
merged_result = merge_chat_messages(file_name)
formatted_output = format_merged_records(merged_result)
# 将结果保存到新文件
output_file_name = "merged_chat_log.txt"
with open(output_file_name, "w", encoding="utf-8") as f:
    f.write(formatted_output)
print(f"合并后的聊天记录已保存到文件: {output_file_name}")

然后将上述的文件处理为适合机器学习的json格式

import json
import re

def convert_chatlog_to_json(file_path):
    chat_data = []
    current_instruction = ""
    current_output = ""
    # Regex to identify the speaker and the message
    # It captures the speaker's name (律政先锋🀄 or 我) and the following message
    # It also handles messages with image or animated emoji tags.
    pattern = re.compile(r'^(律政先锋🀄\(wxid_m7ca7aue2jml22\)|我) .*?\n(.*?)(?=\n(律政先锋🀄|我)|$)', re.DOTALL)
    
    with open(file_path, 'r', encoding='utf-8') as f:
        # Read the entire file content
        content = f.read()
    
    # Split the content by the time stamp to process each message block
    messages = re.split(r'\n(律政先锋🀄|我) .*? \d{2}:\d{2}:\d{2}\n', content)
    # The first element will be empty, so we skip it.
    messages = messages[1:]
    
    # Process messages in pairs of (speaker, message)
    for i in range(0, len(messages), 2):
        speaker = messages[i].strip()
        message = messages[i+1].strip()

        # Clean up the message: remove image and animation tags
        message = re.sub(r'\[.*?\]', '', message)  # remove emojis like [坏笑]
        message = re.sub(r'![图片]\(.*\)', '', message)  # remove image tags
        message = message.strip()
        
        if not message:
            continue

        if "律政先锋🀄" in speaker:
            current_instruction = message
        elif "我" in speaker:
            current_output = message
            # If both instruction and output are captured, add them to the list
            if current_instruction and current_output:
                chat_data.append({
                    "instruction": current_instruction,
                    "input": "",
                    "output": current_output
                })
                # Reset for the next pair
                current_instruction = ""
                current_output = ""

    return chat_data

def save_to_json_file(data, output_file_path):
    with open(output_file_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

if __name__ == "__main__":
    input_file = 'chatlog.txt'
    output_file = 'output.json'
    json_data = convert_chatlog_to_json(input_file)
    save_to_json_file(json_data, output_file)
    
    print(f"Successfully converted '{input_file}' to '{output_file}'.")

然后删除所有的图片链接、视频链接、语音连接等等等等
然后通过代码删除所有重复的“宝贝”

import json

def process_chat_log(input_filename, output_filename):

    processed_log = []

    try:
        with open(input_filename, 'r', encoding='utf-8') as f:
            chat_log = json.load(f)
    except FileNotFoundError:
        print(f"错误: 找不到文件 {input_filename}")
        return
    except json.JSONDecodeError:
        print(f"错误: 无法解析文件 {input_filename},请检查JSON格式是否正确。")
        return

    for entry in chat_log:
        output_text = entry.get('output', '')
        # 分割字符串,并检查是否存在 '宝贝'
        parts = output_text.split('宝贝')
        if len(parts) > 1:
            # 只保留第一个 '宝贝',其余的用 '你' 替换
            processed_output = parts[0] + '宝贝' + '你'.join(parts[1:])
            entry['output'] = processed_output
        
        processed_log.append(entry)

    try:
        with open(output_filename, 'w', encoding='utf-8') as f:
            json.dump(processed_log, f, ensure_ascii=False, indent=2)
        print(f"文件已成功保存到 {output_filename}")
    except IOError:
        print(f"错误: 无法写入文件 {output_filename}")

# 使用示例:
# 假设你的原始文件是 'chatlog.json',你想将新文件保存为 'processed_chatlog.json'
input_file = 'chatlog.json'
output_file = 'processed_chatlog.json'

process_chat_log(input_file, output_file)


评论