refactor: 处理纵向合并的单元格
This commit is contained in:
parent
45bf3477d1
commit
3e3b77e34d
@ -19,26 +19,41 @@ class XlsSplitHandle(BaseParseTableHandle):
|
|||||||
def handle(self, file, get_buffer, save_image):
|
def handle(self, file, get_buffer, save_image):
|
||||||
buffer = get_buffer(file)
|
buffer = get_buffer(file)
|
||||||
try:
|
try:
|
||||||
wb = xlrd.open_workbook(file_contents=buffer)
|
wb = xlrd.open_workbook(file_contents=buffer, formatting_info=True)
|
||||||
result = []
|
result = []
|
||||||
sheets = wb.sheets()
|
sheets = wb.sheets()
|
||||||
for sheet in sheets:
|
for sheet in sheets:
|
||||||
|
# 获取合并单元格的范围信息
|
||||||
|
merged_cells = sheet.merged_cells
|
||||||
|
print(merged_cells)
|
||||||
|
data = []
|
||||||
paragraphs = []
|
paragraphs = []
|
||||||
rows = iter([sheet.row_values(i) for i in range(sheet.nrows)])
|
# 获取第一行作为标题行
|
||||||
if not rows: continue
|
headers = [sheet.cell_value(0, col_idx) for col_idx in range(sheet.ncols)]
|
||||||
ti = next(rows)
|
# 从第二行开始遍历每一行(跳过标题行)
|
||||||
for r in rows:
|
for row_idx in range(1, sheet.nrows):
|
||||||
l = []
|
row_data = {}
|
||||||
for i, c in enumerate(r):
|
for col_idx in range(sheet.ncols):
|
||||||
if not c:
|
cell_value = sheet.cell_value(row_idx, col_idx)
|
||||||
continue
|
|
||||||
t = str(ti[i]) if i < len(ti) else ""
|
# 检查是否为空单元格,如果为空检查是否在合并区域中
|
||||||
t += (": " if t else "") + str(c)
|
if cell_value == "":
|
||||||
l.append(t)
|
# 检查当前单元格是否在合并区域
|
||||||
l = "; ".join(l)
|
for (rlo, rhi, clo, chi) in merged_cells:
|
||||||
if sheet.name.lower().find("sheet") < 0:
|
if rlo <= row_idx < rhi and clo <= col_idx < chi:
|
||||||
l += " ——" + sheet.name
|
# 使用合并区域的左上角单元格的值
|
||||||
paragraphs.append({'title': '', 'content': l})
|
cell_value = sheet.cell_value(rlo, clo)
|
||||||
|
break
|
||||||
|
|
||||||
|
# 将标题作为键,单元格的值作为值存入字典
|
||||||
|
row_data[headers[col_idx]] = cell_value
|
||||||
|
data.append(row_data)
|
||||||
|
|
||||||
|
for row in data:
|
||||||
|
row_output = "; ".join([f"{key}: {value}" for key, value in row.items()])
|
||||||
|
# print(row_output)
|
||||||
|
paragraphs.append({'title': '', 'content': row_output})
|
||||||
|
|
||||||
result.append({'name': sheet.name, 'paragraphs': paragraphs})
|
result.append({'name': sheet.name, 'paragraphs': paragraphs})
|
||||||
|
|
||||||
except BaseException as e:
|
except BaseException as e:
|
||||||
|
|||||||
@ -17,6 +17,35 @@ class XlsxSplitHandle(BaseParseTableHandle):
|
|||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def fill_merged_cells(self, sheet, image_dict):
|
||||||
|
data = []
|
||||||
|
|
||||||
|
# 获取第一行作为标题行
|
||||||
|
headers = [cell.value for cell in sheet[1]]
|
||||||
|
|
||||||
|
# 从第二行开始遍历每一行
|
||||||
|
for row in sheet.iter_rows(min_row=2, values_only=False):
|
||||||
|
row_data = {}
|
||||||
|
for col_idx, cell in enumerate(row):
|
||||||
|
cell_value = cell.value
|
||||||
|
|
||||||
|
# 如果单元格为空,并且该单元格在合并单元格内,获取合并单元格的值
|
||||||
|
if cell_value is None:
|
||||||
|
for merged_range in sheet.merged_cells.ranges:
|
||||||
|
if cell.coordinate in merged_range:
|
||||||
|
cell_value = sheet[merged_range.min_row][merged_range.min_col - 1].value
|
||||||
|
break
|
||||||
|
|
||||||
|
image = image_dict.get(cell_value, None)
|
||||||
|
if image is not None:
|
||||||
|
cell_value = f''
|
||||||
|
|
||||||
|
# 使用标题作为键,单元格的值作为值存入字典
|
||||||
|
row_data[headers[col_idx]] = cell_value
|
||||||
|
data.append(row_data)
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
def handle(self, file, get_buffer, save_image):
|
def handle(self, file, get_buffer, save_image):
|
||||||
buffer = get_buffer(file)
|
buffer = get_buffer(file)
|
||||||
try:
|
try:
|
||||||
@ -30,25 +59,13 @@ class XlsxSplitHandle(BaseParseTableHandle):
|
|||||||
for sheetname in wb.sheetnames:
|
for sheetname in wb.sheetnames:
|
||||||
paragraphs = []
|
paragraphs = []
|
||||||
ws = wb[sheetname]
|
ws = wb[sheetname]
|
||||||
rows = list(ws.rows)
|
data = self.fill_merged_cells(ws, image_dict)
|
||||||
if not rows: continue
|
|
||||||
ti = list(rows[0])
|
for row in data:
|
||||||
for r in list(rows[1:]):
|
row_output = "; ".join([f"{key}: {value}" for key, value in row.items()])
|
||||||
l = []
|
# print(row_output)
|
||||||
for i, c in enumerate(r):
|
paragraphs.append({'title': '', 'content': row_output})
|
||||||
if not c.value:
|
|
||||||
continue
|
|
||||||
t = str(ti[i].value) if i < len(ti) else ""
|
|
||||||
content = str(c.value)
|
|
||||||
image = image_dict.get(content, None)
|
|
||||||
if image is not None:
|
|
||||||
content = f''
|
|
||||||
t += (": " if t else "") + content
|
|
||||||
l.append(t)
|
|
||||||
l = "; ".join(l)
|
|
||||||
if sheetname.lower().find("sheet") < 0:
|
|
||||||
l += " ——" + sheetname
|
|
||||||
paragraphs.append({'title': '', 'content': l})
|
|
||||||
result.append({'name': sheetname, 'paragraphs': paragraphs})
|
result.append({'name': sheetname, 'paragraphs': paragraphs})
|
||||||
|
|
||||||
except BaseException as e:
|
except BaseException as e:
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user